1최적화 파이프라인 아키텍처

프로덕션 환경의 최적화 시스템은 데이터 수집부터 최적 파라미터 적용까지의 전체 파이프라인을 자동화해야 합니다. 실시간 데이터 스트리밍, 모델 추론, 의사결정, 실행까지의 End-to-End 파이프라인을 구축합니다.

최적화 시스템 통합 아키텍처
OPC UA / SCADA / PLC
실시간 설비 데이터
MES
생산 실행 시스템
ERP
자원 계획 시스템
데이터 수집 & 전처리
OPC UA Connector Preprocessor
최적화 서비스 (FastAPI/Redis)
GET /optimize 캐시 조회 → 최적화
POST /optimize/async 백그라운드
모니터링 & 알림
Prometheus | Grafana | Alert
from dataclasses import dataclass, field from typing import Dict, List, Any, Optional, Callable from datetime import datetime from abc import ABC, abstractmethod import asyncio import json import numpy as np @dataclass class OptimizationConfig: """최적화 파이프라인 설정""" optimization_interval: int = 300 # 최적화 주기 (초) lookahead_horizon: int = 3600 # 예측 지평 (초) min_confidence: float = 0.85 # 최소 신뢰도 enable_auto_apply: bool = False # 자동 적용 여부 constraints: Dict[str, tuple] = field(default_factory=dict) objective_weights: Dict[str, float] = field(default_factory=dict) class DataConnector(ABC): """데이터 소스 추상 인터페이스""" @abstractmethod async def fetch_realtime_data(self) -> Dict[str, Any]: pass @abstractmethod async def fetch_historical_data(self, start: datetime, end: datetime) -> List[Dict]: pass class OPCUAConnector(DataConnector): """OPC UA 기반 PLC/SCADA 연결""" def __init__(self, endpoint: str, node_ids: Dict[str, str]): self.endpoint = endpoint self.node_ids = node_ids self.client = None async def connect(self): from asyncua import Client self.client = Client(self.endpoint) await self.client.connect() async def fetch_realtime_data(self) -> Dict[str, Any]: data = {} for name, node_id in self.node_ids.items(): node = self.client.get_node(node_id) data[name] = await node.read_value() return data async def write_setpoint(self, node_id: str, value: Any): """설정값 쓰기""" node = self.client.get_node(node_id) await node.write_value(value) class OptimizationPipeline: """End-to-End 최적화 파이프라인""" def __init__(self, config: OptimizationConfig): self.config = config self.data_connectors: List[DataConnector] = [] self.preprocessors: List[Callable] = [] self.optimizer: 'ProcessOptimizer' = None self.validators: List[Callable] = [] self.actuators: List[Callable] = [] self.history: List[Dict] = [] def add_data_source(self, connector: DataConnector): self.data_connectors.append(connector) def add_preprocessor(self, func: Callable): self.preprocessors.append(func) def set_optimizer(self, optimizer): self.optimizer = optimizer async def collect_data(self) -> Dict[str, Any]: """모든 소스에서 데이터 수집""" all_data = {} tasks = [conn.fetch_realtime_data() for conn in self.data_connectors] results = await asyncio.gather(*tasks) for result in results: all_data.update(result) return all_data def preprocess(self, raw_data: Dict) -> np.ndarray: """데이터 전처리 파이프라인""" data = raw_data for preprocessor in self.preprocessors: data = preprocessor(data) return data async def run_optimization_cycle(self) -> Dict[str, Any]: """단일 최적화 사이클 실행""" # 1. 데이터 수집 raw_data = await self.collect_data() # 2. 전처리 processed_data = self.preprocess(raw_data) # 3. 최적화 실행 result = self.optimizer.optimize( current_state=processed_data, constraints=self.config.constraints, weights=self.config.objective_weights ) # 4. 검증 for validator in self.validators: if not validator(result): result['status'] = 'validation_failed' return result # 5. 기록 self.history.append({ 'timestamp': datetime.now().isoformat(), 'input': raw_data, 'output': result }) return result

2MES/ERP 시스템 연동

최적화 시스템은 MES(Manufacturing Execution System), ERP(Enterprise Resource Planning) 시스템과 연동하여 생산 계획, 재고, 품질 데이터를 활용합니다. REST API와 데이터베이스 연동으로 실시간 정보를 교환합니다.

import aiohttp from sqlalchemy import create_engine, text from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession class MESConnector: """MES 시스템 연동""" def __init__(self, base_url: str, api_key: str): self.base_url = base_url self.headers = {'Authorization': f'Bearer {api_key}'} async def get_production_orders(self) -> List[Dict]: """생산 오더 조회""" async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/api/v1/production-orders", headers=self.headers ) as resp: return await resp.json() async def get_equipment_status(self, equipment_id: str) -> Dict: """설비 상태 조회""" async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/api/v1/equipment/{equipment_id}/status", headers=self.headers ) as resp: return await resp.json() async def update_process_parameters(self, equipment_id: str, params: Dict[str, float]) -> bool: """공정 파라미터 업데이트""" async with aiohttp.ClientSession() as session: async with session.put( f"{self.base_url}/api/v1/equipment/{equipment_id}/parameters", headers=self.headers, json=params ) as resp: return resp.status == 200 class ERPConnector: """ERP 시스템 연동 (SAP, Oracle 등)""" def __init__(self, db_url: str): self.engine = create_async_engine(db_url) async def get_material_costs(self, material_ids: List[str]) -> Dict[str, float]: """자재 원가 조회""" async with AsyncSession(self.engine) as session: result = await session.execute( text(""" SELECT material_id, unit_cost FROM mm_material_master WHERE material_id IN :ids """), {'ids': tuple(material_ids)} ) return {row[0]: row[1] for row in result} async def get_energy_rates(self, timestamp: datetime) -> Dict[str, float]: """시간대별 에너지 요금 조회""" async with AsyncSession(self.engine) as session: result = await session.execute( text(""" SELECT rate_type, rate_value FROM utility_rates WHERE valid_from <= :ts AND valid_to > :ts """), {'ts': timestamp} ) return {row[0]: row[1] for row in result} class IntegratedOptimizationSystem: """MES/ERP 통합 최적화 시스템""" def __init__(self, mes: MESConnector, erp: ERPConnector): self.mes = mes self.erp = erp async def build_optimization_context(self, equipment_id: str) -> Dict: """최적화를 위한 통합 컨텍스트 구성""" # 병렬 데이터 수집 orders_task = self.mes.get_production_orders() status_task = self.mes.get_equipment_status(equipment_id) energy_task = self.erp.get_energy_rates(datetime.now()) orders, status, energy_rates = await asyncio.gather( orders_task, status_task, energy_task ) # 자재 비용 조회 material_ids = [o['material_id'] for o in orders] material_costs = await self.erp.get_material_costs(material_ids) return { 'production_orders': orders, 'equipment_status': status, 'energy_rates': energy_rates, 'material_costs': material_costs, 'timestamp': datetime.now().isoformat() }

3실시간 최적화 서비스

최적화 시스템은 마이크로서비스로 배포되어 REST API를 통해 다른 시스템과 통합됩니다. FastAPI 기반의 비동기 서비스로 높은 처리량과 낮은 지연시간을 제공합니다.

from fastapi import FastAPI, BackgroundTasks, HTTPException from pydantic import BaseModel from typing import Optional import redis.asyncio as redis app = FastAPI(title="Process Optimization Service") class OptimizationRequest(BaseModel): equipment_id: str current_state: Dict[str, float] constraints: Optional[Dict[str, tuple]] = None objective_weights: Optional[Dict[str, float]] = None class OptimizationResponse(BaseModel): request_id: str status: str optimal_parameters: Dict[str, float] predicted_improvement: Dict[str, float] confidence: float execution_time_ms: float class OptimizationService: """실시간 최적화 서비스""" def __init__(self): self.redis = None self.optimizer_cache: Dict[str, 'ProcessOptimizer'] = {} self.running_tasks: Dict[str, asyncio.Task] = {} async def startup(self): self.redis = await redis.from_url("redis://localhost:6379") async def get_cached_result(self, cache_key: str) -> Optional[Dict]: """캐시된 최적화 결과 조회""" result = await self.redis.get(cache_key) return json.loads(result) if result else None async def cache_result(self, cache_key: str, result: Dict, ttl: int = 60): """최적화 결과 캐싱""" await self.redis.setex(cache_key, ttl, json.dumps(result)) def get_optimizer(self, equipment_id: str) -> 'ProcessOptimizer': """설비별 최적화기 로드""" if equipment_id not in self.optimizer_cache: # 모델 로드 (실제로는 MLflow 등에서 로드) self.optimizer_cache[equipment_id] = ProcessOptimizer.load(equipment_id) return self.optimizer_cache[equipment_id] async def optimize(self, request: OptimizationRequest) -> OptimizationResponse: """최적화 실행""" import time import uuid start_time = time.time() request_id = str(uuid.uuid4()) # 캐시 확인 cache_key = f"opt:{request.equipment_id}:{hash(str(request.current_state))}" cached = await self.get_cached_result(cache_key) if cached: cached['request_id'] = request_id cached['status'] = 'cached' return OptimizationResponse(**cached) # 최적화 실행 optimizer = self.get_optimizer(request.equipment_id) result = optimizer.optimize( current_state=np.array(list(request.current_state.values())), constraints=request.constraints or {}, weights=request.objective_weights or {} ) response_data = { 'request_id': request_id, 'status': 'success', 'optimal_parameters': result['parameters'], 'predicted_improvement': result['improvement'], 'confidence': result['confidence'], 'execution_time_ms': (time.time() - start_time) * 1000 } # 캐싱 await self.cache_result(cache_key, response_data) return OptimizationResponse(**response_data) service = OptimizationService() @app.on_event("startup") async def startup_event(): await service.startup() @app.post("/optimize", response_model=OptimizationResponse) async def optimize(request: OptimizationRequest): """동기 최적화 API""" return await service.optimize(request) @app.post("/optimize/async") async def optimize_async(request: OptimizationRequest, background_tasks: BackgroundTasks): """비동기 최적화 API (긴 작업용)""" import uuid task_id = str(uuid.uuid4()) background_tasks.add_task(service.optimize, request) return {"task_id": task_id, "status": "processing"}

4모니터링 및 운영 관리

프로덕션 최적화 시스템은 지속적인 모니터링과 성능 관리가 필수입니다. Prometheus 메트릭, Grafana 대시보드, 알람 시스템으로 시스템 건강 상태를 추적합니다.

from prometheus_client import Counter, Histogram, Gauge, generate_latest from fastapi.responses import PlainTextResponse import logging # Prometheus 메트릭 정의 OPTIMIZATION_REQUESTS = Counter( 'optimization_requests_total', 'Total optimization requests', ['equipment_id', 'status'] ) OPTIMIZATION_LATENCY = Histogram( 'optimization_latency_seconds', 'Optimization request latency', ['equipment_id'], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0] ) IMPROVEMENT_GAUGE = Gauge( 'optimization_improvement_percent', 'Predicted improvement percentage', ['equipment_id', 'metric'] ) MODEL_CONFIDENCE = Gauge( 'model_confidence', 'Optimization model confidence', ['equipment_id'] ) class OptimizationMonitor: """최적화 시스템 모니터링""" def __init__(self): self.logger = logging.getLogger('optimization') self.alert_thresholds = { 'min_confidence': 0.7, 'max_latency_ms': 5000, 'min_improvement': 0.01 } def record_optimization(self, equipment_id: str, result: Dict, latency_ms: float): """최적화 결과 기록""" # Prometheus 메트릭 업데이트 OPTIMIZATION_REQUESTS.labels( equipment_id=equipment_id, status=result['status'] ).inc() OPTIMIZATION_LATENCY.labels(equipment_id=equipment_id).observe(latency_ms / 1000) MODEL_CONFIDENCE.labels(equipment_id=equipment_id).set(result['confidence']) for metric, value in result.get('predicted_improvement', {}).items(): IMPROVEMENT_GAUGE.labels( equipment_id=equipment_id, metric=metric ).set(value * 100) # 알람 체크 self._check_alerts(equipment_id, result, latency_ms) def _check_alerts(self, equipment_id: str, result: Dict, latency_ms: float): """알람 조건 확인""" alerts = [] if result['confidence'] < self.alert_thresholds['min_confidence']: alerts.append(f"Low confidence: {result['confidence']:.2f}") if latency_ms > self.alert_thresholds['max_latency_ms']: alerts.append(f"High latency: {latency_ms:.0f}ms") if alerts: self._send_alert(equipment_id, alerts) async def _send_alert(self, equipment_id: str, alerts: List[str]): """알람 전송 (Slack, Email 등)""" message = f"Optimization Alert [{equipment_id}]: " + ", ".join(alerts) self.logger.warning(message) # 실제 알람 전송 로직 @app.get("/metrics") async def metrics(): """Prometheus 메트릭 엔드포인트""" return PlainTextResponse(generate_latest()) @app.get("/health") async def health_check(): """헬스 체크""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "cached_optimizers": len(service.optimizer_cache) }

운영 베스트 프랙티스: 최적화 모델은 주기적으로 재학습하고, A/B 테스트로 새 모델을 검증한 후 점진적으로 배포합니다. 모델 드리프트 감지로 성능 저하를 조기에 발견합니다.