CHAPTER 6 6 / 10

AGV/AMR 자율 주행

SLAM, 동적 경로 계획, 다중 로봇 협업을 위한 AI 기술

1AGV vs AMR

물류 로봇은 주행 방식에 따라 AGV(Automated Guided Vehicle)와 AMR(Autonomous Mobile Robot)으로 구분됩니다.

AGV vs AMR Comparison Architecture
AGV
Automated Guided Vehicle
Navigation
Magnetic Tape / QR Code Path
Obstacle Response
X
→ Stop & Wait
Fixed Path Only
Infrastructure Dependent
AMR
Autonomous Mobile Robot
Navigation
*
SLAM + Localization
Map Building + Position Estimation
Obstacle Response
→ Dynamic Avoidance
Alternative Route Generation
Software Defined
AGV
Lower Initial Cost
vs
AMR
Higher Flexibility
특성 AGV AMR
경로 인식 마그네틱 테이프, QR 코드 SLAM, 자연 랜드마크
경로 변경 인프라 재설치 필요 소프트웨어 설정만으로 가능
장애물 대응 정지 후 대기 동적 회피 경로 생성
초기 비용 낮음 (인프라 별도) 높음 (센서 탑재)
유연성 낮음 높음

최신 스마트 팩토리에서는 SLAM 기반 AMR이 주류가 되고 있습니다. 레이아웃 변경이 잦은 환경에서 특히 유리합니다.

2SLAM (Simultaneous Localization and Mapping)

SLAM은 로봇이 미지의 환경을 탐색하면서 동시에 지도를 생성하고 자신의 위치를 추정하는 기술입니다.

import numpy as np from scipy.spatial.transform import Rotation from collections import deque class ParticleFilterSLAM: """파티클 필터 기반 SLAM (FastSLAM 변형)""" def __init__(self, num_particles: int = 100, map_resolution: float = 0.05): self.num_particles = num_particles self.map_resolution = map_resolution # 파티클 초기화 [x, y, theta, weight] self.particles = np.zeros((num_particles, 4)) self.particles[:, 3] = 1.0 / num_particles # 균등 가중치 # 각 파티클별 점유 격자 지도 self.particle_maps = [ OccupancyGrid(resolution=map_resolution) for _ in range(num_particles) ] # 모션 노이즈 파라미터 self.alpha = [0.1, 0.01, 0.01, 0.1] # 회전, 이동 노이즈 def predict(self, odometry: np.ndarray): """모션 모델 적용 (오도메트리 기반 예측)""" dx, dy, dtheta = odometry for i in range(self.num_particles): # 노이즈 추가된 이동 d_trans = np.sqrt(dx**2 + dy**2) d_rot1 = np.arctan2(dy, dx) - self.particles[i, 2] d_rot2 = dtheta - d_rot1 # 샘플링 노이즈 d_rot1 += np.random.normal(0, self.alpha[0] * abs(d_rot1) + self.alpha[1] * d_trans) d_trans += np.random.normal(0, self.alpha[2] * d_trans + self.alpha[3] * (abs(d_rot1) + abs(d_rot2))) d_rot2 += np.random.normal(0, self.alpha[0] * abs(d_rot2) + self.alpha[1] * d_trans) # 위치 업데이트 self.particles[i, 0] += d_trans * np.cos(self.particles[i, 2] + d_rot1) self.particles[i, 1] += d_trans * np.sin(self.particles[i, 2] + d_rot1) self.particles[i, 2] += d_rot1 + d_rot2 def update(self, scan: np.ndarray, scan_angles: np.ndarray): """측정 모델 적용 (LiDAR 스캔 기반 업데이트)""" for i in range(self.num_particles): # 예상 스캔과 실제 스캔 비교 expected_scan = self._ray_cast( self.particle_maps[i], self.particles[i, :3], scan_angles ) # 가중치 계산 (스캔 매칭 점수) likelihood = self._compute_likelihood(scan, expected_scan) self.particles[i, 3] *= likelihood # 지도 업데이트 self._update_map( self.particle_maps[i], self.particles[i, :3], scan, scan_angles ) # 가중치 정규화 self.particles[:, 3] /= np.sum(self.particles[:, 3]) # 리샘플링 (Effective Sample Size가 낮으면) if self._effective_sample_size() < self.num_particles / 2: self._resample() def _compute_likelihood(self, actual: np.ndarray, expected: np.ndarray) -> float: """스캔 일치도 계산""" valid = ~np.isnan(actual) & ~np.isnan(expected) if np.sum(valid) < 10: return 1e-10 diff = actual[valid] - expected[valid] sigma = 0.1 # 측정 노이즈 likelihood = np.exp(-0.5 * np.sum(diff**2) / sigma**2) return max(likelihood, 1e-10) def get_best_estimate(self): """최적 추정치 반환""" best_idx = np.argmax(self.particles[:, 3]) return self.particles[best_idx, :3], self.particle_maps[best_idx] class OccupancyGrid: """점유 격자 지도""" def __init__(self, resolution: float = 0.05, size: tuple = (100, 100)): self.resolution = resolution self.size = size self.origin = np.array([-size[0] * resolution / 2, -size[1] * resolution / 2]) # 로그 오즈 표현 (0 = unknown, + = occupied, - = free) self.grid = np.zeros(size) # 센서 모델 파라미터 self.l_occ = 0.4 # 장애물 관측 시 증가량 self.l_free = -0.2 # 빈 공간 관측 시 감소량 def update_cell(self, x: int, y: int, occupied: bool): """셀 업데이트""" if 0 <= x < self.size[0] and 0 <= y < self.size[1]: if occupied: self.grid[x, y] += self.l_occ else: self.grid[x, y] += self.l_free # 클리핑 self.grid[x, y] = np.clip(self.grid[x, y], -5, 5) def get_probability(self, x: int, y: int) -> float: """점유 확률 반환""" if 0 <= x < self.size[0] and 0 <= y < self.size[1]: return 1 / (1 + np.exp(-self.grid[x, y])) return 0.5

3동적 경로 계획

AMR은 정적 장애물뿐만 아니라 사람, 다른 로봇 등 동적 장애물도 회피해야 합니다.

import heapq from typing import List, Tuple, Optional class DynamicWindowApproach: """DWA (Dynamic Window Approach) - 실시간 장애물 회피""" def __init__(self, max_speed: float = 1.0, # m/s max_yawrate: float = 1.0, # rad/s max_accel: float = 0.5, # m/s² max_dyawrate: float = 0.5, # rad/s² dt: float = 0.1): # 제어 주기 self.max_speed = max_speed self.max_yawrate = max_yawrate self.max_accel = max_accel self.max_dyawrate = max_dyawrate self.dt = dt # 평가 가중치 self.heading_weight = 1.0 self.distance_weight = 1.0 self.velocity_weight = 1.0 def compute_velocity(self, current_state: np.ndarray, # [x, y, theta, v, omega] goal: np.ndarray, # [x, y] obstacles: np.ndarray # Nx2 장애물 좌표 ) -> Tuple[float, float]: """ 최적 선속도, 각속도 계산 Returns: (linear_velocity, angular_velocity) """ # 동적 윈도우 계산 v_min = max(0, current_state[3] - self.max_accel * self.dt) v_max = min(self.max_speed, current_state[3] + self.max_accel * self.dt) w_min = max(-self.max_yawrate, current_state[4] - self.max_dyawrate * self.dt) w_max = min(self.max_yawrate, current_state[4] + self.max_dyawrate * self.dt) best_score = -float('inf') best_v, best_w = 0, 0 # 속도 공간 샘플링 for v in np.linspace(v_min, v_max, 20): for w in np.linspace(w_min, w_max, 20): # 궤적 시뮬레이션 trajectory = self._simulate_trajectory(current_state, v, w) # 장애물 충돌 검사 min_dist = self._calc_obstacle_distance(trajectory, obstacles) if min_dist < 0.3: # 충돌 회피 마진 continue # 목표 지향성 점수 heading_score = self._calc_heading_score(trajectory[-1], goal) # 장애물 거리 점수 distance_score = min_dist # 속도 점수 (빠를수록 좋음) velocity_score = v # 종합 점수 score = (self.heading_weight * heading_score + self.distance_weight * distance_score + self.velocity_weight * velocity_score) if score > best_score: best_score = score best_v, best_w = v, w return best_v, best_w def _simulate_trajectory(self, state: np.ndarray, v: float, w: float, predict_time: float = 2.0) -> np.ndarray: """궤적 시뮬레이션""" trajectory = [state.copy()] current = state.copy() t = 0 while t < predict_time: current[0] += v * np.cos(current[2]) * self.dt current[1] += v * np.sin(current[2]) * self.dt current[2] += w * self.dt current[3] = v current[4] = w trajectory.append(current.copy()) t += self.dt return np.array(trajectory) class HybridAStar: """Hybrid A* - 비홀로노믹 경로 계획""" def __init__(self, resolution: float = 0.5, heading_resolution: float = np.pi / 12, wheelbase: float = 0.5): self.resolution = resolution self.heading_resolution = heading_resolution self.wheelbase = wheelbase # 이동 프리미티브 (조향각) self.steering_angles = np.linspace(-np.pi/4, np.pi/4, 5) def plan(self, start: np.ndarray, # [x, y, theta] goal: np.ndarray, # [x, y, theta] occupancy_map: np.ndarray) -> Optional[List[np.ndarray]]: """ 비홀로노믹 제약을 고려한 경로 계획 Returns: 경로 웨이포인트 리스트 or None """ open_set = [] closed_set = set() start_node = self._create_node(start, None, 0, goal) heapq.heappush(open_set, (start_node['f'], id(start_node), start_node)) while open_set: _, _, current = heapq.heappop(open_set) state = current['state'] # 목표 도달 검사 if self._reached_goal(state, goal): return self._reconstruct_path(current) # 이산화된 상태를 closed set에 추가 discrete_state = self._discretize(state) if discrete_state in closed_set: continue closed_set.add(discrete_state) # 이웃 노드 확장 for steering in self.steering_angles: for direction in [1, -1]: # 전진/후진 new_state = self._kinematic_model(state, steering, direction) # 충돌 검사 if self._check_collision(new_state, occupancy_map): continue discrete_new = self._discretize(new_state) if discrete_new in closed_set: continue new_cost = current['g'] + self._motion_cost(state, new_state, direction) new_node = self._create_node(new_state, current, new_cost, goal) heapq.heappush(open_set, (new_node['f'], id(new_node), new_node)) return None # 경로 없음 def _kinematic_model(self, state: np.ndarray, steering: float, direction: int, step: float = 0.5) -> np.ndarray: """자전거 모델 기반 상태 전이""" x, y, theta = state distance = direction * step if abs(steering) < 1e-6: # 직진 new_x = x + distance * np.cos(theta) new_y = y + distance * np.sin(theta) new_theta = theta else: # 호 이동 radius = self.wheelbase / np.tan(steering) beta = distance / radius new_x = x + radius * (np.sin(theta + beta) - np.sin(theta)) new_y = y + radius * (np.cos(theta) - np.cos(theta + beta)) new_theta = theta + beta return np.array([new_x, new_y, new_theta]) def _create_node(self, state, parent, g_cost, goal): """노드 생성""" h_cost = self._heuristic(state, goal) return { 'state': state, 'parent': parent, 'g': g_cost, 'h': h_cost, 'f': g_cost + h_cost } def _heuristic(self, state: np.ndarray, goal: np.ndarray) -> float: """Reed-Shepp 거리 기반 휴리스틱""" # 간단화: 유클리드 거리 + 방향 차이 dist = np.linalg.norm(state[:2] - goal[:2]) angle_diff = abs(self._normalize_angle(state[2] - goal[2])) return dist + 0.5 * angle_diff * self.wheelbase

4다중 로봇 협업

여러 AMR이 동시에 운영될 때 충돌 방지와 작업 효율화를 위한 중앙 조정이 필요합니다.

from enum import Enum from dataclasses import dataclass import asyncio class TaskPriority(Enum): EMERGENCY = 0 HIGH = 1 NORMAL = 2 LOW = 3 @dataclass class TransportTask: """운송 작업 정의""" id: str source: np.ndarray # 픽업 위치 destination: np.ndarray # 드롭오프 위치 priority: TaskPriority deadline: float = None payload_type: str = "general" class FleetManager: """다중 AMR 플릿 관리자""" def __init__(self, num_robots: int): self.robots = {} self.task_queue = [] self.active_tasks = {} self.conflict_zones = {} # 구역별 점유 상태 def register_robot(self, robot_id: str, robot): """로봇 등록""" self.robots[robot_id] = { 'instance': robot, 'status': 'idle', 'position': robot.get_position(), 'current_task': None, 'battery': 100 } async def dispatch_task(self, task: TransportTask): """작업 할당""" # 가용 로봇 찾기 available = [ (rid, r) for rid, r in self.robots.items() if r['status'] == 'idle' and r['battery'] > 20 ] if not available: self.task_queue.append(task) return None # 최적 로봇 선택 (거리 + 배터리 기준) best_robot = None best_score = float('inf') for robot_id, robot_info in available: dist = np.linalg.norm(robot_info['position'] - task.source) score = dist - robot_info['battery'] * 0.1 # 배터리 보너스 if score < best_score: best_score = score best_robot = robot_id # 작업 할당 self.robots[best_robot]['status'] = 'assigned' self.robots[best_robot]['current_task'] = task.id self.active_tasks[task.id] = { 'task': task, 'robot': best_robot, 'status': 'picking_up' } # 로봇에게 명령 전송 await self.robots[best_robot]['instance'].execute_task(task) return best_robot def request_zone_lock(self, robot_id: str, zone_id: str) -> bool: """구역 진입 요청 (충돌 방지)""" if zone_id not in self.conflict_zones: self.conflict_zones[zone_id] = robot_id return True if self.conflict_zones[zone_id] == robot_id: return True # 우선순위 기반 양보 current_owner = self.conflict_zones[zone_id] requester_task = self.robots[robot_id]['current_task'] owner_task = self.robots[current_owner]['current_task'] if requester_task and owner_task: req_priority = self.active_tasks.get(requester_task, {}).get('task', {}) own_priority = self.active_tasks.get(owner_task, {}).get('task', {}) if hasattr(req_priority, 'priority') and hasattr(own_priority, 'priority'): if req_priority.priority.value < own_priority.priority.value: # 높은 우선순위 로봇에게 양보 self.conflict_zones[zone_id] = robot_id return True return False def release_zone_lock(self, robot_id: str, zone_id: str): """구역 잠금 해제""" if self.conflict_zones.get(zone_id) == robot_id: del self.conflict_zones[zone_id] class TrafficController: """교통 제어 - 교차로 관리""" def __init__(self, intersection_points: List[np.ndarray]): self.intersections = { tuple(p): {'queue': [], 'current': None} for p in intersection_points } async def request_passage(self, robot_id: str, intersection: np.ndarray, arrival_time: float) -> float: """ 교차로 통과 요청 Returns: 허용된 통과 시각 """ key = tuple(intersection) if key not in self.intersections: return arrival_time # 교차로가 아님 intersection_info = self.intersections[key] # 대기열에 추가 heapq.heappush( intersection_info['queue'], (arrival_time, robot_id) ) # 자신의 차례가 될 때까지 대기 while intersection_info['queue'][0][1] != robot_id: await asyncio.sleep(0.1) # 통과 시작 intersection_info['current'] = robot_id return arrival_time def complete_passage(self, robot_id: str, intersection: np.ndarray): """교차로 통과 완료""" key = tuple(intersection) if key not in self.intersections: return intersection_info = self.intersections[key] if intersection_info['current'] == robot_id: intersection_info['current'] = None if intersection_info['queue'] and intersection_info['queue'][0][1] == robot_id: heapq.heappop(intersection_info['queue'])

대규모 AMR 플릿에서는 분산 알고리즘(예: Conflict-Based Search)이 중앙집중식보다 확장성이 좋습니다.

5센서 융합과 위치 추정

AMR은 여러 센서를 융합하여 정확한 위치를 추정합니다.

센서 장점 단점 용도
2D LiDAR 정밀, 넓은 범위 유리/투명체 취약 SLAM, 장애물 감지
3D LiDAR 입체 인식 고가, 데이터량 多 3D 매핑
IMU 고속, 드리프트 없음(단기) 누적 오차 오도메트리 보정
휠 엔코더 간단, 저렴 슬립 오차 이동 거리 측정
카메라 (Visual SLAM) 풍부한 특징점 조명 영향, 계산량 시각적 랜드마크
UWB 고정밀 (~10cm) 인프라 필요 절대 위치 보정

단일 센서에 의존하면 안 됩니다. 확장 칼만 필터(EKF) 또는 파티클 필터로 다중 센서를 융합하세요.

6AMR 시스템 아키텍처

완전한 AMR 시스템의 소프트웨어 아키텍처입니다.

ROS 2의 Navigation2 스택은 AMR 개발을 위한 산업 표준으로 자리잡았습니다. DWA, TEB, SLAM Toolbox 등이 포함되어 있습니다.