1스케줄링 문제 정의

생산 스케줄링은 제한된 자원(기계, 작업자, 시간)을 활용하여 작업들을 최적의 순서로 배치하는 조합 최적화 문제입니다. Job Shop Scheduling Problem(JSSP)은 NP-hard 문제로 알려져 있으며, AI 기반 접근법이 효과적입니다.

Job Shop 스케줄링 문제 구조
작업 (Job)
Job 1
O1 → O2 → O3
Job 2
O1 → O2 → O3
기계 (Machine)
Machine 1
Machine 2
Machine 3
간트 차트
M1
M2
M3
AI 스케줄링 알고리즘 비교
유전 알고리즘
  • 염색체 인코딩
  • 교차 & 돌연변이
  • Population 탐색
강화학습
  • 상태-행동 학습
  • MDP 모델링
  • Online 적응
제약 프로그래밍
  • 수학적 정형화
  • CP/MIP 솔버
  • 최적해 보장
스케줄링 문제의 분류

- Flow Shop: 모든 작업이 동일한 기계 순서
- Job Shop: 작업마다 다른 기계 순서
- Flexible Job Shop: 작업이 여러 대안 기계에서 처리 가능
- Open Shop: 작업 순서에 제약 없음

Python
"""
생산 스케줄링 문제 정의
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional
from enum import Enum

@dataclass
class Operation:
    """단위 작업"""
    job_id: int
    op_idx: int
    machine_id: int
    processing_time: int

    def __repr__(self):
        return f"J{self.job_id}-O{self.op_idx}(M{self.machine_id},{self.processing_time})"

@dataclass
class Job:
    """작업(여러 공정의 시퀀스)"""
    job_id: int
    operations: List[Operation]
    due_date: Optional[int] = None
    priority: int = 1

@dataclass
class Machine:
    """기계"""
    machine_id: int
    name: str
    efficiency: float = 1.0

@dataclass
class Schedule:
    """스케줄 결과"""
    assignments: Dict[int, List[Tuple[Operation, int, int]]]  # machine_id -> [(op, start, end)]
    makespan: int = 0
    total_tardiness: int = 0

    def add_assignment(self, machine_id: int, op: Operation, start: int):
        if machine_id not in self.assignments:
            self.assignments[machine_id] = []
        end = start + op.processing_time
        self.assignments[machine_id].append((op, start, end))
        self.makespan = max(self.makespan, end)


class JobShopProblem:
    """Job Shop Scheduling Problem"""
    def __init__(
        self,
        jobs: List[Job],
        machines: List[Machine]
    ):
        self.jobs = jobs
        self.machines = machines
        self.n_jobs = len(jobs)
        self.n_machines = len(machines)

    def calculate_makespan(self, schedule: Schedule) -> int:
        """Makespan (총 완료 시간) 계산"""
        return schedule.makespan

    def calculate_tardiness(self, schedule: Schedule) -> int:
        """납기 지연 총합 계산"""
        total_tardiness = 0
        for job in self.jobs:
            if job.due_date:
                # 작업 완료 시간 찾기
                last_op = job.operations[-1]
                for machine_id, assignments in schedule.assignments.items():
                    for op, start, end in assignments:
                        if op.job_id == job.job_id and op.op_idx == len(job.operations) - 1:
                            tardiness = max(0, end - job.due_date)
                            total_tardiness += tardiness
        return total_tardiness


# 예시 문제 생성
def create_sample_problem(n_jobs: int = 5, n_machines: int = 3) -> JobShopProblem:
    """샘플 JSSP 생성"""
    np.random.seed(42)

    machines = [Machine(i, f"Machine-{i}") for i in range(n_machines)]

    jobs = []
    for j in range(n_jobs):
        # 각 작업은 모든 기계를 한 번씩 방문 (랜덤 순서)
        machine_order = np.random.permutation(n_machines)
        operations = [
            Operation(
                job_id=j,
                op_idx=i,
                machine_id=int(machine_order[i]),
                processing_time=np.random.randint(5, 20)
            )
            for i in range(n_machines)
        ]
        due_date = sum(op.processing_time for op in operations) + np.random.randint(10, 50)
        jobs.append(Job(j, operations, due_date))

    return JobShopProblem(jobs, machines)


problem = create_sample_problem(5, 3)
print(f"Job Shop Problem: {problem.n_jobs} jobs, {problem.n_machines} machines")

2유전 알고리즘 스케줄러

유전 알고리즘(Genetic Algorithm)은 스케줄링 문제에 널리 사용되는 메타휴리스틱입니다. 염색체는 작업 순서를 인코딩하고, 교차와 돌연변이로 탐색합니다.

Python
"""
유전 알고리즘 기반 Job Shop 스케줄러
"""
import random
from typing import List, Tuple

class GeneticScheduler:
    """유전 알고리즘 스케줄러"""

    def __init__(
        self,
        problem: JobShopProblem,
        population_size: int = 50,
        mutation_rate: float = 0.1,
        crossover_rate: float = 0.8
    ):
        self.problem = problem
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate

        # 총 작업 수 (각 작업의 모든 공정)
        self.total_ops = sum(len(j.operations) for j in problem.jobs)

    def _create_chromosome(self) -> List[int]:
        """
        염색체 생성: Job-based representation
        각 작업이 n_machines번 나타나는 순열
        """
        chromosome = []
        for job in self.problem.jobs:
            chromosome.extend([job.job_id] * len(job.operations))
        random.shuffle(chromosome)
        return chromosome

    def _decode_chromosome(self, chromosome: List[int]) -> Schedule:
        """염색체 → 스케줄 디코딩"""
        schedule = Schedule(assignments={})

        # 각 작업의 현재 공정 인덱스
        job_op_idx = {j.job_id: 0 for j in self.problem.jobs}

        # 각 기계의 가용 시간
        machine_available = {m.machine_id: 0 for m in self.problem.machines}

        # 각 작업의 이전 공정 완료 시간
        job_available = {j.job_id: 0 for j in self.problem.jobs}

        for job_id in chromosome:
            job = self.problem.jobs[job_id]
            op_idx = job_op_idx[job_id]
            op = job.operations[op_idx]

            # 시작 시간: 기계 가용 시간과 이전 공정 완료 시간 중 큰 값
            start_time = max(
                machine_available[op.machine_id],
                job_available[job_id]
            )

            # 스케줄에 추가
            schedule.add_assignment(op.machine_id, op, start_time)

            # 업데이트
            end_time = start_time + op.processing_time
            machine_available[op.machine_id] = end_time
            job_available[job_id] = end_time
            job_op_idx[job_id] += 1

        return schedule

    def _fitness(self, chromosome: List[int]) -> float:
        """적합도 함수 (낮을수록 좋음)"""
        schedule = self._decode_chromosome(chromosome)
        makespan = self.problem.calculate_makespan(schedule)
        tardiness = self.problem.calculate_tardiness(schedule)
        return makespan + 0.5 * tardiness

    def _crossover(self, parent1: List[int], parent2: List[int]) -> Tuple[List[int], List[int]]:
        """Job-based Order Crossover (JOX)"""
        if random.random() > self.crossover_rate:
            return parent1.copy(), parent2.copy()

        # 랜덤 작업 선택
        selected_jobs = random.sample(
            range(self.problem.n_jobs),
            self.problem.n_jobs // 2
        )

        child1, child2 = [], []
        p1_remaining = parent1.copy()
        p2_remaining = parent2.copy()

        for i in range(len(parent1)):
            if parent1[i] in selected_jobs:
                child1.append(parent1[i])
                if parent1[i] in p2_remaining:
                    p2_remaining.remove(parent1[i])
            if parent2[i] in selected_jobs:
                child2.append(parent2[i])
                if parent2[i] in p1_remaining:
                    p1_remaining.remove(parent2[i])

        # 나머지 작업 채우기
        for gene in parent2:
            if gene in p2_remaining:
                child1.append(gene)
                p2_remaining.remove(gene)

        for gene in parent1:
            if gene in p1_remaining:
                child2.append(gene)
                p1_remaining.remove(gene)

        return child1, child2

    def _mutate(self, chromosome: List[int]) -> List[int]:
        """Swap Mutation"""
        if random.random() > self.mutation_rate:
            return chromosome

        mutated = chromosome.copy()
        i, j = random.sample(range(len(chromosome)), 2)
        mutated[i], mutated[j] = mutated[j], mutated[i]
        return mutated

    def optimize(self, n_generations: int = 100) -> Tuple[Schedule, List[float]]:
        """유전 알고리즘 실행"""
        # 초기 population
        population = [self._create_chromosome() for _ in range(self.population_size)]

        history = []
        best_chromosome = None
        best_fitness = float('inf')

        for gen in range(n_generations):
            # 적합도 평가
            fitness_scores = [self._fitness(c) for c in population]

            # 최선 해 업데이트
            min_idx = np.argmin(fitness_scores)
            if fitness_scores[min_idx] < best_fitness:
                best_fitness = fitness_scores[min_idx]
                best_chromosome = population[min_idx].copy()

            history.append(best_fitness)

            # 선택 (토너먼트)
            new_population = []
            for _ in range(self.population_size):
                tournament = random.sample(list(enumerate(population)), 3)
                winner = min(tournament, key=lambda x: fitness_scores[x[0]])
                new_population.append(winner[1].copy())

            # 교차 및 변이
            offspring = []
            for i in range(0, len(new_population) - 1, 2):
                c1, c2 = self._crossover(new_population[i], new_population[i+1])
                offspring.append(self._mutate(c1))
                offspring.append(self._mutate(c2))

            population = offspring[:self.population_size]

            if gen % 20 == 0:
                print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")

        return self._decode_chromosome(best_chromosome), history


# 스케줄러 실행
ga_scheduler = GeneticScheduler(problem, population_size=50)
best_schedule, history = ga_scheduler.optimize(n_generations=100)
print(f"\nGA 최적화 결과: Makespan = {best_schedule.makespan}")

3강화학습 스케줄러

강화학습은 스케줄링을 순차적 의사결정 문제로 모델링합니다. 에이전트가 현재 상태에서 다음에 처리할 작업을 선택합니다.

Python
"""
강화학습 기반 스케줄러 (DQN)
"""
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

class SchedulingEnvironment:
    """스케줄링 환경 (Gym 스타일)"""

    def __init__(self, problem: JobShopProblem):
        self.problem = problem
        self.reset()

    def reset(self):
        """환경 초기화"""
        self.job_op_idx = {j.job_id: 0 for j in self.problem.jobs}
        self.machine_available = {m.machine_id: 0 for m in self.problem.machines}
        self.job_available = {j.job_id: 0 for j in self.problem.jobs}
        self.schedule = Schedule(assignments={})
        self.done = False
        return self._get_state()

    def _get_state(self) -> np.ndarray:
        """현재 상태 반환"""
        state = []

        # 각 작업의 진행률
        for job in self.problem.jobs:
            progress = self.job_op_idx[job.job_id] / len(job.operations)
            state.append(progress)

        # 각 기계의 가용 시간 (정규화)
        max_time = max(self.machine_available.values()) + 1
        for m in self.problem.machines:
            state.append(self.machine_available[m.machine_id] / max_time)

        return np.array(state, dtype=np.float32)

    def _get_valid_actions(self) -> List[int]:
        """유효한 액션(작업) 반환"""
        valid = []
        for job in self.problem.jobs:
            if self.job_op_idx[job.job_id] < len(job.operations):
                valid.append(job.job_id)
        return valid

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, dict]:
        """액션 실행"""
        job = self.problem.jobs[action]
        op_idx = self.job_op_idx[action]

        if op_idx >= len(job.operations):
            # 유효하지 않은 액션
            return self._get_state(), -10.0, self.done, {}

        op = job.operations[op_idx]

        # 스케줄링
        start_time = max(
            self.machine_available[op.machine_id],
            self.job_available[action]
        )
        self.schedule.add_assignment(op.machine_id, op, start_time)

        end_time = start_time + op.processing_time
        self.machine_available[op.machine_id] = end_time
        self.job_available[action] = end_time
        self.job_op_idx[action] += 1

        # 보상 계산 (대기 시간 최소화)
        wait_time = start_time - self.job_available.get(action, 0)
        reward = -wait_time / 10.0  # 대기 시간 패널티

        # 종료 조건
        self.done = all(
            self.job_op_idx[j.job_id] >= len(j.operations)
            for j in self.problem.jobs
        )

        if self.done:
            reward += 100 - self.schedule.makespan  # Makespan 보너스

        return self._get_state(), reward, self.done, {}


class DQNNetwork(nn.Module):
    """DQN 네트워크"""

    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 128):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, x):
        return self.network(x)


class DQNScheduler:
    """DQN 기반 스케줄러"""

    def __init__(
        self,
        problem: JobShopProblem,
        lr: float = 0.001,
        gamma: float = 0.99,
        epsilon: float = 1.0,
        epsilon_decay: float = 0.995,
        buffer_size: int = 10000
    ):
        self.problem = problem
        self.env = SchedulingEnvironment(problem)

        state_dim = problem.n_jobs + problem.n_machines
        action_dim = problem.n_jobs

        self.q_network = DQNNetwork(state_dim, action_dim)
        self.target_network = DQNNetwork(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())

        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay

        self.replay_buffer = deque(maxlen=buffer_size)

    def select_action(self, state: np.ndarray, valid_actions: List[int]) -> int:
        """Epsilon-greedy 액션 선택"""
        if random.random() < self.epsilon:
            return random.choice(valid_actions)

        with torch.no_grad():
            q_values = self.q_network(torch.FloatTensor(state))
            # 유효하지 않은 액션 마스킹
            mask = torch.full(q_values.shape, float('-inf'))
            for a in valid_actions:
                mask[a] = 0
            masked_q = q_values + mask
            return masked_q.argmax().item()

    def train_step(self, batch_size: int = 64):
        """학습 스텝"""
        if len(self.replay_buffer) < batch_size:
            return

        batch = random.sample(self.replay_buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.FloatTensor(dones)

        # Q-Learning 업데이트
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
        next_q = self.target_network(next_states).max(1)[0].detach()
        target_q = rewards + self.gamma * next_q * (1 - dones)

        loss = nn.MSELoss()(current_q.squeeze(), target_q)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, n_episodes: int = 500):
        """에이전트 학습"""
        best_makespan = float('inf')

        for episode in range(n_episodes):
            state = self.env.reset()
            total_reward = 0

            while not self.env.done:
                valid_actions = self.env._get_valid_actions()
                if not valid_actions:
                    break

                action = self.select_action(state, valid_actions)
                next_state, reward, done, _ = self.env.step(action)

                self.replay_buffer.append((state, action, reward, next_state, done))
                self.train_step()

                state = next_state
                total_reward += reward

            # Epsilon decay
            self.epsilon = max(0.01, self.epsilon * self.epsilon_decay)

            # Target network 업데이트
            if episode % 10 == 0:
                self.target_network.load_state_dict(self.q_network.state_dict())

            makespan = self.env.schedule.makespan
            if makespan < best_makespan:
                best_makespan = makespan

            if episode % 50 == 0:
                print(f"Episode {episode}: Makespan={makespan}, Best={best_makespan}, Epsilon={self.epsilon:.3f}")

        return best_makespan


# DQN 스케줄러 학습
# dqn_scheduler = DQNScheduler(problem)
# best_makespan = dqn_scheduler.train(n_episodes=200)
print("DQN 스케줄러 모듈 로드 완료")

4스케줄링 알고리즘 비교

다양한 스케줄링 알고리즘의 특성을 비교합니다.

알고리즘 해 품질 계산 시간 적용 환경
Priority Rule 보통 매우 빠름 실시간 디스패칭
유전 알고리즘 좋음 중간 오프라인 계획
강화학습 좋음~매우 좋음 학습 느림, 추론 빠름 동적 환경
제약 프로그래밍 최적 느림 소규모 문제