CHAPTER 6 9 / 10

로봇 시스템 통합

ROS 2, OPC UA, 엣지 배포를 통한 로봇 AI 시스템 구축

1로봇 시스템 아키텍처

현대 로봇 시스템은 여러 컴포넌트가 유기적으로 연결된 분산 시스템입니다. 효과적인 통합을 위해 명확한 아키텍처 설계가 필요합니다.

Robot System Layer Architecture
Perception Layer
Vision
Processing
SLAM
Localization
Force/Torque
Sensing
LiDAR
Processing
Planning Layer
Motion
Planning
Task
Planning
Behavior
Tree
Control Layer
Trajectory
Following
Force/Position
Control
Safety
Monitoring
Hardware Layer
Robot
Driver
Gripper
Control
Digital
I/O
Sensor
Interface
Communication Protocols
ROS 2 DDS OPC UA MQTT

계층 간 명확한 인터페이스 정의가 시스템의 확장성과 유지보수성을 결정합니다. 각 계층은 독립적으로 테스트하고 교체할 수 있어야 합니다.

2ROS 2 기반 통합

ROS 2는 로봇 소프트웨어 개발의 사실상 표준입니다. DDS 기반의 실시간 통신과 풍부한 에코시스템을 제공합니다.

import rclpy from rclpy.node import Node from rclpy.action import ActionServer, ActionClient from geometry_msgs.msg import PoseStamped from sensor_msgs.msg import Image, JointState from std_srvs.srv import Trigger from control_msgs.action import FollowJointTrajectory import numpy as np class RobotControlNode(Node): """ROS 2 로봇 제어 노드""" def __init__(self): super().__init__('robot_control_node') # 파라미터 선언 self.declare_parameter('robot_name', 'robot_arm') self.declare_parameter('control_rate', 100.0) # 퍼블리셔 self.joint_state_pub = self.create_publisher( JointState, 'joint_states', 10 ) self.tcp_pose_pub = self.create_publisher( PoseStamped, 'tcp_pose', 10 ) # 서브스크라이버 self.target_sub = self.create_subscription( PoseStamped, 'target_pose', self.target_callback, 10 ) # 서비스 self.home_srv = self.create_service( Trigger, 'go_home', self.home_callback ) # 액션 서버 self.trajectory_action = ActionServer( self, FollowJointTrajectory, 'follow_joint_trajectory', self.execute_trajectory_callback ) # 제어 타이머 control_rate = self.get_parameter('control_rate').value self.timer = self.create_timer( 1.0 / control_rate, self.control_loop ) self.get_logger().info('Robot control node initialized') def control_loop(self): """메인 제어 루프""" # 현재 상태 읽기 joint_state = self._read_joint_state() # 상태 퍼블리시 msg = JointState() msg.header.stamp = self.get_clock().now().to_msg() msg.name = ['joint_1', 'joint_2', 'joint_3', 'joint_4', 'joint_5', 'joint_6'] msg.position = joint_state.tolist() self.joint_state_pub.publish(msg) def target_callback(self, msg: PoseStamped): """목표 포즈 수신""" self.get_logger().info(f'Received target: {msg.pose.position}') # 역기구학 및 모션 실행 async def execute_trajectory_callback(self, goal_handle): """궤적 추종 액션 실행""" trajectory = goal_handle.request.trajectory feedback = FollowJointTrajectory.Feedback() for point in trajectory.points: # 각 웨이포인트 실행 self._move_to_point(point) # 피드백 발송 feedback.actual.positions = self._read_joint_state().tolist() goal_handle.publish_feedback(feedback) goal_handle.succeed() result = FollowJointTrajectory.Result() result.error_code = 0 return result class VisionProcessingNode(Node): """비전 처리 노드""" def __init__(self): super().__init__('vision_node') self.image_sub = self.create_subscription( Image, 'camera/image_raw', self.image_callback, 10 ) self.detection_pub = self.create_publisher( PoseStamped, 'detected_objects', 10 ) # AI 모델 로드 self.detector = self._load_detection_model() def image_callback(self, msg: Image): """이미지 처리 콜백""" # ROS 이미지 → NumPy 변환 image = self._ros_to_numpy(msg) # 객체 탐지 detections = self.detector.detect(image) # 결과 퍼블리시 for det in detections: pose_msg = PoseStamped() pose_msg.header = msg.header pose_msg.pose.position.x = det['x'] pose_msg.pose.position.y = det['y'] pose_msg.pose.position.z = det['z'] self.detection_pub.publish(pose_msg)

3OPC UA 통합

OPC UA는 산업 자동화의 표준 통신 프로토콜입니다. ROS 2와 PLC/SCADA 시스템 간의 브릿지 역할을 합니다.

from asyncua import Server, ua, Client import asyncio class RobotOPCUAServer: """로봇 OPC UA 서버""" def __init__(self, endpoint: str = "opc.tcp://localhost:4840"): self.server = Server() self.endpoint = endpoint async def setup(self): """서버 설정""" await self.server.init() self.server.set_endpoint(self.endpoint) # 네임스페이스 등록 uri = "urn:robot:opcua:server" idx = await self.server.register_namespace(uri) # 로봇 객체 생성 robot_obj = await self.server.nodes.objects.add_object( idx, "Robot" ) # 변수 추가 self.joint_positions = await robot_obj.add_variable( idx, "JointPositions", [0.0] * 6, ua.VariantType.Double ) await self.joint_positions.set_writable() self.tcp_pose = await robot_obj.add_variable( idx, "TCPPose", [0.0] * 6, ua.VariantType.Double ) self.robot_status = await robot_obj.add_variable( idx, "Status", "IDLE", ua.VariantType.String ) # 메서드 추가 await robot_obj.add_method( idx, "MoveToPosition", self._move_to_position, [ua.VariantType.Double] * 6, [ua.VariantType.Boolean] ) async def _move_to_position(self, parent, *args): """이동 명령 메서드""" target_position = list(args) # 로봇 이동 로직 success = await self._execute_motion(target_position) return [ua.Variant(success, ua.VariantType.Boolean)] async def update_state(self, joints, tcp, status): """상태 업데이트""" await self.joint_positions.write_value(joints) await self.tcp_pose.write_value(tcp) await self.robot_status.write_value(status) async def run(self): """서버 실행""" async with self.server: while True: await asyncio.sleep(0.1) class PLCBridge: """PLC-ROS 브릿지""" def __init__(self, plc_address: str, ros_node): self.plc_client = Client(plc_address) self.ros_node = ros_node self.tag_mapping = {} async def connect(self): """PLC 연결""" await self.plc_client.connect() await self._discover_tags() async def _discover_tags(self): """사용 가능한 태그 검색""" root = self.plc_client.get_root_node() objects = await root.get_child(["0:Objects"]) # 재귀적으로 태그 검색 await self._browse_node(objects) async def sync_loop(self): """동기화 루프""" while True: # PLC → ROS 동기화 for tag_name, ros_topic in self.tag_mapping.items(): try: node = self.plc_client.get_node(tag_name) value = await node.read_value() self.ros_node.publish_to_topic(ros_topic, value) except Exception as e: self.ros_node.get_logger().error(f"Sync error: {e}") await asyncio.sleep(0.01) # 100Hz

OPC UA + ROS 2 조합은 IT/OT 융합의 핵심입니다. ros2_opcua 패키지를 사용하면 더 쉽게 통합할 수 있습니다.

4엣지 AI 배포

로봇에서 AI 추론을 실행할 때 지연 시간과 대역폭을 최소화하기 위해 엣지 배포가 필요합니다.

import tensorrt as trt import numpy as np class EdgeAIInference: """엣지 AI 추론 엔진""" def __init__(self, model_path: str, device_id: int = 0): self.logger = trt.Logger(trt.Logger.WARNING) self.runtime = trt.Runtime(self.logger) # TensorRT 엔진 로드 with open(model_path, 'rb') as f: self.engine = self.runtime.deserialize_cuda_engine(f.read()) self.context = self.engine.create_execution_context() # CUDA 스트림 및 버퍼 설정 import pycuda.driver as cuda import pycuda.autoinit self.stream = cuda.Stream() self.buffers = self._allocate_buffers() def _allocate_buffers(self): """입출력 버퍼 할당""" import pycuda.driver as cuda inputs = [] outputs = [] bindings = [] for binding in self.engine: size = trt.volume(self.engine.get_binding_shape(binding)) dtype = trt.nptype(self.engine.get_binding_dtype(binding)) # 호스트 메모리 host_mem = cuda.pagelocked_empty(size, dtype) # 디바이스 메모리 device_mem = cuda.mem_alloc(host_mem.nbytes) bindings.append(int(device_mem)) if self.engine.binding_is_input(binding): inputs.append({'host': host_mem, 'device': device_mem}) else: outputs.append({'host': host_mem, 'device': device_mem}) return {'inputs': inputs, 'outputs': outputs, 'bindings': bindings} def infer(self, input_data: np.ndarray) -> np.ndarray: """추론 실행""" import pycuda.driver as cuda # 입력 데이터 복사 np.copyto(self.buffers['inputs'][0]['host'], input_data.ravel()) cuda.memcpy_htod_async( self.buffers['inputs'][0]['device'], self.buffers['inputs'][0]['host'], self.stream ) # 추론 실행 self.context.execute_async_v2( bindings=self.buffers['bindings'], stream_handle=self.stream.handle ) # 출력 데이터 복사 cuda.memcpy_dtoh_async( self.buffers['outputs'][0]['host'], self.buffers['outputs'][0]['device'], self.stream ) self.stream.synchronize() return self.buffers['outputs'][0]['host'].copy() class RobotEdgeAINode: """엣지 AI 통합 로봇 노드""" def __init__(self): # 비전 모델 (TensorRT) self.vision_model = EdgeAIInference('models/yolo_fp16.engine') # 그래스프 모델 self.grasp_model = EdgeAIInference('models/grasp_fp16.engine') # 성능 모니터링 self.inference_times = [] def process_frame(self, image: np.ndarray) -> dict: """프레임 처리 파이프라인""" import time start = time.perf_counter() # 1. 객체 탐지 detections = self._detect_objects(image) # 2. 탐지된 각 객체에 대해 그래스프 계획 grasp_poses = [] for det in detections: cropped = self._crop_detection(image, det) grasp = self._plan_grasp(cropped) grasp_poses.append(grasp) elapsed = (time.perf_counter() - start) * 1000 self.inference_times.append(elapsed) return { 'detections': detections, 'grasp_poses': grasp_poses, 'inference_time_ms': elapsed } def get_performance_stats(self) -> dict: """성능 통계""" times = np.array(self.inference_times[-100:]) return { 'mean_ms': np.mean(times), 'std_ms': np.std(times), 'p95_ms': np.percentile(times, 95), 'fps': 1000.0 / np.mean(times) }

엣지 디바이스의 열 관리가 중요합니다. NVIDIA Jetson의 경우 전력 모드 설정과 팬 제어로 성능을 최적화하세요.

5행동 트리와 작업 계획

복잡한 로봇 작업을 관리하기 위해 행동 트리(Behavior Tree)를 사용합니다.

from enum import Enum class NodeStatus(Enum): SUCCESS = "success" FAILURE = "failure" RUNNING = "running" class BehaviorNode: """행동 트리 노드 기본 클래스""" def __init__(self, name: str): self.name = name self.status = NodeStatus.RUNNING def tick(self) -> NodeStatus: raise NotImplementedError class SequenceNode(BehaviorNode): """시퀀스 노드 (AND)""" def __init__(self, name: str, children: list): super().__init__(name) self.children = children self.current_index = 0 def tick(self) -> NodeStatus: while self.current_index < len(self.children): child = self.children[self.current_index] status = child.tick() if status == NodeStatus.RUNNING: return NodeStatus.RUNNING elif status == NodeStatus.FAILURE: self.current_index = 0 return NodeStatus.FAILURE self.current_index += 1 self.current_index = 0 return NodeStatus.SUCCESS class SelectorNode(BehaviorNode): """셀렉터 노드 (OR)""" def __init__(self, name: str, children: list): super().__init__(name) self.children = children def tick(self) -> NodeStatus: for child in self.children: status = child.tick() if status == NodeStatus.SUCCESS: return NodeStatus.SUCCESS elif status == NodeStatus.RUNNING: return NodeStatus.RUNNING return NodeStatus.FAILURE class PickAndPlaceTree: """픽 앤 플레이스 행동 트리""" def __init__(self, robot_controller, vision_system): self.robot = robot_controller self.vision = vision_system # 행동 트리 구성 self.root = SequenceNode("PickAndPlace", [ self._create_detect_node(), self._create_approach_node(), self._create_pick_node(), self._create_move_node(), self._create_place_node() ]) def _create_detect_node(self): return ActionNode("Detect", self._detect_object) def _create_pick_node(self): return SequenceNode("Pick", [ ActionNode("OpenGripper", self.robot.open_gripper), ActionNode("Approach", lambda: self.robot.move_to(self.pick_pose)), ActionNode("CloseGripper", self.robot.close_gripper), ConditionNode("HasObject", self.robot.has_object) ]) async def execute(self): """행동 트리 실행""" while True: status = self.root.tick() if status == NodeStatus.SUCCESS: print("Task completed successfully") break elif status == NodeStatus.FAILURE: print("Task failed") break await asyncio.sleep(0.01)

6통합 아키텍처 모범 사례

영역 권장 사항
통신 ROS 2 DDS (실시간), MQTT (비실시간), OPC UA (PLC 연동)
AI 배포 ONNX → TensorRT 변환, INT8/FP16 양자화
안전 안전 PLC 분리, E-Stop 하드와이어
모니터링 Prometheus + Grafana, ROS 2 bag 로깅
배포 Docker 컨테이너, CI/CD 파이프라인

시스템 통합의 성공 요인은 명확한 인터페이스 정의, 철저한 테스트, 그리고 문서화입니다. 각 컴포넌트를 독립적으로 테스트할 수 있는 환경을 구축하세요.