Назад к блогу

Server API

17 Янв, 2026

## Архитектура кодовой базы

Кодовая база TCP Сервера организована в модульную структуру, где каждый файл отвечает за конкретную функциональность. Основные компоненты системы расположены в директории `core/protocol/server/` и взаимодействуют через четко определенные интерфейсы.

## Модуль управления соединениями

### connection_manager_phantom.rs

**Назначение**: Централизованный реестр активных соединений с механизмами принудительного отключения.

«`rust
#[derive(Clone)]
pub struct PhantomConnectionManager {
active_connections: Arc, mpsc::Sender<()>>>>,
}
«`

Ключевой структурой является `PhantomConnectionManager`, который использует `Arc>` для потокобезопасного доступа к хэш-таблице активных соединений. Каждая запись содержит идентификатор сессии (ключ) и канал для отправки сигнала отключения (значение).

«`rust
pub async fn handle_phantom_client_connection(
stream: TcpStream,
peer: SocketAddr,
session: Arc,
phantom_crypto_pool: Arc,
phantom_session_manager: Arc,
connection_manager: Arc,
heartbeat_manager: Arc,
packet_service: Arc,
) -> anyhow::Result<()>
«`

Функция `handle_phantom_client_connection` является точкой входа для обработки нового клиентского соединения. Она выполняет следующие шаги:

1. **Регистрация соединения** в менеджере соединений
2. **Инициализация heartbeat** через менеджер heartbeat
3. **Запуск writer task** для асинхронной отправки данных
4. **Основной цикл обработки** с поддержкой принудительного закрытия
5. **Очистка ресурсов** при завершении соединения

### Механизм heartbeat

«`rust
async fn phantom_write_task(
writer: tokio::net::tcp::OwnedWriteHalf,
heartbeat_manager: Arc,
session_id: Vec,
_peer: SocketAddr,
)
«`

Отдельная задача `phantom_write_task` отвечает за отправку heartbeat-пакетов каждые 30 секунд. Это обеспечивает поддержание активности соединения и позволяет обнаруживать «висящие» подключения.

## Модуль управления сессиями

### session_manager_phantom.rs

**Назначение**: Управление жизненным циклом фантомных сессий с интеграцией системы heartbeat.

«`rust
pub struct PhantomSessionEntry {
pub session: Arc,
pub addr: SocketAddr,
pub created_at: std::time::Instant,
pub last_activity: std::time::Instant,
pub operation_count: u64,
}
«`

Структура `PhantomSessionEntry` хранит полную информацию о сессии, включая временные метки и счетчик операций. Это позволяет реализовать сложную логику очистки неактивных сессий.

«`rust
pub async fn cleanup_expired_sessions(&self, max_age_seconds: u64) -> usize {
let now = std::time::Instant::now();
let max_age = std::time::Duration::from_secs(max_age_seconds);

let mut expired_ids = Vec::new();

{
let sessions = self.sessions.read().await;
for (session_id, entry) in sessions.iter() {
if now.duration_since(entry.created_at) > max_age {
expired_ids.push(session_id.clone());
}
}
}

let count = expired_ids.len();
for session_id in expired_ids {
self.force_remove_session(&session_id).await;
}

if count > 0 {
info!(«👻 Cleaned up {} expired phantom sessions», count);
}

count
}
«`

Метод `cleanup_expired_sessions` демонстрирует подход к очистке устаревших сессий. Важной особенностью является разделение фазы идентификации и удаления для минимизации времени блокировки.

## Основной серверный модуль

### tcp_server_phantom.rs

**Назначение**: Обработка входящих TCP-подключений и координация работы всех подсистем.

«`rust
pub async fn handle_phantom_connection(
mut stream: TcpStream,
peer: SocketAddr,
phantom_config: PhantomConfig,
session_manager: Arc,
connection_manager: Arc,
crypto_pool: Arc,
heartbeat_manager: Arc,
packet_service: Arc,
) -> anyhow::Result<()>
«`

Функция `handle_phantom_connection` представляет полный цикл обработки подключения:

«`rust
// Выполняем фантомный handshake с таймаутом
let handshake_result = match timeout(
Duration::from_secs(30),
perform_phantom_handshake(&mut stream, HandshakeRole::Server)
).await {
Ok(Ok(result)) => {
let handshake_time = handshake_start.elapsed();
info!(target: «server»,
«👻 Phantom handshake successful for {} in {:?}, session: {}»,
peer, handshake_time, hex::encode(&result.session.session_id()));
result
},
Ok(Err(e)) => {
let handshake_time = handshake_start.elapsed();
warn!(target: «server»,
«👻 Phantom handshake failed for {} after {:?}: {}»,
peer, handshake_time, e);
return Ok(());
}
Err(_) => {
error!(target: «server», «👻 Phantom handshake timeout for {}», peer);
return Ok(());
}
};
«`

Криптографическое рукопожатие выполняется с таймаутом 30 секунд. Реализация использует паттерн match для обработки всех возможных исходов: успех, ошибка и таймаут.

## Система безопасности

### security_audit.rs

**Назначение**: Централизованная система аудита безопасности с асинхронной обработкой событий.

«`rust
static GLOBAL_AUDIT: Lazy> = Lazy::new(|| {
Mutex::new(SecurityAudit { tx: None })
});
«`

Используется паттерн singleton с ленивой инициализацией через `once_cell`. Это гарантирует единственный экземпляр системы аудита во всем приложении.

«`rust
pub async fn log_handshake_success(peer: SocketAddr, session_id: [u8; 16]) {
if let Err(e) = Self::send_event(AuditEvent::HandshakeSuccess { peer, session_id }).await {
error!(«Failed to log handshake success: {}», e);
}
}
«`

Каждый метод логирования защищен обработкой ошибок, что предотвращает падение основного приложения при проблемах в системе аудита.

### security_metrics.rs

**Назначение**: Интеграция с Prometheus для сбора метрик безопасности.

«`rust
#[derive(Clone)]
pub struct SecurityMetrics {
replay_attacks: Counter,
rate_limit_hits: Counter,
failed_handshakes: Counter,
successful_sessions: Counter,
active_connections: Gauge,
processing_time: Histogram,
nonce_cache_size: Gauge,
congestion_accepted: Counter,
congestion_rate_limited: Counter,
congestion_banned: Counter,
system_load: Gauge,
}
«`

Структура содержит все метрики безопасности, сгруппированные по назначению. Использование различных типов метрик (Counter, Gauge, Histogram) позволяет точно отражать различные аспекты работы системы.

## Система контроля перегрузки

### controller.rs

**Назначение**: Главный координатор системы контроля перегрузки, принимающий решение о принятии или отклонении пакетов.

«`rust
pub struct CongestionController {
analyzer: Arc,
limiter: Arc,
reputation: Arc,
monitor: Arc,
auth_manager: Arc,
config_manager: Arc,
config: RwLock,
is_enabled: RwLock,
}
«`

Архитектура построена на dependency injection через трейт-объекты, что позволяет легко заменять реализации компонентов.

«`rust
pub async fn should_accept_packet(
&self,
source_ip: IpAddr,
packet_data: &[u8],
packet_priority: PacketPriority,
connection_metrics: &ConnectionMetrics,
session_id: Option>,
) -> Decision
«`

Основной метод `should_accept_packet` реализует многоступенчатую проверку:

1. **Проверка включения системы** — если система отключена, пакет принимается
2. **Валидация входных данных** — проверка IP, размера пакета, ID сессии
3. **Анализ репутации IP** — проверка blacklist/whitelist
4. **Обнаружение аномалий** — анализ паттернов трафика
5. **Принятие финального решения** — на основе всех факторов

### Реализация алгоритма AIMD

#### aimd_algorithm.rs

«`rust
pub async fn record_success(&self, ip: IpAddr) -> u64 {
let mut states = self.ip_states.write().await;
let state = states.entry(ip).or_insert_with(|| IPLimitState {
current_limit: self.config.initial_limit,
used_this_window: 0,
last_updated: Instant::now(),
success_count: 0,
failure_count: 0,
});

state.success_count += 1;
state.used_this_window += 1;

// Additive Increase: увеличиваем лимит при успехе
if state.success_count >= 10 { // Каждые 10 успехов
state.current_limit = (state.current_limit + self.config.additive_increase)
.min(self.config.max_limit);
state.success_count = 0;
}

state.current_limit
}
«`

Алгоритм AIMD (Additive Increase/Multiplicative Decrease) реализован через два метода: `record_success` для постепенного увеличения лимитов и `record_failure` для резкого уменьшения при проблемах.

## Система обработки пакетов

### dispatcher.rs

**Назначение**: Распределение задач обработки между рабочими потоками с поддержкой приоритетов.

«`rust
pub struct Dispatcher {
tx: mpsc::Sender,
phantom_crypto_pool: Arc,
}

pub struct Work {
pub ctx: Arc,
pub raw_payload: Vec,
pub client_ip: SocketAddr,
pub reply: oneshot::Sender>,
pub received_at: Instant,
pub priority: Priority,
pub is_large: bool,
}
«`

Архитектура «диспетчер-воркеры» использует каналы Tokio для передачи задач. Каждая задача содержит всю необходимую контекстную информацию и канал для отправки ответа.

«`rust
impl Dispatcher {
pub fn spawn(
num_workers: usize,
phantom_crypto_pool: Arc,
phantom_packet_service: Arc,
) -> Self {
let (tx, rx) = mpsc::channel::(65536);
let rx = Arc::new(Mutex::new(rx));

info!(«🚀 Starting Dispatcher with {} workers», num_workers);
info!(» — Crypto pool: {}», phantom_crypto_pool.get_stats());
info!(» — Max queue size: 65536″);

for worker_id in 0..num_workers {
let rx = Arc::clone(&rx);
let phantom_crypto_pool = Arc::clone(&phantom_crypto_pool);
let phantom_packet_service = Arc::clone(&phantom_packet_service);

tokio::spawn(async move {
let mut worker = DispatcherWorker::new(
worker_id,
rx,
phantom_crypto_pool,
phantom_packet_service
);
worker.run().await;
});
}

Dispatcher {
tx,
phantom_crypto_pool
}
}
}
«`

Метод `spawn` создает указанное количество воркеров, каждый из которых работает в отдельной задаче Tokio. Размер очереди ограничен 65536 задачами для предотвращения неограниченного роста памяти.

### packet_service.rs

**Назначение**: Бизнес-логика обработки различных типов пакетов.

«`rust
pub async fn process_packet(
&self,
session: Arc,
packet_type: u8,
payload: Vec,
client_ip: SocketAddr,
) -> Result>
«`

Основной метод `process_packet` использует match-выражение для маршрутизации обработки в зависимости от типа пакета:

«`rust
let response_data = match packet_type {
0x01 => { // Ping packet
let ping_start = Instant::now();
let result = self.handle_ping(payload, session.clone(), client_ip).await?;
let ping_time = ping_start.elapsed();
debug!(«Ping processing took {:?}», ping_time);
result
}
0x10 => { // Heartbeat packet
let heartbeat_start = Instant::now();
let result = self.handle_heartbeat(session.session_id(), client_ip).await?;
let heartbeat_time = heartbeat_start.elapsed();
debug!(«Heartbeat processing took {:?}», heartbeat_time);
result
}
_ => {
let unknown_start = Instant::now();
let result = self.handle_unknown_packet(packet_type, payload, session.clone(), client_ip).await?;
let unknown_time = unknown_start.elapsed();
warn!(«Unknown packet processing took {:?}», unknown_time);
result
}
};
«`

Каждый тип пакета обрабатывается в отдельном методе, что обеспечивает чистоту кода и легкость расширения.

## Пайплайн обработки

### orchestrator.rs

**Назначение**: Координация выполнения последовательности этапов обработки.

«`rust
pub struct PipelineOrchestrator {
stages: Vec>,
}

impl PipelineOrchestrator {
pub fn new() -> Self {
Self { stages: Vec::new() }
}

pub fn add_stage(mut self, stage: S) -> Self {
self.stages.push(Box::new(stage));
self
}

pub async fn execute(&self, mut context: PipelineContext) -> Result, StageError> {
let start_time = std::time::Instant::now();

for (i, stage) in self.stages.iter().enumerate() {
info!(«Executing pipeline stage {}», i + 1);
stage.execute(&mut context).await?;
}

let processing_time = start_time.elapsed();
info!(«Pipeline execution completed in {:?}», processing_time);

context.encrypted_response
.ok_or_else(|| StageError::ProcessingFailed(«No response generated».to_string()))
}
}
«`

Паттерн «строитель» (builder pattern) позволяет гибко конфигурировать пайплайн через цепочку вызовов `add_stage`.

### Этапы пайплайна

#### decryption.rs

«`rust
pub struct PhantomDecryptionStage {
crypto_pool: Arc,
}

#[async_trait]
impl PipelineStage for PhantomDecryptionStage {
async fn execute(&self, context: &mut PipelineContext) -> Result<(), StageError> {
info!(«Decrypting phantom packet for session {}»,
hex::encode(context.phantom_session.session_id()));

let result = self.crypto_pool.decrypt(
context.phantom_session.clone(),
context.raw_payload.clone()
).await
.map_err(|e| StageError::DecryptionFailed(e.to_string()))?;

context.packet_type = Some(result.0);
context.decrypted_data = Some(result.1);

info!(«Successfully decrypted phantom packet type: 0x{:02X}», result.0);
Ok(())
}
}
«`

Этап дешифрования использует фантомный криптопул для обработки зашифрованных данных. Ошибки преобразуются в типизированные `StageError`.

#### processing.rs

«`rust
pub struct PhantomProcessingStage {
packet_service: Arc,
client_ip: SocketAddr,
}

#[async_trait]
impl PipelineStage for PhantomProcessingStage {
async fn execute(&self, context: &mut PipelineContext) -> Result<(), StageError> {
let packet_type = context.packet_type
.ok_or_else(|| StageError::ProcessingFailed(«No packet type available».to_string()))?;

let decrypted_data = context.decrypted_data
.take()
.ok_or_else(|| StageError::ProcessingFailed(«No decrypted data available».to_string()))?;

info!(«Processing phantom packet type: 0x{:02X} from {}»,
packet_type, self.client_ip);

let processing_result = self.packet_service
.process_packet(
context.phantom_session.clone(),
packet_type,
decrypted_data,
self.client_ip,
)
.await
.map_err(|e| StageError::ProcessingFailed(e.to_string()))?;

context.processed_data = Some(processing_result.response);
Ok(())
}
}
«`

Этап обработки делегирует бизнес-логику `PhantomPacketService`. Важной особенностью является использование `take()` для перемещения данных из контекста, что предотвращает их повторное использование.

#### encryption.rs

«`rust
pub struct PhantomEncryptionStage {
response_packet_type: u8,
crypto_pool: Arc,
}

#[async_trait]
impl PipelineStage for PhantomEncryptionStage {
async fn execute(&self, context: &mut PipelineContext) -> Result<(), StageError> {
let processed_data = context.processed_data
.take()
.ok_or_else(|| StageError::EncryptionFailed(«No processed data available».to_string()))?;

info!(«Encrypting phantom response of {} bytes», processed_data.len());

let encrypted_response = self.crypto_pool.encrypt(
context.phantom_session.clone(),
self.response_packet_type,
processed_data,
).await
.map_err(|e| StageError::EncryptionFailed(e.to_string()))?;

context.encrypted_response = Some(encrypted_response);
Ok(())
}
}
«`

Завершающий этап шифрует подготовленный ответ перед отправкой клиенту. Тип пакета ответа задается при создании этапа.

## Система репутации IP

### reputation_manager.rs

**Назначение**: Управление репутацией IP-адресов с автоматическим blacklisting.

«`rust
pub struct ReputationManagerImpl {
ip_database: IPDatabase,
offense_tracker: OffenseTracker,
scoring: IPScoring,
auto_blacklist_enabled: RwLock,
cache: ReputationCache,
}
«`

Композитная структура объединяет несколько компонентов:
— `IPDatabase` — постоянное хранилище записей об IP
— `OffenseTracker` — отслеживание нарушений во временном окне
— `IPScoring` — расчет репутационных scores
— `ReputationCache` — кэш для ускорения доступа

«`rust
#[async_trait]
impl ReputationManager for ReputationManagerImpl {
async fn get_score(&self, ip: IpAddr) -> ReputationScore {
// Пытаемся получить из кэша
if let Some(cached_score) = self.cache.get(ip).await {
return cached_score;
}

// Создаем или получаем запись IP
let ip_record = match self.ip_database.get_or_create_record(ip).await {
Ok(record) => record,
Err(e) => {
error!(«Failed to get/create record for {}: {}», ip, e);
// Возвращаем нейтральный score при ошибке
return ReputationScore {
score: 0.5,
offenses: 0,
last_offense: None,
is_whitelisted: false,
is_blacklisted: false,
};
}
};

// … расчет score …

// Сохраняем в кэш
self.cache.put(ip, reputation_score.clone()).await;
reputation_score
}
}
«`

Метод `get_score` демонстрирует многоуровневую архитектуру с кэшированием. При ошибках доступа к данным возвращается нейтральный score вместо паники.

## Вспомогательные модули

### frame_reader.rs и frame_writer.rs

**Назначение**: Чтение и запись структурированных сетевых кадров.

«`rust
pub async fn read_frame(reader: &mut R) -> Result>
where
R: AsyncRead + Unpin,
{
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
reader.read_exact(&mut data).await?;
Ok(data)
}
«`

Функция `read_frame` читает кадры формата «длина + данные», где длина кодируется 4 байтами в big-endian. Такой формат обеспечивает надежное разделение сообщений в потоке.

«`rust
pub async fn write_frame(writer: &mut W, data: &[u8]) -> anyhow::Result<()>
where
W: AsyncWrite + Unpin,
{
let len = (data.len() as u32).to_be_bytes();
writer.write_all(&len).await?;
writer.write_all(data).await?;
writer.flush().await?;
Ok(())
}
«`

Соответствующая функция `write_frame` записывает данные в том же формате. Вызов `flush()` гарантирует, что данные действительно отправлены в сеть.

## Конфигурация и валидация

### config.rs

**Назначение**: Централизованное управление настройками безопасности.

«`rust
pub struct ConfigManager {
config: RwLock,
config_path: Option,
}

impl ConfigManager {
pub async fn update_config(&self, update_fn: F) -> Result<(), String>
where
F: FnOnce(&mut SecurityConfig),
{
let mut config = self.config.write().await;
update_fn(&mut config);

// Валидация конфигурации
self.validate_config(&config)?;

// Автосохранение при изменении
self.save_config().await?;

Ok(())
}

fn validate_config(&self, config: &SecurityConfig) -> Result<(), String> {
if config.congestion_control.anomaly_threshold_soft >=
config.congestion_control.anomaly_threshold_hard {
return Err(«anomaly_threshold_soft must be less than anomaly_threshold_hard».to_string());
}

if config.rate_limiting.max_packets_per_second == 0 {
return Err(«max_packets_per_second must be greater than 0».to_string());
}

Ok(())
}
}
«`

`ConfigManager` обеспечивает потокобезопасное обновление конфигурации с автоматической валидацией и сохранением. Паттерн «замыкание для обновления» позволяет выполнять атомарные изменения.

### validation.rs

**Назначение**: Валидация входных данных с учетом контекста безопасности.

«`rust
impl InputValidator {
pub fn validate_ip(ip: IpAddr, allow_private_ips: bool) -> Result<(), String> {
if ip.is_unspecified() {
return Err(«Unspecified IP address is not allowed».to_string());
}

if ip.is_multicast() {
return Err(«Multicast IP address is not allowed».to_string());
}

if ip.is_loopback() && !allow_private_ips {
return Err(«Loopback IP address is not allowed».to_string());
}

if !allow_private_ips {
if let IpAddr::V4(ipv4) = ip {
if Self::is_private_ipv4(ipv4) {
return Err(«Private IP address is not allowed».to_string());
}
}

if let IpAddr::V6(ipv6) = ip {
if Self::is_private_ipv6(ipv6) {
return Err(«Private IPv6 address is not allowed».to_string());
}
}
}

Ok(())
}
}
«`

Валидатор IP-адресов проверяет различные классы адресов, которые могут представлять угрозу безопасности. Параметр `allow_private_ips` позволяет гибко настраивать политику в тестовой среде.

## Паттерны проектирования в коде

### Стратегия (Strategy Pattern)

«`rust
#[async_trait]
pub trait TrafficAnalyzer: Send + Sync {
async fn analyze_packet(
&self,
packet: &PacketInfo,
metrics: &ConnectionMetrics,
) -> AnomalyScore;

async fn get_system_load(&self) -> LoadLevel;
}
«`

Трейты `TrafficAnalyzer`, `AdaptiveLimiter`, `ReputationManager` позволяют динамически заменять алгоритмы без изменения кода контроллера.

### Наблюдатель (Observer Pattern)

«`rust
pub enum AuditEvent {
HandshakeSuccess { peer: SocketAddr, session_id: [u8; 16] },
HandshakeFailure { peer: SocketAddr, reason: String },
ReplayAttack { peer: SocketAddr, nonce: [u8; 12] },
}
«`

Система аудита использует паттерн наблюдатель для рассылки событий безопасности всем заинтересованным компонентам.

### Цепочка обязанностей (Chain of Responsibility)

«`rust
pub struct PipelineOrchestrator {
stages: Vec>,
}
«`

Пайплайн обработки пакетов реализует цепочку обязанностей, где каждый этап может обработать запрос или передать его следующему.

## Оптимизации производительности

### Атомарные счетчики с выравниванием кэша

«`rust
#[repr(align(64))]
pub struct CacheAlignedAtomic(AtomicU64);

pub struct StripedCounter {
stripes: Vec,
stripe_count: usize,
}
«`

Структура `CacheAlignedAtomic` обеспечивает выравнивание по границе кэш-линии (64 байта), что предотвращает false sharing в многопоточных сценариях.

### LRU-кэширование

«`rust
pub struct ReputationCache {
cache: RwLock>,
ttl: Duration,
}
«`

Кэш репутации использует алгоритм LRU (Least Recently Used) для эффективного управления памятью и автоматического удаления устаревших записей.

### Пул криптографических контекстов

«`rust
pub struct PhantomCryptoPool {
contexts: RwLock, PhantomCryptoContext>>,
max_size: usize,
}
«`

Пул криптографических контекстов позволяет повторно использовать дорогие в создании объекты, уменьшая нагрузку на сборщик мусора и улучшая производительность.

## Заключение

Кодовая база TCP Сервера демонстрирует современные подходы к разработке высоконагруженных сетевых приложений на Rust. Архитектура построена вокруг принципов модульности, потокобезопасности и отказоустойчивости. Использование асинхронного программирования через Tokio в сочетании с продвинутыми структурами данных позволяет системе эффективно масштабироваться и обрабатывать тысячи одновременных соединений.

Ключевые сильные стороны реализации:
— **Четкое разделение ответственности** между модулями
— **Комплексная система безопасности** с защитой в глубину
— **Адаптивные алгоритмы** для обработки изменяющейся нагрузки
— **Детальный мониторинг и аудит** всех аспектов работы
— **Оптимизации производительности** на всех уровнях стека

Код служит отличной основой для построения промышленных систем обработки сетевого трафика с высокими требованиями к безопасности и производительности.