산업용 카메라 연동, GStreamer 파이프라인, 실시간 객체 탐지, 결함 검출 시스템 구현
제조 현장의 실시간 비전 AI는 카메라 캡처, 전처리, 추론, 후처리가 지연 없이 연속적으로 처리되어야 합니다. 각 단계의 병렬화와 비동기 처리가 성능의 핵심입니다.
실시간 비전 시스템에서 각 단계는 독립적인 스레드/프로세스로 분리하고, 큐 기반 비동기 통신으로 연결하여 파이프라인 병렬성을 확보합니다.
산업용 카메라는 GigE Vision, USB3 Vision, CoaXPress 등의 프로토콜을 사용합니다. GenICam 표준을 통해 다양한 제조사의 카메라를 일관되게 제어할 수 있습니다.
import cv2 import numpy as np from typing import Optional, Callable from threading import Thread, Event from queue import Queue import time class IndustrialCamera: """산업용 카메라 제어 (GigE Vision/USB3 Vision)""" def __init__( self, camera_id: int = 0, width: int = 1920, height: int = 1080, fps: int = 60, exposure_us: int = 10000 ): self.camera_id = camera_id self.width = width self.height = height self.target_fps = fps self.exposure_us = exposure_us self.cap: Optional[cv2.VideoCapture] = None self.frame_queue: Queue = Queue(maxsize=3) self.stop_event = Event() self.capture_thread: Optional[Thread] = None def open(self) -> bool: """카메라 연결 및 설정""" # GStreamer 파이프라인 (Jetson용) gst_pipeline = ( f"v4l2src device=/dev/video{self.camera_id} ! " f"video/x-raw,width={self.width},height={self.height}," f"framerate={self.target_fps}/1 ! " f"videoconvert ! video/x-raw,format=BGR ! appsink" ) try: # GStreamer 백엔드 시도 self.cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): # 표준 V4L2 폴백 self.cap = cv2.VideoCapture(self.camera_id) self._configure_v4l2() return self.cap.isOpened() except Exception as e: print(f"Camera open failed: {e}") return False def _configure_v4l2(self): """V4L2 카메라 설정""" self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) self.cap.set(cv2.CAP_PROP_FPS, self.target_fps) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 최소 버퍼 # 노출 수동 설정 self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1) # Manual self.cap.set(cv2.CAP_PROP_EXPOSURE, self.exposure_us) def start_capture(self): """비동기 캡처 시작""" self.stop_event.clear() self.capture_thread = Thread(target=self._capture_loop, daemon=True) self.capture_thread.start() def _capture_loop(self): """프레임 캡처 루프""" while not self.stop_event.is_set(): ret, frame = self.cap.read() if ret: timestamp = time.time() # 큐가 가득 차면 오래된 프레임 버림 if self.frame_queue.full(): try: self.frame_queue.get_nowait() except: pass self.frame_queue.put((frame, timestamp)) def get_frame(self, timeout: float = 0.1) -> tuple: """최신 프레임 획득""" try: return self.frame_queue.get(timeout=timeout) except: return None, None def stop(self): """캡처 중지""" self.stop_event.set() if self.capture_thread: self.capture_thread.join(timeout=1.0) if self.cap: self.cap.release()
NVIDIA Jetson에서 GStreamer를 활용하면 하드웨어 비디오 디코더(NVDEC)와 GPU 리사이즈를 통해 CPU 부하 없이 고속 영상 처리가 가능합니다.
class GStreamerPipeline: """GStreamer 하드웨어 가속 파이프라인""" PIPELINE_TEMPLATES = { "usb_camera": ( "v4l2src device={device} ! " "video/x-raw,width={width},height={height},framerate={fps}/1 ! " "nvvidconv ! video/x-raw(memory:NVMM) ! " "nvvidconv ! video/x-raw,format=BGRx ! " "videoconvert ! video/x-raw,format=BGR ! appsink" ), "rtsp_camera": ( "rtspsrc location={rtsp_url} latency=0 ! " "rtph264depay ! h264parse ! nvv4l2decoder ! " "nvvidconv ! video/x-raw,format=BGRx ! " "videoconvert ! video/x-raw,format=BGR ! appsink" ), "csi_camera": ( "nvarguscamerasrc sensor-id={sensor_id} ! " "video/x-raw(memory:NVMM),width={width},height={height}," "framerate={fps}/1,format=NV12 ! " "nvvidconv flip-method=0 ! video/x-raw,format=BGRx ! " "videoconvert ! video/x-raw,format=BGR ! appsink" ), "gige_camera": ( "aabornsrc device={ip_address} ! " "video/x-bayer,width={width},height={height} ! " "bayer2rgb ! videoconvert ! video/x-raw,format=BGR ! appsink" ) } def __init__(self, pipeline_type: str, **kwargs): self.pipeline_str = self.PIPELINE_TEMPLATES[pipeline_type].format(**kwargs) self.cap = None def start(self) -> bool: """파이프라인 시작""" self.cap = cv2.VideoCapture(self.pipeline_str, cv2.CAP_GSTREAMER) return self.cap.isOpened() def read(self) -> tuple: """프레임 읽기""" return self.cap.read() if self.cap else (False, None) @staticmethod def create_multi_stream(camera_configs: list) -> "MultiStreamPipeline": """멀티 스트림 파이프라인 생성""" return MultiStreamPipeline(camera_configs) class MultiStreamPipeline: """멀티 카메라 동시 처리 파이프라인""" def __init__(self, camera_configs: list): self.pipelines = [] self.frame_queues = [] for config in camera_configs: pipeline = GStreamerPipeline(**config) queue = Queue(maxsize=2) self.pipelines.append(pipeline) self.frame_queues.append(queue) def start_all(self): """모든 스트림 시작""" for i, pipeline in enumerate(self.pipelines): pipeline.start() Thread( target=self._stream_loop, args=(pipeline, self.frame_queues[i]), daemon=True ).start() def _stream_loop(self, pipeline, queue): """개별 스트림 루프""" while True: ret, frame = pipeline.read() if ret: if queue.full(): try: queue.get_nowait() except: pass queue.put((frame, time.time())) def get_all_frames(self) -> list: """모든 스트림의 최신 프레임 획득""" frames = [] for queue in self.frame_queues: try: frames.append(queue.get_nowait()) except: frames.append((None, None)) return frames
YOLO, SSD 등의 객체 탐지 모델을 TensorRT로 가속하여 실시간 탐지를 수행합니다. 배치 처리와 비동기 추론으로 처리량을 극대화합니다.
import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit import numpy as np from dataclasses import dataclass from typing import List @dataclass class Detection: """탐지 결과""" class_id: int confidence: float bbox: tuple # (x1, y1, x2, y2) class_name: str = "" class TensorRTDetector: """TensorRT 기반 실시간 객체 탐지""" def __init__( self, engine_path: str, input_size: tuple = (640, 640), conf_threshold: float = 0.5, nms_threshold: float = 0.4 ): self.input_size = input_size self.conf_threshold = conf_threshold self.nms_threshold = nms_threshold # TensorRT 엔진 로드 self.logger = trt.Logger(trt.Logger.WARNING) with open(engine_path, "rb") as f: self.engine = trt.Runtime(self.logger).deserialize_cuda_engine(f.read()) self.context = self.engine.create_execution_context() # CUDA 메모리 할당 self._allocate_buffers() # CUDA 스트림 (비동기 처리용) self.stream = cuda.Stream() def _allocate_buffers(self): """입출력 버퍼 할당""" self.inputs = [] self.outputs = [] self.bindings = [] for binding in self.engine: shape = self.engine.get_binding_shape(binding) dtype = trt.nptype(self.engine.get_binding_dtype(binding)) size = trt.volume(shape) # 호스트/디바이스 메모리 host_mem = cuda.pagelocked_empty(size, dtype) device_mem = cuda.mem_alloc(host_mem.nbytes) self.bindings.append(int(device_mem)) if self.engine.binding_is_input(binding): self.inputs.append({"host": host_mem, "device": device_mem}) else: self.outputs.append({"host": host_mem, "device": device_mem}) def preprocess(self, image: np.ndarray) -> np.ndarray: """이미지 전처리""" # 리사이즈 resized = cv2.resize(image, self.input_size) # BGR -> RGB, HWC -> CHW rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) chw = rgb.transpose((2, 0, 1)) # 정규화 normalized = chw.astype(np.float32) / 255.0 # 배치 차원 추가 return np.expand_dims(normalized, axis=0) def detect(self, image: np.ndarray) -> List[Detection]: """객체 탐지 실행""" orig_h, orig_w = image.shape[:2] # 전처리 input_data = self.preprocess(image) np.copyto(self.inputs[0]["host"], input_data.ravel()) # GPU 전송 cuda.memcpy_htod_async( self.inputs[0]["device"], self.inputs[0]["host"], self.stream ) # 추론 self.context.execute_async_v2( bindings=self.bindings, stream_handle=self.stream.handle ) # 결과 전송 cuda.memcpy_dtoh_async( self.outputs[0]["host"], self.outputs[0]["device"], self.stream ) self.stream.synchronize() # 후처리 return self.postprocess( self.outputs[0]["host"], orig_w, orig_h ) def postprocess( self, output: np.ndarray, orig_w: int, orig_h: int ) -> List[Detection]: """탐지 결과 후처리 (NMS 포함)""" detections = [] # YOLO 출력 형태: [batch, num_boxes, 5+num_classes] output = output.reshape((-1, 85)) # COCO 80 classes # 신뢰도 필터링 scores = output[:, 4] mask = scores > self.conf_threshold boxes = output[mask] if len(boxes) == 0: return detections # 좌표 변환 (center to corner) scale_x = orig_w / self.input_size[0] scale_y = orig_h / self.input_size[1] for box in boxes: cx, cy, w, h = box[:4] x1 = int((cx - w/2) * scale_x) y1 = int((cy - h/2) * scale_y) x2 = int((cx + w/2) * scale_x) y2 = int((cy + h/2) * scale_y) class_scores = box[5:] class_id = np.argmax(class_scores) confidence = box[4] * class_scores[class_id] if confidence > self.conf_threshold: detections.append(Detection( class_id=class_id, confidence=confidence, bbox=(x1, y1, x2, y2) )) # NMS 적용 return self._nms(detections)
제조 라인의 결함 검출은 객체 탐지와 분류를 결합한 2단계 파이프라인으로 구현됩니다. 먼저 ROI를 탐지한 후, 해당 영역을 결함 분류 모델에 전달합니다.
트리거 신호 수신 또는 연속 캡처
검사 대상 영역 검출 (YOLO)
각 ROI에 대해 OK/NG 및 결함 유형 분류
PLC 신호 출력, DB 기록
from dataclasses import dataclass from enum import Enum from typing import List, Tuple class DefectType(Enum): OK = 0 SCRATCH = 1 CRACK = 2 STAIN = 3 DENT = 4 MISSING = 5 @dataclass class InspectionResult: """검사 결과""" is_ok: bool defect_type: DefectType confidence: float bbox: Tuple[int, int, int, int] roi_image: np.ndarray class DefectInspectionPipeline: """2단계 결함 검출 파이프라인""" def __init__( self, roi_detector: TensorRTDetector, defect_classifier: "TensorRTClassifier", ok_threshold: float = 0.95 ): self.roi_detector = roi_detector self.defect_classifier = defect_classifier self.ok_threshold = ok_threshold # 통계 추적 self.total_inspected = 0 self.total_defects = 0 def inspect(self, frame: np.ndarray) -> List[InspectionResult]: """프레임 검사""" results = [] # 1. ROI 탐지 detections = self.roi_detector.detect(frame) for det in detections: # 2. ROI 추출 x1, y1, x2, y2 = det.bbox roi = frame[y1:y2, x1:x2] if roi.size == 0: continue # 3. 결함 분류 class_id, confidence = self.defect_classifier.classify(roi) defect_type = DefectType(class_id) # 4. 판정 is_ok = (defect_type == DefectType.OK and confidence >= self.ok_threshold) results.append(InspectionResult( is_ok=is_ok, defect_type=defect_type, confidence=confidence, bbox=det.bbox, roi_image=roi )) # 통계 업데이트 self.total_inspected += 1 if not is_ok: self.total_defects += 1 return results def get_defect_rate(self) -> float: """불량률 계산""" if self.total_inspected == 0: return 0.0 return self.total_defects / self.total_inspected * 100
비전 AI 시스템의 실시간 성능을 확보하기 위한 최적화 기법들입니다.
import threading from collections import deque import time class RealtimeVisionSystem: """최적화된 실시간 비전 시스템""" def __init__( self, camera: IndustrialCamera, pipeline: DefectInspectionPipeline, target_fps: int = 60 ): self.camera = camera self.pipeline = pipeline self.target_fps = target_fps self.frame_interval = 1.0 / target_fps # 이중 버퍼링 self.frame_buffer = [None, None] self.buffer_index = 0 self.buffer_lock = threading.Lock() # 성능 모니터링 self.latency_history = deque(maxlen=100) self.fps_history = deque(maxlen=100) # 제어 플래그 self.running = False def start(self): """시스템 시작""" self.running = True self.camera.open() self.camera.start_capture() # 처리 스레드 시작 self.process_thread = threading.Thread( target=self._process_loop, daemon=True ) self.process_thread.start() def _process_loop(self): """메인 처리 루프""" last_time = time.perf_counter() while self.running: loop_start = time.perf_counter() # 프레임 획득 frame, timestamp = self.camera.get_frame() if frame is None: continue # 검사 실행 infer_start = time.perf_counter() results = self.pipeline.inspect(frame) infer_time = (time.perf_counter() - infer_start) * 1000 # 결과 처리 (비동기) self._handle_results(results, frame) # 성능 기록 loop_time = time.perf_counter() - loop_start self.latency_history.append(infer_time) self.fps_history.append(1.0 / (time.perf_counter() - last_time)) last_time = time.perf_counter() # 프레임 속도 조절 sleep_time = self.frame_interval - loop_time if sleep_time > 0: time.sleep(sleep_time) def _handle_results(self, results: List[InspectionResult], frame): """결과 처리 (콜백/PLC 신호)""" for result in results: if not result.is_ok: # 불량 알림 (비동기) threading.Thread( target=self._report_defect, args=(result,), daemon=True ).start() def get_stats(self) -> dict: """성능 통계""" return { "avg_fps": np.mean(self.fps_history) if self.fps_history else 0, "avg_latency_ms": np.mean(self.latency_history) if self.latency_history else 0, "p99_latency_ms": np.percentile(self.latency_history, 99) if self.latency_history else 0, "defect_rate": self.pipeline.get_defect_rate() }
실시간 비전 시스템에서는 평균 지연시간보다 P99(99번째 백분위) 지연시간이 더 중요합니다. 간헐적인 지연 스파이크가 라인 정지로 이어질 수 있기 때문입니다.