1최적화 파이프라인 아키텍처
프로덕션 환경의 최적화 시스템은 데이터 수집부터 최적 파라미터 적용까지의 전체 파이프라인을 자동화해야 합니다. 실시간 데이터 스트리밍, 모델 추론, 의사결정, 실행까지의 End-to-End 파이프라인을 구축합니다.
최적화 시스템 통합 아키텍처
OPC UA / SCADA / PLC
실시간 설비 데이터
MES
생산 실행 시스템
ERP
자원 계획 시스템
데이터 수집 & 전처리
OPC UA Connector
→
Preprocessor
최적화 서비스 (FastAPI/Redis)
GET
/optimize
→
캐시 조회 → 최적화
POST
/optimize/async
→
백그라운드
모니터링 & 알림
Prometheus
|
Grafana
|
Alert
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from abc import ABC, abstractmethod
import asyncio
import json
import numpy as np
@dataclass
class OptimizationConfig:
"""최적화 파이프라인 설정"""
optimization_interval: int = 300 # 최적화 주기 (초)
lookahead_horizon: int = 3600 # 예측 지평 (초)
min_confidence: float = 0.85 # 최소 신뢰도
enable_auto_apply: bool = False # 자동 적용 여부
constraints: Dict[str, tuple] = field(default_factory=dict)
objective_weights: Dict[str, float] = field(default_factory=dict)
class DataConnector(ABC):
"""데이터 소스 추상 인터페이스"""
@abstractmethod
async def fetch_realtime_data(self) -> Dict[str, Any]:
pass
@abstractmethod
async def fetch_historical_data(self, start: datetime, end: datetime) -> List[Dict]:
pass
class OPCUAConnector(DataConnector):
"""OPC UA 기반 PLC/SCADA 연결"""
def __init__(self, endpoint: str, node_ids: Dict[str, str]):
self.endpoint = endpoint
self.node_ids = node_ids
self.client = None
async def connect(self):
from asyncua import Client
self.client = Client(self.endpoint)
await self.client.connect()
async def fetch_realtime_data(self) -> Dict[str, Any]:
data = {}
for name, node_id in self.node_ids.items():
node = self.client.get_node(node_id)
data[name] = await node.read_value()
return data
async def write_setpoint(self, node_id: str, value: Any):
"""설정값 쓰기"""
node = self.client.get_node(node_id)
await node.write_value(value)
class OptimizationPipeline:
"""End-to-End 최적화 파이프라인"""
def __init__(self, config: OptimizationConfig):
self.config = config
self.data_connectors: List[DataConnector] = []
self.preprocessors: List[Callable] = []
self.optimizer: 'ProcessOptimizer' = None
self.validators: List[Callable] = []
self.actuators: List[Callable] = []
self.history: List[Dict] = []
def add_data_source(self, connector: DataConnector):
self.data_connectors.append(connector)
def add_preprocessor(self, func: Callable):
self.preprocessors.append(func)
def set_optimizer(self, optimizer):
self.optimizer = optimizer
async def collect_data(self) -> Dict[str, Any]:
"""모든 소스에서 데이터 수집"""
all_data = {}
tasks = [conn.fetch_realtime_data() for conn in self.data_connectors]
results = await asyncio.gather(*tasks)
for result in results:
all_data.update(result)
return all_data
def preprocess(self, raw_data: Dict) -> np.ndarray:
"""데이터 전처리 파이프라인"""
data = raw_data
for preprocessor in self.preprocessors:
data = preprocessor(data)
return data
async def run_optimization_cycle(self) -> Dict[str, Any]:
"""단일 최적화 사이클 실행"""
# 1. 데이터 수집
raw_data = await self.collect_data()
# 2. 전처리
processed_data = self.preprocess(raw_data)
# 3. 최적화 실행
result = self.optimizer.optimize(
current_state=processed_data,
constraints=self.config.constraints,
weights=self.config.objective_weights
)
# 4. 검증
for validator in self.validators:
if not validator(result):
result['status'] = 'validation_failed'
return result
# 5. 기록
self.history.append({
'timestamp': datetime.now().isoformat(),
'input': raw_data,
'output': result
})
return result2MES/ERP 시스템 연동
최적화 시스템은 MES(Manufacturing Execution System), ERP(Enterprise Resource Planning) 시스템과 연동하여 생산 계획, 재고, 품질 데이터를 활용합니다. REST API와 데이터베이스 연동으로 실시간 정보를 교환합니다.
import aiohttp
from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
class MESConnector:
"""MES 시스템 연동"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {api_key}'}
async def get_production_orders(self) -> List[Dict]:
"""생산 오더 조회"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/api/v1/production-orders",
headers=self.headers
) as resp:
return await resp.json()
async def get_equipment_status(self, equipment_id: str) -> Dict:
"""설비 상태 조회"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/api/v1/equipment/{equipment_id}/status",
headers=self.headers
) as resp:
return await resp.json()
async def update_process_parameters(self, equipment_id: str,
params: Dict[str, float]) -> bool:
"""공정 파라미터 업데이트"""
async with aiohttp.ClientSession() as session:
async with session.put(
f"{self.base_url}/api/v1/equipment/{equipment_id}/parameters",
headers=self.headers,
json=params
) as resp:
return resp.status == 200
class ERPConnector:
"""ERP 시스템 연동 (SAP, Oracle 등)"""
def __init__(self, db_url: str):
self.engine = create_async_engine(db_url)
async def get_material_costs(self, material_ids: List[str]) -> Dict[str, float]:
"""자재 원가 조회"""
async with AsyncSession(self.engine) as session:
result = await session.execute(
text("""
SELECT material_id, unit_cost
FROM mm_material_master
WHERE material_id IN :ids
"""),
{'ids': tuple(material_ids)}
)
return {row[0]: row[1] for row in result}
async def get_energy_rates(self, timestamp: datetime) -> Dict[str, float]:
"""시간대별 에너지 요금 조회"""
async with AsyncSession(self.engine) as session:
result = await session.execute(
text("""
SELECT rate_type, rate_value
FROM utility_rates
WHERE valid_from <= :ts AND valid_to > :ts
"""),
{'ts': timestamp}
)
return {row[0]: row[1] for row in result}
class IntegratedOptimizationSystem:
"""MES/ERP 통합 최적화 시스템"""
def __init__(self, mes: MESConnector, erp: ERPConnector):
self.mes = mes
self.erp = erp
async def build_optimization_context(self, equipment_id: str) -> Dict:
"""최적화를 위한 통합 컨텍스트 구성"""
# 병렬 데이터 수집
orders_task = self.mes.get_production_orders()
status_task = self.mes.get_equipment_status(equipment_id)
energy_task = self.erp.get_energy_rates(datetime.now())
orders, status, energy_rates = await asyncio.gather(
orders_task, status_task, energy_task
)
# 자재 비용 조회
material_ids = [o['material_id'] for o in orders]
material_costs = await self.erp.get_material_costs(material_ids)
return {
'production_orders': orders,
'equipment_status': status,
'energy_rates': energy_rates,
'material_costs': material_costs,
'timestamp': datetime.now().isoformat()
}3실시간 최적화 서비스
최적화 시스템은 마이크로서비스로 배포되어 REST API를 통해 다른 시스템과 통합됩니다. FastAPI 기반의 비동기 서비스로 높은 처리량과 낮은 지연시간을 제공합니다.
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
import redis.asyncio as redis
app = FastAPI(title="Process Optimization Service")
class OptimizationRequest(BaseModel):
equipment_id: str
current_state: Dict[str, float]
constraints: Optional[Dict[str, tuple]] = None
objective_weights: Optional[Dict[str, float]] = None
class OptimizationResponse(BaseModel):
request_id: str
status: str
optimal_parameters: Dict[str, float]
predicted_improvement: Dict[str, float]
confidence: float
execution_time_ms: float
class OptimizationService:
"""실시간 최적화 서비스"""
def __init__(self):
self.redis = None
self.optimizer_cache: Dict[str, 'ProcessOptimizer'] = {}
self.running_tasks: Dict[str, asyncio.Task] = {}
async def startup(self):
self.redis = await redis.from_url("redis://localhost:6379")
async def get_cached_result(self, cache_key: str) -> Optional[Dict]:
"""캐시된 최적화 결과 조회"""
result = await self.redis.get(cache_key)
return json.loads(result) if result else None
async def cache_result(self, cache_key: str, result: Dict, ttl: int = 60):
"""최적화 결과 캐싱"""
await self.redis.setex(cache_key, ttl, json.dumps(result))
def get_optimizer(self, equipment_id: str) -> 'ProcessOptimizer':
"""설비별 최적화기 로드"""
if equipment_id not in self.optimizer_cache:
# 모델 로드 (실제로는 MLflow 등에서 로드)
self.optimizer_cache[equipment_id] = ProcessOptimizer.load(equipment_id)
return self.optimizer_cache[equipment_id]
async def optimize(self, request: OptimizationRequest) -> OptimizationResponse:
"""최적화 실행"""
import time
import uuid
start_time = time.time()
request_id = str(uuid.uuid4())
# 캐시 확인
cache_key = f"opt:{request.equipment_id}:{hash(str(request.current_state))}"
cached = await self.get_cached_result(cache_key)
if cached:
cached['request_id'] = request_id
cached['status'] = 'cached'
return OptimizationResponse(**cached)
# 최적화 실행
optimizer = self.get_optimizer(request.equipment_id)
result = optimizer.optimize(
current_state=np.array(list(request.current_state.values())),
constraints=request.constraints or {},
weights=request.objective_weights or {}
)
response_data = {
'request_id': request_id,
'status': 'success',
'optimal_parameters': result['parameters'],
'predicted_improvement': result['improvement'],
'confidence': result['confidence'],
'execution_time_ms': (time.time() - start_time) * 1000
}
# 캐싱
await self.cache_result(cache_key, response_data)
return OptimizationResponse(**response_data)
service = OptimizationService()
@app.on_event("startup")
async def startup_event():
await service.startup()
@app.post("/optimize", response_model=OptimizationResponse)
async def optimize(request: OptimizationRequest):
"""동기 최적화 API"""
return await service.optimize(request)
@app.post("/optimize/async")
async def optimize_async(request: OptimizationRequest,
background_tasks: BackgroundTasks):
"""비동기 최적화 API (긴 작업용)"""
import uuid
task_id = str(uuid.uuid4())
background_tasks.add_task(service.optimize, request)
return {"task_id": task_id, "status": "processing"}4모니터링 및 운영 관리
프로덕션 최적화 시스템은 지속적인 모니터링과 성능 관리가 필수입니다. Prometheus 메트릭, Grafana 대시보드, 알람 시스템으로 시스템 건강 상태를 추적합니다.
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi.responses import PlainTextResponse
import logging
# Prometheus 메트릭 정의
OPTIMIZATION_REQUESTS = Counter(
'optimization_requests_total',
'Total optimization requests',
['equipment_id', 'status']
)
OPTIMIZATION_LATENCY = Histogram(
'optimization_latency_seconds',
'Optimization request latency',
['equipment_id'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
IMPROVEMENT_GAUGE = Gauge(
'optimization_improvement_percent',
'Predicted improvement percentage',
['equipment_id', 'metric']
)
MODEL_CONFIDENCE = Gauge(
'model_confidence',
'Optimization model confidence',
['equipment_id']
)
class OptimizationMonitor:
"""최적화 시스템 모니터링"""
def __init__(self):
self.logger = logging.getLogger('optimization')
self.alert_thresholds = {
'min_confidence': 0.7,
'max_latency_ms': 5000,
'min_improvement': 0.01
}
def record_optimization(self, equipment_id: str, result: Dict,
latency_ms: float):
"""최적화 결과 기록"""
# Prometheus 메트릭 업데이트
OPTIMIZATION_REQUESTS.labels(
equipment_id=equipment_id,
status=result['status']
).inc()
OPTIMIZATION_LATENCY.labels(equipment_id=equipment_id).observe(latency_ms / 1000)
MODEL_CONFIDENCE.labels(equipment_id=equipment_id).set(result['confidence'])
for metric, value in result.get('predicted_improvement', {}).items():
IMPROVEMENT_GAUGE.labels(
equipment_id=equipment_id,
metric=metric
).set(value * 100)
# 알람 체크
self._check_alerts(equipment_id, result, latency_ms)
def _check_alerts(self, equipment_id: str, result: Dict, latency_ms: float):
"""알람 조건 확인"""
alerts = []
if result['confidence'] < self.alert_thresholds['min_confidence']:
alerts.append(f"Low confidence: {result['confidence']:.2f}")
if latency_ms > self.alert_thresholds['max_latency_ms']:
alerts.append(f"High latency: {latency_ms:.0f}ms")
if alerts:
self._send_alert(equipment_id, alerts)
async def _send_alert(self, equipment_id: str, alerts: List[str]):
"""알람 전송 (Slack, Email 등)"""
message = f"Optimization Alert [{equipment_id}]: " + ", ".join(alerts)
self.logger.warning(message)
# 실제 알람 전송 로직
@app.get("/metrics")
async def metrics():
"""Prometheus 메트릭 엔드포인트"""
return PlainTextResponse(generate_latest())
@app.get("/health")
async def health_check():
"""헬스 체크"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"cached_optimizers": len(service.optimizer_cache)
}운영 베스트 프랙티스: 최적화 모델은 주기적으로 재학습하고, A/B 테스트로 새 모델을 검증한 후 점진적으로 배포합니다. 모델 드리프트 감지로 성능 저하를 조기에 발견합니다.