1Model Predictive Control (MPC)
MPC는 공정 모델을 사용하여 미래 상태를 예측하고 최적 제어 입력을 계산하는 고급 제어 기법입니다. 매 제어 주기마다 롤링 호라이즌 최적화를 수행하여 다변수 시스템, 제약 조건, 비선형성을 효과적으로 처리합니다. 화학 공정, 반도체, 발전소 등 복잡한 공정에서 널리 사용됩니다.
계층적 실시간 제어 아키텍처
Level 1
상위 레벨: 경제성 최적화
주기: 1시간 | RTO (Real-Time Optimization)
최적 Setpoint
Level 2
중간 레벨: MPC / RL 제어
주기: 1분 | 다변수 최적 제어
MPC
(선형 모델)
x(k+1)=Ax+Bu
x(k+1)=Ax+Bu
DDPG
(RL 정책)
Actor-Critic
Actor-Critic
Adaptive MPC
(온라인 학습)
RLS 파라미터 추정
RLS 파라미터 추정
제어 출력
Level 3
하위 레벨: PID / 게인 스케줄링
주기: 초 단위 | 빠른 추종 제어
공정 (Plant)
밸브, 모터, 히터, 펌프 등 액추에이터
import numpy as np
from dataclasses import dataclass
from typing import List, Callable, Tuple
from scipy.optimize import minimize
@dataclass
class MPCConfig:
"""MPC 설정"""
horizon: int # 예측 호라이즌 (스텝 수)
control_horizon: int # 제어 호라이즌
dt: float # 샘플링 주기 (초)
state_weights: np.ndarray # 상태 가중치 Q
control_weights: np.ndarray # 제어 가중치 R
class LinearMPC:
"""선형 MPC 컨트롤러"""
def __init__(self, A: np.ndarray, B: np.ndarray, C: np.ndarray,
config: MPCConfig):
"""
선형 상태공간 모델: x(k+1) = Ax(k) + Bu(k), y = Cx
"""
self.A, self.B, self.C = A, B, C
self.config = config
self.n_states = A.shape[0]
self.n_inputs = B.shape[1]
def predict(self, x0: np.ndarray, u_seq: np.ndarray) -> np.ndarray:
"""미래 상태 예측"""
N = len(u_seq)
x_pred = np.zeros((N + 1, self.n_states))
x_pred[0] = x0
for k in range(N):
x_pred[k + 1] = self.A @ x_pred[k] + self.B @ u_seq[k]
return x_pred
def cost(self, u_flat: np.ndarray, x0: np.ndarray,
x_ref: np.ndarray) -> float:
"""목적 함수: 추적 오차 + 제어 입력"""
u_seq = u_flat.reshape(-1, self.n_inputs)
x_pred = self.predict(x0, u_seq)
Q = self.config.state_weights
R = self.config.control_weights
J = 0.0
for k in range(len(u_seq)):
state_error = x_pred[k] - x_ref
J += state_error @ Q @ state_error
J += u_seq[k] @ R @ u_seq[k]
return J
def solve(self, x0: np.ndarray, x_ref: np.ndarray,
u_bounds: Tuple[np.ndarray, np.ndarray] = None) -> np.ndarray:
"""최적 제어 시퀀스 계산"""
N = self.config.control_horizon
u0 = np.zeros(N * self.n_inputs)
bounds = None
if u_bounds:
lo, hi = u_bounds
bounds = [(lo[i % self.n_inputs], hi[i % self.n_inputs])
for i in range(N * self.n_inputs)]
result = minimize(self.cost, u0, args=(x0, x_ref),
method='SLSQP', bounds=bounds)
return result.x.reshape(-1, self.n_inputs)[0] # 첫 제어 입력2강화학습 기반 제어
강화학습(RL)은 명시적 모델 없이 시행착오를 통해 최적 제어 정책을 학습합니다. Deep Deterministic Policy Gradient(DDPG)나 Soft Actor-Critic(SAC) 같은 알고리즘은 연속 제어 문제에 적합하며, 복잡한 비선형 시스템에서도 효과적입니다.
import torch
import torch.nn as nn
class ActorNetwork(nn.Module):
"""DDPG Actor (정책 네트워크)"""
def __init__(self, state_dim: int, action_dim: int,
action_bounds: Tuple[float, float]):
super().__init__()
self.action_low, self.action_high = action_bounds
self.net = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim),
nn.Tanh()
)
def forward(self, state: torch.Tensor) -> torch.Tensor:
raw = self.net(state)
# Tanh 출력을 실제 action 범위로 스케일
scaled = (raw + 1) * (self.action_high - self.action_low) / 2 + self.action_low
return scaled
class CriticNetwork(nn.Module):
"""DDPG Critic (Q 네트워크)"""
def __init__(self, state_dim: int, action_dim: int):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1)
)
def forward(self, state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
x = torch.cat([state, action], dim=-1)
return self.net(x)
class DDPGController:
"""DDPG 기반 제어기"""
def __init__(self, state_dim: int, action_dim: int,
action_bounds: Tuple[float, float]):
self.actor = ActorNetwork(state_dim, action_dim, action_bounds)
self.critic = CriticNetwork(state_dim, action_dim)
self.actor_optim = torch.optim.Adam(self.actor.parameters(), lr=1e-4)
self.critic_optim = torch.optim.Adam(self.critic.parameters(), lr=1e-3)
def get_action(self, state: np.ndarray) -> np.ndarray:
"""현재 상태에서 action 선택"""
with torch.no_grad():
state_t = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state_t)
return action.numpy()[0]
def train_step(self, batch: dict, gamma: float = 0.99):
"""한 스텝 학습"""
states = torch.FloatTensor(batch['states'])
actions = torch.FloatTensor(batch['actions'])
rewards = torch.FloatTensor(batch['rewards'])
next_states = torch.FloatTensor(batch['next_states'])
# Critic 업데이트
next_actions = self.actor(next_states)
target_q = rewards + gamma * self.critic(next_states, next_actions)
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q.detach())
self.critic_optim.zero_grad()
critic_loss.backward()
self.critic_optim.step()
# Actor 업데이트
actor_loss = -self.critic(states, self.actor(states)).mean()
self.actor_optim.zero_grad()
actor_loss.backward()
self.actor_optim.step()3적응형 제어
공정 특성이 시간에 따라 변화하는 경우(촉매 열화, 장비 마모 등) 적응형 제어가 필요합니다. 온라인 모델 업데이트와 파라미터 추정을 통해 제어기가 변화에 적응합니다. Recursive Least Squares(RLS)나 칼만 필터 기반 추정이 일반적입니다.
class AdaptiveMPC:
"""온라인 학습 기반 적응형 MPC"""
def __init__(self, base_model: LinearMPC, forgetting_factor: float = 0.99):
self.mpc = base_model
self.ff = forgetting_factor
self.P = np.eye(base_model.n_states) * 1000 # 공분산
self.history = []
def update_model(self, x_prev: np.ndarray, u: np.ndarray, x_curr: np.ndarray):
"""RLS로 모델 파라미터 업데이트"""
# 예측 오차
x_pred = self.mpc.A @ x_prev + self.mpc.B @ u
error = x_curr - x_pred
# 회귀자 (간단화된 버전)
phi = np.concatenate([x_prev, u])
# RLS 업데이트
K = self.P @ phi / (self.ff + phi @ self.P @ phi)
self.P = (self.P - np.outer(K, phi @ self.P)) / self.ff
# A, B 행렬 업데이트 (간단화)
# 실제로는 벡터화된 형태로 업데이트
self.history.append({'error': np.linalg.norm(error)})
def control(self, x: np.ndarray, x_ref: np.ndarray,
u_prev: np.ndarray = None) -> np.ndarray:
"""적응형 제어 실행"""
if u_prev is not None and len(self.history) > 0:
# 이전 데이터로 모델 업데이트
x_prev = self.history[-1].get('state')
if x_prev is not None:
self.update_model(x_prev, u_prev, x)
# MPC 풀기
u = self.mpc.solve(x, x_ref)
self.history.append({'state': x, 'action': u})
return u
class GainSchedulingController:
"""게인 스케줄링 제어기"""
def __init__(self, controllers: List[Tuple[Callable, dict]]):
"""controllers: [(condition_fn, controller_params), ...]"""
self.controllers = controllers
def select_controller(self, operating_point: dict) -> dict:
"""운전점에 따라 제어기 선택"""
for condition_fn, params in self.controllers:
if condition_fn(operating_point):
return params
return self.controllers[-1][1] # 기본값
def compute_output(self, error: float, operating_point: dict) -> float:
"""PID 출력 계산"""
params = self.select_controller(operating_point)
Kp, Ki, Kd = params['Kp'], params['Ki'], params['Kd']
return Kp * error # 간단화된 P 제어4실시간 제어 시스템 통합
MPC, RL, 적응형 제어를 통합하여 계층적 제어 시스템을 구성합니다. 상위 레벨은 최적화 기반 설정점을 계산하고, 하위 레벨은 실시간 추종 제어를 수행합니다.
class HierarchicalControlSystem:
"""계층적 제어 시스템"""
def __init__(self, optimizer_level: object, # 상위: 최적화
mpc_level: LinearMPC, # 중간: MPC
regulatory_level: object): # 하위: PID 등
self.optimizer = optimizer_level
self.mpc = mpc_level
self.regulatory = regulatory_level
self.setpoints = {}
def run_optimization(self, period_minutes: int = 60) -> dict:
"""상위 레벨: 경제성 최적화 (1시간 주기)"""
optimal_setpoints = self.optimizer.optimize()
self.setpoints = optimal_setpoints
return optimal_setpoints
def run_mpc(self, current_state: np.ndarray) -> np.ndarray:
"""중간 레벨: MPC (1분 주기)"""
x_ref = np.array(list(self.setpoints.values()))
return self.mpc.solve(current_state, x_ref)
def run_regulatory(self, measurement: float, setpoint: float) -> float:
"""하위 레벨: PID (초 단위)"""
return self.regulatory.compute(setpoint - measurement)
# 통합 실행 예시
def realtime_control_example():
# MPC 설정
A = np.array([[0.9, 0.1], [0, 0.8]])
B = np.array([[0.1], [0.2]])
C = np.eye(2)
config = MPCConfig(
horizon=20,
control_horizon=5,
dt=1.0,
state_weights=np.diag([1, 1]),
control_weights=np.array([[0.1]])
)
mpc = LinearMPC(A, B, C, config)
# 시뮬레이션
x = np.array([0.0, 0.0])
x_ref = np.array([1.0, 0.5])
for t in range(50):
u = mpc.solve(x, x_ref, u_bounds=(np.array([-1]), np.array([1])))
x = A @ x + B @ u
print(f"t={t}: state={x}, control={u}")적용: 실시간 제어 AI로 공정 변동 50% 감소, 전환 시간 30% 단축이 가능합니다.