1베이지안 최적화 개요

베이지안 최적화(Bayesian Optimization)는 비용이 높은 블랙박스 함수의 전역 최적점을 효율적으로 찾는 방법입니다. 제조 공정에서 파라미터 변경 실험은 비용과 시간이 많이 들기 때문에, 적은 실험 횟수로 최적점을 찾는 것이 중요합니다.

베이지안 최적화 루프
초기 샘플링 (LHS)
파라미터 공간 균등 커버
서로게이트 모델 (Gaussian Process)
μ(x) : 예측 평균
σ²(x) : 예측 분산 (불확실성)
획득 함수 (Acquisition Function)
EI
Expected
Improvement
UCB
Upper
Confidence Bound
PI
Probability of
Improvement
다음 평가점 선택
실험 수행
↻ 반복 (수렴까지)
베이지안 최적화의 핵심 구성요소

1) 서로게이트 모델 (Surrogate Model): 목적 함수를 근사하는 확률 모델 (주로 Gaussian Process)
2) 획득 함수 (Acquisition Function): 다음에 평가할 점을 선택하는 기준 (EI, UCB, PI 등)

Python
"""
베이지안 최적화 기반 공정 파라미터 튜닝
"""
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize
from typing import Tuple, List, Callable, Optional

class GaussianProcess:
    """
    Gaussian Process Regression
    공정 응답을 모델링하는 서로게이트 모델
    """
    def __init__(
        self,
        length_scale: float = 1.0,
        noise: float = 1e-6,
        kernel: str = 'rbf'
    ):
        self.length_scale = length_scale
        self.noise = noise
        self.kernel_type = kernel
        self.X_train = None
        self.y_train = None
        self.K_inv = None

    def kernel(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
        """RBF (Squared Exponential) 커널"""
        if self.kernel_type == 'rbf':
            # Pairwise squared distance
            dist_sq = np.sum(X1**2, axis=1, keepdims=True) + \
                      np.sum(X2**2, axis=1) - 2 * X1 @ X2.T
            return np.exp(-0.5 * dist_sq / self.length_scale**2)
        elif self.kernel_type == 'matern52':
            # Matern 5/2 kernel
            dist = np.sqrt(np.sum((X1[:, None] - X2[None, :])**2, axis=2))
            sqrt5 = np.sqrt(5.0)
            return (1 + sqrt5 * dist / self.length_scale +
                    5 * dist**2 / (3 * self.length_scale**2)) * \
                   np.exp(-sqrt5 * dist / self.length_scale)

    def fit(self, X: np.ndarray, y: np.ndarray):
        """학습 데이터로 GP 피팅"""
        self.X_train = X.copy()
        self.y_train = y.copy()

        # 커널 행렬 계산 (with noise)
        K = self.kernel(X, X) + self.noise * np.eye(len(X))
        self.K_inv = np.linalg.inv(K)

    def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """예측 평균과 분산 반환"""
        K_s = self.kernel(X, self.X_train)
        K_ss = self.kernel(X, X)

        # 예측 평균
        mu = K_s @ self.K_inv @ self.y_train

        # 예측 분산
        var = np.diag(K_ss - K_s @ self.K_inv @ K_s.T)
        var = np.maximum(var, 1e-10)  # 수치 안정성

        return mu, var


class BayesianOptimizer:
    """
    베이지안 최적화 엔진
    """
    def __init__(
        self,
        bounds: np.ndarray,
        acquisition: str = 'ei',
        xi: float = 0.01,
        kappa: float = 2.0
    ):
        """
        Args:
            bounds: (n_dims, 2) 형태의 파라미터 경계
            acquisition: 획득 함수 종류 ('ei', 'ucb', 'pi')
            xi: EI/PI의 exploration 파라미터
            kappa: UCB의 exploration 파라미터
        """
        self.bounds = bounds
        self.n_dims = bounds.shape[0]
        self.acquisition = acquisition
        self.xi = xi
        self.kappa = kappa

        self.gp = GaussianProcess(length_scale=1.0, noise=1e-6)
        self.X_observed = []
        self.y_observed = []
        self.best_y = float('inf')
        self.best_x = None

    def _acquisition_function(self, X: np.ndarray) -> np.ndarray:
        """획득 함수 계산"""
        if len(self.X_observed) < 2:
            return np.random.random(len(X))

        mu, var = self.gp.predict(X)
        std = np.sqrt(var)

        if self.acquisition == 'ei':
            # Expected Improvement
            improvement = self.best_y - mu - self.xi
            Z = improvement / (std + 1e-9)
            ei = improvement * norm.cdf(Z) + std * norm.pdf(Z)
            return ei

        elif self.acquisition == 'ucb':
            # Upper Confidence Bound (for minimization: lower is better)
            return -(mu - self.kappa * std)

        elif self.acquisition == 'pi':
            # Probability of Improvement
            Z = (self.best_y - mu - self.xi) / (std + 1e-9)
            return norm.cdf(Z)

    def suggest_next(self, n_restarts: int = 10) -> np.ndarray:
        """다음 평가할 점 제안"""
        if len(self.X_observed) < 2:
            # 초기에는 랜덤 샘플링
            return np.random.uniform(
                self.bounds[:, 0], self.bounds[:, 1]
            )

        # 획득 함수 최대화 (다중 시작점)
        best_x = None
        best_acq = float('-inf')

        for _ in range(n_restarts):
            x0 = np.random.uniform(self.bounds[:, 0], self.bounds[:, 1])

            def neg_acq(x):
                return -self._acquisition_function(x.reshape(1, -1))[0]

            result = minimize(
                neg_acq,
                x0,
                bounds=[(b[0], b[1]) for b in self.bounds],
                method='L-BFGS-B'
            )

            if -result.fun > best_acq:
                best_acq = -result.fun
                best_x = result.x

        return best_x

    def update(self, x: np.ndarray, y: float):
        """관측 결과로 모델 업데이트"""
        self.X_observed.append(x)
        self.y_observed.append(y)

        if y < self.best_y:
            self.best_y = y
            self.best_x = x.copy()

        # GP 재학습
        X = np.array(self.X_observed)
        Y = np.array(self.y_observed)
        self.gp.fit(X, Y)

    def optimize(
        self,
        objective: Callable,
        n_iterations: int = 50,
        n_initial: int = 5
    ) -> Tuple[np.ndarray, float]:
        """베이지안 최적화 실행"""
        # 초기 랜덤 샘플링
        for _ in range(n_initial):
            x = np.random.uniform(self.bounds[:, 0], self.bounds[:, 1])
            y = objective(x)
            self.update(x, y)

        # 베이지안 최적화 루프
        for i in range(n_iterations - n_initial):
            x_next = self.suggest_next()
            y_next = objective(x_next)
            self.update(x_next, y_next)

            print(f"Iter {i+n_initial}: x={x_next}, y={y_next:.6f}, best={self.best_y:.6f}")

        return self.best_x, self.best_y

2제조 공정 파라미터 튜닝

실제 제조 환경에서 베이지안 최적화를 적용하는 예시입니다. 사출 성형 공정의 온도, 압력, 사이클 타임을 최적화합니다.

Python
"""
사출 성형 공정 파라미터 최적화 예시
"""

class InjectionMoldingSimulator:
    """
    사출 성형 공정 시뮬레이터
    실제로는 물리 시뮬레이션 또는 실험 결과
    """
    def __init__(self, noise_std: float = 0.1):
        self.noise_std = noise_std
        self.call_count = 0

        # 최적 파라미터 (숨겨진 ground truth)
        self.optimal_temp = 225.0
        self.optimal_pressure = 85.0
        self.optimal_cycle = 18.0

    def evaluate(self, params: np.ndarray) -> float:
        """
        파라미터로부터 불량률 계산
        params: [온도(°C), 압력(MPa), 사이클타임(s)]
        """
        self.call_count += 1
        temp, pressure, cycle = params

        # 불량률 함수 (2차 함수로 모델링)
        defect_rate = (
            0.01 * (temp - self.optimal_temp)**2 +
            0.008 * (pressure - self.optimal_pressure)**2 +
            0.015 * (cycle - self.optimal_cycle)**2 +
            np.random.normal(0, self.noise_std)
        )
        return max(0, defect_rate)


# 최적화 실행
def run_injection_optimization():
    # 파라미터 경계: [온도, 압력, 사이클타임]
    bounds = np.array([
        [190, 260],   # 온도: 190-260°C
        [50, 120],    # 압력: 50-120 MPa
        [10, 30]      # 사이클타임: 10-30s
    ])

    simulator = InjectionMoldingSimulator(noise_std=0.05)
    optimizer = BayesianOptimizer(bounds, acquisition='ei')

    best_x, best_y = optimizer.optimize(
        objective=simulator.evaluate,
        n_iterations=30,
        n_initial=5
    )

    print(f"\n최적화 결과:")
    print(f"  최적 온도: {best_x[0]:.1f}°C")
    print(f"  최적 압력: {best_x[1]:.1f} MPa")
    print(f"  최적 사이클타임: {best_x[2]:.1f}s")
    print(f"  최소 불량률: {best_y:.4f}")
    print(f"  총 실험 횟수: {simulator.call_count}")

    return optimizer

# optimizer = run_injection_optimization()
실무 팁: 초기 샘플링 전략

Latin Hypercube Sampling(LHS)을 사용하면 초기 샘플이 파라미터 공간을 더 균등하게 커버합니다. scipy.stats.qmc.LatinHypercube를 활용하세요.

3다변량 공정 최적화

여러 품질 특성을 동시에 고려하는 다변량 최적화 예시입니다.

Python
"""
다변량 공정 최적화: 여러 품질 특성 동시 최적화
"""
from dataclasses import dataclass
from typing import Dict

@dataclass
class QualityMetrics:
    """품질 메트릭"""
    defect_rate: float
    surface_roughness: float
    dimensional_accuracy: float
    cycle_time: float

class MultivariateProcessOptimizer:
    """
    다변량 공정 최적화
    여러 품질 특성의 가중 합을 최소화
    """
    def __init__(
        self,
        bounds: np.ndarray,
        weights: Dict[str, float]
    ):
        self.bounds = bounds
        self.weights = weights
        self.optimizer = BayesianOptimizer(bounds, acquisition='ei')

    def weighted_objective(
        self,
        params: np.ndarray,
        process_model: Callable
    ) -> float:
        """가중 목적 함수"""
        metrics = process_model(params)

        total = (
            self.weights.get('defect_rate', 0) * metrics.defect_rate +
            self.weights.get('surface_roughness', 0) * metrics.surface_roughness +
            self.weights.get('dimensional_accuracy', 0) * metrics.dimensional_accuracy +
            self.weights.get('cycle_time', 0) * metrics.cycle_time
        )
        return total


def multi_quality_process_model(params: np.ndarray) -> QualityMetrics:
    """
    복수 품질 특성 모델
    실제로는 ML 모델 또는 물리 시뮬레이션
    """
    temp, pressure, speed, cooling = params

    # 각 품질 특성 계산
    defect = 0.01 * (temp - 220)**2 + 0.005 * (pressure - 80)**2
    roughness = 0.1 * abs(speed - 100) + 0.05 * abs(cooling - 15)
    accuracy = 0.02 * abs(temp - 215) + 0.01 * abs(pressure - 85)
    time = 0.5 * (30 - speed) + cooling

    return QualityMetrics(
        defect_rate=max(0, defect + np.random.normal(0, 0.01)),
        surface_roughness=max(0, roughness + np.random.normal(0, 0.02)),
        dimensional_accuracy=max(0, accuracy + np.random.normal(0, 0.01)),
        cycle_time=max(0, time + np.random.normal(0, 0.5))
    )


# 다변량 최적화 실행
bounds = np.array([
    [180, 260],   # 온도
    [50, 120],    # 압력
    [50, 150],    # 속도
    [5, 25]       # 냉각시간
])

weights = {
    'defect_rate': 10.0,      # 불량률 가중치 높음
    'surface_roughness': 2.0,
    'dimensional_accuracy': 5.0,
    'cycle_time': 1.0
}

multivar_optimizer = MultivariateProcessOptimizer(bounds, weights)
print("다변량 최적화기 초기화 완료")

4실시간 적응형 최적화

공정 조건이 시간에 따라 변하는 환경에서의 적응형 최적화 방법입니다.

Python
"""
적응형 베이지안 최적화
공정 드리프트에 대응하는 온라인 최적화
"""

class AdaptiveBayesianOptimizer:
    """
    적응형 베이지안 최적화
    - 시간 가중 GP로 최근 관측에 더 큰 가중치
    - 주기적 재최적화
    """
    def __init__(
        self,
        bounds: np.ndarray,
        decay_rate: float = 0.95,
        reoptimize_interval: int = 10
    ):
        self.bounds = bounds
        self.decay_rate = decay_rate
        self.reoptimize_interval = reoptimize_interval

        self.X_history = []
        self.y_history = []
        self.t_history = []
        self.current_time = 0

        self.current_optimum = None

    def time_weighted_kernel(
        self,
        X1: np.ndarray,
        X2: np.ndarray,
        t1: np.ndarray,
        t2: np.ndarray
    ) -> np.ndarray:
        """시간 가중 RBF 커널"""
        # 공간 거리
        spatial_dist = np.sum(X1**2, axis=1, keepdims=True) + \
                       np.sum(X2**2, axis=1) - 2 * X1 @ X2.T

        # 시간 가중치
        time_diff = np.abs(t1[:, None] - t2[None, :])
        time_weight = self.decay_rate ** time_diff

        return np.exp(-0.5 * spatial_dist) * time_weight

    def update(self, x: np.ndarray, y: float):
        """새 관측으로 업데이트"""
        self.X_history.append(x)
        self.y_history.append(y)
        self.t_history.append(self.current_time)
        self.current_time += 1

        # 주기적 재최적화
        if self.current_time % self.reoptimize_interval == 0:
            self._reoptimize()

    def _reoptimize(self):
        """현재 상태에서 최적점 재탐색"""
        # 최근 데이터만 사용 (시간 가중)
        weights = np.array([
            self.decay_rate ** (self.current_time - t)
            for t in self.t_history
        ])

        # 가중 평균으로 현재 최적점 추정
        X = np.array(self.X_history)
        y = np.array(self.y_history)

        # 가중 최소값 찾기
        weighted_scores = y * (1 / weights)  # 최근일수록 가중치 높음
        best_idx = np.argmin(y[-20:]) if len(y) > 20 else np.argmin(y)
        self.current_optimum = X[best_idx]

        print(f"[재최적화] t={self.current_time}, 현재 최적: {self.current_optimum}")

    def get_current_optimum(self) -> np.ndarray:
        """현재 추정 최적점 반환"""
        if self.current_optimum is None:
            return np.mean(self.bounds, axis=1)
        return self.current_optimum


# 드리프트가 있는 공정 시뮬레이션
class DriftingProcess:
    """시간에 따라 최적점이 변하는 공정"""
    def __init__(self, drift_rate: float = 0.1):
        self.drift_rate = drift_rate
        self.time = 0

    def evaluate(self, params: np.ndarray) -> float:
        self.time += 1

        # 시간에 따른 최적점 이동
        optimal_temp = 220 + self.drift_rate * self.time
        optimal_pressure = 80 + 0.5 * np.sin(self.time / 10)

        defect = (
            0.01 * (params[0] - optimal_temp)**2 +
            0.005 * (params[1] - optimal_pressure)**2
        )
        return defect + np.random.normal(0, 0.01)


print("적응형 최적화 모듈 로드 완료")