Sim-to-Real, PPO/SAC 알고리즘, 조작 학습
강화학습(Reinforcement Learning)은 로봇이 환경과 상호작용하면서 시행착오를 통해 최적의 행동을 학습하는 방법입니다. 복잡한 동작이나 명시적으로 프로그래밍하기 어려운 작업에 효과적입니다.
| 학습 방식 | 특징 | 로봇 적용 예 |
|---|---|---|
| 지도 학습 | 레이블된 데이터 필요 | 비전 인식, 분류 |
| 모방 학습 | 전문가 시연 모방 | 조립 동작, 복잡한 궤적 |
| 강화학습 | 보상 최대화 탐색 | 최적 제어, 적응 동작 |
로봇 강화학습의 핵심 과제는 '샘플 효율성'입니다. 실제 로봇에서 수만 번의 시도가 필요하면 비현실적이므로 시뮬레이션에서 학습 후 전이(Sim-to-Real)하는 방법이 주로 사용됩니다.
강화학습 문제는 MDP(Markov Decision Process)로 정형화됩니다.
import numpy as np
import torch
import torch.nn as nn
from typing import Tuple, Dict
class RobotEnvironment:
"""로봇 강화학습 환경 인터페이스 (Gym 스타일)"""
def __init__(self):
# 상태 공간: 관절 각도, 속도, 목표 등
self.observation_dim = 24 # 예: 6 관절 x 2 (각도, 속도) + 그리퍼 + 목표
self.action_dim = 7 # 예: 6 관절 토크 + 그리퍼
# 액션 범위
self.action_low = -1.0
self.action_high = 1.0
def reset(self) -> np.ndarray:
"""에피소드 시작, 초기 상태 반환"""
# 로봇 초기화, 목표 설정
initial_state = np.zeros(self.observation_dim)
return initial_state
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, Dict]:
"""
행동 실행 후 결과 반환
Returns:
next_state: 다음 상태
reward: 보상
done: 에피소드 종료 여부
info: 추가 정보
"""
# 액션 실행
next_state = self._execute_action(action)
# 보상 계산
reward = self._compute_reward(next_state)
# 종료 조건 검사
done = self._check_termination(next_state)
return next_state, reward, done, {}
def _compute_reward(self, state: np.ndarray) -> float:
"""
보상 함수 설계 예시 (Reach 작업)
보상 = -거리 + 성공 보너스 - 에너지 페널티
"""
gripper_pos = state[:3]
target_pos = state[-3:]
# 거리 기반 보상
distance = np.linalg.norm(gripper_pos - target_pos)
distance_reward = -distance
# 성공 보너스
success_bonus = 10.0 if distance < 0.05 else 0.0
# 에너지 효율 (액션 크기 페널티)
# action_penalty = -0.01 * np.sum(self.last_action ** 2)
return distance_reward + success_bonus
class ReplayBuffer:
"""경험 재생 버퍼"""
def __init__(self, capacity: int, obs_dim: int, action_dim: int):
self.capacity = capacity
self.ptr = 0
self.size = 0
self.observations = np.zeros((capacity, obs_dim))
self.actions = np.zeros((capacity, action_dim))
self.rewards = np.zeros(capacity)
self.next_observations = np.zeros((capacity, obs_dim))
self.dones = np.zeros(capacity)
def store(self, obs, action, reward, next_obs, done):
"""전이 저장"""
self.observations[self.ptr] = obs
self.actions[self.ptr] = action
self.rewards[self.ptr] = reward
self.next_observations[self.ptr] = next_obs
self.dones[self.ptr] = done
self.ptr = (self.ptr + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def sample(self, batch_size: int) -> Dict[str, torch.Tensor]:
"""미니배치 샘플링"""
indices = np.random.choice(self.size, batch_size, replace=False)
return {
'obs': torch.FloatTensor(self.observations[indices]),
'action': torch.FloatTensor(self.actions[indices]),
'reward': torch.FloatTensor(self.rewards[indices]),
'next_obs': torch.FloatTensor(self.next_observations[indices]),
'done': torch.FloatTensor(self.dones[indices])
}SAC는 엔트로피 정규화를 포함한 Off-Policy 알고리즘으로, 로봇 학습에서 가장 널리 사용됩니다.
import torch.nn.functional as F
from torch.distributions import Normal
class GaussianPolicy(nn.Module):
"""가우시안 정책 네트워크 (SAC용)"""
def __init__(self, obs_dim: int, action_dim: int, hidden_dim: int = 256):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
self.mean_head = nn.Linear(hidden_dim, action_dim)
self.log_std_head = nn.Linear(hidden_dim, action_dim)
self.log_std_min = -20
self.log_std_max = 2
def forward(self, obs):
"""평균과 표준편차 출력"""
h = self.net(obs)
mean = self.mean_head(h)
log_std = self.log_std_head(h)
log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
return mean, log_std
def sample(self, obs):
"""재매개변수화 샘플링"""
mean, log_std = self.forward(obs)
std = log_std.exp()
# 재매개변수화 트릭
normal = Normal(mean, std)
x_t = normal.rsample() # 미분 가능 샘플링
# tanh 스쿼싱
action = torch.tanh(x_t)
# 로그 확률 계산 (tanh 변환 보정)
log_prob = normal.log_prob(x_t)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(-1, keepdim=True)
return action, log_prob, mean
class TwinQNetwork(nn.Module):
"""쌍둥이 Q 네트워크 (SAC용)"""
def __init__(self, obs_dim: int, action_dim: int, hidden_dim: int = 256):
super().__init__()
# Q1 네트워크
self.q1 = nn.Sequential(
nn.Linear(obs_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
# Q2 네트워크
self.q2 = nn.Sequential(
nn.Linear(obs_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, obs, action):
x = torch.cat([obs, action], dim=-1)
return self.q1(x), self.q2(x)
class SAC:
"""Soft Actor-Critic 알고리즘"""
def __init__(self,
obs_dim: int,
action_dim: int,
gamma: float = 0.99,
tau: float = 0.005,
alpha: float = 0.2,
lr: float = 3e-4):
self.gamma = gamma
self.tau = tau
self.alpha = alpha # 엔트로피 계수
# 네트워크 초기화
self.policy = GaussianPolicy(obs_dim, action_dim)
self.q_network = TwinQNetwork(obs_dim, action_dim)
self.target_q = TwinQNetwork(obs_dim, action_dim)
# 타겟 네트워크 복사
self.target_q.load_state_dict(self.q_network.state_dict())
# 옵티마이저
self.policy_optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
self.q_optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
# 자동 엔트로피 튜닝
self.target_entropy = -action_dim
self.log_alpha = torch.zeros(1, requires_grad=True)
self.alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=lr)
def update(self, batch: Dict[str, torch.Tensor]):
"""SAC 업데이트"""
obs = batch['obs']
action = batch['action']
reward = batch['reward'].unsqueeze(-1)
next_obs = batch['next_obs']
done = batch['done'].unsqueeze(-1)
# Q 타겟 계산
with torch.no_grad():
next_action, next_log_prob, _ = self.policy.sample(next_obs)
target_q1, target_q2 = self.target_q(next_obs, next_action)
target_q = torch.min(target_q1, target_q2)
target_q = reward + self.gamma * (1 - done) * (target_q - self.alpha * next_log_prob)
# Q 네트워크 업데이트
q1, q2 = self.q_network(obs, action)
q_loss = F.mse_loss(q1, target_q) + F.mse_loss(q2, target_q)
self.q_optimizer.zero_grad()
q_loss.backward()
self.q_optimizer.step()
# 정책 업데이트
new_action, log_prob, _ = self.policy.sample(obs)
q1_new, q2_new = self.q_network(obs, new_action)
q_new = torch.min(q1_new, q2_new)
policy_loss = (self.alpha * log_prob - q_new).mean()
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
# 엔트로피 계수 업데이트
alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()
self.alpha = self.log_alpha.exp().item()
# 타겟 네트워크 소프트 업데이트
for param, target_param in zip(self.q_network.parameters(),
self.target_q.parameters()):
target_param.data.copy_(
self.tau * param.data + (1 - self.tau) * target_param.data
)
return {
'q_loss': q_loss.item(),
'policy_loss': policy_loss.item(),
'alpha': self.alpha
}시뮬레이션에서 학습한 정책을 실제 로봇에 전이하는 것은 강화학습의 핵심 도전입니다. Reality Gap을 극복하기 위한 다양한 기법이 사용됩니다.
class DomainRandomization:
"""도메인 랜덤화 - Reality Gap 극복"""
def __init__(self):
# 물리 파라미터 범위
self.mass_range = (0.8, 1.2) # 질량 변화율
self.friction_range = (0.5, 1.5) # 마찰 계수 변화율
self.damping_range = (0.9, 1.1) # 댐핑 변화율
# 센서 노이즈 범위
self.joint_noise_std = 0.01 # 관절 위치 노이즈 (rad)
self.force_noise_std = 1.0 # 힘 센서 노이즈 (N)
# 시각 랜덤화
self.lighting_range = (0.5, 2.0) # 조명 변화율
self.color_jitter = 0.2 # 색상 변화
def randomize_physics(self, simulator):
"""물리 파라미터 랜덤화"""
# 링크 질량 변화
for link in simulator.get_links():
original_mass = link.get_original_mass()
scale = np.random.uniform(*self.mass_range)
link.set_mass(original_mass * scale)
# 마찰 계수 변화
for surface in simulator.get_surfaces():
original_friction = surface.get_original_friction()
scale = np.random.uniform(*self.friction_range)
surface.set_friction(original_friction * scale)
# 관절 댐핑 변화
for joint in simulator.get_joints():
original_damping = joint.get_original_damping()
scale = np.random.uniform(*self.damping_range)
joint.set_damping(original_damping * scale)
def add_observation_noise(self, obs: np.ndarray) -> np.ndarray:
"""관측값에 노이즈 추가"""
noisy_obs = obs.copy()
# 관절 위치 노이즈
joint_indices = slice(0, 6) # 예: 처음 6개가 관절 위치
noisy_obs[joint_indices] += np.random.normal(
0, self.joint_noise_std, size=6
)
return noisy_obs
def randomize_action(self, action: np.ndarray,
action_delay_prob: float = 0.1) -> np.ndarray:
"""액션 랜덤화 (통신 지연 시뮬레이션)"""
if np.random.random() < action_delay_prob:
# 이전 액션 반복 (지연 시뮬레이션)
return self.prev_action if hasattr(self, 'prev_action') else action
self.prev_action = action.copy()
return action
class TeacherStudentDistillation:
"""Teacher-Student 학습 (Sim-to-Real)"""
def __init__(self, teacher_policy, student_policy):
"""
Teacher: 시뮬레이션 전용 정보(접촉력, 정확한 상태 등) 사용
Student: 실제 로봇에서 사용 가능한 정보만 사용
"""
self.teacher = teacher_policy # 특권 정보 사용
self.student = student_policy # 제한된 관측만 사용
self.student_optimizer = torch.optim.Adam(
self.student.parameters(), lr=1e-4
)
def collect_demonstrations(self, env, num_episodes: int = 100):
"""Teacher 정책으로 시연 데이터 수집"""
demonstrations = []
for _ in range(num_episodes):
obs_full = env.reset() # 전체 관측
obs_partial = self._extract_partial_obs(obs_full) # 부분 관측
done = False
while not done:
# Teacher 액션 (전체 정보 사용)
with torch.no_grad():
teacher_action = self.teacher(
torch.FloatTensor(obs_full)
).numpy()
demonstrations.append({
'partial_obs': obs_partial.copy(),
'teacher_action': teacher_action.copy()
})
obs_full, _, done, _ = env.step(teacher_action)
obs_partial = self._extract_partial_obs(obs_full)
return demonstrations
def train_student(self, demonstrations, epochs: int = 100):
"""Student 네트워크 학습 (행동 복제)"""
dataset = torch.utils.data.TensorDataset(
torch.FloatTensor([d['partial_obs'] for d in demonstrations]),
torch.FloatTensor([d['teacher_action'] for d in demonstrations])
)
loader = torch.utils.data.DataLoader(dataset, batch_size=256, shuffle=True)
for epoch in range(epochs):
total_loss = 0
for obs, target_action in loader:
student_action = self.student(obs)
loss = F.mse_loss(student_action, target_action)
self.student_optimizer.zero_grad()
loss.backward()
self.student_optimizer.step()
total_loss += loss.item()
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {total_loss / len(loader):.4f}")
def _extract_partial_obs(self, full_obs: np.ndarray) -> np.ndarray:
"""실제 로봇에서 측정 가능한 부분만 추출"""
# 예: 접촉력, GT 물체 위치 등 제거
partial_indices = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] # 관절 상태만
return full_obs[partial_indices]Domain Randomization의 핵심은 '충분히 다양한' 환경에서 학습하여 실제 환경도 그 범위 내에 포함되게 하는 것입니다. 너무 과도한 랜덤화는 오히려 성능을 저하시킵니다.
강화학습이 적용되는 대표적인 로봇 조작 작업들입니다.
| 작업 | 관측 공간 | 액션 공간 | 보상 설계 |
|---|---|---|---|
| Reach (도달) | 관절 위치/속도, 목표 | 관절 토크 | 거리 기반 |
| Push (밀기) | + 물체 위치 | 엔드이펙터 속도 | 물체-목표 거리 |
| Pick & Place | + 그리퍼 상태 | + 그리퍼 개폐 | 단계별 보상 |
| Peg Insertion | + 힘/토크 센서 | 임피던스 파라미터 | 삽입 깊이 |
| Dexterous Manipulation | 손가락 위치, 촉각 | 20+ DoF 위치 | 물체 자세 |
보상 함수 설계(Reward Engineering)가 강화학습 성공의 80%를 좌우합니다. 희소 보상(성공 시만 +1)보다 조밀한 보상(진행 상황 반영)이 학습에 유리합니다.
로봇 강화학습에 사용되는 주요 도구들입니다.
| 시뮬레이터 | 특징 | 장점 |
|---|---|---|
| IsaacGym (NVIDIA) | GPU 병렬화 | 수천 환경 동시 실행 |
| MuJoCo | 정확한 접촉 역학 | 연구 표준, 빠른 속도 |
| PyBullet | 오픈소스, 쉬운 설정 | 무료, ROS 통합 |
| Isaac Sim (Omniverse) | 포토리얼리스틱 | 시각 기반 학습에 적합 |
Isaac Gym은 GPU에서 병렬로 수천 개의 환경을 실행할 수 있어 학습 속도가 획기적으로 향상됩니다. PPO 기준 수백만 스텝을 수 분 내에 처리합니다.