ROS 2, OPC UA, 엣지 배포를 통한 로봇 AI 시스템 구축
현대 로봇 시스템은 여러 컴포넌트가 유기적으로 연결된 분산 시스템입니다. 효과적인 통합을 위해 명확한 아키텍처 설계가 필요합니다.
계층 간 명확한 인터페이스 정의가 시스템의 확장성과 유지보수성을 결정합니다. 각 계층은 독립적으로 테스트하고 교체할 수 있어야 합니다.
ROS 2는 로봇 소프트웨어 개발의 사실상 표준입니다. DDS 기반의 실시간 통신과 풍부한 에코시스템을 제공합니다.
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, ActionClient
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import Image, JointState
from std_srvs.srv import Trigger
from control_msgs.action import FollowJointTrajectory
import numpy as np
class RobotControlNode(Node):
"""ROS 2 로봇 제어 노드"""
def __init__(self):
super().__init__('robot_control_node')
# 파라미터 선언
self.declare_parameter('robot_name', 'robot_arm')
self.declare_parameter('control_rate', 100.0)
# 퍼블리셔
self.joint_state_pub = self.create_publisher(
JointState, 'joint_states', 10
)
self.tcp_pose_pub = self.create_publisher(
PoseStamped, 'tcp_pose', 10
)
# 서브스크라이버
self.target_sub = self.create_subscription(
PoseStamped, 'target_pose',
self.target_callback, 10
)
# 서비스
self.home_srv = self.create_service(
Trigger, 'go_home', self.home_callback
)
# 액션 서버
self.trajectory_action = ActionServer(
self,
FollowJointTrajectory,
'follow_joint_trajectory',
self.execute_trajectory_callback
)
# 제어 타이머
control_rate = self.get_parameter('control_rate').value
self.timer = self.create_timer(
1.0 / control_rate, self.control_loop
)
self.get_logger().info('Robot control node initialized')
def control_loop(self):
"""메인 제어 루프"""
# 현재 상태 읽기
joint_state = self._read_joint_state()
# 상태 퍼블리시
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['joint_1', 'joint_2', 'joint_3', 'joint_4', 'joint_5', 'joint_6']
msg.position = joint_state.tolist()
self.joint_state_pub.publish(msg)
def target_callback(self, msg: PoseStamped):
"""목표 포즈 수신"""
self.get_logger().info(f'Received target: {msg.pose.position}')
# 역기구학 및 모션 실행
async def execute_trajectory_callback(self, goal_handle):
"""궤적 추종 액션 실행"""
trajectory = goal_handle.request.trajectory
feedback = FollowJointTrajectory.Feedback()
for point in trajectory.points:
# 각 웨이포인트 실행
self._move_to_point(point)
# 피드백 발송
feedback.actual.positions = self._read_joint_state().tolist()
goal_handle.publish_feedback(feedback)
goal_handle.succeed()
result = FollowJointTrajectory.Result()
result.error_code = 0
return result
class VisionProcessingNode(Node):
"""비전 처리 노드"""
def __init__(self):
super().__init__('vision_node')
self.image_sub = self.create_subscription(
Image, 'camera/image_raw',
self.image_callback, 10
)
self.detection_pub = self.create_publisher(
PoseStamped, 'detected_objects', 10
)
# AI 모델 로드
self.detector = self._load_detection_model()
def image_callback(self, msg: Image):
"""이미지 처리 콜백"""
# ROS 이미지 → NumPy 변환
image = self._ros_to_numpy(msg)
# 객체 탐지
detections = self.detector.detect(image)
# 결과 퍼블리시
for det in detections:
pose_msg = PoseStamped()
pose_msg.header = msg.header
pose_msg.pose.position.x = det['x']
pose_msg.pose.position.y = det['y']
pose_msg.pose.position.z = det['z']
self.detection_pub.publish(pose_msg)OPC UA는 산업 자동화의 표준 통신 프로토콜입니다. ROS 2와 PLC/SCADA 시스템 간의 브릿지 역할을 합니다.
from asyncua import Server, ua, Client
import asyncio
class RobotOPCUAServer:
"""로봇 OPC UA 서버"""
def __init__(self, endpoint: str = "opc.tcp://localhost:4840"):
self.server = Server()
self.endpoint = endpoint
async def setup(self):
"""서버 설정"""
await self.server.init()
self.server.set_endpoint(self.endpoint)
# 네임스페이스 등록
uri = "urn:robot:opcua:server"
idx = await self.server.register_namespace(uri)
# 로봇 객체 생성
robot_obj = await self.server.nodes.objects.add_object(
idx, "Robot"
)
# 변수 추가
self.joint_positions = await robot_obj.add_variable(
idx, "JointPositions",
[0.0] * 6, ua.VariantType.Double
)
await self.joint_positions.set_writable()
self.tcp_pose = await robot_obj.add_variable(
idx, "TCPPose",
[0.0] * 6, ua.VariantType.Double
)
self.robot_status = await robot_obj.add_variable(
idx, "Status", "IDLE", ua.VariantType.String
)
# 메서드 추가
await robot_obj.add_method(
idx, "MoveToPosition",
self._move_to_position,
[ua.VariantType.Double] * 6,
[ua.VariantType.Boolean]
)
async def _move_to_position(self, parent, *args):
"""이동 명령 메서드"""
target_position = list(args)
# 로봇 이동 로직
success = await self._execute_motion(target_position)
return [ua.Variant(success, ua.VariantType.Boolean)]
async def update_state(self, joints, tcp, status):
"""상태 업데이트"""
await self.joint_positions.write_value(joints)
await self.tcp_pose.write_value(tcp)
await self.robot_status.write_value(status)
async def run(self):
"""서버 실행"""
async with self.server:
while True:
await asyncio.sleep(0.1)
class PLCBridge:
"""PLC-ROS 브릿지"""
def __init__(self, plc_address: str, ros_node):
self.plc_client = Client(plc_address)
self.ros_node = ros_node
self.tag_mapping = {}
async def connect(self):
"""PLC 연결"""
await self.plc_client.connect()
await self._discover_tags()
async def _discover_tags(self):
"""사용 가능한 태그 검색"""
root = self.plc_client.get_root_node()
objects = await root.get_child(["0:Objects"])
# 재귀적으로 태그 검색
await self._browse_node(objects)
async def sync_loop(self):
"""동기화 루프"""
while True:
# PLC → ROS 동기화
for tag_name, ros_topic in self.tag_mapping.items():
try:
node = self.plc_client.get_node(tag_name)
value = await node.read_value()
self.ros_node.publish_to_topic(ros_topic, value)
except Exception as e:
self.ros_node.get_logger().error(f"Sync error: {e}")
await asyncio.sleep(0.01) # 100HzOPC UA + ROS 2 조합은 IT/OT 융합의 핵심입니다. ros2_opcua 패키지를 사용하면 더 쉽게 통합할 수 있습니다.
로봇에서 AI 추론을 실행할 때 지연 시간과 대역폭을 최소화하기 위해 엣지 배포가 필요합니다.
import tensorrt as trt
import numpy as np
class EdgeAIInference:
"""엣지 AI 추론 엔진"""
def __init__(self, model_path: str, device_id: int = 0):
self.logger = trt.Logger(trt.Logger.WARNING)
self.runtime = trt.Runtime(self.logger)
# TensorRT 엔진 로드
with open(model_path, 'rb') as f:
self.engine = self.runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
# CUDA 스트림 및 버퍼 설정
import pycuda.driver as cuda
import pycuda.autoinit
self.stream = cuda.Stream()
self.buffers = self._allocate_buffers()
def _allocate_buffers(self):
"""입출력 버퍼 할당"""
import pycuda.driver as cuda
inputs = []
outputs = []
bindings = []
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
# 호스트 메모리
host_mem = cuda.pagelocked_empty(size, dtype)
# 디바이스 메모리
device_mem = cuda.mem_alloc(host_mem.nbytes)
bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
inputs.append({'host': host_mem, 'device': device_mem})
else:
outputs.append({'host': host_mem, 'device': device_mem})
return {'inputs': inputs, 'outputs': outputs, 'bindings': bindings}
def infer(self, input_data: np.ndarray) -> np.ndarray:
"""추론 실행"""
import pycuda.driver as cuda
# 입력 데이터 복사
np.copyto(self.buffers['inputs'][0]['host'], input_data.ravel())
cuda.memcpy_htod_async(
self.buffers['inputs'][0]['device'],
self.buffers['inputs'][0]['host'],
self.stream
)
# 추론 실행
self.context.execute_async_v2(
bindings=self.buffers['bindings'],
stream_handle=self.stream.handle
)
# 출력 데이터 복사
cuda.memcpy_dtoh_async(
self.buffers['outputs'][0]['host'],
self.buffers['outputs'][0]['device'],
self.stream
)
self.stream.synchronize()
return self.buffers['outputs'][0]['host'].copy()
class RobotEdgeAINode:
"""엣지 AI 통합 로봇 노드"""
def __init__(self):
# 비전 모델 (TensorRT)
self.vision_model = EdgeAIInference('models/yolo_fp16.engine')
# 그래스프 모델
self.grasp_model = EdgeAIInference('models/grasp_fp16.engine')
# 성능 모니터링
self.inference_times = []
def process_frame(self, image: np.ndarray) -> dict:
"""프레임 처리 파이프라인"""
import time
start = time.perf_counter()
# 1. 객체 탐지
detections = self._detect_objects(image)
# 2. 탐지된 각 객체에 대해 그래스프 계획
grasp_poses = []
for det in detections:
cropped = self._crop_detection(image, det)
grasp = self._plan_grasp(cropped)
grasp_poses.append(grasp)
elapsed = (time.perf_counter() - start) * 1000
self.inference_times.append(elapsed)
return {
'detections': detections,
'grasp_poses': grasp_poses,
'inference_time_ms': elapsed
}
def get_performance_stats(self) -> dict:
"""성능 통계"""
times = np.array(self.inference_times[-100:])
return {
'mean_ms': np.mean(times),
'std_ms': np.std(times),
'p95_ms': np.percentile(times, 95),
'fps': 1000.0 / np.mean(times)
}엣지 디바이스의 열 관리가 중요합니다. NVIDIA Jetson의 경우 전력 모드 설정과 팬 제어로 성능을 최적화하세요.
복잡한 로봇 작업을 관리하기 위해 행동 트리(Behavior Tree)를 사용합니다.
from enum import Enum
class NodeStatus(Enum):
SUCCESS = "success"
FAILURE = "failure"
RUNNING = "running"
class BehaviorNode:
"""행동 트리 노드 기본 클래스"""
def __init__(self, name: str):
self.name = name
self.status = NodeStatus.RUNNING
def tick(self) -> NodeStatus:
raise NotImplementedError
class SequenceNode(BehaviorNode):
"""시퀀스 노드 (AND)"""
def __init__(self, name: str, children: list):
super().__init__(name)
self.children = children
self.current_index = 0
def tick(self) -> NodeStatus:
while self.current_index < len(self.children):
child = self.children[self.current_index]
status = child.tick()
if status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif status == NodeStatus.FAILURE:
self.current_index = 0
return NodeStatus.FAILURE
self.current_index += 1
self.current_index = 0
return NodeStatus.SUCCESS
class SelectorNode(BehaviorNode):
"""셀렉터 노드 (OR)"""
def __init__(self, name: str, children: list):
super().__init__(name)
self.children = children
def tick(self) -> NodeStatus:
for child in self.children:
status = child.tick()
if status == NodeStatus.SUCCESS:
return NodeStatus.SUCCESS
elif status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
return NodeStatus.FAILURE
class PickAndPlaceTree:
"""픽 앤 플레이스 행동 트리"""
def __init__(self, robot_controller, vision_system):
self.robot = robot_controller
self.vision = vision_system
# 행동 트리 구성
self.root = SequenceNode("PickAndPlace", [
self._create_detect_node(),
self._create_approach_node(),
self._create_pick_node(),
self._create_move_node(),
self._create_place_node()
])
def _create_detect_node(self):
return ActionNode("Detect", self._detect_object)
def _create_pick_node(self):
return SequenceNode("Pick", [
ActionNode("OpenGripper", self.robot.open_gripper),
ActionNode("Approach", lambda: self.robot.move_to(self.pick_pose)),
ActionNode("CloseGripper", self.robot.close_gripper),
ConditionNode("HasObject", self.robot.has_object)
])
async def execute(self):
"""행동 트리 실행"""
while True:
status = self.root.tick()
if status == NodeStatus.SUCCESS:
print("Task completed successfully")
break
elif status == NodeStatus.FAILURE:
print("Task failed")
break
await asyncio.sleep(0.01)| 영역 | 권장 사항 |
|---|---|
| 통신 | ROS 2 DDS (실시간), MQTT (비실시간), OPC UA (PLC 연동) |
| AI 배포 | ONNX → TensorRT 변환, INT8/FP16 양자화 |
| 안전 | 안전 PLC 분리, E-Stop 하드와이어 |
| 모니터링 | Prometheus + Grafana, ROS 2 bag 로깅 |
| 배포 | Docker 컨테이너, CI/CD 파이프라인 |
시스템 통합의 성공 요인은 명확한 인터페이스 정의, 철저한 테스트, 그리고 문서화입니다. 각 컴포넌트를 독립적으로 테스트할 수 있는 환경을 구축하세요.