1스케줄링 문제 정의
생산 스케줄링은 제한된 자원(기계, 작업자, 시간)을 활용하여 작업들을 최적의 순서로 배치하는 조합 최적화 문제입니다. Job Shop Scheduling Problem(JSSP)은 NP-hard 문제로 알려져 있으며, AI 기반 접근법이 효과적입니다.
Job Shop 스케줄링 문제 구조
작업 (Job)
Job 1
O1 → O2 → O3
Job 2
O1 → O2 → O3
→
기계 (Machine)
Machine 1
Machine 2
Machine 3
→
간트 차트
M1
M2
M3
AI 스케줄링 알고리즘 비교
유전 알고리즘
- 염색체 인코딩
- 교차 & 돌연변이
- Population 탐색
강화학습
- 상태-행동 학습
- MDP 모델링
- Online 적응
제약 프로그래밍
- 수학적 정형화
- CP/MIP 솔버
- 최적해 보장
스케줄링 문제의 분류
- Flow Shop: 모든 작업이 동일한 기계 순서
- Job Shop: 작업마다 다른 기계 순서
- Flexible Job Shop: 작업이 여러 대안 기계에서 처리 가능
- Open Shop: 작업 순서에 제약 없음
"""
생산 스케줄링 문제 정의
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional
from enum import Enum
@dataclass
class Operation:
"""단위 작업"""
job_id: int
op_idx: int
machine_id: int
processing_time: int
def __repr__(self):
return f"J{self.job_id}-O{self.op_idx}(M{self.machine_id},{self.processing_time})"
@dataclass
class Job:
"""작업(여러 공정의 시퀀스)"""
job_id: int
operations: List[Operation]
due_date: Optional[int] = None
priority: int = 1
@dataclass
class Machine:
"""기계"""
machine_id: int
name: str
efficiency: float = 1.0
@dataclass
class Schedule:
"""스케줄 결과"""
assignments: Dict[int, List[Tuple[Operation, int, int]]] # machine_id -> [(op, start, end)]
makespan: int = 0
total_tardiness: int = 0
def add_assignment(self, machine_id: int, op: Operation, start: int):
if machine_id not in self.assignments:
self.assignments[machine_id] = []
end = start + op.processing_time
self.assignments[machine_id].append((op, start, end))
self.makespan = max(self.makespan, end)
class JobShopProblem:
"""Job Shop Scheduling Problem"""
def __init__(
self,
jobs: List[Job],
machines: List[Machine]
):
self.jobs = jobs
self.machines = machines
self.n_jobs = len(jobs)
self.n_machines = len(machines)
def calculate_makespan(self, schedule: Schedule) -> int:
"""Makespan (총 완료 시간) 계산"""
return schedule.makespan
def calculate_tardiness(self, schedule: Schedule) -> int:
"""납기 지연 총합 계산"""
total_tardiness = 0
for job in self.jobs:
if job.due_date:
# 작업 완료 시간 찾기
last_op = job.operations[-1]
for machine_id, assignments in schedule.assignments.items():
for op, start, end in assignments:
if op.job_id == job.job_id and op.op_idx == len(job.operations) - 1:
tardiness = max(0, end - job.due_date)
total_tardiness += tardiness
return total_tardiness
# 예시 문제 생성
def create_sample_problem(n_jobs: int = 5, n_machines: int = 3) -> JobShopProblem:
"""샘플 JSSP 생성"""
np.random.seed(42)
machines = [Machine(i, f"Machine-{i}") for i in range(n_machines)]
jobs = []
for j in range(n_jobs):
# 각 작업은 모든 기계를 한 번씩 방문 (랜덤 순서)
machine_order = np.random.permutation(n_machines)
operations = [
Operation(
job_id=j,
op_idx=i,
machine_id=int(machine_order[i]),
processing_time=np.random.randint(5, 20)
)
for i in range(n_machines)
]
due_date = sum(op.processing_time for op in operations) + np.random.randint(10, 50)
jobs.append(Job(j, operations, due_date))
return JobShopProblem(jobs, machines)
problem = create_sample_problem(5, 3)
print(f"Job Shop Problem: {problem.n_jobs} jobs, {problem.n_machines} machines")
2유전 알고리즘 스케줄러
유전 알고리즘(Genetic Algorithm)은 스케줄링 문제에 널리 사용되는 메타휴리스틱입니다. 염색체는 작업 순서를 인코딩하고, 교차와 돌연변이로 탐색합니다.
Python"""
유전 알고리즘 기반 Job Shop 스케줄러
"""
import random
from typing import List, Tuple
class GeneticScheduler:
"""유전 알고리즘 스케줄러"""
def __init__(
self,
problem: JobShopProblem,
population_size: int = 50,
mutation_rate: float = 0.1,
crossover_rate: float = 0.8
):
self.problem = problem
self.population_size = population_size
self.mutation_rate = mutation_rate
self.crossover_rate = crossover_rate
# 총 작업 수 (각 작업의 모든 공정)
self.total_ops = sum(len(j.operations) for j in problem.jobs)
def _create_chromosome(self) -> List[int]:
"""
염색체 생성: Job-based representation
각 작업이 n_machines번 나타나는 순열
"""
chromosome = []
for job in self.problem.jobs:
chromosome.extend([job.job_id] * len(job.operations))
random.shuffle(chromosome)
return chromosome
def _decode_chromosome(self, chromosome: List[int]) -> Schedule:
"""염색체 → 스케줄 디코딩"""
schedule = Schedule(assignments={})
# 각 작업의 현재 공정 인덱스
job_op_idx = {j.job_id: 0 for j in self.problem.jobs}
# 각 기계의 가용 시간
machine_available = {m.machine_id: 0 for m in self.problem.machines}
# 각 작업의 이전 공정 완료 시간
job_available = {j.job_id: 0 for j in self.problem.jobs}
for job_id in chromosome:
job = self.problem.jobs[job_id]
op_idx = job_op_idx[job_id]
op = job.operations[op_idx]
# 시작 시간: 기계 가용 시간과 이전 공정 완료 시간 중 큰 값
start_time = max(
machine_available[op.machine_id],
job_available[job_id]
)
# 스케줄에 추가
schedule.add_assignment(op.machine_id, op, start_time)
# 업데이트
end_time = start_time + op.processing_time
machine_available[op.machine_id] = end_time
job_available[job_id] = end_time
job_op_idx[job_id] += 1
return schedule
def _fitness(self, chromosome: List[int]) -> float:
"""적합도 함수 (낮을수록 좋음)"""
schedule = self._decode_chromosome(chromosome)
makespan = self.problem.calculate_makespan(schedule)
tardiness = self.problem.calculate_tardiness(schedule)
return makespan + 0.5 * tardiness
def _crossover(self, parent1: List[int], parent2: List[int]) -> Tuple[List[int], List[int]]:
"""Job-based Order Crossover (JOX)"""
if random.random() > self.crossover_rate:
return parent1.copy(), parent2.copy()
# 랜덤 작업 선택
selected_jobs = random.sample(
range(self.problem.n_jobs),
self.problem.n_jobs // 2
)
child1, child2 = [], []
p1_remaining = parent1.copy()
p2_remaining = parent2.copy()
for i in range(len(parent1)):
if parent1[i] in selected_jobs:
child1.append(parent1[i])
if parent1[i] in p2_remaining:
p2_remaining.remove(parent1[i])
if parent2[i] in selected_jobs:
child2.append(parent2[i])
if parent2[i] in p1_remaining:
p1_remaining.remove(parent2[i])
# 나머지 작업 채우기
for gene in parent2:
if gene in p2_remaining:
child1.append(gene)
p2_remaining.remove(gene)
for gene in parent1:
if gene in p1_remaining:
child2.append(gene)
p1_remaining.remove(gene)
return child1, child2
def _mutate(self, chromosome: List[int]) -> List[int]:
"""Swap Mutation"""
if random.random() > self.mutation_rate:
return chromosome
mutated = chromosome.copy()
i, j = random.sample(range(len(chromosome)), 2)
mutated[i], mutated[j] = mutated[j], mutated[i]
return mutated
def optimize(self, n_generations: int = 100) -> Tuple[Schedule, List[float]]:
"""유전 알고리즘 실행"""
# 초기 population
population = [self._create_chromosome() for _ in range(self.population_size)]
history = []
best_chromosome = None
best_fitness = float('inf')
for gen in range(n_generations):
# 적합도 평가
fitness_scores = [self._fitness(c) for c in population]
# 최선 해 업데이트
min_idx = np.argmin(fitness_scores)
if fitness_scores[min_idx] < best_fitness:
best_fitness = fitness_scores[min_idx]
best_chromosome = population[min_idx].copy()
history.append(best_fitness)
# 선택 (토너먼트)
new_population = []
for _ in range(self.population_size):
tournament = random.sample(list(enumerate(population)), 3)
winner = min(tournament, key=lambda x: fitness_scores[x[0]])
new_population.append(winner[1].copy())
# 교차 및 변이
offspring = []
for i in range(0, len(new_population) - 1, 2):
c1, c2 = self._crossover(new_population[i], new_population[i+1])
offspring.append(self._mutate(c1))
offspring.append(self._mutate(c2))
population = offspring[:self.population_size]
if gen % 20 == 0:
print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
return self._decode_chromosome(best_chromosome), history
# 스케줄러 실행
ga_scheduler = GeneticScheduler(problem, population_size=50)
best_schedule, history = ga_scheduler.optimize(n_generations=100)
print(f"\nGA 최적화 결과: Makespan = {best_schedule.makespan}")
3강화학습 스케줄러
강화학습은 스케줄링을 순차적 의사결정 문제로 모델링합니다. 에이전트가 현재 상태에서 다음에 처리할 작업을 선택합니다.
Python"""
강화학습 기반 스케줄러 (DQN)
"""
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
class SchedulingEnvironment:
"""스케줄링 환경 (Gym 스타일)"""
def __init__(self, problem: JobShopProblem):
self.problem = problem
self.reset()
def reset(self):
"""환경 초기화"""
self.job_op_idx = {j.job_id: 0 for j in self.problem.jobs}
self.machine_available = {m.machine_id: 0 for m in self.problem.machines}
self.job_available = {j.job_id: 0 for j in self.problem.jobs}
self.schedule = Schedule(assignments={})
self.done = False
return self._get_state()
def _get_state(self) -> np.ndarray:
"""현재 상태 반환"""
state = []
# 각 작업의 진행률
for job in self.problem.jobs:
progress = self.job_op_idx[job.job_id] / len(job.operations)
state.append(progress)
# 각 기계의 가용 시간 (정규화)
max_time = max(self.machine_available.values()) + 1
for m in self.problem.machines:
state.append(self.machine_available[m.machine_id] / max_time)
return np.array(state, dtype=np.float32)
def _get_valid_actions(self) -> List[int]:
"""유효한 액션(작업) 반환"""
valid = []
for job in self.problem.jobs:
if self.job_op_idx[job.job_id] < len(job.operations):
valid.append(job.job_id)
return valid
def step(self, action: int) -> Tuple[np.ndarray, float, bool, dict]:
"""액션 실행"""
job = self.problem.jobs[action]
op_idx = self.job_op_idx[action]
if op_idx >= len(job.operations):
# 유효하지 않은 액션
return self._get_state(), -10.0, self.done, {}
op = job.operations[op_idx]
# 스케줄링
start_time = max(
self.machine_available[op.machine_id],
self.job_available[action]
)
self.schedule.add_assignment(op.machine_id, op, start_time)
end_time = start_time + op.processing_time
self.machine_available[op.machine_id] = end_time
self.job_available[action] = end_time
self.job_op_idx[action] += 1
# 보상 계산 (대기 시간 최소화)
wait_time = start_time - self.job_available.get(action, 0)
reward = -wait_time / 10.0 # 대기 시간 패널티
# 종료 조건
self.done = all(
self.job_op_idx[j.job_id] >= len(j.operations)
for j in self.problem.jobs
)
if self.done:
reward += 100 - self.schedule.makespan # Makespan 보너스
return self._get_state(), reward, self.done, {}
class DQNNetwork(nn.Module):
"""DQN 네트워크"""
def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 128):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
class DQNScheduler:
"""DQN 기반 스케줄러"""
def __init__(
self,
problem: JobShopProblem,
lr: float = 0.001,
gamma: float = 0.99,
epsilon: float = 1.0,
epsilon_decay: float = 0.995,
buffer_size: int = 10000
):
self.problem = problem
self.env = SchedulingEnvironment(problem)
state_dim = problem.n_jobs + problem.n_machines
action_dim = problem.n_jobs
self.q_network = DQNNetwork(state_dim, action_dim)
self.target_network = DQNNetwork(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.replay_buffer = deque(maxlen=buffer_size)
def select_action(self, state: np.ndarray, valid_actions: List[int]) -> int:
"""Epsilon-greedy 액션 선택"""
if random.random() < self.epsilon:
return random.choice(valid_actions)
with torch.no_grad():
q_values = self.q_network(torch.FloatTensor(state))
# 유효하지 않은 액션 마스킹
mask = torch.full(q_values.shape, float('-inf'))
for a in valid_actions:
mask[a] = 0
masked_q = q_values + mask
return masked_q.argmax().item()
def train_step(self, batch_size: int = 64):
"""학습 스텝"""
if len(self.replay_buffer) < batch_size:
return
batch = random.sample(self.replay_buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(np.array(states))
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(np.array(next_states))
dones = torch.FloatTensor(dones)
# Q-Learning 업데이트
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
next_q = self.target_network(next_states).max(1)[0].detach()
target_q = rewards + self.gamma * next_q * (1 - dones)
loss = nn.MSELoss()(current_q.squeeze(), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def train(self, n_episodes: int = 500):
"""에이전트 학습"""
best_makespan = float('inf')
for episode in range(n_episodes):
state = self.env.reset()
total_reward = 0
while not self.env.done:
valid_actions = self.env._get_valid_actions()
if not valid_actions:
break
action = self.select_action(state, valid_actions)
next_state, reward, done, _ = self.env.step(action)
self.replay_buffer.append((state, action, reward, next_state, done))
self.train_step()
state = next_state
total_reward += reward
# Epsilon decay
self.epsilon = max(0.01, self.epsilon * self.epsilon_decay)
# Target network 업데이트
if episode % 10 == 0:
self.target_network.load_state_dict(self.q_network.state_dict())
makespan = self.env.schedule.makespan
if makespan < best_makespan:
best_makespan = makespan
if episode % 50 == 0:
print(f"Episode {episode}: Makespan={makespan}, Best={best_makespan}, Epsilon={self.epsilon:.3f}")
return best_makespan
# DQN 스케줄러 학습
# dqn_scheduler = DQNScheduler(problem)
# best_makespan = dqn_scheduler.train(n_episodes=200)
print("DQN 스케줄러 모듈 로드 완료")
4스케줄링 알고리즘 비교
다양한 스케줄링 알고리즘의 특성을 비교합니다.
| 알고리즘 | 해 품질 | 계산 시간 | 적용 환경 |
|---|---|---|---|
| Priority Rule | 보통 | 매우 빠름 | 실시간 디스패칭 |
| 유전 알고리즘 | 좋음 | 중간 | 오프라인 계획 |
| 강화학습 | 좋음~매우 좋음 | 학습 느림, 추론 빠름 | 동적 환경 |
| 제약 프로그래밍 | 최적 | 느림 | 소규모 문제 |