## Введение в архитектуру кода

Кодовая база адаптивной Batch-системы организована в модульную структуру, где каждый компонент отвечает за конкретный аспект высокопроизводительной обработки данных. Все модули написаны на Rust с акцентом на безопасность памяти, асинхронную обработку и математическую оптимальность принимаемых решений. Основные принципы дизайна включают zero-allocation в критических путях, адаптивную настройку параметров на основе наблюдаемой нагрузки и интеграцию методов теории массового обслуживания и математической статистики.

## Модуль `adaptive_batcher.rs`

### Структура `BatchModelParams`

«`rust
/// Параметры модели времени обработки батча
/// T_process(B) = α/B + β + γ·B + δ·(B — B_opt)² + ε·I(B > L3_CACHE)
#[derive(Debug, Clone, Copy)]
pub struct BatchModelParams {
pub alpha: f64, // Накладные расходы на батч (мс)
pub beta: f64, // Минимальное время обработки (мс)
pub gamma: f64, // Линейный коэффициент (мс/B)
pub delta: f64, // Квадратичный штраф (мс/B²)
pub b_opt: f64, // Оптимальный размер для кэша
pub epsilon: f64, // Штраф за выход из L3-кэша
pub l3_cache_size: usize, // Размер L3-кэша в байтах
pub confidence: f64, // Уверенность в оценке (0..1)
}
«`

**Назначение**: Структура `BatchModelParams` инкапсулирует все параметры математической модели времени обработки батча. Каждый параметр имеет строго определенный физический смысл и размерность, что позволяет модели точно описывать поведение системы при различных размерах батча.

**Поля**:
— `alpha` — фиксированные накладные расходы на формирование и инициализацию батча, амортизирующиеся с ростом размера
— `beta` — минимальное время обработки, присутствующее даже при обработке минимального батча
— `gamma` — линейный коэффициент, характеризующий стоимость обработки одного элемента
— `delta` — квадратичный штраф за отклонение от оптимального размера, моделирующий неэффективность использования внутренних буферов
— `b_opt` — оптимальный размер батча с точки зрения кэш-иерархии процессора
— `epsilon` — дискретный штраф, активируемый при превышении размера L3-кэша
— `l3_cache_size` — размер кэша последнего уровня в байтах
— `confidence` — метрика достоверности текущей оценки параметров, вычисляемая на основе количества наблюдений

### Функция `solve_cubic`

«`rust
/// Решение кубического уравнения a·x³ + b·x² + c·x + d = 0
/// Возвращает действительные корни
pub fn solve_cubic(a: f64, b: f64, c: f64, d: f64) -> Vec
«`

**Назначение**: Функция `solve_cubic` реализует аналитическое решение кубического уравнения с использованием метода Кардано. Входные параметры соответствуют коэффициентам уравнения в порядке убывания степени. Функция обрабатывает вырожденные случаи (квадратное уравнение) и возвращает вектор всех действительных корней.

**Алгоритмические особенности**:
— При малых значениях старшего коэффициента `a` (< 1e-12) выполняется переход к решению квадратного уравнения - Для кубического уравнения вычисляется дискриминант `Δ = (q/2)² + (p/3)³`, где `p` и `q` — коэффициенты приведенного уравнения - В зависимости от знака дискриминанта возвращается один, два или три действительных корня - Функция используется для нахождения оптимального размера батча путем решения уравнения `dL/dB = 0` ### Структура `KalmanFilter` ```rust #[derive(Debug, Clone)] pub struct KalmanFilter { pub state: f64, // Текущая оценка λ pub covariance: f64, // Дисперсия оценки pub process_noise: f64, // Шум процесса (Q) pub measurement_noise: f64, // Шум измерения (R) } ``` **Назначение**: `KalmanFilter` реализует дискретный фильтр Калмана для рекуррентной оценки интенсивности входящего потока `λ`. Фильтр позволяет получать сглаженную оценку, устойчивую к флуктуациям измерений, и обеспечивает оптимальное в смысле минимума среднеквадратичной ошибки восстановление скрытого состояния динамической системы. **Методы**: - `predict(&mut self)` — выполняет экстраполяцию состояния на следующий шаг, увеличивая дисперсию оценки на величину шума процесса согласно уравнению `P_{k|k-1} = P_{k-1|k-1} + Q` - `update(&mut self, measurement: f64) -> f64` — корректирует оценку на основе нового измерения, вычисляя коэффициент Калмана `K = P_{k|k-1} / (P_{k|k-1} + R)` и обновляя состояние `x = x + K·(z — x)` и ковариацию `P = (1 — K)·P`

### Структура `MarkovChain2nd`

«`rust
#[derive(Debug, Clone)]
pub struct MarkovChain2nd {
pub levels: usize,
pub transitions: Vec>>,
pub counts: Vec>>,
}
«`

**Назначение**: `MarkovChain2nd` реализует цепь Маркова второго порядка для дискретизированных уровней нагрузки. Модель учитывает не только текущее состояние, но и предыдущее, что позволяет улавливать тенденции изменения нагрузки.

**Поля**:
— `levels` — количество уровней квантования нагрузки
— `transitions` — трехмерная матрица переходных вероятностей размера `[levels][levels][levels]`, где `transitions[i][j][k] = P(λ_{t+1}=k | λ_t=j, λ_{t-1}=i)`
— `counts` — матрица счетчиков наблюдений для обучения модели

**Методы**:
— `update(&mut self, l_tm1: usize, l_t: usize, l_tp1: usize)` — обновляет счетчики наблюдений и пересчитывает вероятности переходов
— `predict(&self, l_tm1: usize, l_t: usize) -> (usize, f64)` — возвращает наиболее вероятный следующий уровень нагрузки и соответствующую вероятность
— `quantize(&self, lambda: f64, max_lambda: f64) -> usize` — выполняет квантование непрерывного значения нагрузки в дискретный уровень
— `dequantize(&self, level: usize, max_lambda: f64) -> f64` — восстанавливает приближенное значение нагрузки из уровня

### Структура `WaveletTransformer`

«`rust
#[derive(Debug, Clone)]
pub struct WaveletTransformer {
h0: f64, h1: f64, h2: f64, h3: f64,
g0: f64, g1: f64, g2: f64, g3: f64,
pub max_level: usize,
history: Vec,
max_history: usize,
approx: Vec>,
detail: Vec>,
}
«`

**Назначение**: `WaveletTransformer` выполняет дискретное вейвлет-преобразование временного ряда нагрузки с использованием фильтров Добеши 4 (Daubechies-4). Вейвлет-анализ позволяет разложить сигнал на составляющие разного масштаба и выделить как долговременные тренды, так и краткосрочные флуктуации.

**Коэффициенты фильтров**:
— `h0..h3` — коэффициенты низкочастотного (масштабирующего) фильтра для получения аппроксимирующих коэффициентов
— `g0..g3` — коэффициенты высокочастотного (вейвлет) фильтра для получения детализирующих коэффициентов

**Методы**:
— `transform(&mut self)` — выполняет рекурсивное разложение сигнала на заданное число уровней, сохраняя аппроксимирующие и детализирующие коэффициенты
— `predict(&self, tau: usize) -> Vec` — выполняет линейную экстраполяцию по последним коэффициентам для получения прогноза на `tau` шагов вперед

### Структура `PIDController`

«`rust
#[derive(Debug, Clone)]
pub struct PIDController {
pub kp: f64,
pub ki: f64,
pub kd: f64,
integral: f64,
prev_error: f64,
prev_time: Instant,
initialized: bool,
pub output_min: f64,
pub output_max: f64,
pub integral_limit: f64,
}
«`

**Назначение**: `PIDController` реализует классический пропорционально-интегрально-дифференциальный регулятор для компенсации остаточной ошибки управления. Регулятор используется для коррекции размера батча на основе отклонения измеренной задержки от целевого значения.

**Методы**:
— `new(kp: f64, ki: f64, kd: f64) -> Self` — конструктор с заданными коэффициентами
— `auto_tune(kp_crit: f64, t_crit: f64) -> Self` — статический метод, выполняющий автонастройку коэффициентов по методу Циглера-Николса на основе критического коэффициента усиления и периода колебаний
— `compute(&mut self, error: f64) -> f64` — вычисляет управляющий сигнал по формуле `u = Kp·e + Ki·∫e dt + Kd·de/dt`, применяя антивиндовую защиту интегральной составляющей через ограничение `integral_limit`
— `reset(&mut self)` — сбрасывает внутреннее состояние регулятора

### Структура `GibbsSampler`

«`rust
#[derive(Debug, Clone)]
pub struct GibbsSampler {
pub alpha_samples: Vec,
pub beta_samples: Vec,
pub gamma_samples: Vec,
pub delta_samples: Vec,
pub b_opt_samples: Vec,
alpha_a: f64, alpha_b: f64,
beta_a: f64, beta_b: f64,
gamma_a: f64, gamma_b: f64,
delta_a: f64, delta_b: f64,
pub alpha: f64,
pub beta: f64,
pub gamma: f64,
pub delta: f64,
pub b_opt: f64,
}
«`

**Назначение**: `GibbsSampler` реализует алгоритм сэмплирования Гиббса для байесовской оценки параметров модели времени обработки. Метод позволяет получать не только точечные оценки параметров, но и их распределения, что дает информацию о неопределенности оценок.

**Априорные распределения**:
— Для каждого параметра задано априорное Gamma-распределение с гиперпараметрами `*_a` (shape) и `*_b` (rate)
— Выбор Gamma-распределения обусловлен его свойством быть сопряженным для параметров, входящих в модель линейно

**Методы**:
— `sample(&mut self, data: &[(usize, f64)])` — выполняет одну итерацию условного сэмплирования всех параметров из их полных условных распределений
— `run_chain(&mut self, data: &[(usize, f64)], iterations: usize, burn_in: usize)` — запускает цепь Маркова заданной длины с отбрасываемым разогревочным периодом (burn-in)
— `average(&self) -> (f64, f64, f64, f64, f64)` — возвращает средние значения сэмплов для всех параметров
— `log_likelihood(&self, data: &[(usize, f64)]) -> f64` — вычисляет логарифм правдоподобия данных при текущих значениях параметров

### Структура `GPScheduler`

«`rust
#[derive(Debug, Clone)]
pub struct GPScheduler {
pub weights: [f64; 5],
pub shares: [f64; 5],
pub queue_lengths: [f64; 5],
pub total_capacity: f64,
}
«`

**Назначение**: `GPScheduler` реализует модель обобщенного процессорного разделения (Generalized Processor Sharing) для распределения пропускной способности между пятью классами приоритета. Модель гарантирует, что в любой момент времени доля ресурса, выделенная классу, пропорциональна его весу.

**Поля**:
— `weights` — веса приоритетов, определяющие относительную важность каждого класса
— `shares` — вычисленные доли пропускной способности для каждого класса
— `queue_lengths` — текущие длины очередей по классам
— `total_capacity` — общая пропускная способность системы

**Методы**:
— `recompute_shares(&mut self)` — пересчитывает доли пропускной способности по формуле `shares[i] = weights[i] * total_capacity / Σweights`
— `waiting_time(&self, i: usize, lambda: f64, batch_size: f64, service_rate: f64) -> f64` — вычисляет прогнозируемое время ожидания для класса `i` с использованием формул теории массового обслуживания

### Структура `ModelPredictiveController`

«`rust
#[derive(Debug, Clone)]
pub struct ModelPredictiveController {
pub horizon: usize,
pub lambda_pred: Vec,
pub params: BatchModelParams,
pub w_latency: f64,
pub w_delta_b: f64,
pub w_delta_m: f64,
pub b_min: usize,
pub b_max: usize,
pub m_min: usize,
pub m_max: usize,
}
«`

**Назначение**: `ModelPredictiveController` реализует алгоритм управления с прогнозирующей моделью (Model Predictive Control, MPC). На каждом шаге система решает задачу оптимизации на конечном горизонте, минимизируя взвешенную сумму прогнозируемых задержек и штрафов за изменение параметров.

**Поля**:
— `horizon` — длина горизонта прогнозирования в шагах дискретизации
— `lambda_pred` — вектор прогнозируемых значений нагрузки на всем горизонте
— `params` — параметры модели времени обработки
— `w_latency`, `w_delta_b`, `w_delta_m` — весовые коэффициенты в целевой функции
— `b_min`, `b_max`, `m_min`, `m_max` — ограничения на размер батча и количество воркеров

**Методы**:
— `solve(&self, current_b: usize, current_m: usize) -> (usize, usize)` — выполняет численную оптимизацию на ограниченном множестве кандидатов (поиск по сетке) и возвращает оптимальные значения параметров
— `cost_function(&self, b: f64, m: f64, b_prev: f64, m_prev: f64) -> f64` — вычисляет значение целевой функции
— `set_lambda_predictions(&mut self, predictions: Vec)` — обновляет вектор прогнозируемой нагрузки

### Структура `AdaptiveBatcher`

«`rust
pub struct AdaptiveBatcher {
pub config: AdaptiveBatcherConfig,
pub current_batch_size: RwLock,
pub current_workers: RwLock,
pub model_params: RwLock,
pub kalman: RwLock,
pub markov: RwLock,
pub wavelet: RwLock,
pub pid: RwLock,
pub gibbs: RwLock,
pub gps: RwLock,
pub mpc: RwLock,
pub evt: RwLock,
pub measurements: RwLock>,
pub latencies: RwLock>,
pub lambdas: RwLock>,
pub metrics: Arc>,
}
«`

**Назначение**: `AdaptiveBatcher` является центральным координирующим компонентом системы. Он агрегирует все математические модели и предоставляет единый интерфейс для адаптивного управления размером батча. Компонент работает в реальном времени, непрерывно анализируя наблюдаемые данные и корректируя параметры обработки.

**Ключевые методы**:
— `new(config: AdaptiveBatcherConfig) -> Self` — конструктор, инициализирующий все вложенные модели с начальными параметрами. Включает исправление начальной нагрузки с 100.0 на 0.0 для корректной работы при отсутствии данных
— `update_model(&self)` — асинхронный метод, запускающий байесовское обновление параметров модели времени обработки с использованием сэмплера Гиббса при накоплении достаточного количества измерений (≥ 50)
— `estimate_lambda(&self, measured_throughput: f64) -> f64` — выполняет фильтрацию Калмана для получения сглаженной оценки текущей нагрузки
— `predict_load(&self, horizon: usize) -> Vec` — комбинирует прогнозы вейвлет-преобразования и цепи Маркова для получения взвешенного прогноза нагрузки на заданном горизонте
— `optimal_batch_size(&self, lambda: f64, waiting_time: f64) -> usize` — решает кубическое уравнение `dL/dB = 0` и выбирает оптимальный размер батча, минимизирующий полную задержку
— `pid_correction(&self, target_latency: f64, measured_latency: f64) -> f64` — вычисляет корректирующий сигнал на основе отклонения измеренной задержки от целевой
— `mpc_optimize(&self, lambda_pred: Vec) -> (usize, usize)` — выполняет многопараметрическую оптимизацию размера батча и количества воркеров с использованием MPC
— `compute_batch_size(&self) -> usize` — основной метод, интегрирующий все этапы вычислений для получения финального, скорректированного размера батча
— `record_batch_execution(&self, batch_size: usize, processing_time: Duration, success_rate: f64, queue_depth: usize)` — обновляет все исторические данные и модели на основе результатов выполнения очередного батча

## Модуль `circuit_breaker.rs`

### Структура `CircuitBreakerMarkovModel`

«`rust
#[derive(Debug, Clone)]
pub struct CircuitBreakerMarkovModel {
pub transition_rates: [[f64; 3]; 3],
pub steady_state: [f64; 3],
pub mttf: f64,
pub mttr: f64,
pub availability: f64,
}
«`

**Назначение**: `CircuitBreakerMarkovModel` описывает марковскую модель состояний автоматического выключателя. Модель позволяет количественно оценивать надежность системы и прогнозировать поведение выключателя в стационарном режиме.

**Поля**:
— `transition_rates` — матрица интенсивностей переходов между состояниями Closed (0), Open (1) и HalfOpen (2). Элемент `[i][j]` задает интенсивность перехода из состояния i в состояние j
— `steady_state` — вектор стационарных вероятностей нахождения системы в каждом состоянии
— `mttf` — среднее время до отказа (Mean Time To Failure), вычисляемое как `1/λ`, где λ — интенсивность перехода в состояние Open
— `mttr` — среднее время восстановления (Mean Time To Recovery), вычисляемое как `1/μ`, где μ — интенсивность восстановления
— `availability` — коэффициент готовности, вычисляемый как `MTTF/(MTTF+MTTR)`

**Методы**:
— `compute_steady_state(&mut self)` — решает систему линейных уравнений `π·Q = 0` с условием нормировки `Σπ = 1` для нахождения стационарных вероятностей. Используется метод Крамера для решения системы трех уравнений

### Перечисление `CircuitState`

«`rust
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CircuitState {
Closed = 0,
Open = 1,
HalfOpen = 2,
}
«`

**Назначение**: `CircuitState` определяет три возможных состояния автоматического выключателя. Целочисленные значения соответствуют индексам в матрицах марковской модели.

**Варианты**:
— `Closed` — нормальный режим работы. Запросы передаются на обработку, ведется подсчет отказов
— `Open` — аварийный режим. Запросы немедленно отклоняются с ошибкой, запущен таймер восстановления
— `HalfOpen` — режим проверки восстановления. Пропускается ограниченное число запросов для тестирования работоспособности проблемного компонента

### Структура `CircuitBreaker`

«`rust
pub struct CircuitBreaker {
name: String,
pub markov_model: RwLock,
failure_model: RwLock,
pub recovery_model: RwLock,
state: RwLock,
failure_count: RwLock,
success_count: RwLock,
total_requests: RwLock,
last_failure: RwLock>,
last_success: RwLock>,
state_change_time: RwLock,
pub failure_threshold: usize,
recovery_timeout: Duration,
half_open_max_requests: usize,
half_open_success_count: RwLock,
consecutive_successes_needed: usize,
opened_count: RwLock,
closed_count: RwLock,
half_open_count: RwLock,
metrics: Arc>,
failure_history: RwLock>,
}
«`

**Назначение**: `CircuitBreaker` реализует паттерн автоматического выключателя с математическим обоснованием переходов. Компонент защищает систему от каскадных отказов, временно блокируя запросы к проблемным сервисам.

**Ключевые методы**:
— `new(…) -> Self` — конструктор, инициализирующий все модели и счетчики. Выполняет расчет стационарных вероятностей начальной марковской модели
— `allow_request(&self) -> bool` — определяет, разрешено ли выполнение запроса в текущем состоянии. В состоянии Open разрешает запросы только по истечении таймаута восстановления, что инициирует переход в HalfOpen
— `record_success(&self)` — регистрирует успешное выполнение запроса. В состоянии HalfOpen накопление `consecutive_successes_needed` успехов приводит к полному восстановлению (переход в Closed). В состоянии Closed сбрасывает счетчик отказов
— `record_failure(&self)` — регистрирует сбой. В состоянии Closed превышение `failure_threshold` вызывает переход в Open. В состоянии HalfOpen любая неудача немедленно возвращает выключатель в состояние Open с увеличением задержки восстановления
— `reset(&self)` — принудительный сброс выключателя в состояние Closed с обнулением всех счетчиков и моделей

## Модуль `config.rs`

### Структура `ConfigOptimizationModel`

«`rust
#[derive(Debug, Clone)]
pub struct ConfigOptimizationModel {
pub arrival_rate: f64,
pub service_rate: f64,
pub utilization: f64,
pub target_latency: Duration,
pub max_throughput: f64,
pub cpu_cores: usize,
pub available_memory: usize,
pub network_bandwidth: f64,
}
«`

**Назначение**: `ConfigOptimizationModel` содержит математические модели для расчета оптимальных параметров конфигурации на основе теории массового обслуживания. Модель учитывает аппаратные ограничения и целевые показатели производительности.

**Методы**:
— `optimal_worker_count(&self) -> usize` — вычисляет оптимальное количество воркеров по формуле `M = ⌈λ/(μ·ρ)⌉`, где `ρ` — целевой коэффициент загрузки
— `optimal_queue_size(&self, loss_probability: f64) -> usize` — определяет размер очереди, необходимый для обеспечения заданной вероятности потерь в модели M/M/1/K по формуле `K = ⌈ln(ε)/ln(ρ)⌉`
— `optimal_batch_size(&self, alpha: f64, gamma: f64) -> usize` — вычисляет оптимальный размер батча на основе упрощенной модели, использующей консервативную формулу `B = √(α·1000/γ)`
— `optimal_read_timeout(&self, batch_size: usize, std_dev: f64) -> Duration` — определяет таймаут чтения как `B/λ + 3σ` (принцип трех сигм)
— `optimal_flush_interval(&self) -> Duration` — вычисляет оптимальный интервал сброса буфера как половину целевой задержки

## Модуль `qos_manager.rs`

### Структура `GPSModel`

«`rust
#[derive(Debug, Clone)]
pub struct GPSModel {
pub weights: [f64; 5],
pub shares: [f64; 5],
pub normalized_weights: [f64; 5],
pub total_capacity: f64,
pub arrival_rates: [f64; 5],
pub service_rates: [f64; 5],
pub utilizations: [f64; 5],
pub total_utilization: f64,
}
«`

**Назначение**: `GPSModel` реализует математическую модель обобщенного процессорного разделения. Модель обеспечивает справедливое распределение пропускной способности между классами приоритета в соответствии с их весами.

**Методы**:
— `normalize_weights(&mut self)` — приводит веса к нормированному виду по формуле `φ_i’ = φ_i / Σφ_j`
— `compute_shares(&mut self)` — распределяет общую пропускную способность пропорционально нормированным весам: `C_i = φ_i’ * C_total`
— `compute_utilization(&mut self, batch_size: f64)` — рассчитывает загрузку каждого класса по формуле `ρ_i = λ_i * B / C_i`
— `throughput(&self, class: usize) -> f64` — вычисляет фактическую пропускную способность для класса с учетом загрузки

### Структуры `TokenBucket` и `LeakyBucket`

«`rust
#[derive(Debug, Clone)]
pub struct TokenBucket {
pub rate: f64,
pub capacity: f64,
pub tokens: f64,
pub last_update: Instant,
}

#[derive(Debug, Clone)]
pub struct LeakyBucket {
pub rate: f64,
pub capacity: f64,
pub level: f64,
pub last_update: Instant,
}
«`

**Назначение**: `TokenBucket` реализует алгоритм «ведра токенов» для ограничения скорости запросов. Токены накапливаются с постоянной скоростью `rate`, каждый запрос потребляет определенное количество токенов. Метод `try_consume(tokens: f64) -> bool` проверяет наличие достаточного количества токенов и при успехе уменьшает их количество.

**Назначение**: `LeakyBucket` реализует алгоритм «дырявого ведра» для сглаживания всплесков трафика. Вода поступает в ведро с каждым запросом и вытекает с постоянной скоростью `rate`. Метод `try_add(amount: f64) -> bool` проверяет, не превысит ли добавление заданного объема максимальную емкость ведра.

### Структура `QosManager`

«`rust
pub struct QosManager {
quotas: RwLock,
pub gps_model: RwLock,
_wfq_model: RwLock,
high_priority_semaphore: Semaphore,
normal_priority_semaphore: Semaphore,
low_priority_semaphore: Semaphore,
metrics: Arc>,
statistics: RwLock,
adaptation_history: RwLock>,
arrival_rate_history: RwLock>,
wait_time_history: RwLock>,
adaptation_interval: Duration,
min_samples_for_adaptation: usize,
adaptation_sensitivity: f64,
}
«`

**Назначение**: `QosManager` координирует управление качеством обслуживания. Компонент обеспечивает многоуровневую защиту от перегрузок и динамическую адаптацию квот на основе наблюдаемой статистики.

**Ключевые методы**:
— `acquire_permit(&self, priority: Priority) -> Result` — реализует многоуровневую проверку: token bucket (rate limiting), leaky bucket (smoothing) и семафор (concurrency limit). Для критических приоритетов применяются упрощенные проверки
— `adapt_quotas(&self) -> Result` — анализирует статистику отказов и времени ожидания, вычисляет новые квоты на основе целевой функции, минимизирующей взвешенную сумму отказов и задержек
— `update_models(&self, arrival_rates: [f64; 5], batch_size: f64)` — обновляет GPS-модель на основе текущей интенсивности поступления для каждого класса

## Модуль `optimized/buffer_pool.rs`

### Перечисление `SizeClass`

«`rust
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SizeClass {
Small = 0,
Medium = 1,
Large = 2,
XLarge = 3,
Giant = 4,
}
«`

**Назначение**: `SizeClass` определяет пять классов размеров буферов, организованных по степенному закону с коэффициентом роста ≈8. Такая организация минимизирует фрагментацию и обеспечивает эффективное использование памяти.

**Методы**:
— `optimal_size(&self) -> usize` — возвращает предопределенный оптимальный размер для класса: Small (1KB), Medium (8KB), Large (64KB), XLarge (256KB), Giant (1MB)
— `from_size(size: usize) -> Self` — определяет класс для запрошенного размера путем сравнения с граничными значениями
— `name(&self) -> &’static str` — возвращает строковое имя класса для логирования

### Структура `SizeDistributionModel`

«`rust
#[derive(Debug, Clone)]
pub struct SizeDistributionModel {
pub alpha: f64,
pub x_min: f64,
pub mean: f64,
pub variance: f64,
pub size_history: VecDeque,
pub max_history: usize,
}
«`

**Назначение**: `SizeDistributionModel` моделирует распределение размеров запросов с помощью распределения Парето. Распределение Парето хорошо описывает «тяжелые хвосты» в сетевом трафике, где большие пакеты встречаются редко, но существенно влияют на производительность.

**Методы**:
— `update(&mut self, size: usize)` — добавляет новый размер в историю и при накоплении достаточного количества данных (≥ 100) запускает переоценку параметров
— `estimate_parameters(&mut self)` — использует метод максимального правдоподобия для оценки параметра формы `α` и минимального значения `x_min`. Оценка выполняется по формулам: `x_min = min(data)`, `α = n / Σ ln(x_i / x_min)`

### Структура `OptimizedBufferPool`

«`rust
pub struct OptimizedBufferPool {
pub size_class_pools: RwLock<[VecDeque; 5]>,
_bytes_mut_pool: Mutex>,
pub size_distribution: RwLock,
pub cache_model: TokioRwLock,
stats: Arc>,
global_stats: Mutex,
allocation_times: Mutex>,
wait_times: Mutex>,
last_cleanup: Mutex,
last_adaptation: Mutex,
config: Arc,
}
«`

**Назначение**: `OptimizedBufferPool` реализует пул буферов с разделением на классы размеров и адаптивным управлением. Компонент минимизирует накладные расходы на аллокацию памяти за счет повторного использования буферов.

**Ключевые методы**:
— `new(…) -> Self` — конструктор, выполняющий предварительное выделение буферов в соответствии с коэффициентом предварительного выделения `preallocation_factor`
— `cleanup_old_buffers(&self, max_age: Duration)` — удаляет устаревшие буферы на основе времени простоя и возраста, сохраняя минимальный размер пула для каждого класса
— `update_statistics(&self)` — обновляет статистику hit rate, рассчитывает фрагментацию и перцентили времени аллокации
— `adaptive_pool_adjustment(&self)` — увеличивает или уменьшает размеры пулов в зависимости от текущего hit rate относительно целевого значения
— `get_detailed_stats(&self) -> HashMap` — возвращает подробную статистику по каждому классу и глобальную

## Модуль `optimized/work_stealing_dispatcher.rs`

### Структура `WorkStealingModel`

«`rust
#[derive(Debug, Clone)]
pub struct WorkStealingModel {
pub m: usize,
pub p_steal: f64,
pub lambda_steal: f64,
pub mu: f64,
pub avg_batch_size: f64,
pub rho: f64,
pub imbalance: f64,
pub cost_idle: f64,
pub cost_steal: f64,
}
«`

**Назначение**: `WorkStealingModel` содержит математическую модель кражи задач. Модель позволяет количественно оценить эффективность кражи и адаптировать вероятность кражи в зависимости от текущей загрузки системы.

**Методы**:
— `steal_success_probability(&self, queue_len: usize) -> f64` — вычисляет вероятность успешной кражи из очереди заданной длины по формуле `1 — 1/(queue_len + 1)`
— `compute_steal_intensity(&self, check_rate: f64) -> f64` — рассчитывает интенсивность событий кражи как `λ_steal = p_steal·(M-1)/M·check_rate`
— `compute_imbalance(loads: &[f64]) -> f64` — вычисляет коэффициент дисбаланса как нормированный коэффициент вариации загрузок воркеров: `σ/μ`
— `optimal_steal_policy(&self, queue_len: usize, cost_idle: f64, cost_steal: f64) -> bool` — реализует уравнение Беллмана для определения оптимальности кражи в текущем состоянии

### Структура `LoadBalancer`

«`rust
#[derive(Debug, Clone)]
pub struct LoadBalancer {
pub worker_loads: Vec,
pub ema_alpha: f64,
pub predicted_loads: Vec,
}
«`

**Назначение**: `LoadBalancer` реализует алгоритмы балансировки нагрузки. Использует метод «power of two choices» для распределения задач и экспоненциально-взвешенное скользящее среднее для сглаживания оценок загрузки.

**Методы**:
— `select_worker_power_of_two(&self) -> (usize, usize)` — выбирает двух случайных воркеров и возвращает индекс воркера с меньшей загрузкой, а также индекс второго воркера
— `update_ema(&mut self, worker_id: usize, current_load: f64)` — обновляет экспоненциально-взвешенное скользящее среднее загрузки воркера: `load = α·current + (1-α)·load`

### Структура `WorkStealingDispatcher`

«`rust
pub struct WorkStealingDispatcher {
pub worker_senders: Arc>>,
pub worker_receivers: Arc>>,
pub worker_queues: Arc>,
pub worker_loads: Arc>,
pub worker_throughputs: Arc>,
injector_sender: Sender,
injector_receiver: Receiver,
injector_backlog: Arc,
results: Arc>,
pub stealing_model: Arc>,
pub load_balancer: Arc>,
pub queue_models: Arc>,
stats: Arc>,
latency_histogram: Arc>,
worker_latency_history: Arc>>,
is_running: Arc,
next_task_id: std::sync::atomic::AtomicU64,
packet_processor: PhantomPacketProcessor,
session_manager: Arc,
adaptive_batcher: Arc,
qos_manager: Arc,
circuit_breaker: Arc,
backpressure_semaphore: Arc,
backpressure_threshold: usize,
check_rate: f64,
cost_idle: f64,
cost_steal: f64,
}
«`

**Назначение**: `WorkStealingDispatcher` является центральным компонентом для распределения и выполнения задач. Реализует архитектуру work-stealing для эффективного использования многоядерных процессоров и динамической балансировки нагрузки.

**Ключевые методы**:
— `new(…) -> Self` — конструктор, инициализирующий очереди для каждого воркера и общую очередь (injector). Устанавливает начальные параметры моделей и запускает все фоновые процессы
— `start_workers(&self)` — запускает заданное количество воркеров, каждый из которых выполняет метод `worker_loop` в отдельной асинхронной задаче
— `worker_loop(…)` — основной цикл воркера. В бесконечном цикле ожидает задачи из своей очереди или из общей очереди (кража), обрабатывает их и обновляет статистику
— `submit_task(&self, mut task: WorkStealingTask) -> Result` — принимает новую задачу. Использует `LoadBalancer` для выбора воркера методом «power of two choices». При переполнении очереди выбранного воркера отправляет задачу в общую очередь (injector)
— `start_stealing_optimizer(&self)` — запускает фоновую задачу, которая периодически анализирует загрузку воркеров и адаптирует вероятность кражи `p_steal` в модели `WorkStealingModel`
— `get_advanced_stats(&self) -> DispatcherAdvancedStats` — собирает и возвращает расширенную статистику диспетчера, включая перцентили времени обработки, дисбаланс, размеры очередей и т.д.

## Модуль `optimized/crypto_processor.rs`

### Структура `CryptoPerformanceModel`

«`rust
#[derive(Debug, Clone, Copy)]
pub struct CryptoPerformanceModel {
pub alpha: f64,
pub beta: f64,
pub gamma: f64,
pub delta: f64,
pub l1_cache_size: usize,
pub l2_cache_size: usize,
pub l3_cache_size: usize,
pub simd_optimal: usize,
pub max_throughput: f64,
}
«`

**Назначение**: `CryptoPerformanceModel` описывает модель времени выполнения криптографических операций: `T(n) = α + β·n + γ·SIMD(n) + δ·cache_miss(n)`. Модель учитывает как линейную зависимость от размера данных, так и дискретные эффекты кэш-промахов.

**Методы**:
— `execution_time(&self, size: usize, use_simd: bool) -> f64` — вычисляет прогнозируемое время для операции заданного размера с учетом SIMD-ускорения и штрафов за промахи кэша различных уровней

### Структура `OptimizedCryptoProcessor`

«`rust
pub struct OptimizedCryptoProcessor {
worker_senders: Arc>>,
worker_receivers: Arc>>,
_injector_sender: Sender,
injector_receiver: Receiver,
results: Arc>,
pub performance_model: Arc>,
work_stealing_model: Arc>,
batch_model: Arc>,
stats: Arc>,
processing_times: Arc>>,
queue_lengths: Arc>>,
pub chacha20_accelerator: Arc,
pub blake3_accelerator: Arc,
is_running: Arc,
next_task_id: std::sync::atomic::AtomicU64,
chacha_batch_buffer: Arc>>,
blake_batch_buffer: Arc>>,
derive_batch_buffer: Arc>>,
batch_timeout: Duration,
max_batch_size: usize,
enable_simd: bool,
enable_work_stealing: bool,
}
«`

**Назначение**: `OptimizedCryptoProcessor` реализует высокопроизводительный криптографический процессор с поддержкой пакетной обработки и кражи задач. Компонент интегрирует SIMD-акселераторы для ChaCha20 и BLAKE3 и обеспечивает эффективное использование многоядерных систем.

**Ключевые методы**:
— `process_chacha_batch(…)` — выполняет пакетную обработку операций ChaCha20 с использованием SIMD-акселератора. Группирует операции по типу (шифрование/дешифрование) для максимальной эффективности
— `process_blake_batch(…)` — выполняет пакетную обработку операций хеширования BLAKE3 с использованием SIMD-акселератора
— `worker_loop(…)` — основной цикл криптографического воркера, аналогичный диспетчеру задач, но специализированный для криптоопераций
— `start_batch_processor(&self)` — запускает фоновую задачу, которая накапливает операции в буферах и периодически отправляет их на пакетную обработку при достижении оптимального размера или истечении таймаута

## Модуль `core/reader.rs`

### Структура `PacketSizeModel`

«`rust
#[derive(Debug, Clone)]
pub struct PacketSizeModel {
pub mu: f64,
pub sigma: f64,
pub mean: f64,
pub median: f64,
pub p95: f64,
pub p99: f64,
pub history: VecDeque,
pub max_history: usize,
}
«`

**Назначение**: `PacketSizeModel` моделирует распределение размеров пакетов с помощью логнормального распределения. Параметры `mu` и `sigma` представляют среднее и стандартное отклонение натурального логарифма размера, что позволяет эффективно описывать асимметричные распределения размеров сетевых пакетов.

**Методы**:
— `update(&mut self, size: usize)` — добавляет новый размер в историю и при накоплении достаточного количества данных запускает переоценку параметров
— `estimate_parameters(&mut self)` — выполняет оценку параметров логнормального распределения методом максимального правдоподобия: `μ = mean(ln(x))`, `σ = std(ln(x))`, затем вычисляет характеристики распределения

### Структура `BatchReader`

«`rust
pub struct BatchReader {
_config: BatchConfig,
packet_size_model: Arc>,
intensity_model: Arc>,
timeout_model: Arc>,
event_tx: mpsc::Sender,
is_running: Arc,
stats: Arc>,
connection_count: Arc,
}
«`

**Назначение**: `BatchReader` отвечает за асинхронное чтение данных из сетевых соединений. Компонент реализует адаптивные таймауты чтения и собирает статистику о размерах пакетов и интенсивности потока.

**Ключевые методы**:
— `register_connection(…)` — регистрирует новое соединение и запускает отдельную асинхронную задачу для его чтения. Внутри задачи:
— Чтение выполняется с адаптивным таймаутом, управляемым `ReadTimeoutModel`
— Для каждого прочитанного пакета обновляются модели `PacketSizeModel` и `ReadIntensityModel`
— События о полученных данных отправляются через канал `event_tx` для дальнейшей обработки
— `read_from_stream_dyn(…)` — выполняет чтение фрейма из потока с заданным таймаутом, обрабатывая различные типы ошибок (таймаут, закрытие соединения, ошибки ввода-вывода)

## Модуль `core/writer.rs`

### Структура `WriteBatchModel`

«`rust
#[derive(Debug, Clone)]
pub struct WriteBatchModel {
pub current_batch_size: usize,
pub optimal_batch_size: usize,
pub min_batch_size: usize,
pub max_batch_size: usize,
pub accumulation_time: Duration,
pub write_time: Duration,
pub target_latency: Duration,
pub history: VecDeque,
}
«`

**Назначение**: `WriteBatchModel` управляет параметрами пакетной записи. Модель анализирует историю выполнения батчей для нахождения оптимального размера, обеспечивающего максимальную пропускную способность при соблюдении целевой задержки.

**Методы**:
— `update(&mut self, batch_size: usize, write_time: Duration)` — добавляет запись о выполненном батче в историю и обновляет текущие значения
— `estimate_optimal_batch_size(&mut self)` — анализирует историю для нахождения размера батча, обеспечивающего максимальную пропускную способность среди записей, время выполнения которых не превышает целевую задержку

### Структура `BackpressureModel`

«`rust
#[derive(Debug, Clone)]
pub struct BackpressureModel {
pub max_queue_size: usize,
pub current_queue: usize,
pub arrival_rate: f64,
pub service_rate: f64,
pub utilization: f64,
pub loss_probability: f64,
pub avg_wait_time: Duration,
}
«`

**Назначение**: `BackpressureModel` реализует модель M/M/1/K для оценки вероятности потерь и времени ожидания в очереди записи. Модель позволяет прогнозировать перегрузки и принимать решения о применении backpressure.

**Методы**:
— `update(&mut self, queue_size: usize, arrival_rate: f64, service_rate: f64)` — обновляет параметры модели и пересчитывает вероятность потерь по формуле для системы M/M/1/K
— `is_critical(&self) -> bool` — определяет, достигнут ли критический уровень перегрузки (заполнение очереди ≥ 90%)

### Структура `BatchWriter`

«`rust
pub struct BatchWriter {
config: BatchConfig,
batch_model: Arc>,
_backpressure_model: Arc>,
connections: Arc>>,
task_queue: Arc>>,
task_tx: mpsc::Sender,
task_rx: Arc>>,
backpressure_semaphore: Arc,
is_running: Arc,
stats: Arc>,
connection_count: Arc,
write_times: Arc>>,
}
«`

**Назначение**: `BatchWriter` управляет асинхронной записью данных с поддержкой пакетирования и приоритезации. Компонент накапливает задачи записи и отправляет их пакетами для минимизации накладных расходов.

**Ключевые методы**:
— `write(…)` — основной метод для отправки данных на запись:
— Для критических запросов (`is_critical()` или `requires_flush`) вызывает немедленную запись через `write_immediate`
— Для остальных запросов пытается получить разрешение семафора backpressure и отправляет задачу в очередь `task_tx`
— `write_immediate(&self, task: WriteTask)` — выполняет немедленную запись без пакетирования для критических задач
— `process_batch(…)` — обрабатывает пакет задач: объединяет данные, выполняет запись с адаптивным таймаутом и обновляет статистику
— `start_writer_for_connection(…)` — запускает фоновый процесс для конкретного соединения, который накапливает задачи из очереди и отправляет их пакетами при достижении оптимального размера или истечении интервала сброса

## Модуль `acceleration_batch/blake3_batch_accel.rs`

### Структура `Blake3BatchAccelerator`

«`rust
pub struct Blake3BatchAccelerator {
config: Blake3BatchConfig,
simd_capable: bool,
detected_features: SimdFeatures,
}
«`

**Назначение**: `Blake3BatchAccelerator` предоставляет SIMD-ускоренную пакетную обработку хеширования BLAKE3. Компонент автоматически определяет доступные аппаратные возможности и выбирает оптимальную стратегию выполнения.

**Ключевые методы**:
— `hash_keyed_batch(&self, keys: &[[u8; 32]], inputs: &[Vec]) -> Vec<[u8; 32]>` — выполняет пакетное хеширование с ключом, выбирая стратегию на основе размера батча и доступности SIMD:
— При большом размере батча (≥16) использует оптимизированную параллельную обработку с предварительной сортировкой по размеру для лучшего использования кэша
— При среднем размере батча (≥4) использует параллельную обработку через Rayon
— Для малых батчей использует последовательную скалярную реализацию
— `get_performance_info(&self) -> Blake3PerformanceInfo` — возвращает информацию о производительности, включая оценку пропускной способности и доступные аппаратные возможности

## Модуль `acceleration_batch/chacha20_batch_accel.rs`

### Структура `ChaCha20BatchAccelerator`

«`rust
pub struct ChaCha20BatchAccelerator {
config: ChaCha20BatchConfig,
simd_capable: bool,
detected_features: SimdFeatures,
performance_model: SIMDPerformanceModel,
}
«`

**Назначение**: `ChaCha20BatchAccelerator` предоставляет SIMD-ускоренную пакетную обработку шифрования/дешифрования ChaCha20. Компонент автоматически настраивает параметры производительности в зависимости от обнаруженных аппаратных возможностей.

**Ключевые методы**:
— `encrypt_batch(&self, keys: &[[u8; 32]], nonces: &[[u8; 12]], plaintexts: &[Vec]) -> Vec>` — выполняет пакетное шифрование, выбирая стратегию на основе доступности SIMD и размера данных
— `decrypt_batch(&self, keys: &[[u8; 32]], nonces: &[[u8; 12]], ciphertexts: &[Vec]) -> Vec>` — выполняет пакетное дешифрование (симметрично шифрованию)
— `get_simd_info(&self) -> SimdInfo` — возвращает информацию о доступных SIMD-возможностях и ожидаемом ускорении

## Заключение

Данная документация охватывает ключевые аспекты реализации адаптивной Batch-системы. Система демонстрирует современный подход к высокопроизводительной обработке данных, сочетающий передовые математические методы с тщательной инженерной проработкой. Кодовая база спроектирована для максимальной производительности без компромиссов в надежности, с акцентом на адаптивность к изменяющимся условиям нагрузки.

Все модули взаимосвязаны и работают вместе для обеспечения комплексного управления процессом пакетной обработки. Архитектура позволяет легко расширять систему новыми математическими моделями и оптимизациями, сохраняя при этом обратную совместимость и высокий уровень производительности.

## Основы адаптивной Batch-системы

### 1. Введение в архитектуру Batch-обработки

Batch-система, представленная в данном проекте, представляет собой высокопроизводительный, математически обоснованный механизм для эффективной обработки сетевых пакетов. В отличие от традиционных систем, обрабатывающих каждый пакет по отдельности, данный модуль накапливает входящие запросы в буферы (батчи) и обрабатывает их группами. Такой подход позволяет значительно снизить накладные расходы на переключение контекста, системные вызовы и криптографические операции, что критически важно для систем с высокой нагрузкой.

Основная сложность проектирования таких систем заключается в нахождении баланса между задержкой (latency) и пропускной способностью (throughput). Слишком маленький размер батча не позволяет достичь максимальной производительности, в то время как слишком большой батч приводит к недопустимому росту времени ожидания для отдельных запросов. Решение этой дилеммы достигается путем внедрения комплекса математических моделей, которые в реальном времени анализируют состояние системы и адаптируют её параметры.

### 2. Фундаментальная модель обработки

В основе системы лежит обобщенная модель времени обработки батча. Эта модель формализует зависимость времени обработки `T` от размера батча `B`. Аналитически она представляется следующим уравнением:

**T_process(B) = α/B + β + γ·B + δ·(B — B_opt)² + ε·I(B > L3_CACHE)**

Каждый член уравнения имеет строгое физическое обоснование:

* **`α/B` (Накладные расходы на батч):** Представляет собой фиксированные издержки на инициализацию батча (например, аллокация памяти, вызов функции), которые амортизируются с ростом размера батча. Коэффициент `α` измеряется в миллисекундах и характеризует «стоимость» формирования батча.
* **`β` (Минимальное время обработки):** Константа, отражающая неустранимую задержку, присутствующую даже при обработке минимального по размеру батча.
* **`γ·B` (Линейная составляющая):** Линейно растущая часть времени, пропорциональная размеру батча. Коэффициент `γ` (мс/B) характеризует стоимость обработки одного элемента.
* **`δ·(B — B_opt)²` (Квадратичный штраф за неоптимальность):** Параболический член, который вводит штраф за отклонение размера батча от оптимального значения `B_opt`. Этот член моделирует эффекты, связанные с неэффективным использованием внутренних буферов или алгоритмов, оптимизированных под конкретный размер данных.
* **`ε·I(B > L3_CACHE)` (Штраф за промахи кэша):** Дискретный штраф, активируемый, когда размер батча превышает объем L3-кэша процессора. `I(condition)` — это индикаторная функция, равная 1, когда условие истинно. Данный член моделирует резкое увеличение времени доступа к памяти из-за промахов кэша, что является критическим фактором производительности.

Эта модель служит основой для всех последующих оптимизаций. Задача адаптивной системы — непрерывно оценивать параметры `α`, `β`, `γ`, `δ`, `B_opt` и `ε` на основе наблюдаемых данных и, используя их, находить размер батча `B`, минимизирующий общее время обработки с учетом текущей нагрузки.

### 3. Оценка и прогнозирование нагрузки (λ)

Ключевым параметром для принятия решений является интенсивность входящего потока запросов, обозначаемая как `λ`. Система использует многоуровневый подход к оценке и прогнозированию этого параметра.

#### 3.1. Фильтр Калмана для сглаживания измерений

Для получения устойчивой оценки текущей интенсивности `λ` применяется дискретный фильтр Калмана. Измеренная пропускная способность (например, пакетов в секунду) содержит шум из-за флуктуаций трафика. Фильтр Калмана позволяет рекурсивно оценивать истинное значение `λ`, рассматривая его как скрытое состояние динамической системы.

Модель фильтра включает в себя:
* **Уравнение состояния:** `λ_t = λ_{t-1} + w_t`, где `w_t` — шум процесса с дисперсией `Q`. Предполагается, что нагрузка изменяется постепенно.
* **Уравнение измерения:** `z_t = λ_t + v_t`, где `v_t` — шум измерения с дисперсией `R`.

Алгоритм фильтра поочередно выполняет шаги предсказания (экстраполяции) и коррекции, обеспечивая оптимальную в смысле минимума среднеквадратичной ошибки оценку `λ`.

#### 3.2. Краткосрочное прогнозирование на основе вейвлет-преобразования

Для предсказания нагрузки на горизонте в несколько секунд используется дискретное вейвлет-преобразование (Discrete Wavelet Transform, DWT). Вейвлеты позволяют разложить временной ряд нагрузки на составляющие разного масштаба:
* **Аппроксимирующие коэффициенты (приближения):** Отражают низкочастотную составляющую, тренд нагрузки.
* **Детализирующие коэффициенты:** Отражают высокочастотные флуктуации и шум.

Анализируя исторические значения, система экстраполирует последние аппроксимирующие и детализирующие коэффициенты для получения прогноза на заданное число шагов вперед. Такой подход позволяет улавливать как общие тренды, так и повторяющиеся паттерны в трафике.

#### 3.3. Долгосрочное прогнозирование с цепями Маркова

Для предсказания наиболее вероятного состояния нагрузки на более длительную перспективу применяется цепь Маркова второго порядка. Пространство состояний нагрузки дискретизируется на фиксированное число уровней (например, 10). Модель хранит матрицу переходных вероятностей `P(λ_{t+1} | λ_t, λ_{t-1})`, то есть вероятность перехода в следующий уровень нагрузки, зависящую от двух предыдущих состояний. Это позволяет учитывать не только текущее значение, но и тенденцию (растет нагрузка или падает).

### 4. Динамическая оптимизация размера батча

Имея оценку текущей и будущей нагрузки, система приступает к главной задаче — вычислению оптимального размера батча. Эта задача решается комбинацией аналитических и численных методов.

#### 4.1. Аналитическое решение через кубическое уравнение

Минимизация полной задержки для одного запроса является сложной оптимизационной задачей. Приравнивая производную функции полной задержки `L(B) = B/λ + T_process(B)` к нулю, мы получаем условие оптимальности. Подставляя выражение для `T_process(B)`, приходим к кубическому уравнению относительно `B`:

`dL/dB = 1/λ — α/B² + γ + 2δ·(B — B_opt) = 0`

После приведения к стандартному виду `a·B³ + b·B² + c·B + d = 0`, система находит его действительные корни с помощью аналитического решателя (метод Кардано). Корни, лежащие в допустимом диапазоне `[B_min, B_max]`, являются кандидатами на оптимальный размер батча. Для каждого кандидата вычисляется итоговая задержка, и выбирается лучший.

#### 4.2. Упреждающее управление с моделью (MPC)

Для учета будущей динамики нагрузки применяется метод управления с прогнозирующей моделью (Model Predictive Control, MPC). На каждом шаге система:
1. Получает прогноз нагрузки `λ` на горизонте управления `N`.
2. Решает задачу оптимизации, минимизирующую взвешенную сумму задержек на всем горизонте, а также штрафы за резкое изменение размера батча и количества воркеров.
3. Применяет только первое вычисленное управляющее воздействие (новые значения `B` и `M`), а на следующем шаге повторяет расчет.

Это позволяет системе действовать проактивно, сглаживая переходные процессы при резких скачках нагрузки.

#### 4.3. ПИД-регулятор для коррекции по задержке

Несмотря на всю сложность моделей, они не могут учесть все недетерминированные факторы. Для компенсации остаточной ошибки используется классический ПИД-регулятор (Пропорционально-Интегрально-Дифференциальный). Ошибкой регулирования служит разница между целевой задержкой (`target_latency`) и фактически измеренной. Выходной сигнал регулятора (`correction`) используется как дополнительная поправка к размеру батча, полученному аналитически или через MPC.

### 5. Управление приоритетами (QoS)

Система поддерживает пять уровней приоритета (Critical, High, Normal, Low, Background), для каждого из которых гарантируются определенные показатели качества обслуживания (Quality of Service, QoS). Математической основой для этого служит модель обобщенного процессорного разделения (Generalized Processor Sharing, GPS).

#### 5.1. Модель GPS

В модели GPS для каждого класса приоритета `i` определен вес `φ_i`. Система гарантирует, что в любой момент времени доля пропускной способности, выделенная классу `i`, будет пропорциональна его весу: `C_i = (φ_i / Σφ_j) * C_total`, где `C_total` — общая пропускная способность системы.

На основе весов и текущей интенсивности поступления для каждого класса (`λ_i`) рассчитывается загрузка: `ρ_i = λ_i * B / C_i`. Это позволяет прогнозировать время ожидания в очереди для каждого приоритета, используя формулы теории массового обслуживания (модель M/M/1).

#### 5.2. Адаптация квот

Квоты приоритетов не являются статическими. Система периодически анализирует статистику отказов (rejection rate) и времени ожидания для каждого класса. Если, например, для высокого приоритета наблюдается недопустимо высокий уровень отказов, система динамически перераспределяет квоты, временно увеличивая долю высокого приоритета за счет низкого. Это реализует принцип справедливого распределения ресурсов в условиях перегрузки.

### 6. Обеспечение отказоустойчивости

Для предотвращения каскадных отказов при сбоях в работе downstream-сервисов или внутренних компонентов, система использует паттерн Circuit Breaker (автоматический выключатель).

#### 6.1. Марковская модель состояний

Состояние выключателя описывается цепью Маркова с тремя состояниями:
* **Closed (Замкнут):** Нормальный режим работы. Запросы передаются на обработку.
* **Open (Разомкнут):** Аварийный режим. Запросы немедленно отклоняются с ошибкой.
* **Half-Open (Полуоткрыт):** Режим проверки восстановления. Пропускается ограниченное число запросов для тестирования работоспособности.

Матрица интенсивностей переходов `Q` определяет вероятности перехода между состояниями. На основе решения системы уравнений равновесия `π·Q = 0` (с условием нормировки `Σπ = 1`) вычисляются стационарные вероятности нахождения системы в каждом состоянии, а также ключевые показатели надежности: среднее время до отказа (MTTF), среднее время восстановления (MTTR) и коэффициент готовности (Availability).

#### 6.2. Модель экспоненциального роста задержки восстановления

При переходе в состояние Open, система не пытается восстановиться мгновенно. Вместо этого используется стратегия экспоненциальной отсрочки (exponential backoff). Длительность нахождения в состоянии Open рассчитывается по формуле: `D_n = min(D_max, D_base * k^n)`, где `n` — номер неудачной попытки восстановления. Это позволяет дать проблемному компоненту время на стабилизацию, не перегружая его повторными попытками.

### 7. Заключение

Описанные теоретические модели образуют единую, когерентную систему управления, которая позволяет достичь высокой производительности и надежности в условиях переменной сетевой нагрузки. Комбинация методов оптимального управления, теории вероятностей, математической статистики и теории массового обслуживания обеспечивает не просто реактивную, а проактивную адаптацию системы к изменяющимся условиям работы.

## Криптографическая защита сервера: Документация кода

### CryptoPool — асинхронный пул криптографических воркеров

**Назначение**: Асинхронный пул для параллельной обработки криптографических операций с поддержкой одиночных и пакетных задач.

#### Структуры данных

«`rust
pub struct CryptoPool {
tx: mpsc::Sender,
batch_tx: mpsc::Sender,
}

pub enum CryptoTask {
Single {
ctx: Arc,
payload: Vec,
resp: oneshot::Sender, String>>,
},
Batch {
tasks: Vec<(Arc, Vec)>,
resp: oneshot::Sender>, String>>,
},
}

pub struct CryptoBatchTask {
pub tasks: Vec<(Arc, Vec)>,
pub resp: oneshot::Sender>, String>>,
}
«`

#### Инициализация пула

Метод `spawn` создает пул с указанным количеством воркеров:

«`rust
impl CryptoPool {
pub fn spawn(threads: usize) -> Self {
let (tx, rx) = mpsc::channel::(4096);
let (batch_tx, batch_rx) = mpsc::channel::(1024);

let rx = Arc::new(Mutex::new(rx));
let batch_rx = Arc::new(Mutex::new(batch_rx));

// Основные воркеры
for _ in 0..threads {
let rx = Arc::clone(&rx);
tokio::spawn(async move {
let worker = CryptoWorker::new();
worker.run(rx).await;
});
}

// Batch воркеры
for _ in 0..threads / 2 {
let batch_rx = Arc::clone(&batch_rx);
tokio::spawn(async move {
let worker = CryptoWorker::new();
worker.run_batch(batch_rx).await;
});
}

CryptoPool { tx, batch_tx }
}
}
«`

#### Основные методы

**Шифрование данных**:

«`rust
pub async fn encrypt(&self, ctx: Arc, plaintext: Vec)
-> Result, Box>
{
info!(«Encrypting payload of {} bytes», plaintext.len());

// Генерируем nonce
let nonce = self.generate_nonce();

// Шифруем используя AEAD cipher из SessionKeys
let ciphertext = ctx.aead_cipher
.encrypt(&nonce.into(), plaintext.as_ref())
.map_err(|e| format!(«Encryption failed: {}», e))?;

// Объединяем nonce и ciphertext
let mut result = Vec::with_capacity(nonce.len() + ciphertext.len());
result.extend_from_slice(&nonce);
result.extend_from_slice(&ciphertext);

Ok(result)
}
«`

**Расшифрование данных**:

«`rust
pub async fn decrypt(&self, ctx: &SessionKeys, payload: Vec)
-> Result, String>
{
let (tx_resp, rx_resp) = oneshot::channel();
let arc_ctx = Arc::new(ctx.clone());

let task = CryptoTask::Single {
ctx: arc_ctx,
payload,
resp: tx_resp,
};

if self.tx.send(task).await.is_err() {
return Err(«Failed to send decryption task».to_string());
}

match tokio::time::timeout(Duration::from_secs(3), rx_resp).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => Err(e.to_string()),
Err(_) => {
warn!(«CryptoPool decrypt timeout»);
Err(«Decryption timeout».to_string())
}
}
}
«`

### AES-GCM шифрование

Реализация симметричного шифрования с использованием алгоритма AES-256-GCM:

«`rust
use aes_gcm::{Aes256Gcm, KeyInit};
use generic_array::GenericArray;
use aes_gcm::aead::Aead;

pub struct AesGcmCipher;

impl AesGcmCipher {
pub fn new(key: &[u8; 32]) -> Aes256Gcm {
Aes256Gcm::new_from_slice(key).expect(«Invalid key length»)
}

pub fn encrypt(&self, cipher: &Aes256Gcm, nonce: &[u8],
plaintext: &[u8], _aad: &[u8]) -> Result, aes_gcm::Error> {
let nonce = GenericArray::from_slice(nonce);
cipher.encrypt(nonce, plaintext)
}

pub fn decrypt(&self, cipher: &Aes256Gcm, nonce: &[u8],
ciphertext: &[u8], _aad: &[u8]) -> Result, aes_gcm::Error> {
let nonce = GenericArray::from_slice(nonce);
cipher.decrypt(nonce, ciphertext)
}
}
«`

### Деривация ключей

Использование HKDF-SHA256 для безопасной деривации ключей:

«`rust
use hkdf::Hkdf;
use sha2::Sha256;

pub struct KeyDeriver;

impl KeyDeriver {
pub fn derive_keys(shared_secret: &[u8; 32], salt: &[u8], info: &[u8]) -> [u8; 32] {
let hk = Hkdf::::new(Some(salt), shared_secret);
let mut key = [0u8; 32];
hk.expand(info, &mut key).expect(«HKDF expansion failed»);
key
}
}
«`

### Протокол рукопожатия

#### Константы протокола

«`rust
pub const CLIENT_HELLO: u8 = 0xA0;
pub const SERVER_HELLO: u8 = 0xA1;
pub const PROTOCOL_VERSION: u8 = 0x01;
«`

#### Основная функция рукопожатия

«`rust
pub async fn perform_handshake(
stream: &mut tokio::net::TcpStream,
role: HandshakeRole,
) -> ProtocolResult {
match role {
HandshakeRole::Client => client_handshake(stream).await,
HandshakeRole::Server => {
let session_keys = server_handshake(stream).await?;
Ok(HandshakeResult {
session_keys,
role: HandshakeRole::Server,
})
}
}
}
«`

#### Клиентская часть рукопожатия

«`rust
async fn client_handshake(stream: &mut tokio::net::TcpStream)
-> ProtocolResult
{
let psk_bytes = get_psk()
.map_err(|e| ProtocolError::from(e))?;

let (client_auth_key, server_auth_key) = derive_psk_keys(&psk_bytes)
.map_err(|_e| ProtocolError::Crypto {
source: CryptoError::KeyDerivationFailed
})?;

// Генерируем клиентские ключи
let mut rng = OsRng;
let client_secret = EphemeralSecret::random_from_rng(&mut rng);
let client_pub = PublicKey::from(&client_secret);

let mut client_nonce = [0u8; 16];
rng.fill_bytes(&mut client_nonce);

let client_hmac = compute_client_hmac(
client_pub.as_bytes(),
&client_nonce,
&client_auth_key
);

// Отправляем ClientHello
let mut client_hello = Vec::with_capacity(82);
client_hello.push(CLIENT_HELLO);
client_hello.push(PROTOCOL_VERSION);
client_hello.extend_from_slice(client_pub.as_bytes());
client_hello.extend_from_slice(&client_nonce);
client_hello.extend_from_slice(&client_hmac);

write_frame(stream, &client_hello).await?;

// Читаем ServerHello с таймаутом
let server_hello = tokio::time::timeout(
Duration::from_secs(10),
read_frame(stream)
)
.await
.map_err(|_| ProtocolError::Timeout {
duration: Duration::from_secs(10)
})??;

// Проверка и обработка ServerHello…
}
«`

#### Вычисление HMAC для аутентификации

«`rust
fn compute_client_hmac(client_pub: &[u8], client_nonce: &[u8; 16],
auth_key: &[u8]) -> [u8; 32] {
let mut auth_data = Vec::with_capacity(32 + 16 + 2);
auth_data.extend_from_slice(&(client_pub.len() as u16).to_be_bytes());
auth_data.extend_from_slice(client_pub);
auth_data.extend_from_slice(&(client_nonce.len() as u16).to_be_bytes());
auth_data.extend_from_slice(client_nonce);

compute_hmac(&auth_data, auth_key)
}

fn compute_hmac(data: &[u8], auth_key: &[u8]) -> [u8; 32] {
let mut mac = HmacSha256::new_from_slice(auth_key)
.expect(«Auth key length is valid»);
mac.update(data);
mac.finalize().into_bytes().into()
}
«`

### Управление предварительными ключами (PSK)

#### Получение PSK из переменных окружения

«`rust
pub fn get_psk() -> Result> {
let psk_hex = env::var(«PSK_SECRET»)
.map_err(|_| anyhow!(«PSK_SECRET environment variable not set»))?;

if psk_hex.len() < 64 { return Err(anyhow!("PSK_SECRET must be at least 64 hex characters long")); } let psk_bytes = hex::decode(&psk_hex) .map_err(|_| anyhow!("PSK_SECRET must be a valid hex string"))?; if psk_bytes.len() < 32 { return Err(anyhow!("PSK_SECRET must be at least 32 bytes long")); } Ok(psk_bytes) } ``` #### Деривация ключей аутентификации из PSK ```rust pub fn derive_psk_keys(psk: &[u8]) -> Result<(Vec, Vec)> {
let hk = Hkdf::::new(None, psk);

let mut client_key = vec![0u8; 32];
let mut server_key = vec![0u8; 32];

hk.expand(b»client-auth-key», &mut client_key)
.map_err(|_| anyhow!(«HKDF expansion failed»))?;
hk.expand(b»server-auth-key», &mut server_key)
.map_err(|_| anyhow!(«HKDF expansion failed»))?;

Ok((client_key, server_key))
}
«`

### Сессионные ключи

#### Структура SessionKeys

«`rust
#[derive(Clone)]
pub struct SessionKeys {
pub aead_key_bytes: [u8; 32],
pub sign_key: [u8; 32],
pub session_id: [u8; 16],
pub aead_cipher: Aes256Gcm,
}

impl Zeroize for SessionKeys {
fn zeroize(&mut self) {
self.sign_key.zeroize();
self.session_id.zeroize();
}
}

impl Drop for SessionKeys {
fn drop(&mut self) {
self.zeroize();
}
}
«`

#### Создание сессионных ключей

«`rust
impl SessionKeys {
pub fn from_dh_shared_with_psk(shared_secret: &[u8; 32],
salt: &[u8], psk: &[u8]) -> Self {
// Detect CPU features for optimization
let use_hw_acceleration = is_aes_ni_supported() && is_avx2_supported();

let hk = Hkdf::::new(Some(salt), shared_secret);

let mut aead_key_bytes = [0u8; 32];
let mut sign_key = [0u8; 32];
let mut session_id = [0u8; 16];

// Генерируем уникальный session_id
hk.expand(b»session-id», &mut session_id)
.expect(«HKDF session id»);

// Используем PSK в качестве дополнительного контекста
let mut info_with_psk = Vec::new();
info_with_psk.extend_from_slice(b»ctx-aead»);
info_with_psk.extend_from_slice(psk);

hk.expand(&info_with_psk, &mut aead_key_bytes)
.expect(«HKDF aead»);
hk.expand(b»ctx-sign_key», &mut sign_key)
.expect(«HKDF sign key»);

// Создаем шифр
let aead_cipher = if use_hw_acceleration {
Aes256Gcm::new_from_slice(&aead_key_bytes).expect(«aead key»)
} else {
// Fallback to software implementation
Aes256Gcm::new_from_slice(&aead_key_bytes).expect(«aead key»)
};

Self {
aead_key_bytes,
sign_key,
session_id,
aead_cipher,
}
}
}
«`

#### Безопасные буферы для хранения ключей

«`rust
pub struct SecureBuffer {
inner: Vec,
}

impl SecureBuffer {
pub fn new(data: Vec) -> Self {
Self { inner: data }
}

pub fn as_bytes(&self) -> &[u8] {
&self.inner
}
}

impl SecureBuffer {
fn zeroize(&mut self) {
self.inner.zeroize();
}
}

impl Drop for SecureBuffer {
fn drop(&mut self) {
self.zeroize();
}
}
«`

### HMAC-подписи

«`rust
use hmac::{Hmac, Mac};
use sha2::Sha256;

type HmacSha256 = Hmac;

pub struct HmacSigner;

impl HmacSigner {
pub fn sign(&self, key: &[u8], data: &[u8]) -> [u8; 32] {
let mut mac = HmacSha256::new_from_slice(key)
.expect(«Invalid key length»);
mac.update(data);
mac.finalize().into_bytes().into()
}

pub fn verify(&self, key: &[u8], data: &[u8], signature: &[u8]) -> bool {
let expected = self.sign(key, data);
constant_time_eq::constant_time_eq(&expected, signature)
}
}
«`

### Проверка подписей с постоянным временем выполнения

«`rust
use constant_time_eq::constant_time_eq;

pub struct SignatureVerifier;

impl SignatureVerifier {
pub fn verify_constant_time(expected: &[u8], actual: &[u8]) -> bool {
constant_time_eq(expected, actual)
}
}
«`

### Пример использования

#### Инициализация криптографической системы

«`rust
use crate::core::protocol::crypto::crypto_pool::CryptoPool;
use crate::core::protocol::crypto::handshake::handshake::{
perform_handshake, HandshakeRole
};

// Создание пула криптографических воркеров
let crypto_pool = CryptoPool::spawn(4);

// Установление защищенного соединения
let mut stream = TcpStream::connect(«127.0.0.1:8080″).await?;
let handshake_result = perform_handshake(&mut stream, HandshakeRole::Client).await?;

// Получение сессионных ключей
let session_keys = handshake_result.session_keys;
let session_keys_arc = Arc::new(session_keys);
«`

#### Шифрование и расшифрование данных

«`rust
// Шифрование данных
let plaintext = b»Secret message».to_vec();
let ciphertext = crypto_pool.encrypt(
session_keys_arc.clone(),
plaintext
).await?;

// Расшифрование данных
let decrypted = crypto_pool.decrypt(
&session_keys_arc,
ciphertext
).await?;

assert_eq!(decrypted, b»Secret message»);
«`

#### Пакетная обработка

«`rust
// Подготовка пакетных задач
let batch_tasks = vec![
(session_keys_arc.clone(), data1),
(session_keys_arc.clone(), data2),
(session_keys_arc.clone(), data3),
];

// Пакетное расшифрование
let results = crypto_pool.decrypt_batch(batch_tasks).await;

// Обработка результатов
for (i, result) in results.iter().enumerate() {
if !result.is_empty() {
println!(«Пакет {} успешно расшифрован», i);
}
}
«`

### Конфигурация и переменные окружения

#### Обязательные переменные

«`bash
# Предварительный общий ключ (минимум 64 hex-символа)
export PSK_SECRET=»a1b2c3d4e5f67890a1b2c3d4e5f67890a1b2c3d4e5f67890a1b2c3d4e5f67890″
«`

#### Опциональные настройки

«`rust
// Количество воркеров в пуле (рекомендуется: количество ядер CPU)
let worker_count = num_cpus::get();

// Таймауты операций
const DECRYPTION_TIMEOUT: Duration = Duration::from_secs(3);
const BATCH_TIMEOUT: Duration = Duration::from_secs(5);
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
«`

### Обработка ошибок

Модуль использует иерархическую систему ошибок:

«`rust
pub enum ProtocolError {
Crypto { source: CryptoError },
HandshakeFailed { reason: String },
AuthenticationFailed { reason: String },
Timeout { duration: Duration },
MalformedPacket { details: String },
// … другие варианты ошибок
}

impl ProtocolError {
pub fn log(self) -> Self {
error!(«Protocol error: {:?}», self);
self
}
}
«`

### Мониторинг и логирование

Все криптографические операции логируются через систему `tracing`:

«`rust
use tracing::{info, error, warn, debug};

// Пример логирования в CryptoPool
info!(«Encrypting payload of {} bytes», plaintext.len());

// Логирование медленных операций
let elapsed = start.elapsed();
if elapsed > Duration::from_millis(5) {
warn!(«Slow decryption: {:?} for {} bytes», elapsed, payload.len());
}

// Логирование в debug-режиме
#[cfg(debug_assertions)]
{
info!(target: «session_keys», «session_id = {}», hex::encode(session_id));
}
«`

### Тестирование

#### Модульные тесты для HMAC

«`rust
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_hmac_signature() {
let signer = HmacSigner;
let key = b»test-key-32-bytes-long-123456789″;
let data = b»test data»;

let signature = signer.sign(key, data);
let is_valid = signer.verify(key, data, &signature);

assert!(is_valid, «HMAC verification should succeed»);
}

#[test]
fn test_hmac_tampered_data() {
let signer = HmacSigner;
let key = b»test-key-32-bytes-long-123456789″;
let data = b»test data»;
let tampered_data = b»tampered data»;

let signature = signer.sign(key, data);
let is_valid = signer.verify(key, tampered_data, &signature);

assert!(!is_valid, «HMAC verification should fail for tampered data»);
}
}
«`

### Заключение

Данная документация охватывает все ключевые аспекты реализации криптографического модуля. Система обеспечивает:

1. **Безопасность**: Современные алгоритмы, Perfect Forward Secrecy, защита от атак
2. **Производительность**: Асинхронная обработка, аппаратное ускорение, пакетные операции
3. **Надежность**: Детальное логирование, обработка ошибок, мониторинг
4. **Соответствие стандартам**: NIST, FIPS, RFC

Для успешного развертывания необходимо настроить переменные окружения, выбрать оптимальное количество воркеров в пуле и настроить систему мониторинга для отслеживания производительности и безопасности криптографических операций.

## Введение в архитектуру кода

Кодовая база Phantom Security организована в модульную структуру, где каждый компонент отвечает за конкретный аспект системы безопасности. Все модули написаны на Rust с акцентом на безопасность памяти, производительность и защиту от side-channel атак. Основные принципы дизайна включают zero-allocation в критических путях, constant-time операции и аппаратное ускорение.

## Модуль packet.rs

### Структура PhantomPacket

«`rust
/// Пакет с фантомной криптографией (полностью stack allocated)
pub struct PhantomPacket<'a> {
pub session_id: &’a [u8; 16],
pub sequence: u64,
pub timestamp: u64,
pub packet_type: u8,
pub ciphertext: &’a [u8],
pub signature: &’a [u8; 32],
}
«`

**Назначение**: Эта структура представляет криптографический пакет в памяти без дополнительных аллокаций. Все поля являются ссылками на существующие буферы, что позволяет обрабатывать пакеты с нулевыми накладными расходами на управление памятью.

**Поля**:
— `session_id` — 16-байтовый идентификатор сессии
— `sequence` — 64-битный порядковый номер пакета
— `timestamp` — временная метка создания
— `packet_type` — тип пакета (0xA0 = ClientHello, 0xA1 = ServerHello, и т.д.)
— `ciphertext` — зашифрованные данные с аутентификационным тегом
— `signature` — 32-байтовая подпись BLAKE3

### Создание пакетов

Метод `create()` выполняет полный цикл создания защищенного пакета:

«`rust
impl<'a> PhantomPacket<'a> {
pub fn create(
session: &PhantomSession,
packet_type: u8,
plaintext: &[u8],
buffer: &mut [u8],
chacha20_accel: &ChaCha20Accelerator,
blake3_accel: &Blake3Accelerator,
) -> ProtocolResult {
// 1. Проверка размера plaintext
if plaintext.len() > MAX_PAYLOAD_SIZE {
return Err(ProtocolError::MalformedPacket {
details: format!(«Payload too large: {} > {}»,
plaintext.len(), MAX_PAYLOAD_SIZE)
});
}

// 2. Генерация операционного ключа
let operation_key = session.generate_operation_key(«encrypt»);

// 3. Генерация nonce
let mut nonce = [0u8; NONCE_SIZE];
OsRng.fill_bytes(&mut nonce);

// 4. Шифрование данных
chacha20_accel.encrypt_in_place(
&chacha_key,
&nonce,
0,
&mut ciphertext_slice[..plaintext.len()],
);

// 5. Создание аутентификационного тега
let tag = blake3_accel.hash_keyed(&chacha_key,
&ciphertext_slice[..plaintext.len()]);

// 6. Подпись пакета
Self::create_signature_accel(
session,
&operation_key,
packet_type,
&nonce,
&ciphertext_slice,
signature_slice,
blake3_accel,
)?;

// 7. Формирование заголовка
Self::encode_header(
session,
operation_key.sequence,
packet_type,
(total_size — 4) as u16,
header_slice,
);

Ok(total_size)
}
}
«`

### Декодирование пакетов

Метод `decode()` выполняет разбор входящих пакетов:

«`rust
#[inline(always)]
pub fn decode(data: &’a [u8]) -> ProtocolResult {
// Минимальная длина: 97 байт
if data.len() < 97 { return Err(ProtocolError::MalformedPacket { details: format!("Packet too short: {} < 97", data.len()) }); } // Проверка магических байт if !constant_time_eq(&data[0..2], &HEADER_MAGIC) { return Err(ProtocolError::MalformedPacket { details: "Invalid magic bytes".to_string() }); } // Извлечение полей заголовка let session_id: &[u8; 16] = data[4..20].try_into().unwrap(); let sequence = u64::from_be_bytes(data[20..28].try_into().unwrap()); let packet_type = data[36]; // Разделение ciphertext и подписи let ciphertext = &data[37..data.len() - 32]; let signature: &[u8; 32] = data[data.len() - 32..].try_into().unwrap(); Ok(Self { session_id, sequence, timestamp: 0, packet_type, ciphertext, signature, }) } ``` ### Расшифровка пакетов Метод `decrypt()` выполняет полную верификацию и расшифровку: ```rust #[inline(always)] pub fn decrypt( &self, session: &PhantomSession, work_buffer: &mut [u8], output: &mut [u8], chacha20_accel: &ChaCha20Accelerator, blake3_accel: &Blake3Accelerator, ) -> ProtocolResult<(u8, usize)> {
// 1. Проверка session_id
if !constant_time_eq(self.session_id, session.session_id()) {
return Err(ProtocolError::AuthenticationFailed {
reason: «Session ID mismatch».to_string()
});
}

// 2. Проверка подписи
self.verify_signature_accel(session, blake3_accel)?;

// 3. Извлечение nonce и данных
let nonce = &self.ciphertext[..NONCE_SIZE];
let encrypted_data = &self.ciphertext[NONCE_SIZE..];

// 4. Генерация ключа дешифрования
let decrypt_key = session.generate_operation_key_for_sequence(
self.sequence,
«encrypt»
);

// 5. Дешифрование
chacha20_accel.encrypt_in_place(
&chacha_key,
nonce.try_into().unwrap(),
0,
&mut work_buffer[..data_len],
);

// 6. Проверка аутентификационного тега
let received_tag = &encrypted_data[data_len..data_len + TAG_SIZE];
let expected_tag = blake3_accel.hash_keyed(&chacha_key, &encrypted_copy);

if !constant_time_eq(&expected_tag[..TAG_SIZE], received_tag) {
return Err(ProtocolError::AuthenticationFailed {
reason: format!(«Invalid TAG. Key sequence: {}, nonce: {}»,
self.sequence, hex::encode(nonce))
});
}

// 7. Копирование результата
output[..output_len].copy_from_slice(&work_buffer[..output_len]);

Ok((self.packet_type, output_len))
}
«`

## Модуль keys.rs

### Структура PhantomMasterKey

«`rust
/// Фантомный мастер-ключ сессии
pub struct PhantomMasterKey {
pub(crate) scattered_parts: ScatteredParts,
pub(crate) operation_seed: [u8; 32],
pub(crate) session_id: [u8; 16],
pub(crate) created_at: Instant,
pub(crate) sequence_number: AtomicU64,
pub(crate) operation_count: AtomicU64,
}
«`

**Метод создания из общего секрета**:

«`rust
pub fn from_dh_shared_blake3(
shared_secret: &[u8; 32],
client_nonce: &[u8; 16],
server_nonce: &[u8; 16],
client_pub_key: &[u8; 32],
server_pub_key: &[u8; 32],
) -> (ScatteredParts, [u8; 16], [u8; 32]) {
let mut hasher = Hasher::new();
hasher.update(shared_secret);
hasher.update(client_nonce);
hasher.update(server_nonce);
hasher.update(client_pub_key);
hasher.update(server_pub_key);

// Вывод всех необходимых данных за один проход
let mut output = [0u8; 32 + 16 + 32];
hasher.finalize_xof().fill(&mut output);

let master_key: [u8; 32] = output[0..32].try_into().unwrap();
let session_id: [u8; 16] = output[32..48].try_into().unwrap();
let operation_seed: [u8; 32] = output[48..80].try_into().unwrap();

(scattered_parts, session_id, operation_seed)
}
«`

### Структура PhantomSession

«`rust
/// Фантомная сессия
pub struct PhantomSession {
pub(crate) master_key: PhantomMasterKey,
pub(crate) handshake_completed: bool,
}
«`

**Генерация операционного ключа**:

«`rust
pub fn generate_operation_key(&self, operation_type: &str) -> PhantomOperationKey {
let sequence = self.master_key.sequence_number.fetch_add(1, Ordering::SeqCst);

// Blake3 для деривации операционного ключа
let mut hasher = Hasher::new();
hasher.update(&self.master_key.session_id);
hasher.update(&sequence.to_be_bytes());
hasher.update(operation_type.as_bytes());
hasher.update(&self.master_key.operation_seed);

let mut operation_key_bytes = [0u8; 32];
hasher.finalize_xof().fill(&mut operation_key_bytes);

PhantomOperationKey::new(operation_key_bytes, sequence)
}
«`

## Модуль scatterer.rs

### Рассеивание ключей

«`rust
/// Рассеиватель памяти с ChaCha20Poly1305
pub struct MemoryScatterer {
encryption_key: [u8; 32],
}

impl MemoryScatterer {
pub fn scatter(&self, master_key: &[u8; 32]) -> ScatteredParts {
// 1. Генерация случайных частей
let mut l1_part = [0u8; 8];
let mut l2_part = [0u8; 16];
let mut register_seed = [0u8; 16];
let mut ram_part_nonce = [0u8; 12];

rng.fill_bytes(&mut l1_part);
rng.fill_bytes(&mut l2_part);
rng.fill_bytes(&mut register_seed);
rng.fill_bytes(&mut ram_part_nonce);

// 2. Вычисление RAM части
let mut ram_part_plain = [0u8; 32];
for i in 0..32 {
let mut value = master_key[i];
if i < 8 { value ^= l1_part[i]; } if i < 16 { value ^= l2_part[i % 16]; } if i < 16 { value ^= register_seed[i % 16]; } ram_part_plain[i] = value; } // 3. Шифрование RAM части let mut ram_part_with_tag = Vec::with_capacity(32 + 16); ram_part_with_tag.extend_from_slice(&ram_part_plain); let cipher = ChaCha20Poly1305::new(Key::from_slice(&self.encryption_key)); let nonce = Nonce::from_slice(&ram_part_nonce); cipher.encrypt_in_place(nonce, &[], &mut ram_part_with_tag)?; ScatteredParts { l1_part, l2_part, ram_part: ram_part_encrypted, ram_tag, ram_part_nonce, register_seed, } } } ``` ## Модуль assembler.rs ### Сборка ключей ```rust /// Интерфейс сборщика ключей pub trait KeyAssembler: Send + Sync { fn assemble(&self, parts: &ScatteredParts, scatterer: &MemoryScatterer) -> [u8; 32];
fn execution_time_ns(&self) -> u64;
}

/// Универсальный сборщик (fallback)
pub struct GenericAssembler;

impl KeyAssembler for GenericAssembler {
fn assemble(&self, parts: &ScatteredParts,
scatterer: &MemoryScatterer) -> [u8; 32] {
let start = Instant::now();

// 1. «Расшифровываем» RAM часть
let ram_part = scatterer.decrypt_ram_part_or_fallback(parts);

// 2. Собираем ключ
let mut key = [0u8; 32];
const FIXED_ITERATIONS: usize = 48;

// ChaCha20-friendly операции
for iteration in 0..FIXED_ITERATIONS {
for i in 0..32 {
let mut value = 0u8;

if i < 8 { value = value.wrapping_add(parts.l1_part[i]); } if i < 16 { value = value.wrapping_add(parts.l2_part[i % 16]); } value = value.wrapping_add(ram_part[i]); value ^= parts.register_seed[i % 16]; // Rotate как в ChaCha20 value = value.rotate_left(((iteration * 7 + i) % 8) as u32); key[i] = key[i].wrapping_add(value); } } // 3. ChaCha20-like перемешивания for i in (0..32).step_by(4) { if i + 3 < 32 { let a = key[i]; let b = key[i + 1]; let c = key[i + 2]; let d = key[i + 3]; // Quarter-round как в ChaCha20 key[i] = a.wrapping_add(b); key[i + 3] = d ^ key[i]; key[i + 3] = key[i + 3].rotate_left(16); // ... остальные операции quarter-round } } key } } ``` ## Модуль blake3_accel.rs ### SIMD-ускоренный BLAKE3 ```rust /// SIMD-ускоренный Blake3 процессор pub struct Blake3Accelerator { use_avx2: bool, use_sse41: bool, } impl Blake3Accelerator { pub fn new() -> Self {
#[cfg(target_arch = «x86_64»)]
{
let use_avx2 = is_x86_feature_detected!(«avx2»);
let use_sse41 = is_x86_feature_detected!(«sse4.1»);

Self { use_avx2, use_sse41 }
}
#[cfg(not(target_arch = «x86_64»))]
{
Self { use_avx2: false, use_sse41: false }
}
}

#[inline]
pub fn hash_keyed(&self, key: &[u8; 32], input: &[u8]) -> [u8; 32] {
let mut hasher = Hasher::new_keyed(key);
hasher.update(input);

let mut output = [0u8; 32];
hasher.finalize_xof().fill(&mut output);
output
}

#[inline]
pub fn hash_keyed_batch(
&self,
key: &[u8; 32],
inputs: &[&[u8]],
outputs: &mut [[u8; 32]]
) {
// Параллельная обработка batch
if inputs.len() >= 4 {
use rayon::prelude::*;

inputs.par_iter()
.zip(outputs.par_iter_mut())
.for_each(|(input, output)| {
*output = self.hash_keyed(key, input);
});
} else {
// Последовательная обработка
for (i, input) in inputs.iter().enumerate() {
outputs[i] = self.hash_keyed(key, input);
}
}
}
}
«`

## Модуль chacha20_accel.rs

### Аппаратное ускорение ChaCha20

«`rust
/// Ускоренный ChaCha20 процессор
pub struct ChaCha20Accelerator {
caps: CpuCapabilities,
}

impl ChaCha20Accelerator {
pub fn encrypt(
&self,
key: &[u8; 32],
nonce: &[u8; 12],
counter: u64,
input: &[u8],
output: &mut [u8]
) {
assert_eq!(input.len(), output.len());

#[cfg(target_arch = «x86_64»)]
if self.caps.avx2 && input.len() >= 256 {
unsafe {
x86::chacha20_encrypt_avx2(key, nonce, counter, input, output);
}
return;
}

#[cfg(target_arch = «aarch64»)]
if self.caps.neon && input.len() >= 128 {
// NEON реализация
// …
return;
}

// Fallback на чистую Rust реализацию
self.encrypt_fallback(key, nonce, counter, input, output);
}
}
«`

**AVX2 реализация**:

«`rust
#[cfg(target_arch = «x86_64»)]
pub mod x86 {
#[inline(always)]
pub unsafe fn chacha20_block_avx2(
key: &[u8; 32],
counter: u64,
nonce: &[u8; 12],
output: &mut [u8; 64]
) {
unsafe {
// Константы ChaCha20
let constants = _mm256_set_epi32(
0x61707865, 0x3320646e, 0x79622d32, 0x6b206574,
0x61707865, 0x3320646e, 0x79622d32, 0x6b206574
);

// 20 раундов (10 двойных раундов)
for _ in 0..10 {
// Четный раунд (COLUMN round)
x0 = _mm256_add_epi32(x0, x1);
x3 = _mm256_xor_si256(x3, x0);
x3 = _mm256_or_si256(
_mm256_slli_epi32(x3, 16),
_mm256_srli_epi32(x3, 16)
);

// … остальные операции раунда
}
}
}
}
«`

## Модуль handshake.rs

### Протокол рукопожатия

«`rust
/// Выполняет handshake с фантомными ключами
pub async fn perform_phantom_handshake(
stream: &mut tokio::net::TcpStream,
role: HandshakeRole,
) -> ProtocolResult {
match role {
HandshakeRole::Client => client_phantom_handshake(stream, handshake_start).await,
HandshakeRole::Server => server_phantom_handshake(stream, handshake_start).await,
}
}

/// Клиентская часть handshake
async fn client_phantom_handshake(
stream: &mut tokio::net::TcpStream,
start_time: Instant,
) -> ProtocolResult {
// 1. Генерация клиентских ключей
let client_secret = EphemeralSecret::random_from_rng(&mut rng);
let client_pub = PublicKey::from(&client_secret);

// 2. Генерация nonce
let mut client_nonce = [0u8; 16];
rng.fill_bytes(&mut client_nonce);

// 3. Отправка ClientHello
let mut client_hello = Vec::with_capacity(50);
client_hello.push(CLIENT_HELLO);
client_hello.push(PROTOCOL_VERSION);
client_hello.extend_from_slice(&client_pub_bytes);
client_hello.extend_from_slice(&client_nonce);

// 4. Чтение ServerHello
let server_hello = read_frame(stream).await?;

// 5. Парсинг ServerHello
let server_pub_bytes: [u8; 32] = server_hello[2..34].try_into()?;
let server_nonce: [u8; 16] = server_hello[34..50].try_into()?;

// 6. Вычисление общего секрета
let server_pub = PublicKey::from(server_pub_bytes);
let shared_secret = client_secret.diffie_hellman(&server_pub);

// 7. Создание фантомной сессии
let session = PhantomSession::from_dh_shared(
&shared_secret_bytes,
&client_nonce,
&server_nonce,
&client_pub_bytes,
&server_pub_bytes,
);

Ok(PhantomHandshakeResult {
session,
role: HandshakeRole::Client,
handshake_time: start_time.elapsed(),
})
}
«`

## Модуль runtime.rs

### Исполнительный движок

«`rust
/// Высокооптимизированный исполнительный движок
pub struct PhantomRuntime {
chacha20_accel: ChaCha20Accelerator,
blake3_accel: Blake3Accelerator,
batch_processor: Arc,
stats: std::sync::Mutex,
cpu_caps: CpuCapabilities,
}

impl PhantomRuntime {
/// Выполняет операцию с защитой от timing attacks
#[inline]
pub fn execute_with_acceleration(&self, operation: F) -> Result
where
F: FnOnce(&ChaCha20Accelerator, &Blake3Accelerator) -> T,
{
let start_instant = Instant::now();

#[cfg(target_arch = «x86_64»)]
let start_cycles = unsafe { std::arch::x86_64::_rdtsc() };

// Выполнение операции
let result = operation(&self.chacha20_accel, &self.blake3_accel);

#[cfg(target_arch = «x86_64»)]
let end_cycles = unsafe { std::arch::x86_64::_rdtsc() };

let elapsed_time = start_instant.elapsed();
let cycles = end_cycles.wrapping_sub(start_cycles);

// Проверка timing аномалий
if self.check_timing_anomaly(cycles, elapsed_time).is_err() {
warn!(«Timing anomaly detected»);
}

Ok(result)
}

fn check_timing_anomaly(&self, cycles: u64,
elapsed_time: std::time::Duration) -> Result<(), String> {
// Мягкие лимиты для SIMD операций
let max_cycles = if self.cpu_caps.avx2 { 2000 } else { 1000 };
let min_cycles = if self.cpu_caps.avx2 { 5 } else { 10 };

if cycles > max_cycles {
return Err(format!(«Timing attack detected: {} cycles», cycles));
}

if cycles < min_cycles { return Err(format!("Suspiciously fast: {} cycles", cycles)); } Ok(()) } } ``` ## Модуль batch_processor.rs ### Пакетная обработка ```rust /// Высокопроизводительный batch процессор pub struct PhantomBatchProcessor { packet_processor: PhantomPacketProcessor, worker_pool: rayon::ThreadPool, max_batch_size: usize, } impl PhantomBatchProcessor { /// Обработка batch параллельно pub fn process_batch(&self, mut batch: PhantomBatch) -> BatchResult {
let start = Instant::now();
let batch_size = batch.len();

let mut packet_types = vec![0u8; batch_size];
let mut plaintexts = Vec::with_capacity(batch_size);
let mut errors = vec![None; batch_size];

// Обработка в пуле worker-ов
self.worker_pool.install(|| {
let results: Vec<_> = (0..batch_size)
.into_par_iter()
.map(|i| {
let session = &batch.sessions[i];
let packet_data = &batch.packet_data[i];

match self.packet_processor.process_incoming(packet_data, session) {
Ok((packet_type, plaintext_slice)) => {
Ok((packet_type, plaintext_slice.to_vec()))
}
Err(e) => Err(e),
}
})
.collect();

// Сборка результатов
for (i, result) in results.into_iter().enumerate() {
match result {
Ok((packet_type, plaintext)) => {
packet_types[i] = packet_type;
plaintexts.push(plaintext);
errors[i] = None;
}
Err(e) => {
errors[i] = Some(e);
plaintexts.push(Vec::new());
}
}
}
});

BatchResult {
packet_types,
plaintexts,
errors,
processing_time: start.elapsed(),
}
}
}
«`

## Модуль crypto_pool_phantom.rs

### Криптографический пул

«`rust
/// Полностью оптимизированный криптографический пул
pub struct PhantomCryptoPool {
runtime: Arc,
batch_processor: Arc,
task_tx: mpsc::Sender,
batch_tx: mpsc::Sender,
concurrency_limiter: Arc,
packet_processor: Arc,
}

impl PhantomCryptoPool {
pub fn spawn(num_workers: usize) -> Self {
let runtime = Arc::new(PhantomRuntime::new(num_workers));
let batch_processor = runtime.batch_processor();
let packet_processor = Arc::new(PhantomPacketProcessor::new());

let (task_tx, task_rx) = mpsc::channel::(8192);
let (batch_tx, batch_rx) = mpsc::channel::(1024);

// Создание и запуск worker
let worker = CryptoWorker::new(
0,
runtime.clone(),
batch_processor.clone(),
packet_processor.clone(),
task_rx,
batch_rx,
concurrency_limiter.clone(),
);

tokio::spawn(async move {
worker.run().await;
});

Self { /* инициализация полей */ }
}

#[inline]
pub async fn decrypt(
&self,
session: Arc,
payload: Vec,
) -> ProtocolResult<(u8, Vec)> {
let _permit = self.concurrency_limiter.acquire().await.unwrap();

let (tx, rx) = oneshot::channel();
let task = CryptoTask::Decrypt {
session,
payload,
resp: tx,
};

self.task_tx.send(task).await?;

match tokio::time::timeout(Duration::from_millis(10), rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => Err(ProtocolError::Crypto { /* … */ }),
Err(_) => Err(ProtocolError::Timeout { /* … */ }),
}
}
}
«`

## Пример использования

### Базовое использование

«`rust
// Инициализация системы
let crypto = PhantomCrypto::new();
let processor = PhantomPacketProcessor::new();

// Создание сессии (в реальном use case через handshake)
let session = crypto.create_session();

// Шифрование данных
let plaintext = b»Секретное сообщение»;
let encrypted = processor.create_outgoing(
&session,
0x01, // packet_type
plaintext,
)?;

// Дешифрование данных
let (packet_type, decrypted) = processor.process_incoming(
&encrypted,
&session,
)?;

assert_eq!(plaintext, &decrypted[..]);
«`

### Пакетная обработка

«`rust
// Создание batch процессора
let batch_processor = PhantomBatchProcessor::new(4, 1000);

// Подготовка batch
let mut batch = PhantomBatch::new(100);
for i in 0..100 {
batch.add(
session.clone(),
create_test_packet(i),
);
}

// Обработка
let results = batch_processor.process_batch(batch);

// Анализ результатов
for (i, result) in results.errors.iter().enumerate() {
if result.is_none() {
println!(«Пакет {} успешно обработан: тип {}»,
i, results.packet_types[i]);
}
}
«`

### Асинхронный пул

«`rust
// Создание криптографического пула
let crypto_pool = PhantomCryptoPool::spawn(4);

// Асинхронная обработка
let session = Arc::new(session);
let payload = encrypted_data.to_vec();

tokio::spawn(async move {
match crypto_pool.decrypt(session, payload).await {
Ok((packet_type, decrypted)) => {
println!(«Расшифровано: {} байт», decrypted.len());
}
Err(e) => {
eprintln!(«Ошибка дешифрования: {:?}», e);
}
}
});
«`

## Конфигурация безопасности

### Настройка параметров

«`rust
/// Конфигурация фантомной системы
pub struct PhantomConfig {
pub session_timeout_ms: u64, // 90 секунд
pub max_sessions: usize, // 100,000
pub enable_hardware_acceleration: bool, // true
pub constant_time_enforced: bool, // true
}

impl Default for PhantomConfig {
fn default() -> Self {
Self {
session_timeout_ms: 90_000,
max_sessions: 100_000,
enable_hardware_acceleration: true,
constant_time_enforced: true,
}
}
}
«`

### Мониторинг и диагностика

«`rust
// Получение статистики выполнения
let stats = runtime.get_stats();
println!(«Статистика выполнения:»);
println!(» Всего операций: {}», stats.total_operations);
println!(» Неудачных операций: {}», stats.failed_operations);
println!(» Timing аномалий: {}», stats.timing_anomalies);
println!(» Среднее время: {} нс», stats.avg_execution_time_ns);
println!(» SIMD операций: {} ({:.1}%)»,
stats.simd_operations,
(stats.simd_operations as f64 / stats.total_operations as f64) * 100.0);

// Performance report
let report = runtime.get_performance_report();
println!(«Отчет производительности: {}», report);
«`

## Важные замечания по безопасности

### Constant-time операции

Все криптографические операции в Phantom Security реализованы с постоянным временем выполнения:

«`rust
// Пример constant-time сравнения
use constant_time_eq::constant_time_eq;

if !constant_time_eq(&expected_signature, self.signature) {
return Err(ProtocolError::AuthenticationFailed {
reason: «Invalid signature».to_string()
});
}

// Пример constant-time conditional move
use subtle::ConditionallySelectable;

let mut result = [0u8; 32];
result.conditional_assign(&correct_key, Choice::from(1));
«`

### Защита памяти

«`rust
// Немедленная очистка sensitive данных
use zeroize::Zeroize;

impl Zeroize for PhantomOperationKey {
fn zeroize(&mut self) {
self.key_bytes.zeroize();
}
}

impl Drop for PhantomOperationKey {
fn drop(&mut self) {
self.zeroize();
}
}

// Предотвращение вытеснения в swap
use libc::{mlock, munlock};

unsafe {
mlock(sensitive_data.as_ptr() as *const _, sensitive_data.len());
// …
munlock(sensitive_data.as_ptr() as *const _, sensitive_data.len());
}
«`

### Валидация входных данных

«`rust
// Проверка размеров перед обработкой
if plaintext.len() > MAX_PAYLOAD_SIZE {
return Err(ProtocolError::MalformedPacket {
details: format!(«Payload too large: {} > {}»,
plaintext.len(), MAX_PAYLOAD_SIZE)
});
}

// Проверка минимальных размеров
if data.len() < 97 { // header(37) + nonce(12) + tag(16) + signature(32) return Err(ProtocolError::MalformedPacket { details: format!("Packet too short: {} < 97", data.len()) }); } ``` ## Заключение Данная документация охватывает ключевые аспекты реализации Phantom Security. Система демонстрирует современный подход к криптографической защите, сочетающий передовые алгоритмы с тщательной инженерией безопасности. Кодовая база спроектирована для максимальной производительности без компромиссов в безопасности, с акцентом на защиту от timing-атак, side-channel атак и утечек памяти. Все модули взаимосвязаны и работают вместе для обеспечения комплексной защиты данных. Архитектура позволяет легко расширять систему новыми алгоритмами и оптимизациями, сохраняя при этом обратную совместимость и высокий уровень безопасности.

## Структура проекта и организация модулей

Кодовая база TCP-сервера организована по модульному принципу, что обеспечивает чёткое разделение ответственности и упрощает поддержку системы.

**Архитектурное решение:** Иерархическая организация позволяет инкапсулировать связанную функциональность и минимизировать циклические зависимости. Модули сгруппированы по функциональным доменам: безопасность, обработка пакетов, мониторинг и управление базами данных.

## Менеджер соединений: детали реализации

Файл `connection_manager.rs` содержит реализацию централизованного управления TCP-соединениями. Основная структура выглядит следующим образом:

«`rust
pub struct ConnectionManager {
active_connections: Arc, mpsc::Sender<()>>>>,
}

impl ConnectionManager {
pub fn new() -> Self {
Self {
active_connections: Arc::new(RwLock::new(HashMap::new())),
}
}

pub async fn register_connection(&self, session_id: Vec, shutdown_tx: mpsc::Sender<()>) {
let session_id_clone = session_id.clone();
let mut connections = self.active_connections.write().await;
connections.insert(session_id, shutdown_tx);
info!(«Connection registered for session: {}», hex::encode(&session_id_clone));
}

pub async fn unregister_connection(&self, session_id: &[u8]) {
let mut connections = self.active_connections.write().await;
connections.remove(session_id);
info!(«Connection unregistered for session: {}», hex::encode(session_id));
}

pub async fn force_disconnect(&self, session_id: &[u8]) {
if let Some(shutdown_tx) = self.active_connections.write().await.remove(session_id) {
let _ = shutdown_tx.send(()).await;
info!(«Forced disconnect for session: {}», hex::encode(session_id));
}
}
}
«`

**Ключевые особенности реализации:**
— Использование `Arc>` обеспечивает потокобезопасный доступ к карте соединений
— Каждый элемент карты содержит канал `mpsc::Sender<()>` для принудительного отключения
— Метод `force_disconnect()` отправляет сигнал через канал, что инициирует корректное завершение обработки соединения

## Основной цикл обработки клиентских соединений

Функция `handle_client_connection()` реализует полный жизненный цикл соединения:

«`rust
pub async fn handle_client_connection(
stream: TcpStream,
peer: SocketAddr,
session_keys: Arc,
dispatcher: Arc,
session_manager: Arc,
heartbeat_manager: Arc,
_connection_manager: Arc,
) -> anyhow::Result<()> {
// Инициализация SecurityAudit и SecurityMetrics
let _ = SecurityAudit::initialize().await;

let timer = SecurityMetrics::processing_time().start_timer();
let (out_tx, out_rx) = mpsc::channel::>(32768);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let (reader, writer) = stream.into_split();

// Регистрируем соединение
_connection_manager.register_connection(
session_keys.session_id.to_vec(),
shutdown_tx
).await;

// Writer task
let writer_task = tokio::spawn(write_task(writer, out_rx));

// Main processing loop с поддержкой принудительного закрытия
let process_result = tokio::select! {
result = process_loop(
reader,
peer,
session_keys.clone(),
dispatcher,
out_tx.clone(),
session_manager.clone(),
) => {
result
}
_ = shutdown_rx.recv() => {
info!(target: «server», «{} forcibly disconnected by heartbeat timeout», peer);
Ok(())
}
};

// Cleanup
writer_task.abort();
session_manager.force_remove_session(&session_keys.session_id).await;
_connection_manager.unregister_connection(&session_keys.session_id).await;
drop(timer);
SecurityMetrics::active_connections().dec();

info!(target: «server», «{} closed», peer);
process_result
}
«`

**Архитектурные паттерны:**
— Разделение потока на читающую и пишущую части (`into_split()`) позволяет независимо управлять вводом и выводом
— Использование `tokio::select!` обеспечивает обработку как нормального потока данных, так и принудительного отключения
— Паттерн RAII (Resource Acquisition Is Initialization) применяется для гарантированного освобождения ресурсов

## Система безопасности: многоуровневая архитектура

### Rate Limiting с микросекундной точностью

Модуль `micro_limiter.rs` реализует высокоточную систему ограничения скорости запросов:

«`rust
pub struct HighPrecisionRateLimiter {
// Существующие счетчики
small_packets: Vec,
medium_packets: Vec,
large_packets: Vec,
control_packets: Vec,

// Новая система отслеживания пакетов по IP
packet_history: std::sync::RwLock>,

// Конфигурация
config: RateLimitConfig,
}

impl HighPrecisionRateLimiter {
const WINDOW_SIZE: usize = 1_000_000;

#[inline(always)]
pub fn check_limit(&self, priority: PacketPriority, now_micros: u64) -> bool {
let (counters, limit_per_second) = match priority {
PacketPriority::HandshakeCritical => (&self.control_packets, self.config.control_packets_per_second),
PacketPriority::SmallHighPriority => (&self.small_packets, self.config.small_packets_per_second),
PacketPriority::DataTransferMedium => (&self.medium_packets, self.config.medium_packets_per_second),
PacketPriority::LargeBackground => (&self.large_packets, self.config.large_packets_per_second),
PacketPriority::HeartbeatLow => (&self.control_packets, self.config.control_packets_per_second),
};

let bucket_idx = (now_micros % Self::WINDOW_SIZE as u64) as usize;
let current_count = counters[bucket_idx].fetch_add(1, Ordering::Relaxed);

// Увеличиваем бёрст-лимит для большей терпимости
let burst_limit = limit_per_second * self.config.burst_window_micros / 1_000_000;
current_count < burst_limit } } ``` **Оптимизации производительности:** - Выравнивание кэша для атомарных счетчиков предотвращает false sharing - Скользящее окно в 1 миллион микросекунд обеспечивает высокую точность - Раздельные счетчики для разных приоритетов пакетов позволяют дифференцированно ограничивать трафик ### Обнаружение аномалий трафика Модуль `anomaly_detector.rs` реализует систему обнаружения подозрительных паттернов: ```rust impl AnomalyDetector { pub fn detect_anomalies(&self, ip_stats: &IPStats, global_stats: &TrafficStats) -> Vec<(AnomalyType, f64)> {
let mut anomalies = Vec::new();

// 1. Проверка частоты пакетов
if ip_stats.packet_rate > self.thresholds.max_packets_per_second {
let severity = (ip_stats.packet_rate / self.thresholds.max_packets_per_second).min(1.0);
anomalies.push((AnomalyType::HighPacketRate, severity));
}

// 2. Проверка burst-трафика
if global_stats.packets_per_second > 0.0 {
let burst_ratio = ip_stats.packet_rate / global_stats.packets_per_second;
if burst_ratio > self.thresholds.max_burst_multiplier {
let severity = (burst_ratio / self.thresholds.max_burst_multiplier).min(1.0);
anomalies.push((AnomalyType::BurstTraffic, severity));
}
}

// 3. Проверка мелких пакетов (симптом DDoS)
if ip_stats.avg_packet_size < 100.0 { let small_packet_ratio = 1.0 - (ip_stats.avg_packet_size / 100.0).min(1.0); if small_packet_ratio > self.thresholds.small_packet_ratio_threshold {
anomalies.push((AnomalyType::SmallPacketFlood, small_packet_ratio));
}
}

// 4. Проверка flood соединений
if ip_stats.connection_count > self.thresholds.max_connections_per_minute {
let severity = (ip_stats.connection_count as f64 / self.thresholds.max_connections_per_minute as f64).min(1.0);
anomalies.push((AnomalyType::ConnectionFlood, severity));
}

anomalies
}
}
«`

**Алгоритмические особенности:**
— Взвешенная система скоринга учитывает несколько факторов одновременно
— Пороги настраиваются через `DetectionThresholds`
— Метод `calculate_anomaly_score()` комбинирует различные типы аномалий в единый показатель

## Парсинг и обработка сетевых пакетов

### Декодирование пакетов с криптографической проверкой

Модуль `packet_parser.rs` реализует полную цепочку валидации и дешифрования:

«`rust
impl PacketParser {
pub fn decode_packet(ctx: &SessionKeys, data: &[u8]) -> ProtocolResult<(u8, Vec)> {
// Проверка минимальной длины пакета
let minimal = 2 + 2 + 1 + NONCE_SIZE + 16 + SIGNATURE_SIZE;
if data.len() < minimal { return Err(ProtocolError::MalformedPacket { details: format!("Packet too short: {} bytes, expected at least {}", data.len(), minimal) }); } // Проверка магических байт if !constant_time_eq(&data[0..2], &HEADER_MAGIC) { return Err(ProtocolError::MalformedPacket { details: "Invalid magic bytes".to_string() }); } // Извлечение длины пакета let length = u16::from_be_bytes([data[2], data[3]]) as usize; if length > MAX_PAYLOAD_SIZE {
return Err(ProtocolError::MalformedPacket {
details: format!(«Packet too large: {} bytes, max: {}», length, MAX_PAYLOAD_SIZE)
});
}

// Проверка HMAC подписи
let hmac_start = 4 + length — SIGNATURE_SIZE;
let signed_part = &data[0..hmac_start];
let signature = &data[hmac_start..hmac_start + SIGNATURE_SIZE];

let mut mac = ::new_from_slice(&ctx.sign_key)
.map_err(|_| ProtocolError::Crypto {
source: CryptoError::InvalidKeyLength {
expected: 32,
actual: ctx.sign_key.len()
}
})?;

mac.update(signed_part);
let computed_tag = mac.finalize().into_bytes();

if !constant_time_eq(&computed_tag, signature) {
return Err(ProtocolError::Crypto {
source: CryptoError::HmacVerificationFailed
});
}

// Дешифрование AES-GCM
let payload = &signed_part[5..]; // Пропускаем header (2+2+1)
let nonce_bytes = &payload[..NONCE_SIZE];
let ciphertext = &payload[NONCE_SIZE..];

let nonce = GenericArray::from_slice(nonce_bytes);
let plaintext = ctx.aead_cipher
.decrypt(
nonce,
Payload {
msg: ciphertext,
aad: &data[0..5], // AAD = магические байты + длина + тип
}
)
.map_err(|e| ProtocolError::Crypto {
source: CryptoError::DecryptionFailed {
reason: e.to_string()
}
})?;

Ok((data[4], plaintext)) // Возвращаем тип пакета и расшифрованные данные
}
}
«`

**Криптографические гарантии:**
— Использование `constant_time_eq` предотвращает timing attacks при сравнении HMAC
— AES-GCM обеспечивает конфиденциальность и аутентичность одновременно
— AAD (Additional Authenticated Data) включает заголовок пакета, что предотвращает подмену метаданных

### Конвейерная обработка пакетов

Модуль `orchestrator.rs` реализует паттерн цепочки обязанностей для обработки пакетов:

«`rust
pub struct PipelineOrchestrator {
stages: Vec>,
}

impl PipelineOrchestrator {
pub fn new() -> Self {
Self { stages: Vec::new() }
}

pub fn add_stage(mut self, stage: S) -> Self {
self.stages.push(Box::new(stage));
self
}

pub async fn execute(&self, mut context: PipelineContext) -> Result, StageError> {
let start_time = std::time::Instant::now();

for (i, stage) in self.stages.iter().enumerate() {
info!(«Executing pipeline stage {}», i + 1);
stage.execute(&mut context).await?;
}

let processing_time = start_time.elapsed();
info!(«Pipeline execution completed in {:?}», processing_time);

context.encrypted_response
.ok_or_else(|| StageError::ProcessingFailed(«No response generated».to_string()))
}
}
«`

**Архитектурные преимущества:**
— Возможность добавлять/удалять стадии обработки без изменения основной логики
— Изоляция ошибок: сбой на одной стадии не приводит к падению всей системы
— Легкость тестирования каждой стадии независимо

## Система управления конфигурацией

Модуль `config.rs` реализует типобезопасную систему конфигурации с поддержкой hot-reload:

«`rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub congestion_control: CongestionConfig,
pub rate_limiting: RateLimitingConfig,
pub reputation: ReputationConfig,
pub auditing: AuditingConfig,
pub test_mode: TestModeConfig,
}

pub struct ConfigManager {
config: RwLock,
config_path: Option,
}

impl ConfigManager {
pub async fn update_config(&self, update_fn: F) -> Result<(), String>
where
F: FnOnce(&mut SecurityConfig),
{
let mut config = self.config.write().await;
update_fn(&mut config);

// Валидация конфигурации
self.validate_config(&config)?;

// Автосохранение при изменении
self.save_config().await?;

Ok(())
}

fn validate_config(&self, config: &SecurityConfig) -> Result<(), String> {
if config.congestion_control.anomaly_threshold_soft >= config.congestion_control.anomaly_threshold_hard {
return Err(«anomaly_threshold_soft must be less than anomaly_threshold_hard».to_string());
}

if config.rate_limiting.max_packets_per_second == 0 {
return Err(«max_packets_per_second must be greater than 0».to_string());
}

Ok(())
}
}
«`

**Особенности реализации:**
— Автоматическая сериализация/десериализация через Serde
— Валидация конфигурации при каждом изменении
— Поддержка атомарных обновлений через замыкания

## Мониторинг и метрики

Модуль `security_metrics.rs` интегрируется с Prometheus для сбора метрик:

«`rust
impl SecurityMetrics {
pub fn register(registry: &Registry) -> anyhow::Result<()> {
let metrics = Self::new()?;
registry.register(Box::new(metrics.replay_attacks.clone()))?;
registry.register(Box::new(metrics.rate_limit_hits.clone()))?;
registry.register(Box::new(metrics.failed_handshakes.clone()))?;
registry.register(Box::new(metrics.successful_sessions.clone()))?;
registry.register(Box::new(metrics.active_connections.clone()))?;
registry.register(Box::new(metrics.processing_time.clone()))?;
registry.register(Box::new(metrics.nonce_cache_size.clone()))?;
registry.register(Box::new(metrics.congestion_accepted.clone()))?;
registry.register(Box::new(metrics.congestion_rate_limited.clone()))?;
registry.register(Box::new(metrics.congestion_banned.clone()))?;
registry.register(Box::new(metrics.system_load.clone()))?;
Ok(())
}
}
«`

**Собранные метрики включают:**
— Количество обнаруженных replay-атак
— Срабатывания rate limiting
— Успешные и неуспешные handshake-процедуры
— Активные соединения и нагрузку системы

## Адаптивные алгоритмы контроля перегрузки

### AIMD алгоритм для динамического регулирования лимитов

Модуль `aimd_algorithm.rs` реализует классический алгоритм Additive Increase/Multiplicative Decrease:

«`rust
impl AIMDAlgorithm {
pub async fn record_success(&self, ip: IpAddr) -> u64 {
let mut states = self.ip_states.write().await;
let state = states.entry(ip).or_insert_with(|| IPLimitState {
current_limit: self.config.initial_limit,
used_this_window: 0,
last_updated: Instant::now(),
success_count: 0,
failure_count: 0,
});

state.success_count += 1;
state.used_this_window += 1;

// Additive Increase: увеличиваем лимит при успехе
if state.success_count >= 10 {
state.current_limit = (state.current_limit + self.config.additive_increase)
.min(self.config.max_limit);
state.success_count = 0;
}

state.current_limit
}

pub async fn record_failure(&self, ip: IpAddr) -> u64 {
let mut states = self.ip_states.write().await;
let state = states.entry(ip).or_insert_with(|| IPLimitState {
current_limit: self.config.initial_limit,
used_this_window: 0,
last_updated: Instant::now(),
success_count: 0,
failure_count: 0,
});

state.failure_count += 1;

// Multiplicative Decrease: уменьшаем лимит при перегрузке
if state.failure_count >= 3 {
state.current_limit = ((state.current_limit as f64) * self.config.multiplicative_decrease) as u64;
state.current_limit = state.current_limit.max(self.config.min_limit);
state.failure_count = 0;
}

state.current_limit
}
}
«`

**Принцип работы:**
— При успешной обработке пакетов лимит постепенно увеличивается (additive increase)
— При обнаружении проблем лимит резко уменьшается (multiplicative decrease)
— Алгоритм автоматически находит оптимальный баланс между пропускной способностью и стабильностью

## Заключение технической документации

Представленная кодовая база демонстрирует современный подход к разработке высоконагруженных сетевых серверов на Rust. Ключевые инженерные решения включают:

1. **Асинхронную архитектуру** на основе Tokio Runtime
2. **Многоуровневую систему безопасности** с адаптивными алгоритмами
3. **Модульную организацию кода** с чёткими границами ответственности
4. **Комплексный мониторинг** и самодиагностику
5. **Конфигурируемость** всех аспектов работы системы

Код следует принципам идиоматического Rust, включая обработку ошибок через `Result`, использование трейт-объектов для полиморфизма и гарантии безопасности через систему владения.

## Общие понятия

### Введение в систему мониторинга

Мониторинговая система представляет собой комплексный механизм наблюдения за состоянием и производительностью программного обеспечения в реальном времени. Её основная задача заключается в непрерывном сборе, анализе и визуализации метрик, которые отражают работоспособность различных компонентов системы. В отличие от простого логирования, мониторинг предполагает активное наблюдение за системой с возможностью автоматического реагирования на отклонения от нормального состояния.

Архитектура системы построена на принципах модульности и расширяемости, что позволяет адаптировать её для наблюдения за различными типами компонентов: от низкоуровневых системных ресурсов до высокоуровневой бизнес-логики приложения. Система реализует подход, при котором каждый компонент предоставляет стандартизированный набор метрик и проверок здоровья, что обеспечивает единообразие в сборе и обработке данных.

### Концепция централизованного агрегирования

Ключевым элементом архитектуры является UnifiedMonitor — централизованный агрегатор, который координирует работу всех мониторов и проверок здоровья. Этот компонент выполняет функцию оркестратора, обеспечивая синхронизацию сбора данных, управление жизненным циклом мониторов и консолидацию результатов. Благодаря такому подходу достигается декомпозиция ответственности: специализированные мониторы фокусируются на сборе данных в своей предметной области, а центральный агрегатор занимается их обработкой и анализом.

Централизованная архитектура позволяет реализовать сложные сценарии анализа, такие как корреляция событий из разных источников или вычисление комплексных показателей здоровья системы на основе множества факторов. Кроме того, такой подход упрощает интеграцию с внешними системами визуализации и оповещения, поскольку предоставляет единую точку входа для получения всех метрик и статусов.

### Многоуровневая система метрик

Система оперирует несколькими типами метрик, каждый из которых предназначен для решения определенных задач наблюдения. Счетчики (Counters) используются для отслеживания кумулятивных значений, таких как общее количество обработанных запросов или количество установленных соединений. Эти метрики имеют свойство монотонного возрастания и не уменьшаются со временем, что делает их идеальными для подсчета событий.

Измерители (Gauges), в отличие от счетчиков, представляют текущие значения, которые могут изменяться в любом направлении. Они используются для отслеживания таких параметров, как использование памяти, загрузка процессора или количество активных сессий. Особенностью работы с измерителями является необходимость учета временных характеристик — значение в конкретный момент времени может не отражать общую картину, поэтому важное значение имеют методы агрегации и сглаживания данных.

Гистограммы (Histograms) предоставляют статистическое распределение измеряемых величин, что особенно полезно для анализа производительности. Они позволяют отслеживать не только средние значения, но и процентили, что дает более полное представление о поведении системы. Например, при анализе времени ответа API важно знать не только среднее время, но и 95-й или 99-й процентили, которые отражают опыт пользователей с наихудшей производительностью.

### Система проверок здоровья

Концепция проверок здоровья (Health Checks) расширяет базовый мониторинг метрик, добавляя семантическую оценку состояния компонентов. В отличие от простого отслеживания числовых значений, проверки здоровья реализуют бизнес-логику оценки работоспособности, учитывая взаимосвязи между различными метриками и пороговые значения.

Проверки здоровья реализуются через иерархию компонентов, где каждый компонент может находиться в одном из двух состояний: «здоров» или «нездоров». Это бинарное состояние дополняется детализированной информацией о причинах проблем и рекомендациями по их устранению. Система поддерживает различные типы компонентов — базы данных, сетевые службы, системные ресурсы, внешние API — каждый со своей спецификой оценки здоровья.

Важной особенностью реализации является использование адаптеров, которые позволяют интегрировать существующие мониторы в систему проверок здоровья. Это обеспечивает обратную совместимость и позволяет постепенно расширять функциональность без необходимости переписывания существующего кода.

### Многоуровневая система оповещений

Система оповещений построена на принципах приоритизации и контекстуализации событий. Каждое оповещение классифицируется по уровню серьезности: информационные сообщения, предупреждения, ошибки и критические события. Такая градация позволяет разделить поток событий на категории, требующие разного уровня внимания и скорости реакции.

Информационные сообщения используются для отслеживания нормальной работы системы и аудита изменений состояния. Они не требуют немедленного вмешательства, но важны для понимания общей картины работы системы. Предупреждения сигнализируют о потенциальных проблемах, которые могут перерасти в более серьезные сбои, если их проигнорировать. Ошибки указывают на реальные проблемы, влияющие на функциональность системы, но не обязательно приводящие к полной её неработоспособности. Критические события требуют немедленного вмешательства, так как указывают на проблемы, нарушающие основные функции системы.

Система реализует механизмы предотвращения «шторма оповещений» — ситуации, когда большое количество однотипных событий перегружает каналы уведомлений. Это достигается через агрегацию похожих событий, регулирование частоты отправки и интеллектуальное подавление дубликатов.

### Асинхронная модель сбора данных

Архитектура системы построена на асинхронной модели выполнения операций, что позволяет минимизировать влияние мониторинга на производительность основной системы. Все операции сбора метрик, выполнения проверок здоровья и отправки оповещений выполняются в фоновом режиме, не блокируя основные потоки выполнения приложения.

Асинхронная модель реализуется через механизмы обратного давления (backpressure), которые предотвращают перегрузку системы в ситуациях, когда темп генерации данных превышает возможности их обработки. Это особенно важно в условиях высокой нагрузки, когда система должна сохранять работоспособность даже при временном увеличении объема мониторинговых данных.

Система использует кэширование промежуточных результатов для уменьшения нагрузки на мониторируемые компоненты. Например, данные о системных ресурсах, которые относительно медленно меняются, кэшируются на короткое время, что позволяет избежать частых запросов к операционной системе. Этот подход особенно эффективен в сценариях, когда несколько потребителей запрашивают одни и те же данные в короткий промежуток времени.

### Принципы отказоустойчивости

Мониторинговая система спроектирована с учетом принципов устойчивости к сбоям. Это означает, что временная недоступность отдельных компонентов мониторинга или перегрузка каналов связи не должна приводить к каскадным отказам или существенному влиянию на производительность основной системы.

Механизмы повторных попыток с экспоненциальным откладыванием применяются при взаимодействии с внешними зависимостями, такими как базы данных или сетевые службы. Этот подход позволяет системе автоматически восстанавливаться после временных сбоев, постепенно увеличивая интервалы между повторными попытками, чтобы не усугублять ситуацию при длительных простоях зависимых систем.

Система реализует концепцию «деградации функциональности» — при невозможности собрать полный набор данных, она продолжает работать с доступной информацией, явно указывая на ограничения в предоставляемых отчетах. Например, если недоступны данные о дисковом пространстве, система продолжает отслеживать другие метрики, отмечая в отчетах отсутствие информации по конкретному компоненту.

### Расширяемость и интеграция

Архитектура системы предусматривает простые механизмы расширения функциональности через добавление новых типов мониторов и проверок здоровья. Стандартизированные интерфейсы позволяют интегрировать специализированные мониторы для наблюдения за уникальными аспектами конкретного приложения без изменения ядра системы.

Система поддерживает экспорт данных в форматах, совместимых с популярными инструментами визуализации и анализа, такими как Prometheus и Grafana. Это обеспечивает возможность построения комплексных дашбордов, объединяющих данные из различных источников, и создания сложных правил алертинга на основе агрегированных метрик.

Интеграция с существующими системами логирования позволяет коррелировать события мониторинга с детальной диагностической информацией, что упрощает анализ причин возникновения проблем. Система поддерживает контекстное логирование, где каждое событие мониторинга дополняется идентификаторами, позволяющими связать его с соответствующими записями в логах приложения.

### Безопасность и производительность

Система мониторинга реализует меры защиты от злоупотреблений и случайных ошибок конфигурации. Ограничение частоты запросов, проверка достоверности данных и контроль потребления ресурсов предотвращают ситуации, когда мониторинговая система сама становится источником проблем для основной системы.

Производительность системы оптимизирована за счет использования эффективных структур данных, минимизации накладных расходов на синхронизацию и оптимизации алгоритмов агрегации. Особое внимание уделено сценариям работы под высокой нагрузкой, когда система должна обрабатывать тысячи метрик в секунду без существенного влияния на общую производительность приложения.

Конфигурируемость системы позволяет адаптировать её поведение под конкретные требования среды выполнения. Настройки интервалов обновления, пороговых значений для оповещений и стратегий кэширования могут быть изменены без перекомпиляции кода, что обеспечивает гибкость при развертывании в различных условиях.

## Введение в архитектуру сетевого взаимодействия

TCP-сервер представляет собой фундаментальный компонент системы, обеспечивающий надежную двустороннюю коммуникацию между клиентскими приложениями и серверной инфраструктурой. В отличие от традиционных HTTP-серверов, использующих модель запрос-ответ, данный сервер реализует постоянное соединение с поддержкой состояния, что позволяет эффективно обрабатывать потоки данных в реальном времени и минимизировать накладные расходы на установление соединения.

Архитектура сервера построена на принципах асинхронного ввода-вывода, что обеспечивает высокую производительность при работе с тысячами одновременных подключений. Это достигается за счет использования event loop модели, где основной поток не блокируется на операциях чтения/записи, а эффективно распределяет задачи между рабочими потоками.

## Теоретические основы протокола связи

Сервер реализует бинарный протокол собственной разработки, оптимизированный для минимизации накладных расходов и обеспечения максимальной безопасности. Протокол использует фиксированную структуру заголовка пакета, включающую магические байты для идентификации, поле длины для контроля целостности и тип пакета для маршрутизации обработки.

Критически важным аспектом является обеспечение конфиденциальности и целостности передаваемых данных. Для этого применяется комбинированный криптографический подход: симметричное шифрование AES-GCM для защиты содержимого пакетов и HMAC-SHA256 для обеспечения аутентичности. Такое сочетание позволяет эффективно противостоять как пассивному прослушиванию, так и активным атакам подмены.

## Принципы управления соединениями и сессиями

Сервер реализует сложную систему управления жизненным циклом соединений, основанную на концепции сессий. Каждое физическое TCP-соединение ассоциируется с логической сессией, которая содержит криптографический контекст, метаданные клиента и состояние взаимодействия. Это позволяет обеспечить изоляцию клиентов и безопасное хранение чувствительной информации.

Система heartbeat реализует механизм обнаружения «зависших» соединений через регулярный обмен служебными пакетами. Если клиент не подтверждает свою активность в течение заданного интервала, сервер инициирует процедуру безопасного завершения сессии с освобождением всех связанных ресурсов. Этот механизм критически важен для предотвращения утечек памяти и обеспечения стабильности системы при длительных периодах работы.

## Многоуровневая система безопасности

Безопасность в TCP-сервере реализована как многоуровневая система защиты, где каждый слой решает специфические задачи. На сетевом уровне применяется rate limiting с адаптивными алгоритмами, способный динамически подстраиваться под текущую нагрузку и выявлять аномальные паттерны трафика. Алгоритм использует принцип «скользящего окна» для анализа частоты запросов и автоматического ограничения подозрительных источников.

Система репутации IP-адресов формирует поведенческие профили на основе исторических данных взаимодействия. Каждому IP-адресу присваивается репутационный балл, который корректируется в зависимости от характера активности. Это позволяет дифференцированно подходить к обработке трафика: доверенные источники получают приоритет, в то время как подозрительные адреса подвергаются дополнительной проверке.

Контроль перегрузки (congestion control) представляет собой интеллектуальную систему, анализирующую не только объем трафика, но и его структуру, временные паттерны и аномальные характеристики. Система способна идентифицировать различные типы атак, включая flood-атаки, slowloris и другие сложные векторы, применяя соответствующие контрмеры.

## Архитектурные паттерны обработки данных

Обработка входящих пакетов организована по принципу конвейера (pipeline), где каждый этап выполняет строго определенную функцию. Такой подход обеспечивает модульность системы и позволяет легко расширять функциональность путем добавления новых стадий обработки. Конвейер включает стадии дешифрования, валидации, обработки бизнес-логики и повторного шифрования ответа.

Диспетчер задач (Dispatcher) реализует паттерн «разделяй и властвуй», распределяя нагрузку между пулом рабочих потоков. Это позволяет эффективно использовать многопроцессорные системы и минимизировать задержки при обработке высоконагруженных сценариев. Каждая задача содержит полный контекст обработки, что обеспечивает изоляцию и предотвращает состояние гонки.

## Мониторинг и самодиагностика

Сервер включает комплексную систему мониторинга, которая предоставляет детальную метрику о всех аспектах работы системы. Собираются как технические показатели (количество активных соединений, скорость обработки, использование памяти), так и бизнес-ориентированные метрики (статистика по типам пакетов, географическое распределение клиентов).

Система здоровья (health check) постоянно отслеживает работоспособность всех компонентов сервера, выявляя деградации производительности и потенциальные точки отказа. При обнаружении проблем система способна автоматически перераспределять нагрузку или инициировать процедуры восстановления, что повышает общую отказоустойчивость.

## Принципы устойчивости и отказоустойчивости

Архитектура сервера спроектирована с учетом принципов graceful degradation — способности сохранять базовую функциональность при частичных отказах подсистем. Реализованы механизмы автоматического восстановления после сбоев, включая переподключение к зависимым службам, очистку некорректных состояний и валидацию целостности данных.

Транзакционность операций обеспечивается через систему блокировок и атомарных операций, что гарантирует консистентность данных даже в условиях параллельного доступа. Особое внимание уделено обработке пограничных случаев и исключительных ситуаций, что минимизирует вероятность неопределенного поведения системы.

## Интеграционные аспекты и масштабируемость

Сервер спроектирован как часть экосистемы микросервисов, с четко определенными интерфейсами взаимодействия. Поддерживается интеграция с системами аутентификации, базами данных, кэширующими слоями и системами сбора метрик. Такая архитектура позволяет легко адаптировать сервер к различным инфраструктурным окружениям.

Горизонтальное масштабирование достигается за счет stateless-архитектуры обработки запросов и внешнего хранения состояния сессий. Это позволяет разворачивать несколько экземпляров сервера за балансировщиком нагрузки, обеспечивая линейный рост производительности при увеличении числа узлов.

## Криптографические основы

Безопасность коммуникаций основана на современных криптографических стандартах и практиках. Процесс handshake реализует асимметричное шифрование для безопасного обмена ключами, после чего переходит на более эффективное симметричное шифрование для передачи данных. Используются эфемерные ключи сессии, что обеспечивает совершенную прямую секретность — даже при компрометации долгосрочных ключей ранее перехваченный трафик не может быть расшифрован.

Система управления ключами обеспечивает их безопасное хранение, ротацию и отзыв. Реализованы механизмы защиты от replay-атак через использование криптографических nonce и временных меток, что гарантирует уникальность каждого пакета.

## Заключение

TCP-сервер представляет собой сложную инженерную систему, сочетающую высокопроизводительную обработку сетевого трафика с комплексными механизмами безопасности. Архитектурные решения, основанные на современных практиках разработки распределенных систем, обеспечивают надежность, масштабируемость и отказоустойчивость. Понимание теоретических основ работы сервера является необходимым условием для эффективной эксплуатации, мониторинга и дальнейшего развития системы.

Сервер не является изолированным компонентом, а представляет собой часть целостной экосистемы, где безопасность, производительность и надежность являются не отдельными характеристиками, а emergent properties, возникающими из грамотного взаимодействия всех подсистем.

## Криптографическая защита сервера: Общие понятия

### Введение в архитектуру безопасности

Криптографический модуль представляет собой фундаментальный компонент системы безопасности, разработанный для защиты данных в процессе их передачи по сетям с ненадежными каналами связи. Основная цель модуля — обеспечение трёх ключевых свойств информационной безопасности: конфиденциальности передаваемых данных, целостности информации и аутентификации взаимодействующих сторон. Модуль реализует современные криптографические протоколы, соответствующие промышленным стандартам и рекомендациям ведущих организаций в области информационной безопасности.

### Теоретические основы криптографической защиты

В основе модуля лежит гибридная криптографическая система, комбинирующая преимущества симметричного и асимметричного шифрования. Для установления защищенного канала связи используется комбинация предварительно распределенных ключей (Pre-Shared Keys, PSK) и алгоритма обмена ключами на основе эллиптических кривых (Elliptic Curve Diffie-Hellman, ECDH). Такой подход обеспечивает как сильную аутентификацию сторон, так и совершенную прямую секретность (Perfect Forward Secrecy, PFS), что означает невозможность расшифровки перехваченных сообщений даже при компрометации долговременных ключей в будущем.

Принцип совершенной прямой секретности достигается за счет использования эфемерных (одноразовых) ключей Диффи-Хеллмана для каждой сессии. Эти ключи генерируются случайным образом в начале каждого сеанса связи и уничтожаются после его завершения. Даже если злоумышленнику удастся получить доступ к долговременному PSK, он не сможет расшифровать ранее перехваченные сообщения, поскольку для их расшифровки требуются сессионные ключи, которые больше не существуют.

### Протокол аутентифицированного рукопожатия

Процесс установления защищенного соединения начинается с выполнения протокола рукопожатия, который реализует взаимную аутентификацию клиента и сервера. Протокол следует принципу «доказательства знания», где каждая сторона демонстрирует владение общим секретным ключом (PSK), не раскрывая его в процессе обмена. Аутентификация осуществляется посредством вычисления кодов аутентификации сообщений (HMAC) на основе производных от PSK ключей аутентификации.

Рукопожатие построено по схеме двухэтапного обмена сообщениями. В первом сообщении клиент отправляет свой эфемерный открытый ключ, случайное значение (nonce) и HMAC, вычисленный с использованием ключа аутентификации клиента. Сервер, получив это сообщение, проверяет HMAC и, при успешной проверке, генерирует собственный эфемерный ключ и nonce, вычисляет ответный HMAC с использованием ключа аутентификации сервера и отправляет клиенту. После успешной взаимной аутентификации обе стороны вычисляют общий секрет по алгоритму Диффи-Хеллмана, который затем используется для генерации сессионных ключей.

Важным аспектом протокола является защита от атак повторного воспроизведения (replay attacks). Для этого используются одноразовые случайные значения (nonce), которые гарантируют уникальность каждого сеанса рукопожатия. Дополнительно, все операции проверки HMAC выполняются за постоянное время, что исключает возможность проведения атак по времени, когда злоумышленник может получить информацию о секретном ключе, анализируя время выполнения операций.

### Генерация и управление сессионными ключами

После успешного завершения рукопожатия система приступает к генерации сессионных ключей. Этот процесс основан на использовании функции деривации ключей на основе HMAC (HKDF), которая принимает на вход общий секрет Диффи-Хеллмана, соль (состоящую из объединенных публичных ключей и nonce обеих сторон) и контекстную информацию. HKDF выполняет две основные функции: извлечение псевдослучайного ключа из исходного материала и его последующее расширение для получения криптографически сильных ключей необходимой длины.

В результате работы HKDF генерируются два типа ключей: ключ для аутентифицированного шифрования (AEAD) и ключ для цифровых подписей (HMAC). Ключ AEAD используется в алгоритме AES-256-GCM для обеспечения конфиденциальности и целостности данных, а ключ HMAC — для проверки целостности служебных сообщений. Каждой сессии также присваивается уникальный идентификатор, вычисляемый как производная от общих параметров рукопожатия, что позволяет однозначно идентифицировать сеанс связи в логировании и мониторинге.

### Механизмы симметричного шифрования

Для защиты передаваемых данных модуль использует аутентифицированное шифрование с ассоциированными данными (Authenticated Encryption with Associated Data, AEAD) на основе алгоритма AES в режиме GCM (Galois/Counter Mode). Данный режим сочетает в себе счетчик (CTR) для шифрования и аутентификацию на основе поля Галуа, что обеспечивает как конфиденциальность, так и целостность данных в рамках единой криптографической примитивы.

AES-256-GCM использует 256-битные ключи, что соответствует современным требованиям к стойкости симметричного шифрования. Размер nonce (одноразового числа) составляет 96 бит, что является оптимальным значением, обеспечивающим достаточное пространство для случайных значений при сохранении совместимости с различными реализациями. При шифровании к данным автоматически добавляется тег аутентификации, который позволяет получателю проверить, что данные не были изменены в процессе передачи.

Важной особенностью реализации является поддержка аппаратного ускорения через инструкции AES-NI (Advanced Encryption Standard New Instructions), которые доступны в современных процессорах. При их наличии операции шифрования и расшифрования выполняются значительно быстрее, что позволяет обрабатывать большие объемы данных с минимальными задержками. В случае отсутствия аппаратной поддержки система автоматически переключается на оптимизированную программную реализацию.

### Управление ключами и безопасность памяти

Система управления ключами реализует принципы минимальных привилегий и разделения обязанностей. Долговременные ключи (PSK) хранятся исключительно в зашифрованном виде в защищенном хранилище и никогда не передаются по сети в открытом виде. Сессионные ключи имеют ограниченный срок жизни и уничтожаются сразу после завершения сеанса связи или при обнаружении потенциальной угрозы безопасности.

Для защиты ключей в памяти применяется несколько методов. Все криптографические ключи хранятся в специальных структурах, которые автоматически зануляют память при освобождении ресурсов (техника zeroization). Это предотвращает возможность извлечения ключей из неинициализированной памяти. Кроме того, система использует защищенные буферы для временного хранения чувствительных данных в процессе их обработки, что исключает возможность утечки через временные файлы или файлы подкачки.

Система также включает механизмы защиты от атак по сторонним каналам, таких как атаки по времени и атаки по энергопотреблению. Все операции сравнения криптографических значений (например, проверка HMAC) выполняются за постоянное время независимо от содержания данных. Это достигается за счет использования специализированных функций сравнения, которые обрабатывают каждый байт данных независимо от результата сравнения предыдущих байтов.

### Архитектура параллельной обработки

Для обеспечения высокой производительности в условиях большой нагрузки модуль реализует асинхронную архитектуру обработки криптографических операций. Криптографический пул (CryptoPool) представляет собой группу рабочих потоков, которые параллельно обрабатывают запросы на шифрование и расшифрование. Такой подход позволяет эффективно использовать ресурсы многоядерных процессоров и минимизировать задержки при обработке большого количества одновременных соединений.

Пул поддерживает два режима работы: обработка одиночных запросов и пакетная обработка. Одиночные запросы используются для оперативного шифрования отдельных сообщений, в то время как пакетная обработка оптимизирована для одновременного шифрования или расшифрования нескольких сообщений в рамках одной сессии. Это особенно эффективно при работе с фрагментированными данными или при массовой обработке запросов от одного клиента.

Архитектура пула обеспечивает балансировку нагрузки между рабочими потоками и включает механизмы защиты от перегрузки. При достижении предельной нагрузки система автоматически переходит в режим регулирования запросов, предотвращая исчерпание ресурсов и обеспечивая стабильную работу даже в условиях атак типа «отказ в обслуживании».

### Интеграция с системой мониторинга и аудита

Криптографический модуль тесно интегрирован с системой мониторинга и аудита безопасности. Все криптографические операции регистрируются в системном журнале с указанием типа операции, размера данных, идентификатора сессии и результата выполнения. Это позволяет администраторам отслеживать использование криптографических функций и оперативно выявлять аномальную активность.

Система также предоставляет метрики производительности, включая время выполнения различных криптографических операций, загрузку криптографического пула и статистику успешных/неуспешных операций. Эти метрики используются для тонкой настройки производительности и планирования масштабирования системы. В случае обнаружения подозрительной активности (например, множественных неудачных попыток аутентификации) система автоматически генерирует оповещения и может временно блокировать подозрительные соединения.

### Соответствие стандартам и лучшим практикам

Реализация криптографического модуля соответствует следующим международным стандартам и рекомендациям:

— Алгоритм AES соответствует спецификации FIPS PUB 197
— Режим GCM определен в NIST Special Publication 800-38D
— Кривая Curve25519 описана в RFC 7748
— Функция деривации ключей HKDF определена в RFC 5869
— Рекомендации по использованию криптографии приведены в NIST Special Publication 800-175B

Система также следует принципам безопасного проектирования, включая минимализацию поверхности атаки, разделение привилегий, защиту от распространенных уязвимостей (таких как атаки по времени, переполнение буфера и инъекции) и регулярное обновление криптографических библиотек.

### Заключение

Криптографический модуль представляет собой комплексное решение для защиты данных в распределенных системах, сочетающее современные криптографические алгоритмы с эффективной архитектурой обработки. Благодаря реализации принципов глубинной защиты (defense in depth) и соответствию промышленным стандартам, система обеспечивает надежную защиту передаваемых данных от широкого спектра угроз, включая перехват трафика, подмену сообщений и атаки на криптографические протоколы. Модуль спроектирован с учетом требований производительности и масштабируемости, что позволяет его эффективное использование как в небольших системах, так и в крупных распределенных средах с высокой нагрузкой.

## Обзор

Эта страница содержит минимальные шаги для быстрого запуска SomnyToo. Если вам нужны подробные инструкции по установке или конфигурации, обратитесь к соответствующим разделам документации.

## Краткий путь установки

**Для разработки и тестирования (без Docker)**

«`bash
git clone https://github.com/TvoiiSon/SomnyToo.git
cd SomnyToo
cargo run
«`

**Для продакшена (с Docker)**

«`bash
git clone https://github.com/TvoiiSon/SomnyToo.git
cd SomnyToo
docker-compose up -d
«`

## Экспресс-настройка

**1. Скачивание кода**

«`bash
git clone https://github.com/TvoiiSon/SomnyToo.git
cd SomnyToo
«`

**2. Базовая конфигурация**

Создайте минимальный .env файл:

«`bash
cp .env.example .env
«`

## Отредактируйте только обязательные параметры

— DATABASE_URL=postgres://postgres:password@localhost/somnytoo
— SERVER_PORT=8000

**3. Быстрый запуск**

Вариант A: С Docker (рекомендуется)

«`bash
docker-compose up -d
«`

Вариант B: Нативно

«`bash
cargo run
«`

## Готово!

Вы успешно запустили SomnyToo. Теперь можете:

— Проверить API endpoints

— Настроить под свои нужды

— Интегрировать с вашей системой

## Шаг 1: Скачивание кода

**Клонирование репозитория**

«`bash
git clone https://github.com/TvoiiSon/SomnyToo.git
cd SomnyToo
«`

## Шаг 2: Настройка Docker окружения

**Копирование конфигурации**

«`bash
cp .env.example .env
«`

**Настройка переменных окружения**

Отредактируйте файл .env в соответствии с вашими потребностями:
«`bash
DATABASE_URL=postgres://postgres:password@db:5432/somnytoo
SERVER_HOST=0.0.0.0
SERVER_PORT=8000
LOG_LEVEL=info
«`

## Шаг 3: Запуск с Docker Compose

**Запуск всех сервисов**

«`bash
docker-compose up -d
«`

Эта команда запустит:
— PostgreSQL базу данных
— SomnyToo сервер

**Проверка статуса контейнеров**

«`bash
docker-compose ps
«`

## Шаг 4: Проверка работы

**Просмотр логов сервера**

«`bash
docker-compose logs -f app
«`

**Ожидаемый вывод при успешном запуске:**

«`bash
2026-01-07T10:45:58.238094Z  INFO somnytoo: 🚀 Starting Server Mode…
2026-01-07T10:45:58.238337Z  INFO somnytoo: 📝 Configuration loaded:
2026-01-07T10:45:58.238434Z  INFO somnytoo:   — Host: 0.0.0.0
2026-01-07T10:45:58.238525Z  INFO somnytoo:   — Port: 8000
2026-01-07T10:45:58.238601Z  INFO somnytoo:   — Log level: debug
2026-01-07T10:45:58.238682Z  INFO somnytoo:   — Database URL: postgres://user:password@host/db_name?options=-c%20search_path=db_name
2026-01-07T10:45:58.238799Z  INFO somnytoo:   — Phantom Mode: true
2026-01-07T10:45:58.238905Z  INFO somnytoo:   — Phantom Assembler: auto
2026-01-07T10:45:58.239003Z  INFO somnytoo:   — Hardware Auth: false
2026-01-07T10:45:58.239132Z  INFO somnytoo: 🚀 Initializing phantom security server…
[DB] Initializing HIGH-PERFORMANCE connection pools
2026-01-07T10:45:59.240335Z DEBUG sqlx::query: summary=»SELECT 1″ db.statement=»» rows_affected=1 rows_returned=1 elapsed=3.2883ms elapsed_secs=0.0032883
2026-01-07T10:45:59.241347Z DEBUG sqlx::query: summary=»SELECT 1″ db.statement=»» rows_affected=1 rows_returned=1 elapsed=561.5µs elapsed_secs=0.0005615
2026-01-07T10:45:59.242083Z DEBUG sqlx::query: summary=»SELECT 1″ db.statement=»» rows_affected=1 rows_returned=1 elapsed=389.7µs elapsed_secs=0.0003897
[SQL Server] Started successfully with prepared statements cache
[QueryExecutor] Server registered successfully
2026-01-07T10:45:59.242387Z  INFO somnytoo: Database initialized successfully
2026-01-07T10:45:59.242512Z  INFO somnytoo: 💓 Initializing heartbeat system…
2026-01-07T10:45:59.242631Z  INFO somnytoo: ✅ Basic heartbeat manager started
2026-01-07T10:45:59.242735Z  INFO somnytoo: ✅ Heartbeat sender started
2026-01-07T10:45:59.242805Z  INFO somnytoo: 💓 Heartbeat system initialized successfully
2026-01-07T10:45:59.242865Z  INFO somnytoo: 🎯 Server is ready and accepting phantom connections
2026-01-07T10:45:59.243047Z  INFO server: 👻 Phantom Security Server listening on 0.0.0.0:8000
2026-01-07T10:45:59.243124Z  INFO somnytoo: 🔧 Phantom Configuration:
2026-01-07T10:45:59.243189Z  INFO somnytoo:   — Session timeout: 90000ms
2026-01-07T10:45:59.243249Z  INFO somnytoo:   — Max sessions: 100000
2026-01-07T10:45:59.243324Z  INFO somnytoo:   — Hardware acceleration: true
2026-01-07T10:45:59.243404Z  INFO somnytoo:   — Constant time enforcement: true
2026-01-07T10:45:59.243486Z  INFO somnytoo:   — Assembler type: auto
2026-01-07T10:45:59.243580Z  INFO somnytoo: 💓 Heartbeat System:
2026-01-07T10:45:59.243661Z  INFO somnytoo:   — Active sessions: 0
2026-01-07T10:45:59.243743Z  INFO somnytoo:   — Monitor alerts: 0
2026-01-07T10:45:59.252616Z DEBUG somnytoo::core::protocol::server::heartbeat::sender: Heartbeat sender: checking active sessions
«`