Batch Processing API
## Введение в архитектуру кода
Кодовая база адаптивной Batch-системы организована в модульную структуру, где каждый компонент отвечает за конкретный аспект высокопроизводительной обработки данных. Все модули написаны на Rust с акцентом на безопасность памяти, асинхронную обработку и математическую оптимальность принимаемых решений. Основные принципы дизайна включают zero-allocation в критических путях, адаптивную настройку параметров на основе наблюдаемой нагрузки и интеграцию методов теории массового обслуживания и математической статистики.
## Модуль `adaptive_batcher.rs`
### Структура `BatchModelParams`
«`rust
/// Параметры модели времени обработки батча
/// T_process(B) = α/B + β + γ·B + δ·(B — B_opt)² + ε·I(B > L3_CACHE)
#[derive(Debug, Clone, Copy)]
pub struct BatchModelParams {
pub alpha: f64, // Накладные расходы на батч (мс)
pub beta: f64, // Минимальное время обработки (мс)
pub gamma: f64, // Линейный коэффициент (мс/B)
pub delta: f64, // Квадратичный штраф (мс/B²)
pub b_opt: f64, // Оптимальный размер для кэша
pub epsilon: f64, // Штраф за выход из L3-кэша
pub l3_cache_size: usize, // Размер L3-кэша в байтах
pub confidence: f64, // Уверенность в оценке (0..1)
}
«`
**Назначение**: Структура `BatchModelParams` инкапсулирует все параметры математической модели времени обработки батча. Каждый параметр имеет строго определенный физический смысл и размерность, что позволяет модели точно описывать поведение системы при различных размерах батча.
**Поля**:
— `alpha` — фиксированные накладные расходы на формирование и инициализацию батча, амортизирующиеся с ростом размера
— `beta` — минимальное время обработки, присутствующее даже при обработке минимального батча
— `gamma` — линейный коэффициент, характеризующий стоимость обработки одного элемента
— `delta` — квадратичный штраф за отклонение от оптимального размера, моделирующий неэффективность использования внутренних буферов
— `b_opt` — оптимальный размер батча с точки зрения кэш-иерархии процессора
— `epsilon` — дискретный штраф, активируемый при превышении размера L3-кэша
— `l3_cache_size` — размер кэша последнего уровня в байтах
— `confidence` — метрика достоверности текущей оценки параметров, вычисляемая на основе количества наблюдений
### Функция `solve_cubic`
«`rust
/// Решение кубического уравнения a·x³ + b·x² + c·x + d = 0
/// Возвращает действительные корни
pub fn solve_cubic(a: f64, b: f64, c: f64, d: f64) -> Vec
«`
**Назначение**: Функция `solve_cubic` реализует аналитическое решение кубического уравнения с использованием метода Кардано. Входные параметры соответствуют коэффициентам уравнения в порядке убывания степени. Функция обрабатывает вырожденные случаи (квадратное уравнение) и возвращает вектор всех действительных корней.
**Алгоритмические особенности**:
— При малых значениях старшего коэффициента `a` (< 1e-12) выполняется переход к решению квадратного уравнения
- Для кубического уравнения вычисляется дискриминант `Δ = (q/2)² + (p/3)³`, где `p` и `q` — коэффициенты приведенного уравнения
- В зависимости от знака дискриминанта возвращается один, два или три действительных корня
- Функция используется для нахождения оптимального размера батча путем решения уравнения `dL/dB = 0`
### Структура `KalmanFilter`
```rust
#[derive(Debug, Clone)]
pub struct KalmanFilter {
pub state: f64, // Текущая оценка λ
pub covariance: f64, // Дисперсия оценки
pub process_noise: f64, // Шум процесса (Q)
pub measurement_noise: f64, // Шум измерения (R)
}
```
**Назначение**: `KalmanFilter` реализует дискретный фильтр Калмана для рекуррентной оценки интенсивности входящего потока `λ`. Фильтр позволяет получать сглаженную оценку, устойчивую к флуктуациям измерений, и обеспечивает оптимальное в смысле минимума среднеквадратичной ошибки восстановление скрытого состояния динамической системы.
**Методы**:
- `predict(&mut self)` — выполняет экстраполяцию состояния на следующий шаг, увеличивая дисперсию оценки на величину шума процесса согласно уравнению `P_{k|k-1} = P_{k-1|k-1} + Q`
- `update(&mut self, measurement: f64) -> f64` — корректирует оценку на основе нового измерения, вычисляя коэффициент Калмана `K = P_{k|k-1} / (P_{k|k-1} + R)` и обновляя состояние `x = x + K·(z — x)` и ковариацию `P = (1 — K)·P`
### Структура `MarkovChain2nd`
«`rust
#[derive(Debug, Clone)]
pub struct MarkovChain2nd {
pub levels: usize,
pub transitions: Vec
pub counts: Vec
}
«`
**Назначение**: `MarkovChain2nd` реализует цепь Маркова второго порядка для дискретизированных уровней нагрузки. Модель учитывает не только текущее состояние, но и предыдущее, что позволяет улавливать тенденции изменения нагрузки.
**Поля**:
— `levels` — количество уровней квантования нагрузки
— `transitions` — трехмерная матрица переходных вероятностей размера `[levels][levels][levels]`, где `transitions[i][j][k] = P(λ_{t+1}=k | λ_t=j, λ_{t-1}=i)`
— `counts` — матрица счетчиков наблюдений для обучения модели
**Методы**:
— `update(&mut self, l_tm1: usize, l_t: usize, l_tp1: usize)` — обновляет счетчики наблюдений и пересчитывает вероятности переходов
— `predict(&self, l_tm1: usize, l_t: usize) -> (usize, f64)` — возвращает наиболее вероятный следующий уровень нагрузки и соответствующую вероятность
— `quantize(&self, lambda: f64, max_lambda: f64) -> usize` — выполняет квантование непрерывного значения нагрузки в дискретный уровень
— `dequantize(&self, level: usize, max_lambda: f64) -> f64` — восстанавливает приближенное значение нагрузки из уровня
### Структура `WaveletTransformer`
«`rust
#[derive(Debug, Clone)]
pub struct WaveletTransformer {
h0: f64, h1: f64, h2: f64, h3: f64,
g0: f64, g1: f64, g2: f64, g3: f64,
pub max_level: usize,
history: Vec
max_history: usize,
approx: Vec
detail: Vec
}
«`
**Назначение**: `WaveletTransformer` выполняет дискретное вейвлет-преобразование временного ряда нагрузки с использованием фильтров Добеши 4 (Daubechies-4). Вейвлет-анализ позволяет разложить сигнал на составляющие разного масштаба и выделить как долговременные тренды, так и краткосрочные флуктуации.
**Коэффициенты фильтров**:
— `h0..h3` — коэффициенты низкочастотного (масштабирующего) фильтра для получения аппроксимирующих коэффициентов
— `g0..g3` — коэффициенты высокочастотного (вейвлет) фильтра для получения детализирующих коэффициентов
**Методы**:
— `transform(&mut self)` — выполняет рекурсивное разложение сигнала на заданное число уровней, сохраняя аппроксимирующие и детализирующие коэффициенты
— `predict(&self, tau: usize) -> Vec
### Структура `PIDController`
«`rust
#[derive(Debug, Clone)]
pub struct PIDController {
pub kp: f64,
pub ki: f64,
pub kd: f64,
integral: f64,
prev_error: f64,
prev_time: Instant,
initialized: bool,
pub output_min: f64,
pub output_max: f64,
pub integral_limit: f64,
}
«`
**Назначение**: `PIDController` реализует классический пропорционально-интегрально-дифференциальный регулятор для компенсации остаточной ошибки управления. Регулятор используется для коррекции размера батча на основе отклонения измеренной задержки от целевого значения.
**Методы**:
— `new(kp: f64, ki: f64, kd: f64) -> Self` — конструктор с заданными коэффициентами
— `auto_tune(kp_crit: f64, t_crit: f64) -> Self` — статический метод, выполняющий автонастройку коэффициентов по методу Циглера-Николса на основе критического коэффициента усиления и периода колебаний
— `compute(&mut self, error: f64) -> f64` — вычисляет управляющий сигнал по формуле `u = Kp·e + Ki·∫e dt + Kd·de/dt`, применяя антивиндовую защиту интегральной составляющей через ограничение `integral_limit`
— `reset(&mut self)` — сбрасывает внутреннее состояние регулятора
### Структура `GibbsSampler`
«`rust
#[derive(Debug, Clone)]
pub struct GibbsSampler {
pub alpha_samples: Vec
pub beta_samples: Vec
pub gamma_samples: Vec
pub delta_samples: Vec
pub b_opt_samples: Vec
alpha_a: f64, alpha_b: f64,
beta_a: f64, beta_b: f64,
gamma_a: f64, gamma_b: f64,
delta_a: f64, delta_b: f64,
pub alpha: f64,
pub beta: f64,
pub gamma: f64,
pub delta: f64,
pub b_opt: f64,
}
«`
**Назначение**: `GibbsSampler` реализует алгоритм сэмплирования Гиббса для байесовской оценки параметров модели времени обработки. Метод позволяет получать не только точечные оценки параметров, но и их распределения, что дает информацию о неопределенности оценок.
**Априорные распределения**:
— Для каждого параметра задано априорное Gamma-распределение с гиперпараметрами `*_a` (shape) и `*_b` (rate)
— Выбор Gamma-распределения обусловлен его свойством быть сопряженным для параметров, входящих в модель линейно
**Методы**:
— `sample(&mut self, data: &[(usize, f64)])` — выполняет одну итерацию условного сэмплирования всех параметров из их полных условных распределений
— `run_chain(&mut self, data: &[(usize, f64)], iterations: usize, burn_in: usize)` — запускает цепь Маркова заданной длины с отбрасываемым разогревочным периодом (burn-in)
— `average(&self) -> (f64, f64, f64, f64, f64)` — возвращает средние значения сэмплов для всех параметров
— `log_likelihood(&self, data: &[(usize, f64)]) -> f64` — вычисляет логарифм правдоподобия данных при текущих значениях параметров
### Структура `GPScheduler`
«`rust
#[derive(Debug, Clone)]
pub struct GPScheduler {
pub weights: [f64; 5],
pub shares: [f64; 5],
pub queue_lengths: [f64; 5],
pub total_capacity: f64,
}
«`
**Назначение**: `GPScheduler` реализует модель обобщенного процессорного разделения (Generalized Processor Sharing) для распределения пропускной способности между пятью классами приоритета. Модель гарантирует, что в любой момент времени доля ресурса, выделенная классу, пропорциональна его весу.
**Поля**:
— `weights` — веса приоритетов, определяющие относительную важность каждого класса
— `shares` — вычисленные доли пропускной способности для каждого класса
— `queue_lengths` — текущие длины очередей по классам
— `total_capacity` — общая пропускная способность системы
**Методы**:
— `recompute_shares(&mut self)` — пересчитывает доли пропускной способности по формуле `shares[i] = weights[i] * total_capacity / Σweights`
— `waiting_time(&self, i: usize, lambda: f64, batch_size: f64, service_rate: f64) -> f64` — вычисляет прогнозируемое время ожидания для класса `i` с использованием формул теории массового обслуживания
### Структура `ModelPredictiveController`
«`rust
#[derive(Debug, Clone)]
pub struct ModelPredictiveController {
pub horizon: usize,
pub lambda_pred: Vec
pub params: BatchModelParams,
pub w_latency: f64,
pub w_delta_b: f64,
pub w_delta_m: f64,
pub b_min: usize,
pub b_max: usize,
pub m_min: usize,
pub m_max: usize,
}
«`
**Назначение**: `ModelPredictiveController` реализует алгоритм управления с прогнозирующей моделью (Model Predictive Control, MPC). На каждом шаге система решает задачу оптимизации на конечном горизонте, минимизируя взвешенную сумму прогнозируемых задержек и штрафов за изменение параметров.
**Поля**:
— `horizon` — длина горизонта прогнозирования в шагах дискретизации
— `lambda_pred` — вектор прогнозируемых значений нагрузки на всем горизонте
— `params` — параметры модели времени обработки
— `w_latency`, `w_delta_b`, `w_delta_m` — весовые коэффициенты в целевой функции
— `b_min`, `b_max`, `m_min`, `m_max` — ограничения на размер батча и количество воркеров
**Методы**:
— `solve(&self, current_b: usize, current_m: usize) -> (usize, usize)` — выполняет численную оптимизацию на ограниченном множестве кандидатов (поиск по сетке) и возвращает оптимальные значения параметров
— `cost_function(&self, b: f64, m: f64, b_prev: f64, m_prev: f64) -> f64` — вычисляет значение целевой функции
— `set_lambda_predictions(&mut self, predictions: Vec
### Структура `AdaptiveBatcher`
«`rust
pub struct AdaptiveBatcher {
pub config: AdaptiveBatcherConfig,
pub current_batch_size: RwLock
pub current_workers: RwLock
pub model_params: RwLock
pub kalman: RwLock
pub markov: RwLock
pub wavelet: RwLock
pub pid: RwLock
pub gibbs: RwLock
pub gps: RwLock
pub mpc: RwLock
pub evt: RwLock
pub measurements: RwLock
pub latencies: RwLock
pub lambdas: RwLock
pub metrics: Arc
}
«`
**Назначение**: `AdaptiveBatcher` является центральным координирующим компонентом системы. Он агрегирует все математические модели и предоставляет единый интерфейс для адаптивного управления размером батча. Компонент работает в реальном времени, непрерывно анализируя наблюдаемые данные и корректируя параметры обработки.
**Ключевые методы**:
— `new(config: AdaptiveBatcherConfig) -> Self` — конструктор, инициализирующий все вложенные модели с начальными параметрами. Включает исправление начальной нагрузки с 100.0 на 0.0 для корректной работы при отсутствии данных
— `update_model(&self)` — асинхронный метод, запускающий байесовское обновление параметров модели времени обработки с использованием сэмплера Гиббса при накоплении достаточного количества измерений (≥ 50)
— `estimate_lambda(&self, measured_throughput: f64) -> f64` — выполняет фильтрацию Калмана для получения сглаженной оценки текущей нагрузки
— `predict_load(&self, horizon: usize) -> Vec
— `optimal_batch_size(&self, lambda: f64, waiting_time: f64) -> usize` — решает кубическое уравнение `dL/dB = 0` и выбирает оптимальный размер батча, минимизирующий полную задержку
— `pid_correction(&self, target_latency: f64, measured_latency: f64) -> f64` — вычисляет корректирующий сигнал на основе отклонения измеренной задержки от целевой
— `mpc_optimize(&self, lambda_pred: Vec
— `compute_batch_size(&self) -> usize` — основной метод, интегрирующий все этапы вычислений для получения финального, скорректированного размера батча
— `record_batch_execution(&self, batch_size: usize, processing_time: Duration, success_rate: f64, queue_depth: usize)` — обновляет все исторические данные и модели на основе результатов выполнения очередного батча
—
## Модуль `circuit_breaker.rs`
### Структура `CircuitBreakerMarkovModel`
«`rust
#[derive(Debug, Clone)]
pub struct CircuitBreakerMarkovModel {
pub transition_rates: [[f64; 3]; 3],
pub steady_state: [f64; 3],
pub mttf: f64,
pub mttr: f64,
pub availability: f64,
}
«`
**Назначение**: `CircuitBreakerMarkovModel` описывает марковскую модель состояний автоматического выключателя. Модель позволяет количественно оценивать надежность системы и прогнозировать поведение выключателя в стационарном режиме.
**Поля**:
— `transition_rates` — матрица интенсивностей переходов между состояниями Closed (0), Open (1) и HalfOpen (2). Элемент `[i][j]` задает интенсивность перехода из состояния i в состояние j
— `steady_state` — вектор стационарных вероятностей нахождения системы в каждом состоянии
— `mttf` — среднее время до отказа (Mean Time To Failure), вычисляемое как `1/λ`, где λ — интенсивность перехода в состояние Open
— `mttr` — среднее время восстановления (Mean Time To Recovery), вычисляемое как `1/μ`, где μ — интенсивность восстановления
— `availability` — коэффициент готовности, вычисляемый как `MTTF/(MTTF+MTTR)`
**Методы**:
— `compute_steady_state(&mut self)` — решает систему линейных уравнений `π·Q = 0` с условием нормировки `Σπ = 1` для нахождения стационарных вероятностей. Используется метод Крамера для решения системы трех уравнений
### Перечисление `CircuitState`
«`rust
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CircuitState {
Closed = 0,
Open = 1,
HalfOpen = 2,
}
«`
**Назначение**: `CircuitState` определяет три возможных состояния автоматического выключателя. Целочисленные значения соответствуют индексам в матрицах марковской модели.
**Варианты**:
— `Closed` — нормальный режим работы. Запросы передаются на обработку, ведется подсчет отказов
— `Open` — аварийный режим. Запросы немедленно отклоняются с ошибкой, запущен таймер восстановления
— `HalfOpen` — режим проверки восстановления. Пропускается ограниченное число запросов для тестирования работоспособности проблемного компонента
### Структура `CircuitBreaker`
«`rust
pub struct CircuitBreaker {
name: String,
pub markov_model: RwLock
failure_model: RwLock
pub recovery_model: RwLock
state: RwLock
failure_count: RwLock
success_count: RwLock
total_requests: RwLock
last_failure: RwLock
**Назначение**: `CircuitBreaker` реализует паттерн автоматического выключателя с математическим обоснованием переходов. Компонент защищает систему от каскадных отказов, временно блокируя запросы к проблемным сервисам.
**Ключевые методы**:
— `new(…) -> Self` — конструктор, инициализирующий все модели и счетчики. Выполняет расчет стационарных вероятностей начальной марковской модели
— `allow_request(&self) -> bool` — определяет, разрешено ли выполнение запроса в текущем состоянии. В состоянии Open разрешает запросы только по истечении таймаута восстановления, что инициирует переход в HalfOpen
— `record_success(&self)` — регистрирует успешное выполнение запроса. В состоянии HalfOpen накопление `consecutive_successes_needed` успехов приводит к полному восстановлению (переход в Closed). В состоянии Closed сбрасывает счетчик отказов
— `record_failure(&self)` — регистрирует сбой. В состоянии Closed превышение `failure_threshold` вызывает переход в Open. В состоянии HalfOpen любая неудача немедленно возвращает выключатель в состояние Open с увеличением задержки восстановления
— `reset(&self)` — принудительный сброс выключателя в состояние Closed с обнулением всех счетчиков и моделей
—
## Модуль `config.rs`
### Структура `ConfigOptimizationModel`
«`rust
#[derive(Debug, Clone)]
pub struct ConfigOptimizationModel {
pub arrival_rate: f64,
pub service_rate: f64,
pub utilization: f64,
pub target_latency: Duration,
pub max_throughput: f64,
pub cpu_cores: usize,
pub available_memory: usize,
pub network_bandwidth: f64,
}
«`
**Назначение**: `ConfigOptimizationModel` содержит математические модели для расчета оптимальных параметров конфигурации на основе теории массового обслуживания. Модель учитывает аппаратные ограничения и целевые показатели производительности.
**Методы**:
— `optimal_worker_count(&self) -> usize` — вычисляет оптимальное количество воркеров по формуле `M = ⌈λ/(μ·ρ)⌉`, где `ρ` — целевой коэффициент загрузки
— `optimal_queue_size(&self, loss_probability: f64) -> usize` — определяет размер очереди, необходимый для обеспечения заданной вероятности потерь в модели M/M/1/K по формуле `K = ⌈ln(ε)/ln(ρ)⌉`
— `optimal_batch_size(&self, alpha: f64, gamma: f64) -> usize` — вычисляет оптимальный размер батча на основе упрощенной модели, использующей консервативную формулу `B = √(α·1000/γ)`
— `optimal_read_timeout(&self, batch_size: usize, std_dev: f64) -> Duration` — определяет таймаут чтения как `B/λ + 3σ` (принцип трех сигм)
— `optimal_flush_interval(&self) -> Duration` — вычисляет оптимальный интервал сброса буфера как половину целевой задержки
—
## Модуль `qos_manager.rs`
### Структура `GPSModel`
«`rust
#[derive(Debug, Clone)]
pub struct GPSModel {
pub weights: [f64; 5],
pub shares: [f64; 5],
pub normalized_weights: [f64; 5],
pub total_capacity: f64,
pub arrival_rates: [f64; 5],
pub service_rates: [f64; 5],
pub utilizations: [f64; 5],
pub total_utilization: f64,
}
«`
**Назначение**: `GPSModel` реализует математическую модель обобщенного процессорного разделения. Модель обеспечивает справедливое распределение пропускной способности между классами приоритета в соответствии с их весами.
**Методы**:
— `normalize_weights(&mut self)` — приводит веса к нормированному виду по формуле `φ_i’ = φ_i / Σφ_j`
— `compute_shares(&mut self)` — распределяет общую пропускную способность пропорционально нормированным весам: `C_i = φ_i’ * C_total`
— `compute_utilization(&mut self, batch_size: f64)` — рассчитывает загрузку каждого класса по формуле `ρ_i = λ_i * B / C_i`
— `throughput(&self, class: usize) -> f64` — вычисляет фактическую пропускную способность для класса с учетом загрузки
### Структуры `TokenBucket` и `LeakyBucket`
«`rust
#[derive(Debug, Clone)]
pub struct TokenBucket {
pub rate: f64,
pub capacity: f64,
pub tokens: f64,
pub last_update: Instant,
}
#[derive(Debug, Clone)]
pub struct LeakyBucket {
pub rate: f64,
pub capacity: f64,
pub level: f64,
pub last_update: Instant,
}
«`
**Назначение**: `TokenBucket` реализует алгоритм «ведра токенов» для ограничения скорости запросов. Токены накапливаются с постоянной скоростью `rate`, каждый запрос потребляет определенное количество токенов. Метод `try_consume(tokens: f64) -> bool` проверяет наличие достаточного количества токенов и при успехе уменьшает их количество.
**Назначение**: `LeakyBucket` реализует алгоритм «дырявого ведра» для сглаживания всплесков трафика. Вода поступает в ведро с каждым запросом и вытекает с постоянной скоростью `rate`. Метод `try_add(amount: f64) -> bool` проверяет, не превысит ли добавление заданного объема максимальную емкость ведра.
### Структура `QosManager`
«`rust
pub struct QosManager {
quotas: RwLock
pub gps_model: RwLock
_wfq_model: RwLock
high_priority_semaphore: Semaphore,
normal_priority_semaphore: Semaphore,
low_priority_semaphore: Semaphore,
metrics: Arc
statistics: RwLock
adaptation_history: RwLock
arrival_rate_history: RwLock
wait_time_history: RwLock
adaptation_interval: Duration,
min_samples_for_adaptation: usize,
adaptation_sensitivity: f64,
}
«`
**Назначение**: `QosManager` координирует управление качеством обслуживания. Компонент обеспечивает многоуровневую защиту от перегрузок и динамическую адаптацию квот на основе наблюдаемой статистики.
**Ключевые методы**:
— `acquire_permit(&self, priority: Priority) -> Result
— `adapt_quotas(&self) -> Result
— `update_models(&self, arrival_rates: [f64; 5], batch_size: f64)` — обновляет GPS-модель на основе текущей интенсивности поступления для каждого класса
—
## Модуль `optimized/buffer_pool.rs`
### Перечисление `SizeClass`
«`rust
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SizeClass {
Small = 0,
Medium = 1,
Large = 2,
XLarge = 3,
Giant = 4,
}
«`
**Назначение**: `SizeClass` определяет пять классов размеров буферов, организованных по степенному закону с коэффициентом роста ≈8. Такая организация минимизирует фрагментацию и обеспечивает эффективное использование памяти.
**Методы**:
— `optimal_size(&self) -> usize` — возвращает предопределенный оптимальный размер для класса: Small (1KB), Medium (8KB), Large (64KB), XLarge (256KB), Giant (1MB)
— `from_size(size: usize) -> Self` — определяет класс для запрошенного размера путем сравнения с граничными значениями
— `name(&self) -> &’static str` — возвращает строковое имя класса для логирования
### Структура `SizeDistributionModel`
«`rust
#[derive(Debug, Clone)]
pub struct SizeDistributionModel {
pub alpha: f64,
pub x_min: f64,
pub mean: f64,
pub variance: f64,
pub size_history: VecDeque
pub max_history: usize,
}
«`
**Назначение**: `SizeDistributionModel` моделирует распределение размеров запросов с помощью распределения Парето. Распределение Парето хорошо описывает «тяжелые хвосты» в сетевом трафике, где большие пакеты встречаются редко, но существенно влияют на производительность.
**Методы**:
— `update(&mut self, size: usize)` — добавляет новый размер в историю и при накоплении достаточного количества данных (≥ 100) запускает переоценку параметров
— `estimate_parameters(&mut self)` — использует метод максимального правдоподобия для оценки параметра формы `α` и минимального значения `x_min`. Оценка выполняется по формулам: `x_min = min(data)`, `α = n / Σ ln(x_i / x_min)`
### Структура `OptimizedBufferPool`
«`rust
pub struct OptimizedBufferPool {
pub size_class_pools: RwLock<[VecDeque
_bytes_mut_pool: Mutex
pub size_distribution: RwLock
pub cache_model: TokioRwLock
stats: Arc
global_stats: Mutex
allocation_times: Mutex
wait_times: Mutex
last_cleanup: Mutex
last_adaptation: Mutex
config: Arc
}
«`
**Назначение**: `OptimizedBufferPool` реализует пул буферов с разделением на классы размеров и адаптивным управлением. Компонент минимизирует накладные расходы на аллокацию памяти за счет повторного использования буферов.
**Ключевые методы**:
— `new(…) -> Self` — конструктор, выполняющий предварительное выделение буферов в соответствии с коэффициентом предварительного выделения `preallocation_factor`
— `cleanup_old_buffers(&self, max_age: Duration)` — удаляет устаревшие буферы на основе времени простоя и возраста, сохраняя минимальный размер пула для каждого класса
— `update_statistics(&self)` — обновляет статистику hit rate, рассчитывает фрагментацию и перцентили времени аллокации
— `adaptive_pool_adjustment(&self)` — увеличивает или уменьшает размеры пулов в зависимости от текущего hit rate относительно целевого значения
— `get_detailed_stats(&self) -> HashMap
—
## Модуль `optimized/work_stealing_dispatcher.rs`
### Структура `WorkStealingModel`
«`rust
#[derive(Debug, Clone)]
pub struct WorkStealingModel {
pub m: usize,
pub p_steal: f64,
pub lambda_steal: f64,
pub mu: f64,
pub avg_batch_size: f64,
pub rho: f64,
pub imbalance: f64,
pub cost_idle: f64,
pub cost_steal: f64,
}
«`
**Назначение**: `WorkStealingModel` содержит математическую модель кражи задач. Модель позволяет количественно оценить эффективность кражи и адаптировать вероятность кражи в зависимости от текущей загрузки системы.
**Методы**:
— `steal_success_probability(&self, queue_len: usize) -> f64` — вычисляет вероятность успешной кражи из очереди заданной длины по формуле `1 — 1/(queue_len + 1)`
— `compute_steal_intensity(&self, check_rate: f64) -> f64` — рассчитывает интенсивность событий кражи как `λ_steal = p_steal·(M-1)/M·check_rate`
— `compute_imbalance(loads: &[f64]) -> f64` — вычисляет коэффициент дисбаланса как нормированный коэффициент вариации загрузок воркеров: `σ/μ`
— `optimal_steal_policy(&self, queue_len: usize, cost_idle: f64, cost_steal: f64) -> bool` — реализует уравнение Беллмана для определения оптимальности кражи в текущем состоянии
### Структура `LoadBalancer`
«`rust
#[derive(Debug, Clone)]
pub struct LoadBalancer {
pub worker_loads: Vec
pub ema_alpha: f64,
pub predicted_loads: Vec
}
«`
**Назначение**: `LoadBalancer` реализует алгоритмы балансировки нагрузки. Использует метод «power of two choices» для распределения задач и экспоненциально-взвешенное скользящее среднее для сглаживания оценок загрузки.
**Методы**:
— `select_worker_power_of_two(&self) -> (usize, usize)` — выбирает двух случайных воркеров и возвращает индекс воркера с меньшей загрузкой, а также индекс второго воркера
— `update_ema(&mut self, worker_id: usize, current_load: f64)` — обновляет экспоненциально-взвешенное скользящее среднее загрузки воркера: `load = α·current + (1-α)·load`
### Структура `WorkStealingDispatcher`
«`rust
pub struct WorkStealingDispatcher {
pub worker_senders: Arc
pub worker_receivers: Arc
pub worker_queues: Arc
pub worker_loads: Arc
pub worker_throughputs: Arc
injector_sender: Sender
injector_receiver: Receiver
injector_backlog: Arc
results: Arc
pub stealing_model: Arc
pub load_balancer: Arc
pub queue_models: Arc
stats: Arc
latency_histogram: Arc
worker_latency_history: Arc
is_running: Arc
next_task_id: std::sync::atomic::AtomicU64,
packet_processor: PhantomPacketProcessor,
session_manager: Arc
adaptive_batcher: Arc
qos_manager: Arc
circuit_breaker: Arc
backpressure_semaphore: Arc
backpressure_threshold: usize,
check_rate: f64,
cost_idle: f64,
cost_steal: f64,
}
«`
**Назначение**: `WorkStealingDispatcher` является центральным компонентом для распределения и выполнения задач. Реализует архитектуру work-stealing для эффективного использования многоядерных процессоров и динамической балансировки нагрузки.
**Ключевые методы**:
— `new(…) -> Self` — конструктор, инициализирующий очереди для каждого воркера и общую очередь (injector). Устанавливает начальные параметры моделей и запускает все фоновые процессы
— `start_workers(&self)` — запускает заданное количество воркеров, каждый из которых выполняет метод `worker_loop` в отдельной асинхронной задаче
— `worker_loop(…)` — основной цикл воркера. В бесконечном цикле ожидает задачи из своей очереди или из общей очереди (кража), обрабатывает их и обновляет статистику
— `submit_task(&self, mut task: WorkStealingTask) -> Result
— `start_stealing_optimizer(&self)` — запускает фоновую задачу, которая периодически анализирует загрузку воркеров и адаптирует вероятность кражи `p_steal` в модели `WorkStealingModel`
— `get_advanced_stats(&self) -> DispatcherAdvancedStats` — собирает и возвращает расширенную статистику диспетчера, включая перцентили времени обработки, дисбаланс, размеры очередей и т.д.
—
## Модуль `optimized/crypto_processor.rs`
### Структура `CryptoPerformanceModel`
«`rust
#[derive(Debug, Clone, Copy)]
pub struct CryptoPerformanceModel {
pub alpha: f64,
pub beta: f64,
pub gamma: f64,
pub delta: f64,
pub l1_cache_size: usize,
pub l2_cache_size: usize,
pub l3_cache_size: usize,
pub simd_optimal: usize,
pub max_throughput: f64,
}
«`
**Назначение**: `CryptoPerformanceModel` описывает модель времени выполнения криптографических операций: `T(n) = α + β·n + γ·SIMD(n) + δ·cache_miss(n)`. Модель учитывает как линейную зависимость от размера данных, так и дискретные эффекты кэш-промахов.
**Методы**:
— `execution_time(&self, size: usize, use_simd: bool) -> f64` — вычисляет прогнозируемое время для операции заданного размера с учетом SIMD-ускорения и штрафов за промахи кэша различных уровней
### Структура `OptimizedCryptoProcessor`
«`rust
pub struct OptimizedCryptoProcessor {
worker_senders: Arc
worker_receivers: Arc
_injector_sender: Sender
injector_receiver: Receiver
results: Arc
pub performance_model: Arc
work_stealing_model: Arc
batch_model: Arc
stats: Arc
processing_times: Arc
queue_lengths: Arc
pub chacha20_accelerator: Arc
pub blake3_accelerator: Arc
is_running: Arc
next_task_id: std::sync::atomic::AtomicU64,
chacha_batch_buffer: Arc
blake_batch_buffer: Arc
derive_batch_buffer: Arc
batch_timeout: Duration,
max_batch_size: usize,
enable_simd: bool,
enable_work_stealing: bool,
}
«`
**Назначение**: `OptimizedCryptoProcessor` реализует высокопроизводительный криптографический процессор с поддержкой пакетной обработки и кражи задач. Компонент интегрирует SIMD-акселераторы для ChaCha20 и BLAKE3 и обеспечивает эффективное использование многоядерных систем.
**Ключевые методы**:
— `process_chacha_batch(…)` — выполняет пакетную обработку операций ChaCha20 с использованием SIMD-акселератора. Группирует операции по типу (шифрование/дешифрование) для максимальной эффективности
— `process_blake_batch(…)` — выполняет пакетную обработку операций хеширования BLAKE3 с использованием SIMD-акселератора
— `worker_loop(…)` — основной цикл криптографического воркера, аналогичный диспетчеру задач, но специализированный для криптоопераций
— `start_batch_processor(&self)` — запускает фоновую задачу, которая накапливает операции в буферах и периодически отправляет их на пакетную обработку при достижении оптимального размера или истечении таймаута
—
## Модуль `core/reader.rs`
### Структура `PacketSizeModel`
«`rust
#[derive(Debug, Clone)]
pub struct PacketSizeModel {
pub mu: f64,
pub sigma: f64,
pub mean: f64,
pub median: f64,
pub p95: f64,
pub p99: f64,
pub history: VecDeque
pub max_history: usize,
}
«`
**Назначение**: `PacketSizeModel` моделирует распределение размеров пакетов с помощью логнормального распределения. Параметры `mu` и `sigma` представляют среднее и стандартное отклонение натурального логарифма размера, что позволяет эффективно описывать асимметричные распределения размеров сетевых пакетов.
**Методы**:
— `update(&mut self, size: usize)` — добавляет новый размер в историю и при накоплении достаточного количества данных запускает переоценку параметров
— `estimate_parameters(&mut self)` — выполняет оценку параметров логнормального распределения методом максимального правдоподобия: `μ = mean(ln(x))`, `σ = std(ln(x))`, затем вычисляет характеристики распределения
### Структура `BatchReader`
«`rust
pub struct BatchReader {
_config: BatchConfig,
packet_size_model: Arc
intensity_model: Arc
timeout_model: Arc
event_tx: mpsc::Sender
is_running: Arc
stats: Arc
connection_count: Arc
}
«`
**Назначение**: `BatchReader` отвечает за асинхронное чтение данных из сетевых соединений. Компонент реализует адаптивные таймауты чтения и собирает статистику о размерах пакетов и интенсивности потока.
**Ключевые методы**:
— `register_connection(…)` — регистрирует новое соединение и запускает отдельную асинхронную задачу для его чтения. Внутри задачи:
— Чтение выполняется с адаптивным таймаутом, управляемым `ReadTimeoutModel`
— Для каждого прочитанного пакета обновляются модели `PacketSizeModel` и `ReadIntensityModel`
— События о полученных данных отправляются через канал `event_tx` для дальнейшей обработки
— `read_from_stream_dyn(…)` — выполняет чтение фрейма из потока с заданным таймаутом, обрабатывая различные типы ошибок (таймаут, закрытие соединения, ошибки ввода-вывода)
—
## Модуль `core/writer.rs`
### Структура `WriteBatchModel`
«`rust
#[derive(Debug, Clone)]
pub struct WriteBatchModel {
pub current_batch_size: usize,
pub optimal_batch_size: usize,
pub min_batch_size: usize,
pub max_batch_size: usize,
pub accumulation_time: Duration,
pub write_time: Duration,
pub target_latency: Duration,
pub history: VecDeque
}
«`
**Назначение**: `WriteBatchModel` управляет параметрами пакетной записи. Модель анализирует историю выполнения батчей для нахождения оптимального размера, обеспечивающего максимальную пропускную способность при соблюдении целевой задержки.
**Методы**:
— `update(&mut self, batch_size: usize, write_time: Duration)` — добавляет запись о выполненном батче в историю и обновляет текущие значения
— `estimate_optimal_batch_size(&mut self)` — анализирует историю для нахождения размера батча, обеспечивающего максимальную пропускную способность среди записей, время выполнения которых не превышает целевую задержку
### Структура `BackpressureModel`
«`rust
#[derive(Debug, Clone)]
pub struct BackpressureModel {
pub max_queue_size: usize,
pub current_queue: usize,
pub arrival_rate: f64,
pub service_rate: f64,
pub utilization: f64,
pub loss_probability: f64,
pub avg_wait_time: Duration,
}
«`
**Назначение**: `BackpressureModel` реализует модель M/M/1/K для оценки вероятности потерь и времени ожидания в очереди записи. Модель позволяет прогнозировать перегрузки и принимать решения о применении backpressure.
**Методы**:
— `update(&mut self, queue_size: usize, arrival_rate: f64, service_rate: f64)` — обновляет параметры модели и пересчитывает вероятность потерь по формуле для системы M/M/1/K
— `is_critical(&self) -> bool` — определяет, достигнут ли критический уровень перегрузки (заполнение очереди ≥ 90%)
### Структура `BatchWriter`
«`rust
pub struct BatchWriter {
config: BatchConfig,
batch_model: Arc
_backpressure_model: Arc
connections: Arc
task_queue: Arc
task_tx: mpsc::Sender
task_rx: Arc
backpressure_semaphore: Arc
is_running: Arc
stats: Arc
connection_count: Arc
write_times: Arc
}
«`
**Назначение**: `BatchWriter` управляет асинхронной записью данных с поддержкой пакетирования и приоритезации. Компонент накапливает задачи записи и отправляет их пакетами для минимизации накладных расходов.
**Ключевые методы**:
— `write(…)` — основной метод для отправки данных на запись:
— Для критических запросов (`is_critical()` или `requires_flush`) вызывает немедленную запись через `write_immediate`
— Для остальных запросов пытается получить разрешение семафора backpressure и отправляет задачу в очередь `task_tx`
— `write_immediate(&self, task: WriteTask)` — выполняет немедленную запись без пакетирования для критических задач
— `process_batch(…)` — обрабатывает пакет задач: объединяет данные, выполняет запись с адаптивным таймаутом и обновляет статистику
— `start_writer_for_connection(…)` — запускает фоновый процесс для конкретного соединения, который накапливает задачи из очереди и отправляет их пакетами при достижении оптимального размера или истечении интервала сброса
—
## Модуль `acceleration_batch/blake3_batch_accel.rs`
### Структура `Blake3BatchAccelerator`
«`rust
pub struct Blake3BatchAccelerator {
config: Blake3BatchConfig,
simd_capable: bool,
detected_features: SimdFeatures,
}
«`
**Назначение**: `Blake3BatchAccelerator` предоставляет SIMD-ускоренную пакетную обработку хеширования BLAKE3. Компонент автоматически определяет доступные аппаратные возможности и выбирает оптимальную стратегию выполнения.
**Ключевые методы**:
— `hash_keyed_batch(&self, keys: &[[u8; 32]], inputs: &[Vec
— При большом размере батча (≥16) использует оптимизированную параллельную обработку с предварительной сортировкой по размеру для лучшего использования кэша
— При среднем размере батча (≥4) использует параллельную обработку через Rayon
— Для малых батчей использует последовательную скалярную реализацию
— `get_performance_info(&self) -> Blake3PerformanceInfo` — возвращает информацию о производительности, включая оценку пропускной способности и доступные аппаратные возможности
—
## Модуль `acceleration_batch/chacha20_batch_accel.rs`
### Структура `ChaCha20BatchAccelerator`
«`rust
pub struct ChaCha20BatchAccelerator {
config: ChaCha20BatchConfig,
simd_capable: bool,
detected_features: SimdFeatures,
performance_model: SIMDPerformanceModel,
}
«`
**Назначение**: `ChaCha20BatchAccelerator` предоставляет SIMD-ускоренную пакетную обработку шифрования/дешифрования ChaCha20. Компонент автоматически настраивает параметры производительности в зависимости от обнаруженных аппаратных возможностей.
**Ключевые методы**:
— `encrypt_batch(&self, keys: &[[u8; 32]], nonces: &[[u8; 12]], plaintexts: &[Vec
— `decrypt_batch(&self, keys: &[[u8; 32]], nonces: &[[u8; 12]], ciphertexts: &[Vec
— `get_simd_info(&self) -> SimdInfo` — возвращает информацию о доступных SIMD-возможностях и ожидаемом ускорении
—
## Заключение
Данная документация охватывает ключевые аспекты реализации адаптивной Batch-системы. Система демонстрирует современный подход к высокопроизводительной обработке данных, сочетающий передовые математические методы с тщательной инженерной проработкой. Кодовая база спроектирована для максимальной производительности без компромиссов в надежности, с акцентом на адаптивность к изменяющимся условиям нагрузки.
Все модули взаимосвязаны и работают вместе для обеспечения комплексного управления процессом пакетной обработки. Архитектура позволяет легко расширять систему новыми математическими моделями и оптимизациями, сохраняя при этом обратную совместимость и высокий уровень производительности.