Chapter 8 10 / 10

엣지 AI 보안과 운영

모델 보호, 산업 보안 표준, 모니터링과 유지보수를 통한 안전하고 안정적인 엣지 AI 운영 전략

1엣지 AI 보안 위협

엣지 디바이스는 물리적 접근이 가능하고 네트워크 경계 외부에 위치하여 다양한 보안 위협에 노출됩니다. AI 모델 자체도 공격 대상이 될 수 있습니다.

Edge AI Security Threat Model
Model-Level Attacks
Model Extraction: Model theft via inference API
Adversarial Attack: Misclassification via adversarial inputs
Model Inversion: Training data reconstruction
Device-Level Attacks
Physical Tampering: Physical device access/manipulation
Firmware Attack: Malicious firmware injection
Side-Channel: Power/timing analysis attacks
Network-Level Attacks
MITM: Man-in-the-middle (model update tampering)
DoS: Denial of Service attacks
Data Exfiltration: Sensor data leakage

M모델 도용

추론 요청/응답 분석으로 모델 복제 시도

A적대적 공격

미세한 노이즈로 AI 판단 오류 유도

P물리적 접근

디바이스 탈취 후 모델/데이터 추출

N네트워크 공격

통신 가로채기 및 악성 업데이트 주입

2모델 보호 기법

AI 모델은 기업의 핵심 지적 재산입니다. 모델 암호화, 난독화, 하드웨어 보안 모듈(HSM)을 활용하여 모델을 보호합니다.

import hashlib
import hmac
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class ModelProtection:
    """AI 모델 보호 유틸리티"""

    def __init__(self, device_id: str, master_key: bytes):
        # 디바이스별 고유 키 파생
        self.device_key = self._derive_key(master_key, device_id)
        self.cipher = Fernet(self.device_key)

    def _derive_key(self, master_key: bytes, salt: str) -> bytes:
        """PBKDF2로 디바이스 고유 키 파생"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt.encode(),
            iterations=100000
        )
        key = base64.urlsafe_b64encode(kdf.derive(master_key))
        return key

    def encrypt_model(self, model_path: str) -> str:
        """모델 파일 암호화"""
        with open(model_path, "rb") as f:
            model_data = f.read()

        encrypted = self.cipher.encrypt(model_data)
        encrypted_path = model_path + ".enc"

        with open(encrypted_path, "wb") as f:
            f.write(encrypted)

        return encrypted_path

    def decrypt_to_memory(self, encrypted_path: str) -> bytes:
        """암호화된 모델을 메모리로 복호화 (파일 미생성)"""
        with open(encrypted_path, "rb") as f:
            encrypted_data = f.read()

        decrypted = self.cipher.decrypt(encrypted_data)
        return decrypted

    def verify_integrity(self, model_path: str, expected_hash: str) -> bool:
        """모델 무결성 검증"""
        sha256 = hashlib.sha256()
        with open(model_path, "rb") as f:
            for block in iter(lambda: f.read(65536), b""):
                sha256.update(block)
        return hmac.compare_digest(sha256.hexdigest(), expected_hash)


class SecureModelLoader:
    """보안 모델 로더"""

    def __init__(self, protection: ModelProtection):
        self.protection = protection
        self._model_cache = None

    def load_secure(self, encrypted_path: str, expected_hash: str):
        """암호화된 모델 안전하게 로드"""
        # 1. 복호화 (메모리에서만)
        model_data = self.protection.decrypt_to_memory(encrypted_path)

        # 2. 무결성 검증
        actual_hash = hashlib.sha256(model_data).hexdigest()
        if not hmac.compare_digest(actual_hash, expected_hash):
            raise SecurityError("Model integrity check failed")

        # 3. 메모리에서 직접 로드 (TensorRT)
        import tensorrt as trt
        logger = trt.Logger(trt.Logger.WARNING)
        runtime = trt.Runtime(logger)
        engine = runtime.deserialize_cuda_engine(model_data)

        # 4. 메모리 정리
        del model_data

        return engine

모델 암호화 키는 절대 디바이스에 평문으로 저장하지 마세요. TPM(Trusted Platform Module) 또는 HSM을 사용하여 키를 보호하세요.

3적대적 공격 방어

적대적 공격(Adversarial Attack)은 입력에 미세한 변조를 가해 AI 모델의 오분류를 유도합니다. 입력 검증과 방어 기법으로 이를 탐지하고 완화합니다.

import numpy as np
from typing import Tuple, Optional
import cv2

class AdversarialDefense:
    """적대적 공격 방어 시스템"""

    def __init__(
        self,
        noise_threshold: float = 0.05,
        confidence_threshold: float = 0.7
    ):
        self.noise_threshold = noise_threshold
        self.confidence_threshold = confidence_threshold
        self.history = []  # 최근 입력 히스토리

    def input_sanitization(self, image: np.ndarray) -> np.ndarray:
        """입력 전처리로 적대적 노이즈 완화"""
        # 1. JPEG 압축 (노이즈 제거)
        encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
        _, encoded = cv2.imencode(".jpg", image, encode_param)
        decoded = cv2.imdecode(encoded, cv2.IMREAD_COLOR)

        # 2. 가우시안 블러 (미세 노이즈 제거)
        blurred = cv2.GaussianBlur(decoded, (3, 3), 0)

        return blurred

    def detect_anomaly(
        self,
        image: np.ndarray,
        prediction: np.ndarray,
        confidence: float
    ) -> Tuple[bool, str]:
        """이상 입력 탐지"""
        reasons = []

        # 1. 신뢰도 기반 탐지
        if confidence < self.confidence_threshold:
            reasons.append(f"Low confidence: {confidence:.3f}")

        # 2. 예측 분포 이상 탐지
        entropy = -np.sum(prediction * np.log(prediction + 1e-10))
        if entropy > 2.0:  # 높은 엔트로피 = 불확실한 예측
            reasons.append(f"High entropy: {entropy:.3f}")

        # 3. 입력 통계 이상 탐지
        mean_intensity = np.mean(image)
        std_intensity = np.std(image)
        if std_intensity < 10 or std_intensity > 100:
            reasons.append(f"Abnormal intensity distribution")

        # 4. 히스토리 기반 일관성 검사
        if self.history:
            prev_pred = self.history[-1]
            diff = np.abs(prediction - prev_pred).max()
            if diff > 0.8:  # 급격한 예측 변화
                reasons.append(f"Sudden prediction shift: {diff:.3f}")

        self.history.append(prediction)
        if len(self.history) > 10:
            self.history.pop(0)

        is_anomaly = len(reasons) > 0
        return is_anomaly, "; ".join(reasons)

    def ensemble_defense(
        self,
        image: np.ndarray,
        models: list
    ) -> Tuple[np.ndarray, bool]:
        """앙상블 기반 방어"""
        predictions = []
        for model in models:
            # 각 모델에 다른 전처리 적용
            variants = [
                image,
                self.input_sanitization(image),
                cv2.flip(image, 1),  # 좌우 반전
            ]
            for v in variants:
                pred = model.predict(v)
                predictions.append(pred)

        # 예측 일관성 확인
        predictions = np.array(predictions)
        variance = np.var(predictions, axis=0).mean()
        consensus = variance < 0.1  # 낮은 분산 = 일관된 예측

        # 중앙값 기반 최종 예측
        final_pred = np.median(predictions, axis=0)

        return final_pred, consensus

4산업 보안 표준 준수

제조 환경의 엣지 AI는 IEC 62443, NIST Cybersecurity Framework 등 산업 보안 표준을 준수해야 합니다.

IEC 62443 Zone 분리

엣지 디바이스를 보안 영역(Zone)으로 분리하고 Conduit를 통해 제어된 통신만 허용

최소 권한 원칙

AI 추론 프로세스는 필요한 최소한의 시스템 권한만 부여. root 실행 금지

암호화 통신 (TLS 1.3)

모든 외부 통신은 TLS 1.3으로 암호화. 인증서 핀닝 적용

감사 로깅

모든 보안 이벤트, 접근 시도, 모델 업데이트를 불변 로그로 기록

취약점 관리

정기적인 취약점 스캔 및 패치 적용. CVE 모니터링

import ssl
import certifi
from datetime import datetime
import json
import logging

class SecurityAuditLogger:
    """보안 감사 로거 (IEC 62443 준수)"""

    def __init__(self, log_path: str, device_id: str):
        self.device_id = device_id
        self.logger = logging.getLogger("security_audit")
        handler = logging.FileHandler(log_path)
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_event(
        self,
        event_type: str,
        severity: str,
        details: dict,
        user: str = "system"
    ):
        """보안 이벤트 기록"""
        event = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "device_id": self.device_id,
            "event_type": event_type,
            "severity": severity,  # INFO, WARNING, CRITICAL
            "user": user,
            "details": details
        }
        self.logger.info(json.dumps(event))

    def log_model_update(self, old_version: str, new_version: str, success: bool):
        """모델 업데이트 이벤트"""
        self.log_event(
            "MODEL_UPDATE",
            "INFO" if success else "WARNING",
            {
                "old_version": old_version,
                "new_version": new_version,
                "success": success
            }
        )

    def log_anomaly_detected(self, anomaly_type: str, details: str):
        """이상 탐지 이벤트"""
        self.log_event(
            "ANOMALY_DETECTED",
            "CRITICAL",
            {"type": anomaly_type, "details": details}
        )

    def log_access_attempt(self, endpoint: str, ip: str, success: bool):
        """접근 시도 기록"""
        self.log_event(
            "ACCESS_ATTEMPT",
            "INFO" if success else "WARNING",
            {"endpoint": endpoint, "source_ip": ip, "success": success}
        )

5실시간 모니터링

엣지 AI 시스템의 안정적인 운영을 위해 성능, 건강 상태, 이상 징후를 실시간으로 모니터링합니다.

메트릭 정상 범위 경고 임계값 위험 임계값
추론 지연시간 < 20ms 20-50ms > 50ms
GPU 사용률 30-70% 70-90% > 90%
메모리 사용률 < 60% 60-80% > 80%
GPU 온도 < 65°C 65-80°C > 80°C
모델 정확도 > 95% 90-95% < 90%
import time
import psutil
import subprocess
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class SystemMetrics:
    timestamp: float
    cpu_percent: float
    memory_percent: float
    gpu_percent: Optional[float]
    gpu_temp: Optional[float]
    inference_latency_ms: float
    inference_fps: float
    model_accuracy: float

class EdgeAIMonitor:
    """엣지 AI 시스템 모니터"""

    def __init__(self, alert_callback=None):
        self.alert_callback = alert_callback
        self.metrics_history: List[SystemMetrics] = []

        # 임계값 설정
        self.thresholds = {
            "latency_warning": 20,
            "latency_critical": 50,
            "gpu_temp_warning": 65,
            "gpu_temp_critical": 80,
            "memory_warning": 60,
            "memory_critical": 80,
            "accuracy_warning": 95,
            "accuracy_critical": 90
        }

    def collect_metrics(self, inference_latency: float, accuracy: float) -> SystemMetrics:
        """시스템 메트릭 수집"""
        gpu_percent, gpu_temp = self._get_gpu_stats()

        metrics = SystemMetrics(
            timestamp=time.time(),
            cpu_percent=psutil.cpu_percent(),
            memory_percent=psutil.virtual_memory().percent,
            gpu_percent=gpu_percent,
            gpu_temp=gpu_temp,
            inference_latency_ms=inference_latency,
            inference_fps=1000 / inference_latency if inference_latency > 0 else 0,
            model_accuracy=accuracy
        )

        self.metrics_history.append(metrics)
        if len(self.metrics_history) > 1000:
            self.metrics_history.pop(0)

        # 알람 체크
        self._check_alerts(metrics)

        return metrics

    def _get_gpu_stats(self) -> tuple:
        """GPU 통계 조회 (Jetson/NVIDIA)"""
        try:
            result = subprocess.run(
                ["nvidia-smi", "--query-gpu=utilization.gpu,temperature.gpu",
                 "--format=csv,noheader,nounits"],
                capture_output=True, text=True, timeout=5
            )
            if result.returncode == 0:
                parts = result.stdout.strip().split(", ")
                return float(parts[0]), float(parts[1])
        except:
            pass
        return None, None

    def _check_alerts(self, metrics: SystemMetrics):
        """임계값 기반 알람 생성"""
        # 지연시간 알람
        if metrics.inference_latency_ms > self.thresholds["latency_critical"]:
            self._send_alert(AlertLevel.CRITICAL, "HIGH_LATENCY",
                f"Latency {metrics.inference_latency_ms:.1f}ms exceeds threshold")
        elif metrics.inference_latency_ms > self.thresholds["latency_warning"]:
            self._send_alert(AlertLevel.WARNING, "HIGH_LATENCY",
                f"Latency {metrics.inference_latency_ms:.1f}ms elevated")

        # GPU 온도 알람
        if metrics.gpu_temp and metrics.gpu_temp > self.thresholds["gpu_temp_critical"]:
            self._send_alert(AlertLevel.CRITICAL, "HIGH_TEMPERATURE",
                f"GPU temperature {metrics.gpu_temp}°C critical")

        # 정확도 알람
        if metrics.model_accuracy < self.thresholds["accuracy_critical"]:
            self._send_alert(AlertLevel.CRITICAL, "LOW_ACCURACY",
                f"Model accuracy {metrics.model_accuracy:.1f}% below threshold")

    def _send_alert(self, level: AlertLevel, alert_type: str, message: str):
        """알람 전송"""
        if self.alert_callback:
            self.alert_callback(level, alert_type, message)

6유지보수 전략

엣지 AI 시스템의 장기적인 안정성을 위해 예방적 유지보수, 모델 드리프트 모니터링, 자동 복구 메커니즘을 구현합니다.

from datetime import datetime, timedelta
from typing import Callable
import numpy as np

class ModelDriftDetector:
    """모델 드리프트 탐지"""

    def __init__(self, window_size: int = 1000, drift_threshold: float = 0.1):
        self.window_size = window_size
        self.drift_threshold = drift_threshold
        self.baseline_distribution: Optional[np.ndarray] = None
        self.current_window: List[np.ndarray] = []

    def set_baseline(self, predictions: List[np.ndarray]):
        """기준 분포 설정"""
        self.baseline_distribution = np.mean(predictions, axis=0)

    def add_prediction(self, prediction: np.ndarray) -> tuple:
        """예측 추가 및 드리프트 확인"""
        self.current_window.append(prediction)
        if len(self.current_window) > self.window_size:
            self.current_window.pop(0)

        if len(self.current_window) < self.window_size // 2:
            return False, 0.0

        # KL Divergence 계산
        current_dist = np.mean(self.current_window, axis=0)
        kl_div = self._kl_divergence(self.baseline_distribution, current_dist)

        is_drift = kl_div > self.drift_threshold
        return is_drift, kl_div

    def _kl_divergence(self, p: np.ndarray, q: np.ndarray) -> float:
        """KL Divergence 계산"""
        p = np.clip(p, 1e-10, 1)
        q = np.clip(q, 1e-10, 1)
        return np.sum(p * np.log(p / q))


class AutoRecovery:
    """자동 복구 시스템"""

    def __init__(
        self,
        restart_fn: Callable,
        rollback_fn: Callable,
        max_restarts: int = 3,
        restart_window_hours: int = 1
    ):
        self.restart_fn = restart_fn
        self.rollback_fn = rollback_fn
        self.max_restarts = max_restarts
        self.restart_window = timedelta(hours=restart_window_hours)

        self.restart_history: List[datetime] = []
        self.error_count = 0

    async def handle_error(self, error_type: str) -> str:
        """에러 처리 및 자동 복구"""
        self.error_count += 1

        # 최근 재시작 횟수 확인
        now = datetime.now()
        recent_restarts = [
            r for r in self.restart_history
            if now - r < self.restart_window
        ]

        if len(recent_restarts) < self.max_restarts:
            # 서비스 재시작 시도
            self.restart_history.append(now)
            await self.restart_fn()
            return "RESTARTED"
        else:
            # 재시작 한계 초과 - 롤백
            await self.rollback_fn()
            self.restart_history.clear()
            return "ROLLED_BACK"


class MaintenanceScheduler:
    """예방적 유지보수 스케줄러"""

    def __init__(self):
        self.tasks = []

    def schedule(
        self,
        task_name: str,
        task_fn: Callable,
        interval_hours: int,
        window_start: int = 2,  # 새벽 2시
        window_end: int = 4       # 새벽 4시
    ):
        """유지보수 작업 스케줄링 (비생산 시간대)"""
        self.tasks.append({
            "name": task_name,
            "fn": task_fn,
            "interval": timedelta(hours=interval_hours),
            "window": (window_start, window_end),
            "last_run": None
        })

    async def run_due_tasks(self):
        """실행 대기 중인 작업 수행"""
        now = datetime.now()
        current_hour = now.hour

        for task in self.tasks:
            # 유지보수 윈도우 확인
            start, end = task["window"]
            if not (start <= current_hour < end):
                continue

            # 실행 주기 확인
            if task["last_run"] is None or now - task["last_run"] >= task["interval"]:
                await task["fn"]()
                task["last_run"] = now

Chapter 8에서는 엣지 AI의 핵심 기술들을 학습했습니다. 다음 Chapter 9에서는 Physical AI 시스템을 실제로 개발하는 실전 방법론을 다룹니다.