데이터 동기화 전략, 클라우드 모델 업데이트, 페더레이티드 러닝을 통한 분산 AI 시스템 구축
엣지-클라우드 하이브리드 아키텍처는 엣지의 실시간 추론 능력과 클라우드의 컴퓨팅 자원을 결합합니다. 지연시간이 중요한 추론은 엣지에서, 대규모 학습과 분석은 클라우드에서 수행합니다.
엣지와 클라우드 간 데이터 동기화는 네트워크 대역폭, 지연시간, 데이터 중요도를 고려하여 최적의 전략을 선택해야 합니다.
알람, 이상 탐지 결과 등 즉시 전송이 필요한 데이터. MQTT/WebSocket 활용
집계된 메트릭, 통계 데이터. 주기적 배치 전송으로 대역폭 최적화
원본 이미지, 로그 파일 등. 조건부 업로드 또는 요청 시에만 전송
import asyncio import json from dataclasses import dataclass, asdict from enum import Enum from typing import List, Optional from datetime import datetime import aiohttp import aiomqtt class DataPriority(Enum): HOT = 1 # 즉시 전송 WARM = 2 # 배치 전송 COLD = 3 # 선택적 전송 @dataclass class SyncData: data_id: str priority: DataPriority payload: dict timestamp: datetime size_bytes: int class EdgeCloudSync: """엣지-클라우드 데이터 동기화 관리자""" def __init__( self, mqtt_broker: str, cloud_api_url: str, batch_interval_sec: int = 60, batch_max_size: int = 100 ): self.mqtt_broker = mqtt_broker self.cloud_api_url = cloud_api_url self.batch_interval = batch_interval_sec self.batch_max_size = batch_max_size # 배치 버퍼 self.warm_buffer: List[SyncData] = [] self.cold_buffer: List[SyncData] = [] # 연결 상태 self.is_connected = False self.offline_queue: List[SyncData] = [] async def start(self): """동기화 서비스 시작""" await asyncio.gather( self._mqtt_loop(), self._batch_upload_loop(), self._connection_monitor() ) async def sync(self, data: SyncData): """데이터 동기화 (우선순위 기반)""" if data.priority == DataPriority.HOT: await self._send_realtime(data) elif data.priority == DataPriority.WARM: self.warm_buffer.append(data) if len(self.warm_buffer) >= self.batch_max_size: await self._flush_warm_buffer() else: self.cold_buffer.append(data) async def _send_realtime(self, data: SyncData): """실시간 MQTT 전송""" if not self.is_connected: self.offline_queue.append(data) return try: async with aiomqtt.Client(self.mqtt_broker) as client: topic = f"edge/{data.data_id}/realtime" payload = json.dumps(asdict(data), default=str) await client.publish(topic, payload) except Exception as e: self.offline_queue.append(data) async def _flush_warm_buffer(self): """배치 업로드""" if not self.warm_buffer: return batch = self.warm_buffer.copy() self.warm_buffer.clear() try: async with aiohttp.ClientSession() as session: payload = [asdict(d) for d in batch] async with session.post( f"{self.cloud_api_url}/batch", json=payload ) as resp: if resp.status != 200: self.warm_buffer.extend(batch) # 재시도 except Exception: self.warm_buffer.extend(batch) async def _batch_upload_loop(self): """주기적 배치 업로드""" while True: await asyncio.sleep(self.batch_interval) await self._flush_warm_buffer()
클라우드에서 학습된 새 모델을 엣지 디바이스에 안전하게 배포하는 것은 하이브리드 시스템의 핵심 과제입니다. A/B 테스트, 점진적 롤아웃, 롤백 전략이 필요합니다.
import hashlib import os from dataclasses import dataclass from typing import Optional, Callable from enum import Enum class UpdateStrategy(Enum): IMMEDIATE = "immediate" # 즉시 적용 CANARY = "canary" # 일부 노드 먼저 BLUE_GREEN = "blue_green" # 무중단 전환 SHADOW = "shadow" # 병렬 실행 비교 @dataclass class ModelVersion: version: str checksum: str download_url: str metadata: dict min_accuracy: float class ModelUpdateManager: """엣지 모델 업데이트 관리자""" def __init__( self, model_dir: str, cloud_registry_url: str, strategy: UpdateStrategy = UpdateStrategy.CANARY ): self.model_dir = model_dir self.registry_url = cloud_registry_url self.strategy = strategy self.current_version: Optional[ModelVersion] = None self.previous_version: Optional[ModelVersion] = None self.pending_version: Optional[ModelVersion] = None async def check_for_updates(self) -> Optional[ModelVersion]: """새 버전 확인""" async with aiohttp.ClientSession() as session: async with session.get( f"{self.registry_url}/latest" ) as resp: data = await resp.json() latest = ModelVersion(**data) if (self.current_version is None or latest.version != self.current_version.version): return latest return None async def download_model(self, version: ModelVersion) -> str: """모델 다운로드 및 검증""" local_path = os.path.join(self.model_dir, f"model_{version.version}.engine") # 다운로드 async with aiohttp.ClientSession() as session: async with session.get(version.download_url) as resp: with open(local_path, "wb") as f: async for chunk in resp.content.iter_chunked(8192): f.write(chunk) # 체크섬 검증 if not self._verify_checksum(local_path, version.checksum): os.remove(local_path) raise ValueError("Checksum verification failed") return local_path async def apply_update( self, version: ModelVersion, model_path: str, validate_fn: Callable[[str], float] ) -> bool: """업데이트 적용 (전략에 따라)""" if self.strategy == UpdateStrategy.SHADOW: # Shadow 모드: 병렬 실행 후 정확도 비교 new_accuracy = validate_fn(model_path) if new_accuracy < version.min_accuracy: return False elif self.strategy == UpdateStrategy.BLUE_GREEN: # Blue-Green: 새 모델 로드 후 원자적 전환 self.pending_version = version # 실제 전환은 외부에서 atomic_switch() 호출 # 버전 이력 관리 self.previous_version = self.current_version self.current_version = version return True async def rollback(self) -> bool: """이전 버전으로 롤백""" if self.previous_version is None: return False self.current_version, self.previous_version = \ self.previous_version, self.current_version return True def _verify_checksum(self, path: str, expected: str) -> bool: """SHA256 체크섬 검증""" sha256 = hashlib.sha256() with open(path, "rb") as f: for block in iter(lambda: f.read(65536), b""): sha256.update(block) return sha256.hexdigest() == expected
페더레이티드 러닝(Federated Learning)은 원본 데이터를 중앙 서버로 전송하지 않고, 각 엣지에서 로컬 학습을 수행한 후 모델 가중치만 집계하는 분산 학습 방식입니다.
import torch import torch.nn as nn from typing import List, Dict import copy class FederatedClient: """엣지 디바이스의 페더레이티드 클라이언트""" def __init__( self, model: nn.Module, local_data_loader, client_id: str, local_epochs: int = 5 ): self.model = model self.data_loader = local_data_loader self.client_id = client_id self.local_epochs = local_epochs self.optimizer = torch.optim.SGD(model.parameters(), lr=0.01) def receive_global_model(self, global_weights: Dict): """글로벌 모델 수신""" self.model.load_state_dict(global_weights) def local_train(self) -> Dict: """로컬 학습 수행""" self.model.train() criterion = nn.CrossEntropyLoss() for epoch in range(self.local_epochs): for data, target in self.data_loader: self.optimizer.zero_grad() output = self.model(data) loss = criterion(output, target) loss.backward() self.optimizer.step() # 학습된 가중치 반환 return copy.deepcopy(self.model.state_dict()) def get_gradient_update(self, global_weights: Dict) -> Dict: """가중치 변화량 (gradient) 계산""" local_weights = self.model.state_dict() gradient = {} for key in local_weights: gradient[key] = local_weights[key] - global_weights[key] return gradient class FederatedServer: """중앙 집계 서버""" def __init__(self, model: nn.Module): self.global_model = model self.global_weights = copy.deepcopy(model.state_dict()) self.round_num = 0 def aggregate( self, client_weights: List[Dict], client_data_sizes: List[int] ) -> Dict: """FedAvg 알고리즘으로 가중치 집계""" total_size = sum(client_data_sizes) aggregated = {} for key in self.global_weights: aggregated[key] = torch.zeros_like(self.global_weights[key]) for weights, size in zip(client_weights, client_data_sizes): # 데이터 크기에 비례한 가중 평균 weight = size / total_size aggregated[key] += weights[key] * weight self.global_weights = aggregated self.global_model.load_state_dict(aggregated) self.round_num += 1 return aggregated def differential_privacy_aggregate( self, client_weights: List[Dict], noise_scale: float = 0.1 ) -> Dict: """차등 프라이버시가 적용된 집계""" aggregated = self.aggregate( client_weights, [1] * len(client_weights) ) # 가우시안 노이즈 추가 for key in aggregated: noise = torch.randn_like(aggregated[key]) * noise_scale aggregated[key] += noise self.global_weights = aggregated return aggregated
페더레이티드 러닝은 데이터 프라이버시를 보장하면서 분산 학습이 가능하지만, Non-IID 데이터, 통신 비용, 시스템 이질성 등의 도전 과제가 있습니다.
네트워크 연결이 불안정한 제조 환경에서도 엣지 AI 시스템은 독립적으로 동작해야 합니다. 오프라인 큐잉과 자동 재연결 메커니즘이 필수적입니다.
import sqlite3 import pickle from pathlib import Path class OfflineQueueManager: """오프라인 데이터 큐 관리""" def __init__(self, db_path: str = "offline_queue.db"): self.db_path = Path(db_path) self._init_db() def _init_db(self): """SQLite DB 초기화""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, priority INTEGER, data BLOB, timestamp REAL, retry_count INTEGER DEFAULT 0 ) """) def enqueue(self, data: SyncData): """데이터를 로컬 큐에 저장""" with sqlite3.connect(self.db_path) as conn: conn.execute( "INSERT INTO queue (priority, data, timestamp) VALUES (?, ?, ?)", (data.priority.value, pickle.dumps(data), data.timestamp.timestamp()) ) def dequeue_batch(self, limit: int = 100) -> List[SyncData]: """우선순위 순으로 배치 추출""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "SELECT id, data FROM queue ORDER BY priority, timestamp LIMIT ?", (limit,) ) rows = cursor.fetchall() data_list = [] ids = [] for row_id, data_blob in rows: data_list.append(pickle.loads(data_blob)) ids.append(row_id) # 추출한 항목 삭제 if ids: conn.execute( f"DELETE FROM queue WHERE id IN ({','.join('?' * len(ids))})", ids ) return data_list def get_queue_size(self) -> int: """대기 중인 항목 수""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute("SELECT COUNT(*) FROM queue") return cursor.fetchone()[0] class HybridSyncManager(EdgeCloudSync): """오프라인 지원 하이브리드 동기화""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.offline_queue = OfflineQueueManager() async def sync(self, data: SyncData): """연결 상태에 따른 동기화""" if self.is_connected: try: await super().sync(data) except Exception: self.offline_queue.enqueue(data) else: self.offline_queue.enqueue(data) async def flush_offline_queue(self): """오프라인 큐 비우기 (연결 복구 시)""" while self.offline_queue.get_queue_size() > 0: batch = self.offline_queue.dequeue_batch() for data in batch: try: await super().sync(data) except Exception: self.offline_queue.enqueue(data) break
엣지와 클라우드 전체 시스템의 상태를 통합 모니터링하여 이상 징후를 조기에 감지하고 대응합니다.
from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, List import statistics @dataclass class EdgeNodeStatus: node_id: str is_online: bool last_heartbeat: datetime model_version: str inference_fps: float queue_depth: int cpu_usage: float gpu_usage: float memory_usage: float class HybridSystemMonitor: """하이브리드 시스템 통합 모니터링""" def __init__(self, heartbeat_timeout_sec: int = 60): self.heartbeat_timeout = timedelta(seconds=heartbeat_timeout_sec) self.nodes: Dict[str, EdgeNodeStatus] = {} self.alerts: List[dict] = [] def update_node_status(self, status: EdgeNodeStatus): """노드 상태 업데이트""" self.nodes[status.node_id] = status self._check_alerts(status) def _check_alerts(self, status: EdgeNodeStatus): """알람 조건 확인""" if status.queue_depth > 1000: self._create_alert( status.node_id, "HIGH_QUEUE_DEPTH", f"Queue depth: {status.queue_depth}" ) if status.inference_fps < 10: self._create_alert( status.node_id, "LOW_FPS", f"Inference FPS: {status.inference_fps}" ) if status.gpu_usage > 95: self._create_alert( status.node_id, "HIGH_GPU_USAGE", f"GPU usage: {status.gpu_usage}%" ) def get_system_summary(self) -> dict: """시스템 전체 요약""" online_nodes = [n for n in self.nodes.values() if n.is_online] return { "total_nodes": len(self.nodes), "online_nodes": len(online_nodes), "avg_fps": statistics.mean([n.inference_fps for n in online_nodes]) if online_nodes else 0, "total_queue_depth": sum(n.queue_depth for n in online_nodes), "active_alerts": len(self.alerts), "model_versions": list(set(n.model_version for n in online_nodes)) }