SLAM, 동적 경로 계획, 다중 로봇 협업을 위한 AI 기술
물류 로봇은 주행 방식에 따라 AGV(Automated Guided Vehicle)와 AMR(Autonomous Mobile Robot)으로 구분됩니다.
| 특성 | AGV | AMR |
|---|---|---|
| 경로 인식 | 마그네틱 테이프, QR 코드 | SLAM, 자연 랜드마크 |
| 경로 변경 | 인프라 재설치 필요 | 소프트웨어 설정만으로 가능 |
| 장애물 대응 | 정지 후 대기 | 동적 회피 경로 생성 |
| 초기 비용 | 낮음 (인프라 별도) | 높음 (센서 탑재) |
| 유연성 | 낮음 | 높음 |
최신 스마트 팩토리에서는 SLAM 기반 AMR이 주류가 되고 있습니다. 레이아웃 변경이 잦은 환경에서 특히 유리합니다.
SLAM은 로봇이 미지의 환경을 탐색하면서 동시에 지도를 생성하고 자신의 위치를 추정하는 기술입니다.
import numpy as np
from scipy.spatial.transform import Rotation
from collections import deque
class ParticleFilterSLAM:
"""파티클 필터 기반 SLAM (FastSLAM 변형)"""
def __init__(self, num_particles: int = 100,
map_resolution: float = 0.05):
self.num_particles = num_particles
self.map_resolution = map_resolution
# 파티클 초기화 [x, y, theta, weight]
self.particles = np.zeros((num_particles, 4))
self.particles[:, 3] = 1.0 / num_particles # 균등 가중치
# 각 파티클별 점유 격자 지도
self.particle_maps = [
OccupancyGrid(resolution=map_resolution)
for _ in range(num_particles)
]
# 모션 노이즈 파라미터
self.alpha = [0.1, 0.01, 0.01, 0.1] # 회전, 이동 노이즈
def predict(self, odometry: np.ndarray):
"""모션 모델 적용 (오도메트리 기반 예측)"""
dx, dy, dtheta = odometry
for i in range(self.num_particles):
# 노이즈 추가된 이동
d_trans = np.sqrt(dx**2 + dy**2)
d_rot1 = np.arctan2(dy, dx) - self.particles[i, 2]
d_rot2 = dtheta - d_rot1
# 샘플링 노이즈
d_rot1 += np.random.normal(0, self.alpha[0] * abs(d_rot1) + self.alpha[1] * d_trans)
d_trans += np.random.normal(0, self.alpha[2] * d_trans + self.alpha[3] * (abs(d_rot1) + abs(d_rot2)))
d_rot2 += np.random.normal(0, self.alpha[0] * abs(d_rot2) + self.alpha[1] * d_trans)
# 위치 업데이트
self.particles[i, 0] += d_trans * np.cos(self.particles[i, 2] + d_rot1)
self.particles[i, 1] += d_trans * np.sin(self.particles[i, 2] + d_rot1)
self.particles[i, 2] += d_rot1 + d_rot2
def update(self, scan: np.ndarray, scan_angles: np.ndarray):
"""측정 모델 적용 (LiDAR 스캔 기반 업데이트)"""
for i in range(self.num_particles):
# 예상 스캔과 실제 스캔 비교
expected_scan = self._ray_cast(
self.particle_maps[i],
self.particles[i, :3],
scan_angles
)
# 가중치 계산 (스캔 매칭 점수)
likelihood = self._compute_likelihood(scan, expected_scan)
self.particles[i, 3] *= likelihood
# 지도 업데이트
self._update_map(
self.particle_maps[i],
self.particles[i, :3],
scan, scan_angles
)
# 가중치 정규화
self.particles[:, 3] /= np.sum(self.particles[:, 3])
# 리샘플링 (Effective Sample Size가 낮으면)
if self._effective_sample_size() < self.num_particles / 2:
self._resample()
def _compute_likelihood(self, actual: np.ndarray,
expected: np.ndarray) -> float:
"""스캔 일치도 계산"""
valid = ~np.isnan(actual) & ~np.isnan(expected)
if np.sum(valid) < 10:
return 1e-10
diff = actual[valid] - expected[valid]
sigma = 0.1 # 측정 노이즈
likelihood = np.exp(-0.5 * np.sum(diff**2) / sigma**2)
return max(likelihood, 1e-10)
def get_best_estimate(self):
"""최적 추정치 반환"""
best_idx = np.argmax(self.particles[:, 3])
return self.particles[best_idx, :3], self.particle_maps[best_idx]
class OccupancyGrid:
"""점유 격자 지도"""
def __init__(self, resolution: float = 0.05,
size: tuple = (100, 100)):
self.resolution = resolution
self.size = size
self.origin = np.array([-size[0] * resolution / 2,
-size[1] * resolution / 2])
# 로그 오즈 표현 (0 = unknown, + = occupied, - = free)
self.grid = np.zeros(size)
# 센서 모델 파라미터
self.l_occ = 0.4 # 장애물 관측 시 증가량
self.l_free = -0.2 # 빈 공간 관측 시 감소량
def update_cell(self, x: int, y: int, occupied: bool):
"""셀 업데이트"""
if 0 <= x < self.size[0] and 0 <= y < self.size[1]:
if occupied:
self.grid[x, y] += self.l_occ
else:
self.grid[x, y] += self.l_free
# 클리핑
self.grid[x, y] = np.clip(self.grid[x, y], -5, 5)
def get_probability(self, x: int, y: int) -> float:
"""점유 확률 반환"""
if 0 <= x < self.size[0] and 0 <= y < self.size[1]:
return 1 / (1 + np.exp(-self.grid[x, y]))
return 0.5AMR은 정적 장애물뿐만 아니라 사람, 다른 로봇 등 동적 장애물도 회피해야 합니다.
import heapq
from typing import List, Tuple, Optional
class DynamicWindowApproach:
"""DWA (Dynamic Window Approach) - 실시간 장애물 회피"""
def __init__(self,
max_speed: float = 1.0, # m/s
max_yawrate: float = 1.0, # rad/s
max_accel: float = 0.5, # m/s²
max_dyawrate: float = 0.5, # rad/s²
dt: float = 0.1): # 제어 주기
self.max_speed = max_speed
self.max_yawrate = max_yawrate
self.max_accel = max_accel
self.max_dyawrate = max_dyawrate
self.dt = dt
# 평가 가중치
self.heading_weight = 1.0
self.distance_weight = 1.0
self.velocity_weight = 1.0
def compute_velocity(self,
current_state: np.ndarray, # [x, y, theta, v, omega]
goal: np.ndarray, # [x, y]
obstacles: np.ndarray # Nx2 장애물 좌표
) -> Tuple[float, float]:
"""
최적 선속도, 각속도 계산
Returns:
(linear_velocity, angular_velocity)
"""
# 동적 윈도우 계산
v_min = max(0, current_state[3] - self.max_accel * self.dt)
v_max = min(self.max_speed, current_state[3] + self.max_accel * self.dt)
w_min = max(-self.max_yawrate, current_state[4] - self.max_dyawrate * self.dt)
w_max = min(self.max_yawrate, current_state[4] + self.max_dyawrate * self.dt)
best_score = -float('inf')
best_v, best_w = 0, 0
# 속도 공간 샘플링
for v in np.linspace(v_min, v_max, 20):
for w in np.linspace(w_min, w_max, 20):
# 궤적 시뮬레이션
trajectory = self._simulate_trajectory(current_state, v, w)
# 장애물 충돌 검사
min_dist = self._calc_obstacle_distance(trajectory, obstacles)
if min_dist < 0.3: # 충돌 회피 마진
continue
# 목표 지향성 점수
heading_score = self._calc_heading_score(trajectory[-1], goal)
# 장애물 거리 점수
distance_score = min_dist
# 속도 점수 (빠를수록 좋음)
velocity_score = v
# 종합 점수
score = (self.heading_weight * heading_score +
self.distance_weight * distance_score +
self.velocity_weight * velocity_score)
if score > best_score:
best_score = score
best_v, best_w = v, w
return best_v, best_w
def _simulate_trajectory(self, state: np.ndarray,
v: float, w: float,
predict_time: float = 2.0) -> np.ndarray:
"""궤적 시뮬레이션"""
trajectory = [state.copy()]
current = state.copy()
t = 0
while t < predict_time:
current[0] += v * np.cos(current[2]) * self.dt
current[1] += v * np.sin(current[2]) * self.dt
current[2] += w * self.dt
current[3] = v
current[4] = w
trajectory.append(current.copy())
t += self.dt
return np.array(trajectory)
class HybridAStar:
"""Hybrid A* - 비홀로노믹 경로 계획"""
def __init__(self,
resolution: float = 0.5,
heading_resolution: float = np.pi / 12,
wheelbase: float = 0.5):
self.resolution = resolution
self.heading_resolution = heading_resolution
self.wheelbase = wheelbase
# 이동 프리미티브 (조향각)
self.steering_angles = np.linspace(-np.pi/4, np.pi/4, 5)
def plan(self,
start: np.ndarray, # [x, y, theta]
goal: np.ndarray, # [x, y, theta]
occupancy_map: np.ndarray) -> Optional[List[np.ndarray]]:
"""
비홀로노믹 제약을 고려한 경로 계획
Returns:
경로 웨이포인트 리스트 or None
"""
open_set = []
closed_set = set()
start_node = self._create_node(start, None, 0, goal)
heapq.heappush(open_set, (start_node['f'], id(start_node), start_node))
while open_set:
_, _, current = heapq.heappop(open_set)
state = current['state']
# 목표 도달 검사
if self._reached_goal(state, goal):
return self._reconstruct_path(current)
# 이산화된 상태를 closed set에 추가
discrete_state = self._discretize(state)
if discrete_state in closed_set:
continue
closed_set.add(discrete_state)
# 이웃 노드 확장
for steering in self.steering_angles:
for direction in [1, -1]: # 전진/후진
new_state = self._kinematic_model(state, steering, direction)
# 충돌 검사
if self._check_collision(new_state, occupancy_map):
continue
discrete_new = self._discretize(new_state)
if discrete_new in closed_set:
continue
new_cost = current['g'] + self._motion_cost(state, new_state, direction)
new_node = self._create_node(new_state, current, new_cost, goal)
heapq.heappush(open_set, (new_node['f'], id(new_node), new_node))
return None # 경로 없음
def _kinematic_model(self, state: np.ndarray,
steering: float, direction: int,
step: float = 0.5) -> np.ndarray:
"""자전거 모델 기반 상태 전이"""
x, y, theta = state
distance = direction * step
if abs(steering) < 1e-6:
# 직진
new_x = x + distance * np.cos(theta)
new_y = y + distance * np.sin(theta)
new_theta = theta
else:
# 호 이동
radius = self.wheelbase / np.tan(steering)
beta = distance / radius
new_x = x + radius * (np.sin(theta + beta) - np.sin(theta))
new_y = y + radius * (np.cos(theta) - np.cos(theta + beta))
new_theta = theta + beta
return np.array([new_x, new_y, new_theta])
def _create_node(self, state, parent, g_cost, goal):
"""노드 생성"""
h_cost = self._heuristic(state, goal)
return {
'state': state,
'parent': parent,
'g': g_cost,
'h': h_cost,
'f': g_cost + h_cost
}
def _heuristic(self, state: np.ndarray, goal: np.ndarray) -> float:
"""Reed-Shepp 거리 기반 휴리스틱"""
# 간단화: 유클리드 거리 + 방향 차이
dist = np.linalg.norm(state[:2] - goal[:2])
angle_diff = abs(self._normalize_angle(state[2] - goal[2]))
return dist + 0.5 * angle_diff * self.wheelbase여러 AMR이 동시에 운영될 때 충돌 방지와 작업 효율화를 위한 중앙 조정이 필요합니다.
from enum import Enum
from dataclasses import dataclass
import asyncio
class TaskPriority(Enum):
EMERGENCY = 0
HIGH = 1
NORMAL = 2
LOW = 3
@dataclass
class TransportTask:
"""운송 작업 정의"""
id: str
source: np.ndarray # 픽업 위치
destination: np.ndarray # 드롭오프 위치
priority: TaskPriority
deadline: float = None
payload_type: str = "general"
class FleetManager:
"""다중 AMR 플릿 관리자"""
def __init__(self, num_robots: int):
self.robots = {}
self.task_queue = []
self.active_tasks = {}
self.conflict_zones = {} # 구역별 점유 상태
def register_robot(self, robot_id: str, robot):
"""로봇 등록"""
self.robots[robot_id] = {
'instance': robot,
'status': 'idle',
'position': robot.get_position(),
'current_task': None,
'battery': 100
}
async def dispatch_task(self, task: TransportTask):
"""작업 할당"""
# 가용 로봇 찾기
available = [
(rid, r) for rid, r in self.robots.items()
if r['status'] == 'idle' and r['battery'] > 20
]
if not available:
self.task_queue.append(task)
return None
# 최적 로봇 선택 (거리 + 배터리 기준)
best_robot = None
best_score = float('inf')
for robot_id, robot_info in available:
dist = np.linalg.norm(robot_info['position'] - task.source)
score = dist - robot_info['battery'] * 0.1 # 배터리 보너스
if score < best_score:
best_score = score
best_robot = robot_id
# 작업 할당
self.robots[best_robot]['status'] = 'assigned'
self.robots[best_robot]['current_task'] = task.id
self.active_tasks[task.id] = {
'task': task,
'robot': best_robot,
'status': 'picking_up'
}
# 로봇에게 명령 전송
await self.robots[best_robot]['instance'].execute_task(task)
return best_robot
def request_zone_lock(self, robot_id: str, zone_id: str) -> bool:
"""구역 진입 요청 (충돌 방지)"""
if zone_id not in self.conflict_zones:
self.conflict_zones[zone_id] = robot_id
return True
if self.conflict_zones[zone_id] == robot_id:
return True
# 우선순위 기반 양보
current_owner = self.conflict_zones[zone_id]
requester_task = self.robots[robot_id]['current_task']
owner_task = self.robots[current_owner]['current_task']
if requester_task and owner_task:
req_priority = self.active_tasks.get(requester_task, {}).get('task', {})
own_priority = self.active_tasks.get(owner_task, {}).get('task', {})
if hasattr(req_priority, 'priority') and hasattr(own_priority, 'priority'):
if req_priority.priority.value < own_priority.priority.value:
# 높은 우선순위 로봇에게 양보
self.conflict_zones[zone_id] = robot_id
return True
return False
def release_zone_lock(self, robot_id: str, zone_id: str):
"""구역 잠금 해제"""
if self.conflict_zones.get(zone_id) == robot_id:
del self.conflict_zones[zone_id]
class TrafficController:
"""교통 제어 - 교차로 관리"""
def __init__(self, intersection_points: List[np.ndarray]):
self.intersections = {
tuple(p): {'queue': [], 'current': None}
for p in intersection_points
}
async def request_passage(self,
robot_id: str,
intersection: np.ndarray,
arrival_time: float) -> float:
"""
교차로 통과 요청
Returns:
허용된 통과 시각
"""
key = tuple(intersection)
if key not in self.intersections:
return arrival_time # 교차로가 아님
intersection_info = self.intersections[key]
# 대기열에 추가
heapq.heappush(
intersection_info['queue'],
(arrival_time, robot_id)
)
# 자신의 차례가 될 때까지 대기
while intersection_info['queue'][0][1] != robot_id:
await asyncio.sleep(0.1)
# 통과 시작
intersection_info['current'] = robot_id
return arrival_time
def complete_passage(self, robot_id: str, intersection: np.ndarray):
"""교차로 통과 완료"""
key = tuple(intersection)
if key not in self.intersections:
return
intersection_info = self.intersections[key]
if intersection_info['current'] == robot_id:
intersection_info['current'] = None
if intersection_info['queue'] and intersection_info['queue'][0][1] == robot_id:
heapq.heappop(intersection_info['queue'])대규모 AMR 플릿에서는 분산 알고리즘(예: Conflict-Based Search)이 중앙집중식보다 확장성이 좋습니다.
AMR은 여러 센서를 융합하여 정확한 위치를 추정합니다.
| 센서 | 장점 | 단점 | 용도 |
|---|---|---|---|
| 2D LiDAR | 정밀, 넓은 범위 | 유리/투명체 취약 | SLAM, 장애물 감지 |
| 3D LiDAR | 입체 인식 | 고가, 데이터량 多 | 3D 매핑 |
| IMU | 고속, 드리프트 없음(단기) | 누적 오차 | 오도메트리 보정 |
| 휠 엔코더 | 간단, 저렴 | 슬립 오차 | 이동 거리 측정 |
| 카메라 (Visual SLAM) | 풍부한 특징점 | 조명 영향, 계산량 | 시각적 랜드마크 |
| UWB | 고정밀 (~10cm) | 인프라 필요 | 절대 위치 보정 |
단일 센서에 의존하면 안 됩니다. 확장 칼만 필터(EKF) 또는 파티클 필터로 다중 센서를 융합하세요.
완전한 AMR 시스템의 소프트웨어 아키텍처입니다.
ROS 2의 Navigation2 스택은 AMR 개발을 위한 산업 표준으로 자리잡았습니다. DWA, TEB, SLAM Toolbox 등이 포함되어 있습니다.