Chapter 8 8 / 10

엣지-클라우드 하이브리드

데이터 동기화 전략, 클라우드 모델 업데이트, 페더레이티드 러닝을 통한 분산 AI 시스템 구축

1하이브리드 아키텍처 개요

엣지-클라우드 하이브리드 아키텍처는 엣지의 실시간 추론 능력과 클라우드의 컴퓨팅 자원을 결합합니다. 지연시간이 중요한 추론은 엣지에서, 대규모 학습과 분석은 클라우드에서 수행합니다.

Edge-Cloud Hybrid Architecture
Cloud Layer
Model Registry
Data Lake
Training Server
Dashboard
Data/Logs
Model/Config Updates
Edge Gateway
Sync Manager
Buffer Queue
Local Cache
▲ ▼
Edge AI Node 1
Edge AI Node 2
Edge AI Node 3
Edge AI Node N

E엣지 처리 (Edge)

  • 실시간 추론 (<20ms)
  • 로컬 데이터 전처리
  • 오프라인 동작 지원
  • 민감 데이터 로컬 처리

C클라우드 처리 (Cloud)

  • 대규모 모델 학습
  • 장기 데이터 분석
  • 중앙 모니터링
  • 모델 버전 관리

2데이터 동기화 전략

엣지와 클라우드 간 데이터 동기화는 네트워크 대역폭, 지연시간, 데이터 중요도를 고려하여 최적의 전략을 선택해야 합니다.

1

실시간 스트리밍 (Hot Data)

알람, 이상 탐지 결과 등 즉시 전송이 필요한 데이터. MQTT/WebSocket 활용

2

배치 업로드 (Warm Data)

집계된 메트릭, 통계 데이터. 주기적 배치 전송으로 대역폭 최적화

3

선택적 동기화 (Cold Data)

원본 이미지, 로그 파일 등. 조건부 업로드 또는 요청 시에만 전송

import asyncio
import json
from dataclasses import dataclass, asdict
from enum import Enum
from typing import List, Optional
from datetime import datetime
import aiohttp
import aiomqtt

class DataPriority(Enum):
    HOT = 1    # 즉시 전송
    WARM = 2   # 배치 전송
    COLD = 3   # 선택적 전송

@dataclass
class SyncData:
    data_id: str
    priority: DataPriority
    payload: dict
    timestamp: datetime
    size_bytes: int

class EdgeCloudSync:
    """엣지-클라우드 데이터 동기화 관리자"""

    def __init__(
        self,
        mqtt_broker: str,
        cloud_api_url: str,
        batch_interval_sec: int = 60,
        batch_max_size: int = 100
    ):
        self.mqtt_broker = mqtt_broker
        self.cloud_api_url = cloud_api_url
        self.batch_interval = batch_interval_sec
        self.batch_max_size = batch_max_size

        # 배치 버퍼
        self.warm_buffer: List[SyncData] = []
        self.cold_buffer: List[SyncData] = []

        # 연결 상태
        self.is_connected = False
        self.offline_queue: List[SyncData] = []

    async def start(self):
        """동기화 서비스 시작"""
        await asyncio.gather(
            self._mqtt_loop(),
            self._batch_upload_loop(),
            self._connection_monitor()
        )

    async def sync(self, data: SyncData):
        """데이터 동기화 (우선순위 기반)"""
        if data.priority == DataPriority.HOT:
            await self._send_realtime(data)
        elif data.priority == DataPriority.WARM:
            self.warm_buffer.append(data)
            if len(self.warm_buffer) >= self.batch_max_size:
                await self._flush_warm_buffer()
        else:
            self.cold_buffer.append(data)

    async def _send_realtime(self, data: SyncData):
        """실시간 MQTT 전송"""
        if not self.is_connected:
            self.offline_queue.append(data)
            return

        try:
            async with aiomqtt.Client(self.mqtt_broker) as client:
                topic = f"edge/{data.data_id}/realtime"
                payload = json.dumps(asdict(data), default=str)
                await client.publish(topic, payload)
        except Exception as e:
            self.offline_queue.append(data)

    async def _flush_warm_buffer(self):
        """배치 업로드"""
        if not self.warm_buffer:
            return

        batch = self.warm_buffer.copy()
        self.warm_buffer.clear()

        try:
            async with aiohttp.ClientSession() as session:
                payload = [asdict(d) for d in batch]
                async with session.post(
                    f"{self.cloud_api_url}/batch",
                    json=payload
                ) as resp:
                    if resp.status != 200:
                        self.warm_buffer.extend(batch)  # 재시도
        except Exception:
            self.warm_buffer.extend(batch)

    async def _batch_upload_loop(self):
        """주기적 배치 업로드"""
        while True:
            await asyncio.sleep(self.batch_interval)
            await self._flush_warm_buffer()

3모델 업데이트 전략

클라우드에서 학습된 새 모델을 엣지 디바이스에 안전하게 배포하는 것은 하이브리드 시스템의 핵심 과제입니다. A/B 테스트, 점진적 롤아웃, 롤백 전략이 필요합니다.

import hashlib
import os
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum

class UpdateStrategy(Enum):
    IMMEDIATE = "immediate"      # 즉시 적용
    CANARY = "canary"            # 일부 노드 먼저
    BLUE_GREEN = "blue_green"    # 무중단 전환
    SHADOW = "shadow"            # 병렬 실행 비교

@dataclass
class ModelVersion:
    version: str
    checksum: str
    download_url: str
    metadata: dict
    min_accuracy: float

class ModelUpdateManager:
    """엣지 모델 업데이트 관리자"""

    def __init__(
        self,
        model_dir: str,
        cloud_registry_url: str,
        strategy: UpdateStrategy = UpdateStrategy.CANARY
    ):
        self.model_dir = model_dir
        self.registry_url = cloud_registry_url
        self.strategy = strategy

        self.current_version: Optional[ModelVersion] = None
        self.previous_version: Optional[ModelVersion] = None
        self.pending_version: Optional[ModelVersion] = None

    async def check_for_updates(self) -> Optional[ModelVersion]:
        """새 버전 확인"""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.registry_url}/latest"
            ) as resp:
                data = await resp.json()
                latest = ModelVersion(**data)

                if (self.current_version is None or
                    latest.version != self.current_version.version):
                    return latest
        return None

    async def download_model(self, version: ModelVersion) -> str:
        """모델 다운로드 및 검증"""
        local_path = os.path.join(self.model_dir, f"model_{version.version}.engine")

        # 다운로드
        async with aiohttp.ClientSession() as session:
            async with session.get(version.download_url) as resp:
                with open(local_path, "wb") as f:
                    async for chunk in resp.content.iter_chunked(8192):
                        f.write(chunk)

        # 체크섬 검증
        if not self._verify_checksum(local_path, version.checksum):
            os.remove(local_path)
            raise ValueError("Checksum verification failed")

        return local_path

    async def apply_update(
        self,
        version: ModelVersion,
        model_path: str,
        validate_fn: Callable[[str], float]
    ) -> bool:
        """업데이트 적용 (전략에 따라)"""

        if self.strategy == UpdateStrategy.SHADOW:
            # Shadow 모드: 병렬 실행 후 정확도 비교
            new_accuracy = validate_fn(model_path)
            if new_accuracy < version.min_accuracy:
                return False

        elif self.strategy == UpdateStrategy.BLUE_GREEN:
            # Blue-Green: 새 모델 로드 후 원자적 전환
            self.pending_version = version
            # 실제 전환은 외부에서 atomic_switch() 호출

        # 버전 이력 관리
        self.previous_version = self.current_version
        self.current_version = version

        return True

    async def rollback(self) -> bool:
        """이전 버전으로 롤백"""
        if self.previous_version is None:
            return False

        self.current_version, self.previous_version = \
            self.previous_version, self.current_version
        return True

    def _verify_checksum(self, path: str, expected: str) -> bool:
        """SHA256 체크섬 검증"""
        sha256 = hashlib.sha256()
        with open(path, "rb") as f:
            for block in iter(lambda: f.read(65536), b""):
                sha256.update(block)
        return sha256.hexdigest() == expected

4페더레이티드 러닝

페더레이티드 러닝(Federated Learning)은 원본 데이터를 중앙 서버로 전송하지 않고, 각 엣지에서 로컬 학습을 수행한 후 모델 가중치만 집계하는 분산 학습 방식입니다.

Federated Learning Cycle
1 Global Model Distribution
Central Server
W_global
Edge 1
Edge 2
Edge N
2 Local Training (Privacy Preserved)
Edge 1: Train on Local Data delta-W1
Edge 2: Train on Local Data delta-W2
Edge N: Train on Local Data delta-Wn
3 Gradient/Weight Aggregation
Central Server
delta-W1, delta-W2, ..., delta-Wn
[Edges]
W_new = Aggregate(delta-W1, delta-W2, ..., delta-Wn)
4 Repeat Cycle
import torch
import torch.nn as nn
from typing import List, Dict
import copy

class FederatedClient:
    """엣지 디바이스의 페더레이티드 클라이언트"""

    def __init__(
        self,
        model: nn.Module,
        local_data_loader,
        client_id: str,
        local_epochs: int = 5
    ):
        self.model = model
        self.data_loader = local_data_loader
        self.client_id = client_id
        self.local_epochs = local_epochs
        self.optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    def receive_global_model(self, global_weights: Dict):
        """글로벌 모델 수신"""
        self.model.load_state_dict(global_weights)

    def local_train(self) -> Dict:
        """로컬 학습 수행"""
        self.model.train()
        criterion = nn.CrossEntropyLoss()

        for epoch in range(self.local_epochs):
            for data, target in self.data_loader:
                self.optimizer.zero_grad()
                output = self.model(data)
                loss = criterion(output, target)
                loss.backward()
                self.optimizer.step()

        # 학습된 가중치 반환
        return copy.deepcopy(self.model.state_dict())

    def get_gradient_update(self, global_weights: Dict) -> Dict:
        """가중치 변화량 (gradient) 계산"""
        local_weights = self.model.state_dict()
        gradient = {}
        for key in local_weights:
            gradient[key] = local_weights[key] - global_weights[key]
        return gradient


class FederatedServer:
    """중앙 집계 서버"""

    def __init__(self, model: nn.Module):
        self.global_model = model
        self.global_weights = copy.deepcopy(model.state_dict())
        self.round_num = 0

    def aggregate(
        self,
        client_weights: List[Dict],
        client_data_sizes: List[int]
    ) -> Dict:
        """FedAvg 알고리즘으로 가중치 집계"""
        total_size = sum(client_data_sizes)
        aggregated = {}

        for key in self.global_weights:
            aggregated[key] = torch.zeros_like(self.global_weights[key])

            for weights, size in zip(client_weights, client_data_sizes):
                # 데이터 크기에 비례한 가중 평균
                weight = size / total_size
                aggregated[key] += weights[key] * weight

        self.global_weights = aggregated
        self.global_model.load_state_dict(aggregated)
        self.round_num += 1

        return aggregated

    def differential_privacy_aggregate(
        self,
        client_weights: List[Dict],
        noise_scale: float = 0.1
    ) -> Dict:
        """차등 프라이버시가 적용된 집계"""
        aggregated = self.aggregate(
            client_weights, [1] * len(client_weights)
        )

        # 가우시안 노이즈 추가
        for key in aggregated:
            noise = torch.randn_like(aggregated[key]) * noise_scale
            aggregated[key] += noise

        self.global_weights = aggregated
        return aggregated

페더레이티드 러닝은 데이터 프라이버시를 보장하면서 분산 학습이 가능하지만, Non-IID 데이터, 통신 비용, 시스템 이질성 등의 도전 과제가 있습니다.

5오프라인 동작 지원

네트워크 연결이 불안정한 제조 환경에서도 엣지 AI 시스템은 독립적으로 동작해야 합니다. 오프라인 큐잉과 자동 재연결 메커니즘이 필수적입니다.

import sqlite3
import pickle
from pathlib import Path

class OfflineQueueManager:
    """오프라인 데이터 큐 관리"""

    def __init__(self, db_path: str = "offline_queue.db"):
        self.db_path = Path(db_path)
        self._init_db()

    def _init_db(self):
        """SQLite DB 초기화"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS queue (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    priority INTEGER,
                    data BLOB,
                    timestamp REAL,
                    retry_count INTEGER DEFAULT 0
                )
            """)

    def enqueue(self, data: SyncData):
        """데이터를 로컬 큐에 저장"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO queue (priority, data, timestamp) VALUES (?, ?, ?)",
                (data.priority.value, pickle.dumps(data), data.timestamp.timestamp())
            )

    def dequeue_batch(self, limit: int = 100) -> List[SyncData]:
        """우선순위 순으로 배치 추출"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT id, data FROM queue ORDER BY priority, timestamp LIMIT ?",
                (limit,)
            )
            rows = cursor.fetchall()

            data_list = []
            ids = []
            for row_id, data_blob in rows:
                data_list.append(pickle.loads(data_blob))
                ids.append(row_id)

            # 추출한 항목 삭제
            if ids:
                conn.execute(
                    f"DELETE FROM queue WHERE id IN ({','.join('?' * len(ids))})",
                    ids
                )

            return data_list

    def get_queue_size(self) -> int:
        """대기 중인 항목 수"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("SELECT COUNT(*) FROM queue")
            return cursor.fetchone()[0]


class HybridSyncManager(EdgeCloudSync):
    """오프라인 지원 하이브리드 동기화"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.offline_queue = OfflineQueueManager()

    async def sync(self, data: SyncData):
        """연결 상태에 따른 동기화"""
        if self.is_connected:
            try:
                await super().sync(data)
            except Exception:
                self.offline_queue.enqueue(data)
        else:
            self.offline_queue.enqueue(data)

    async def flush_offline_queue(self):
        """오프라인 큐 비우기 (연결 복구 시)"""
        while self.offline_queue.get_queue_size() > 0:
            batch = self.offline_queue.dequeue_batch()
            for data in batch:
                try:
                    await super().sync(data)
                except Exception:
                    self.offline_queue.enqueue(data)
                    break

6하이브리드 시스템 모니터링

엣지와 클라우드 전체 시스템의 상태를 통합 모니터링하여 이상 징후를 조기에 감지하고 대응합니다.

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List
import statistics

@dataclass
class EdgeNodeStatus:
    node_id: str
    is_online: bool
    last_heartbeat: datetime
    model_version: str
    inference_fps: float
    queue_depth: int
    cpu_usage: float
    gpu_usage: float
    memory_usage: float

class HybridSystemMonitor:
    """하이브리드 시스템 통합 모니터링"""

    def __init__(self, heartbeat_timeout_sec: int = 60):
        self.heartbeat_timeout = timedelta(seconds=heartbeat_timeout_sec)
        self.nodes: Dict[str, EdgeNodeStatus] = {}
        self.alerts: List[dict] = []

    def update_node_status(self, status: EdgeNodeStatus):
        """노드 상태 업데이트"""
        self.nodes[status.node_id] = status
        self._check_alerts(status)

    def _check_alerts(self, status: EdgeNodeStatus):
        """알람 조건 확인"""
        if status.queue_depth > 1000:
            self._create_alert(
                status.node_id, "HIGH_QUEUE_DEPTH",
                f"Queue depth: {status.queue_depth}"
            )

        if status.inference_fps < 10:
            self._create_alert(
                status.node_id, "LOW_FPS",
                f"Inference FPS: {status.inference_fps}"
            )

        if status.gpu_usage > 95:
            self._create_alert(
                status.node_id, "HIGH_GPU_USAGE",
                f"GPU usage: {status.gpu_usage}%"
            )

    def get_system_summary(self) -> dict:
        """시스템 전체 요약"""
        online_nodes = [n for n in self.nodes.values() if n.is_online]

        return {
            "total_nodes": len(self.nodes),
            "online_nodes": len(online_nodes),
            "avg_fps": statistics.mean([n.inference_fps for n in online_nodes]) if online_nodes else 0,
            "total_queue_depth": sum(n.queue_depth for n in online_nodes),
            "active_alerts": len(self.alerts),
            "model_versions": list(set(n.model_version for n in online_nodes))
        }