1다목적 최적화 문제 정의
제조 공정에서는 품질, 비용, 납기, 에너지 등 여러 목표를 동시에 최적화해야 합니다. 이러한 목표들은 종종 상충하여 하나를 개선하면 다른 것이 악화되는 트레이드오프 관계에 있습니다. 다목적 최적화(Multi-objective Optimization)는 이러한 상충 목표들의 최적 균형점을 찾습니다.
from dataclasses import dataclass, field
from typing import List, Callable, Tuple
import numpy as np
@dataclass
class OptimizationObjective:
"""최적화 목표 정의"""
name: str
func: Callable[[np.ndarray], float] # 목적 함수
direction: str = 'minimize' # 'minimize' or 'maximize'
weight: float = 1.0
target: float = None # 목표 제약조건 (Goal Programming)
@dataclass
class MultiObjectiveProblem:
"""다목적 최적화 문제"""
objectives: List[OptimizationObjective]
bounds: List[Tuple[float, float]] # 변수 범위
constraints: List[Callable] = field(default_factory=list)
def evaluate(self, x: np.ndarray) -> np.ndarray:
"""모든 목적 함수 평가"""
values = []
for obj in self.objectives:
val = obj.func(x)
if obj.direction == 'maximize':
val = -val # 최소화 문제로 변환
values.append(val)
return np.array(values)
def is_feasible(self, x: np.ndarray) -> bool:
"""제약조건 만족 여부"""
for constraint in self.constraints:
if not constraint(x):
return False
return True
# 제조 최적화 문제 예시: 품질 vs 비용 vs 사이클타임
def create_manufacturing_problem():
# 목적 함수 정의 (변수: [온도, 압력, 속도])
def quality_score(x):
# 품질은 최적 온도/압력에서 최대
temp_opt, press_opt = 180, 50
return -(100 - (x[0]-temp_opt)**2/100 - (x[1]-press_opt)**2/10)
def production_cost(x):
# 비용 = 에너지(온도, 압력) + 시간 비용
return 0.5 * x[0] + 2.0 * x[1] + 10 / x[2]
def cycle_time(x):
# 사이클타임은 속도에 반비례, 온도에 약간 의존
return 100 / x[2] + 0.1 * (200 - x[0])
objectives = [
OptimizationObjective('Quality', quality_score, 'minimize'), # 음수이므로 minimize
OptimizationObjective('Cost', production_cost, 'minimize'),
OptimizationObjective('CycleTime', cycle_time, 'minimize')
]
bounds = [(150, 220), (30, 80), (5, 20)] # 온도, 압력, 속도
return MultiObjectiveProblem(objectives, bounds)Pareto 최적: 어떤 목표도 악화시키지 않고는 다른 목표를 개선할 수 없는 해를 Pareto 최적해라 하며, 이러한 해들의 집합이 Pareto Frontier입니다.
2NSGA-II 알고리즘 구현
NSGA-II(Non-dominated Sorting Genetic Algorithm II)는 가장 널리 사용되는 다목적 진화 알고리즘입니다. 비지배 정렬과 혼잡 거리(Crowding Distance)를 사용하여 다양하고 균일한 Pareto Frontier를 찾습니다.
from typing import Dict
import random
class Individual:
"""NSGA-II 개체"""
def __init__(self, genes: np.ndarray):
self.genes = genes
self.objectives: np.ndarray = None
self.rank: int = 0
self.crowding_distance: float = 0
class NSGAII:
"""NSGA-II 다목적 최적화"""
def __init__(self, problem: MultiObjectiveProblem,
pop_size: int = 100,
generations: int = 200,
crossover_prob: float = 0.9,
mutation_prob: float = 0.1):
self.problem = problem
self.pop_size = pop_size
self.generations = generations
self.crossover_prob = crossover_prob
self.mutation_prob = mutation_prob
self.n_vars = len(problem.bounds)
def initialize_population(self) -> List[Individual]:
"""초기 집단 생성"""
population = []
for _ in range(self.pop_size):
genes = np.array([
np.random.uniform(lo, hi)
for lo, hi in self.problem.bounds
])
population.append(Individual(genes))
return population
def evaluate_population(self, population: List[Individual]):
"""집단 평가"""
for ind in population:
ind.objectives = self.problem.evaluate(ind.genes)
def dominates(self, obj1: np.ndarray, obj2: np.ndarray) -> bool:
"""obj1이 obj2를 지배하는지 확인 (모든 목적에서 같거나 좋고, 하나라도 더 좋음)"""
return np.all(obj1 <= obj2) and np.any(obj1 < obj2)
def fast_non_dominated_sort(self, population: List[Individual]) -> List[List[Individual]]:
"""비지배 정렬 - O(MN^2)"""
fronts = [[]]
domination_count = {}
dominated_solutions = {i: [] for i in range(len(population))}
for i, p in enumerate(population):
domination_count[i] = 0
for j, q in enumerate(population):
if i == j:
continue
if self.dominates(p.objectives, q.objectives):
dominated_solutions[i].append(j)
elif self.dominates(q.objectives, p.objectives):
domination_count[i] += 1
if domination_count[i] == 0:
p.rank = 0
fronts[0].append(p)
# 나머지 프론트 생성
front_idx = 0
while fronts[front_idx]:
next_front = []
for i, p in enumerate(population):
if p.rank == front_idx:
for j in dominated_solutions[i]:
domination_count[j] -= 1
if domination_count[j] == 0:
population[j].rank = front_idx + 1
next_front.append(population[j])
front_idx += 1
fronts.append(next_front)
return fronts[:-1] # 마지막 빈 프론트 제거
def calculate_crowding_distance(self, front: List[Individual]):
"""혼잡 거리 계산"""
if len(front) == 0:
return
n_obj = len(front[0].objectives)
for ind in front:
ind.crowding_distance = 0
for m in range(n_obj):
# 목적 m에 대해 정렬
front.sort(key=lambda x: x.objectives[m])
# 경계 해는 무한대
front[0].crowding_distance = float('inf')
front[-1].crowding_distance = float('inf')
# 중간 해들의 거리 계산
obj_range = front[-1].objectives[m] - front[0].objectives[m]
if obj_range == 0:
continue
for i in range(1, len(front) - 1):
front[i].crowding_distance += (
front[i + 1].objectives[m] - front[i - 1].objectives[m]
) / obj_range3진화 연산자 및 최적화 루프
NSGA-II의 핵심은 교차(Crossover)와 돌연변이(Mutation) 연산자입니다. SBX(Simulated Binary Crossover)와 다항식 돌연변이가 연속 변수에 효과적입니다.
class NSGAII(NSGAII):
"""NSGA-II 진화 연산자 추가"""
def sbx_crossover(self, parent1: Individual, parent2: Individual,
eta: float = 20) -> Tuple[Individual, Individual]:
"""Simulated Binary Crossover"""
child1_genes = np.copy(parent1.genes)
child2_genes = np.copy(parent2.genes)
if random.random() < self.crossover_prob:
for i in range(self.n_vars):
if random.random() < 0.5:
continue
if abs(parent1.genes[i] - parent2.genes[i]) < 1e-14:
continue
y1 = min(parent1.genes[i], parent2.genes[i])
y2 = max(parent1.genes[i], parent2.genes[i])
lo, hi = self.problem.bounds[i]
# SBX 수식
rand = random.random()
beta = 1.0 + (2.0 * (y1 - lo) / (y2 - y1))
alpha = 2.0 - beta ** -(eta + 1)
if rand <= 1.0 / alpha:
betaq = (rand * alpha) ** (1.0 / (eta + 1))
else:
betaq = (1.0 / (2.0 - rand * alpha)) ** (1.0 / (eta + 1))
c1 = 0.5 * ((y1 + y2) - betaq * (y2 - y1))
c2 = 0.5 * ((y1 + y2) + betaq * (y2 - y1))
child1_genes[i] = np.clip(c1, lo, hi)
child2_genes[i] = np.clip(c2, lo, hi)
return Individual(child1_genes), Individual(child2_genes)
def polynomial_mutation(self, individual: Individual, eta: float = 20):
"""다항식 돌연변이"""
for i in range(self.n_vars):
if random.random() < self.mutation_prob:
lo, hi = self.problem.bounds[i]
delta = (individual.genes[i] - lo) / (hi - lo)
rand = random.random()
if rand < 0.5:
delta_q = (2 * rand) ** (1 / (eta + 1)) - 1
else:
delta_q = 1 - (2 * (1 - rand)) ** (1 / (eta + 1))
individual.genes[i] += delta_q * (hi - lo)
individual.genes[i] = np.clip(individual.genes[i], lo, hi)
def tournament_selection(self, population: List[Individual]) -> Individual:
"""토너먼트 선택 (rank → crowding distance)"""
candidates = random.sample(population, 2)
if candidates[0].rank < candidates[1].rank:
return candidates[0]
elif candidates[0].rank > candidates[1].rank:
return candidates[1]
else:
return candidates[0] if candidates[0].crowding_distance > candidates[1].crowding_distance else candidates[1]
def optimize(self) -> List[Individual]:
"""NSGA-II 메인 루프"""
# 초기화
population = self.initialize_population()
self.evaluate_population(population)
for gen in range(self.generations):
# 자식 생성
offspring = []
while len(offspring) < self.pop_size:
p1 = self.tournament_selection(population)
p2 = self.tournament_selection(population)
c1, c2 = self.sbx_crossover(p1, p2)
self.polynomial_mutation(c1)
self.polynomial_mutation(c2)
offspring.extend([c1, c2])
self.evaluate_population(offspring)
# 부모 + 자식 결합
combined = population + offspring
# 비지배 정렬
fronts = self.fast_non_dominated_sort(combined)
# 새 집단 선택
new_population = []
for front in fronts:
self.calculate_crowding_distance(front)
if len(new_population) + len(front) <= self.pop_size:
new_population.extend(front)
else:
# 혼잡 거리로 정렬하여 선택
front.sort(key=lambda x: x.crowding_distance, reverse=True)
new_population.extend(front[:self.pop_size - len(new_population)])
break
population = new_population
if gen % 20 == 0:
print(f"Generation {gen}: Pareto front size = {len(fronts[0])}")
return fronts[0] # Pareto front 반환4트레이드오프 분석 및 의사결정 지원
Pareto Frontier가 도출되면 의사결정자가 최종 해를 선택해야 합니다. 가중합(Weighted Sum), TOPSIS, 이상점(Ideal Point) 기반 등 다양한 방법으로 최적 절충안을 추천할 수 있습니다.
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
class TradeoffAnalyzer:
"""트레이드오프 분석 및 의사결정 지원"""
def __init__(self, pareto_front: List[Individual],
objective_names: List[str]):
self.pareto_front = pareto_front
self.objective_names = objective_names
self.objectives_matrix = np.array([ind.objectives for ind in pareto_front])
def weighted_sum_selection(self, weights: List[float]) -> Individual:
"""가중합 기반 선택"""
weights = np.array(weights) / np.sum(weights)
# 정규화
normalized = (self.objectives_matrix - self.objectives_matrix.min(axis=0)) / \
(self.objectives_matrix.max(axis=0) - self.objectives_matrix.min(axis=0) + 1e-10)
scores = normalized @ weights
best_idx = np.argmin(scores)
return self.pareto_front[best_idx]
def topsis_selection(self, weights: List[float]) -> Individual:
"""TOPSIS (Technique for Order Preference by Similarity to Ideal Solution)"""
weights = np.array(weights) / np.sum(weights)
# 정규화
norm = self.objectives_matrix / np.sqrt((self.objectives_matrix**2).sum(axis=0))
weighted = norm * weights
# 이상해(최소값)와 비이상해(최대값)
ideal = weighted.min(axis=0)
anti_ideal = weighted.max(axis=0)
# 각 해의 거리 계산
d_ideal = np.sqrt(((weighted - ideal)**2).sum(axis=1))
d_anti = np.sqrt(((weighted - anti_ideal)**2).sum(axis=1))
# 상대적 근접도
closeness = d_anti / (d_ideal + d_anti + 1e-10)
best_idx = np.argmax(closeness)
return self.pareto_front[best_idx]
def knee_point_selection(self) -> Individual:
"""Knee Point 탐지 - 곡률이 가장 큰 지점"""
if len(self.pareto_front) < 3:
return self.pareto_front[0]
# 2D 경우 (첫 두 목적 사용)
points = self.objectives_matrix[:, :2]
# 정렬
sorted_idx = np.argsort(points[:, 0])
points = points[sorted_idx]
# 곡률 계산 (삼각형 면적 / 변 길이의 곱)
max_curvature = 0
knee_idx = 0
for i in range(1, len(points) - 1):
v1 = points[i] - points[i-1]
v2 = points[i+1] - points[i]
# 벡터 외적으로 면적, 내적으로 각도
cross = abs(v1[0] * v2[1] - v1[1] * v2[0])
norm_product = (np.linalg.norm(v1) * np.linalg.norm(v2)) + 1e-10
curvature = cross / norm_product
if curvature > max_curvature:
max_curvature = curvature
knee_idx = i
return self.pareto_front[sorted_idx[knee_idx]]
def visualize_pareto_2d(self, selected: Individual = None):
"""2D Pareto Frontier 시각화"""
fig, ax = plt.subplots(figsize=(10, 8))
obj1 = self.objectives_matrix[:, 0]
obj2 = self.objectives_matrix[:, 1]
ax.scatter(obj1, obj2, c='blue', s=50, alpha=0.7, label='Pareto Solutions')
if selected:
ax.scatter(selected.objectives[0], selected.objectives[1],
c='red', s=200, marker='*', label='Selected')
ax.set_xlabel(self.objective_names[0])
ax.set_ylabel(self.objective_names[1])
ax.set_title('Pareto Frontier - Trade-off Analysis')
ax.legend()
ax.grid(True, alpha=0.3)
return fig
# 사용 예시
def run_manufacturing_optimization():
problem = create_manufacturing_problem()
optimizer = NSGAII(problem, pop_size=100, generations=150)
pareto_front = optimizer.optimize()
analyzer = TradeoffAnalyzer(pareto_front, ['Quality', 'Cost', 'CycleTime'])
# 품질 중시 선택
quality_focused = analyzer.weighted_sum_selection([0.6, 0.2, 0.2])
print(f"품질 중시: 온도={quality_focused.genes[0]:.1f}, 압력={quality_focused.genes[1]:.1f}")
# 균형 선택 (Knee Point)
balanced = analyzer.knee_point_selection()
print(f"균형 선택: 온도={balanced.genes[0]:.1f}, 압력={balanced.genes[1]:.1f}")
return pareto_front, analyzer실전 적용: 제조 현장에서는 Pareto Frontier 전체를 시각화하여 경영진이 전략적 판단을 내리고, 운영팀은 실시간으로 상황에 맞는 운전점을 선택합니다.