Назад к блогу

Server API

24 Янв, 2026

## Структура проекта и организация модулей

Кодовая база TCP-сервера организована по модульному принципу, что обеспечивает чёткое разделение ответственности и упрощает поддержку системы.

**Архитектурное решение:** Иерархическая организация позволяет инкапсулировать связанную функциональность и минимизировать циклические зависимости. Модули сгруппированы по функциональным доменам: безопасность, обработка пакетов, мониторинг и управление базами данных.

## Менеджер соединений: детали реализации

Файл `connection_manager.rs` содержит реализацию централизованного управления TCP-соединениями. Основная структура выглядит следующим образом:

«`rust
pub struct ConnectionManager {
active_connections: Arc, mpsc::Sender<()>>>>,
}

impl ConnectionManager {
pub fn new() -> Self {
Self {
active_connections: Arc::new(RwLock::new(HashMap::new())),
}
}

pub async fn register_connection(&self, session_id: Vec, shutdown_tx: mpsc::Sender<()>) {
let session_id_clone = session_id.clone();
let mut connections = self.active_connections.write().await;
connections.insert(session_id, shutdown_tx);
info!(«Connection registered for session: {}», hex::encode(&session_id_clone));
}

pub async fn unregister_connection(&self, session_id: &[u8]) {
let mut connections = self.active_connections.write().await;
connections.remove(session_id);
info!(«Connection unregistered for session: {}», hex::encode(session_id));
}

pub async fn force_disconnect(&self, session_id: &[u8]) {
if let Some(shutdown_tx) = self.active_connections.write().await.remove(session_id) {
let _ = shutdown_tx.send(()).await;
info!(«Forced disconnect for session: {}», hex::encode(session_id));
}
}
}
«`

**Ключевые особенности реализации:**
— Использование `Arc>` обеспечивает потокобезопасный доступ к карте соединений
— Каждый элемент карты содержит канал `mpsc::Sender<()>` для принудительного отключения
— Метод `force_disconnect()` отправляет сигнал через канал, что инициирует корректное завершение обработки соединения

## Основной цикл обработки клиентских соединений

Функция `handle_client_connection()` реализует полный жизненный цикл соединения:

«`rust
pub async fn handle_client_connection(
stream: TcpStream,
peer: SocketAddr,
session_keys: Arc,
dispatcher: Arc,
session_manager: Arc,
heartbeat_manager: Arc,
_connection_manager: Arc,
) -> anyhow::Result<()> {
// Инициализация SecurityAudit и SecurityMetrics
let _ = SecurityAudit::initialize().await;

let timer = SecurityMetrics::processing_time().start_timer();
let (out_tx, out_rx) = mpsc::channel::>(32768);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let (reader, writer) = stream.into_split();

// Регистрируем соединение
_connection_manager.register_connection(
session_keys.session_id.to_vec(),
shutdown_tx
).await;

// Writer task
let writer_task = tokio::spawn(write_task(writer, out_rx));

// Main processing loop с поддержкой принудительного закрытия
let process_result = tokio::select! {
result = process_loop(
reader,
peer,
session_keys.clone(),
dispatcher,
out_tx.clone(),
session_manager.clone(),
) => {
result
}
_ = shutdown_rx.recv() => {
info!(target: «server», «{} forcibly disconnected by heartbeat timeout», peer);
Ok(())
}
};

// Cleanup
writer_task.abort();
session_manager.force_remove_session(&session_keys.session_id).await;
_connection_manager.unregister_connection(&session_keys.session_id).await;
drop(timer);
SecurityMetrics::active_connections().dec();

info!(target: «server», «{} closed», peer);
process_result
}
«`

**Архитектурные паттерны:**
— Разделение потока на читающую и пишущую части (`into_split()`) позволяет независимо управлять вводом и выводом
— Использование `tokio::select!` обеспечивает обработку как нормального потока данных, так и принудительного отключения
— Паттерн RAII (Resource Acquisition Is Initialization) применяется для гарантированного освобождения ресурсов

## Система безопасности: многоуровневая архитектура

### Rate Limiting с микросекундной точностью

Модуль `micro_limiter.rs` реализует высокоточную систему ограничения скорости запросов:

«`rust
pub struct HighPrecisionRateLimiter {
// Существующие счетчики
small_packets: Vec,
medium_packets: Vec,
large_packets: Vec,
control_packets: Vec,

// Новая система отслеживания пакетов по IP
packet_history: std::sync::RwLock>,

// Конфигурация
config: RateLimitConfig,
}

impl HighPrecisionRateLimiter {
const WINDOW_SIZE: usize = 1_000_000;

#[inline(always)]
pub fn check_limit(&self, priority: PacketPriority, now_micros: u64) -> bool {
let (counters, limit_per_second) = match priority {
PacketPriority::HandshakeCritical => (&self.control_packets, self.config.control_packets_per_second),
PacketPriority::SmallHighPriority => (&self.small_packets, self.config.small_packets_per_second),
PacketPriority::DataTransferMedium => (&self.medium_packets, self.config.medium_packets_per_second),
PacketPriority::LargeBackground => (&self.large_packets, self.config.large_packets_per_second),
PacketPriority::HeartbeatLow => (&self.control_packets, self.config.control_packets_per_second),
};

let bucket_idx = (now_micros % Self::WINDOW_SIZE as u64) as usize;
let current_count = counters[bucket_idx].fetch_add(1, Ordering::Relaxed);

// Увеличиваем бёрст-лимит для большей терпимости
let burst_limit = limit_per_second * self.config.burst_window_micros / 1_000_000;
current_count < burst_limit } } ``` **Оптимизации производительности:** - Выравнивание кэша для атомарных счетчиков предотвращает false sharing - Скользящее окно в 1 миллион микросекунд обеспечивает высокую точность - Раздельные счетчики для разных приоритетов пакетов позволяют дифференцированно ограничивать трафик ### Обнаружение аномалий трафика Модуль `anomaly_detector.rs` реализует систему обнаружения подозрительных паттернов: ```rust impl AnomalyDetector { pub fn detect_anomalies(&self, ip_stats: &IPStats, global_stats: &TrafficStats) -> Vec<(AnomalyType, f64)> {
let mut anomalies = Vec::new();

// 1. Проверка частоты пакетов
if ip_stats.packet_rate > self.thresholds.max_packets_per_second {
let severity = (ip_stats.packet_rate / self.thresholds.max_packets_per_second).min(1.0);
anomalies.push((AnomalyType::HighPacketRate, severity));
}

// 2. Проверка burst-трафика
if global_stats.packets_per_second > 0.0 {
let burst_ratio = ip_stats.packet_rate / global_stats.packets_per_second;
if burst_ratio > self.thresholds.max_burst_multiplier {
let severity = (burst_ratio / self.thresholds.max_burst_multiplier).min(1.0);
anomalies.push((AnomalyType::BurstTraffic, severity));
}
}

// 3. Проверка мелких пакетов (симптом DDoS)
if ip_stats.avg_packet_size < 100.0 { let small_packet_ratio = 1.0 - (ip_stats.avg_packet_size / 100.0).min(1.0); if small_packet_ratio > self.thresholds.small_packet_ratio_threshold {
anomalies.push((AnomalyType::SmallPacketFlood, small_packet_ratio));
}
}

// 4. Проверка flood соединений
if ip_stats.connection_count > self.thresholds.max_connections_per_minute {
let severity = (ip_stats.connection_count as f64 / self.thresholds.max_connections_per_minute as f64).min(1.0);
anomalies.push((AnomalyType::ConnectionFlood, severity));
}

anomalies
}
}
«`

**Алгоритмические особенности:**
— Взвешенная система скоринга учитывает несколько факторов одновременно
— Пороги настраиваются через `DetectionThresholds`
— Метод `calculate_anomaly_score()` комбинирует различные типы аномалий в единый показатель

## Парсинг и обработка сетевых пакетов

### Декодирование пакетов с криптографической проверкой

Модуль `packet_parser.rs` реализует полную цепочку валидации и дешифрования:

«`rust
impl PacketParser {
pub fn decode_packet(ctx: &SessionKeys, data: &[u8]) -> ProtocolResult<(u8, Vec)> {
// Проверка минимальной длины пакета
let minimal = 2 + 2 + 1 + NONCE_SIZE + 16 + SIGNATURE_SIZE;
if data.len() < minimal { return Err(ProtocolError::MalformedPacket { details: format!("Packet too short: {} bytes, expected at least {}", data.len(), minimal) }); } // Проверка магических байт if !constant_time_eq(&data[0..2], &HEADER_MAGIC) { return Err(ProtocolError::MalformedPacket { details: "Invalid magic bytes".to_string() }); } // Извлечение длины пакета let length = u16::from_be_bytes([data[2], data[3]]) as usize; if length > MAX_PAYLOAD_SIZE {
return Err(ProtocolError::MalformedPacket {
details: format!(«Packet too large: {} bytes, max: {}», length, MAX_PAYLOAD_SIZE)
});
}

// Проверка HMAC подписи
let hmac_start = 4 + length — SIGNATURE_SIZE;
let signed_part = &data[0..hmac_start];
let signature = &data[hmac_start..hmac_start + SIGNATURE_SIZE];

let mut mac = ::new_from_slice(&ctx.sign_key)
.map_err(|_| ProtocolError::Crypto {
source: CryptoError::InvalidKeyLength {
expected: 32,
actual: ctx.sign_key.len()
}
})?;

mac.update(signed_part);
let computed_tag = mac.finalize().into_bytes();

if !constant_time_eq(&computed_tag, signature) {
return Err(ProtocolError::Crypto {
source: CryptoError::HmacVerificationFailed
});
}

// Дешифрование AES-GCM
let payload = &signed_part[5..]; // Пропускаем header (2+2+1)
let nonce_bytes = &payload[..NONCE_SIZE];
let ciphertext = &payload[NONCE_SIZE..];

let nonce = GenericArray::from_slice(nonce_bytes);
let plaintext = ctx.aead_cipher
.decrypt(
nonce,
Payload {
msg: ciphertext,
aad: &data[0..5], // AAD = магические байты + длина + тип
}
)
.map_err(|e| ProtocolError::Crypto {
source: CryptoError::DecryptionFailed {
reason: e.to_string()
}
})?;

Ok((data[4], plaintext)) // Возвращаем тип пакета и расшифрованные данные
}
}
«`

**Криптографические гарантии:**
— Использование `constant_time_eq` предотвращает timing attacks при сравнении HMAC
— AES-GCM обеспечивает конфиденциальность и аутентичность одновременно
— AAD (Additional Authenticated Data) включает заголовок пакета, что предотвращает подмену метаданных

### Конвейерная обработка пакетов

Модуль `orchestrator.rs` реализует паттерн цепочки обязанностей для обработки пакетов:

«`rust
pub struct PipelineOrchestrator {
stages: Vec>,
}

impl PipelineOrchestrator {
pub fn new() -> Self {
Self { stages: Vec::new() }
}

pub fn add_stage(mut self, stage: S) -> Self {
self.stages.push(Box::new(stage));
self
}

pub async fn execute(&self, mut context: PipelineContext) -> Result, StageError> {
let start_time = std::time::Instant::now();

for (i, stage) in self.stages.iter().enumerate() {
info!(«Executing pipeline stage {}», i + 1);
stage.execute(&mut context).await?;
}

let processing_time = start_time.elapsed();
info!(«Pipeline execution completed in {:?}», processing_time);

context.encrypted_response
.ok_or_else(|| StageError::ProcessingFailed(«No response generated».to_string()))
}
}
«`

**Архитектурные преимущества:**
— Возможность добавлять/удалять стадии обработки без изменения основной логики
— Изоляция ошибок: сбой на одной стадии не приводит к падению всей системы
— Легкость тестирования каждой стадии независимо

## Система управления конфигурацией

Модуль `config.rs` реализует типобезопасную систему конфигурации с поддержкой hot-reload:

«`rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub congestion_control: CongestionConfig,
pub rate_limiting: RateLimitingConfig,
pub reputation: ReputationConfig,
pub auditing: AuditingConfig,
pub test_mode: TestModeConfig,
}

pub struct ConfigManager {
config: RwLock,
config_path: Option,
}

impl ConfigManager {
pub async fn update_config(&self, update_fn: F) -> Result<(), String>
where
F: FnOnce(&mut SecurityConfig),
{
let mut config = self.config.write().await;
update_fn(&mut config);

// Валидация конфигурации
self.validate_config(&config)?;

// Автосохранение при изменении
self.save_config().await?;

Ok(())
}

fn validate_config(&self, config: &SecurityConfig) -> Result<(), String> {
if config.congestion_control.anomaly_threshold_soft >= config.congestion_control.anomaly_threshold_hard {
return Err(«anomaly_threshold_soft must be less than anomaly_threshold_hard».to_string());
}

if config.rate_limiting.max_packets_per_second == 0 {
return Err(«max_packets_per_second must be greater than 0».to_string());
}

Ok(())
}
}
«`

**Особенности реализации:**
— Автоматическая сериализация/десериализация через Serde
— Валидация конфигурации при каждом изменении
— Поддержка атомарных обновлений через замыкания

## Мониторинг и метрики

Модуль `security_metrics.rs` интегрируется с Prometheus для сбора метрик:

«`rust
impl SecurityMetrics {
pub fn register(registry: &Registry) -> anyhow::Result<()> {
let metrics = Self::new()?;
registry.register(Box::new(metrics.replay_attacks.clone()))?;
registry.register(Box::new(metrics.rate_limit_hits.clone()))?;
registry.register(Box::new(metrics.failed_handshakes.clone()))?;
registry.register(Box::new(metrics.successful_sessions.clone()))?;
registry.register(Box::new(metrics.active_connections.clone()))?;
registry.register(Box::new(metrics.processing_time.clone()))?;
registry.register(Box::new(metrics.nonce_cache_size.clone()))?;
registry.register(Box::new(metrics.congestion_accepted.clone()))?;
registry.register(Box::new(metrics.congestion_rate_limited.clone()))?;
registry.register(Box::new(metrics.congestion_banned.clone()))?;
registry.register(Box::new(metrics.system_load.clone()))?;
Ok(())
}
}
«`

**Собранные метрики включают:**
— Количество обнаруженных replay-атак
— Срабатывания rate limiting
— Успешные и неуспешные handshake-процедуры
— Активные соединения и нагрузку системы

## Адаптивные алгоритмы контроля перегрузки

### AIMD алгоритм для динамического регулирования лимитов

Модуль `aimd_algorithm.rs` реализует классический алгоритм Additive Increase/Multiplicative Decrease:

«`rust
impl AIMDAlgorithm {
pub async fn record_success(&self, ip: IpAddr) -> u64 {
let mut states = self.ip_states.write().await;
let state = states.entry(ip).or_insert_with(|| IPLimitState {
current_limit: self.config.initial_limit,
used_this_window: 0,
last_updated: Instant::now(),
success_count: 0,
failure_count: 0,
});

state.success_count += 1;
state.used_this_window += 1;

// Additive Increase: увеличиваем лимит при успехе
if state.success_count >= 10 {
state.current_limit = (state.current_limit + self.config.additive_increase)
.min(self.config.max_limit);
state.success_count = 0;
}

state.current_limit
}

pub async fn record_failure(&self, ip: IpAddr) -> u64 {
let mut states = self.ip_states.write().await;
let state = states.entry(ip).or_insert_with(|| IPLimitState {
current_limit: self.config.initial_limit,
used_this_window: 0,
last_updated: Instant::now(),
success_count: 0,
failure_count: 0,
});

state.failure_count += 1;

// Multiplicative Decrease: уменьшаем лимит при перегрузке
if state.failure_count >= 3 {
state.current_limit = ((state.current_limit as f64) * self.config.multiplicative_decrease) as u64;
state.current_limit = state.current_limit.max(self.config.min_limit);
state.failure_count = 0;
}

state.current_limit
}
}
«`

**Принцип работы:**
— При успешной обработке пакетов лимит постепенно увеличивается (additive increase)
— При обнаружении проблем лимит резко уменьшается (multiplicative decrease)
— Алгоритм автоматически находит оптимальный баланс между пропускной способностью и стабильностью

## Заключение технической документации

Представленная кодовая база демонстрирует современный подход к разработке высоконагруженных сетевых серверов на Rust. Ключевые инженерные решения включают:

1. **Асинхронную архитектуру** на основе Tokio Runtime
2. **Многоуровневую систему безопасности** с адаптивными алгоритмами
3. **Модульную организацию кода** с чёткими границами ответственности
4. **Комплексный мониторинг** и самодиагностику
5. **Конфигурируемость** всех аспектов работы системы

Код следует принципам идиоматического Rust, включая обработку ошибок через `Result`, использование трейт-объектов для полиморфизма и гарантии безопасности через систему владения.