Chapter 8 7 / 10

실시간 비전 AI

산업용 카메라 연동, GStreamer 파이프라인, 실시간 객체 탐지, 결함 검출 시스템 구현

1실시간 비전 파이프라인 아키텍처

제조 현장의 실시간 비전 AI는 카메라 캡처, 전처리, 추론, 후처리가 지연 없이 연속적으로 처리되어야 합니다. 각 단계의 병렬화와 비동기 처리가 성능의 핵심입니다.

Real-time Vision AI Pipeline
Camera Capture
GigE/USB
~3ms
Decoder (HW)
NVDEC/VAAPI
~2ms
Preproc (GPU)
CUDA Resize
~1ms
Inference (TRT)
INT8/FP16
~5ms
Postproc
NMS/Track
~2ms
Total Latency: ~13ms (75+ FPS)
Results Queue
Overlay (OpenGL)
Display (HMI)

실시간 비전 시스템에서 각 단계는 독립적인 스레드/프로세스로 분리하고, 큐 기반 비동기 통신으로 연결하여 파이프라인 병렬성을 확보합니다.

2산업용 카메라 연동

산업용 카메라는 GigE Vision, USB3 Vision, CoaXPress 등의 프로토콜을 사용합니다. GenICam 표준을 통해 다양한 제조사의 카메라를 일관되게 제어할 수 있습니다.

import cv2
import numpy as np
from typing import Optional, Callable
from threading import Thread, Event
from queue import Queue
import time

class IndustrialCamera:
    """산업용 카메라 제어 (GigE Vision/USB3 Vision)"""

    def __init__(
        self,
        camera_id: int = 0,
        width: int = 1920,
        height: int = 1080,
        fps: int = 60,
        exposure_us: int = 10000
    ):
        self.camera_id = camera_id
        self.width = width
        self.height = height
        self.target_fps = fps
        self.exposure_us = exposure_us

        self.cap: Optional[cv2.VideoCapture] = None
        self.frame_queue: Queue = Queue(maxsize=3)
        self.stop_event = Event()
        self.capture_thread: Optional[Thread] = None

    def open(self) -> bool:
        """카메라 연결 및 설정"""
        # GStreamer 파이프라인 (Jetson용)
        gst_pipeline = (
            f"v4l2src device=/dev/video{self.camera_id} ! "
            f"video/x-raw,width={self.width},height={self.height},"
            f"framerate={self.target_fps}/1 ! "
            f"videoconvert ! video/x-raw,format=BGR ! appsink"
        )

        try:
            # GStreamer 백엔드 시도
            self.cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER)
            if not self.cap.isOpened():
                # 표준 V4L2 폴백
                self.cap = cv2.VideoCapture(self.camera_id)
                self._configure_v4l2()
            return self.cap.isOpened()
        except Exception as e:
            print(f"Camera open failed: {e}")
            return False

    def _configure_v4l2(self):
        """V4L2 카메라 설정"""
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
        self.cap.set(cv2.CAP_PROP_FPS, self.target_fps)
        self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # 최소 버퍼
        # 노출 수동 설정
        self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1)  # Manual
        self.cap.set(cv2.CAP_PROP_EXPOSURE, self.exposure_us)

    def start_capture(self):
        """비동기 캡처 시작"""
        self.stop_event.clear()
        self.capture_thread = Thread(target=self._capture_loop, daemon=True)
        self.capture_thread.start()

    def _capture_loop(self):
        """프레임 캡처 루프"""
        while not self.stop_event.is_set():
            ret, frame = self.cap.read()
            if ret:
                timestamp = time.time()
                # 큐가 가득 차면 오래된 프레임 버림
                if self.frame_queue.full():
                    try:
                        self.frame_queue.get_nowait()
                    except:
                        pass
                self.frame_queue.put((frame, timestamp))

    def get_frame(self, timeout: float = 0.1) -> tuple:
        """최신 프레임 획득"""
        try:
            return self.frame_queue.get(timeout=timeout)
        except:
            return None, None

    def stop(self):
        """캡처 중지"""
        self.stop_event.set()
        if self.capture_thread:
            self.capture_thread.join(timeout=1.0)
        if self.cap:
            self.cap.release()

3GStreamer 하드웨어 가속 파이프라인

NVIDIA Jetson에서 GStreamer를 활용하면 하드웨어 비디오 디코더(NVDEC)와 GPU 리사이즈를 통해 CPU 부하 없이 고속 영상 처리가 가능합니다.

class GStreamerPipeline:
    """GStreamer 하드웨어 가속 파이프라인"""

    PIPELINE_TEMPLATES = {
        "usb_camera": (
            "v4l2src device={device} ! "
            "video/x-raw,width={width},height={height},framerate={fps}/1 ! "
            "nvvidconv ! video/x-raw(memory:NVMM) ! "
            "nvvidconv ! video/x-raw,format=BGRx ! "
            "videoconvert ! video/x-raw,format=BGR ! appsink"
        ),
        "rtsp_camera": (
            "rtspsrc location={rtsp_url} latency=0 ! "
            "rtph264depay ! h264parse ! nvv4l2decoder ! "
            "nvvidconv ! video/x-raw,format=BGRx ! "
            "videoconvert ! video/x-raw,format=BGR ! appsink"
        ),
        "csi_camera": (
            "nvarguscamerasrc sensor-id={sensor_id} ! "
            "video/x-raw(memory:NVMM),width={width},height={height},"
            "framerate={fps}/1,format=NV12 ! "
            "nvvidconv flip-method=0 ! video/x-raw,format=BGRx ! "
            "videoconvert ! video/x-raw,format=BGR ! appsink"
        ),
        "gige_camera": (
            "aabornsrc device={ip_address} ! "
            "video/x-bayer,width={width},height={height} ! "
            "bayer2rgb ! videoconvert ! video/x-raw,format=BGR ! appsink"
        )
    }

    def __init__(self, pipeline_type: str, **kwargs):
        self.pipeline_str = self.PIPELINE_TEMPLATES[pipeline_type].format(**kwargs)
        self.cap = None

    def start(self) -> bool:
        """파이프라인 시작"""
        self.cap = cv2.VideoCapture(self.pipeline_str, cv2.CAP_GSTREAMER)
        return self.cap.isOpened()

    def read(self) -> tuple:
        """프레임 읽기"""
        return self.cap.read() if self.cap else (False, None)

    @staticmethod
    def create_multi_stream(camera_configs: list) -> "MultiStreamPipeline":
        """멀티 스트림 파이프라인 생성"""
        return MultiStreamPipeline(camera_configs)


class MultiStreamPipeline:
    """멀티 카메라 동시 처리 파이프라인"""

    def __init__(self, camera_configs: list):
        self.pipelines = []
        self.frame_queues = []

        for config in camera_configs:
            pipeline = GStreamerPipeline(**config)
            queue = Queue(maxsize=2)
            self.pipelines.append(pipeline)
            self.frame_queues.append(queue)

    def start_all(self):
        """모든 스트림 시작"""
        for i, pipeline in enumerate(self.pipelines):
            pipeline.start()
            Thread(
                target=self._stream_loop,
                args=(pipeline, self.frame_queues[i]),
                daemon=True
            ).start()

    def _stream_loop(self, pipeline, queue):
        """개별 스트림 루프"""
        while True:
            ret, frame = pipeline.read()
            if ret:
                if queue.full():
                    try:
                        queue.get_nowait()
                    except:
                        pass
                queue.put((frame, time.time()))

    def get_all_frames(self) -> list:
        """모든 스트림의 최신 프레임 획득"""
        frames = []
        for queue in self.frame_queues:
            try:
                frames.append(queue.get_nowait())
            except:
                frames.append((None, None))
        return frames

4실시간 객체 탐지

YOLO, SSD 등의 객체 탐지 모델을 TensorRT로 가속하여 실시간 탐지를 수행합니다. 배치 처리와 비동기 추론으로 처리량을 극대화합니다.

import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
from dataclasses import dataclass
from typing import List

@dataclass
class Detection:
    """탐지 결과"""
    class_id: int
    confidence: float
    bbox: tuple  # (x1, y1, x2, y2)
    class_name: str = ""

class TensorRTDetector:
    """TensorRT 기반 실시간 객체 탐지"""

    def __init__(
        self,
        engine_path: str,
        input_size: tuple = (640, 640),
        conf_threshold: float = 0.5,
        nms_threshold: float = 0.4
    ):
        self.input_size = input_size
        self.conf_threshold = conf_threshold
        self.nms_threshold = nms_threshold

        # TensorRT 엔진 로드
        self.logger = trt.Logger(trt.Logger.WARNING)
        with open(engine_path, "rb") as f:
            self.engine = trt.Runtime(self.logger).deserialize_cuda_engine(f.read())
        self.context = self.engine.create_execution_context()

        # CUDA 메모리 할당
        self._allocate_buffers()

        # CUDA 스트림 (비동기 처리용)
        self.stream = cuda.Stream()

    def _allocate_buffers(self):
        """입출력 버퍼 할당"""
        self.inputs = []
        self.outputs = []
        self.bindings = []

        for binding in self.engine:
            shape = self.engine.get_binding_shape(binding)
            dtype = trt.nptype(self.engine.get_binding_dtype(binding))
            size = trt.volume(shape)

            # 호스트/디바이스 메모리
            host_mem = cuda.pagelocked_empty(size, dtype)
            device_mem = cuda.mem_alloc(host_mem.nbytes)
            self.bindings.append(int(device_mem))

            if self.engine.binding_is_input(binding):
                self.inputs.append({"host": host_mem, "device": device_mem})
            else:
                self.outputs.append({"host": host_mem, "device": device_mem})

    def preprocess(self, image: np.ndarray) -> np.ndarray:
        """이미지 전처리"""
        # 리사이즈
        resized = cv2.resize(image, self.input_size)
        # BGR -> RGB, HWC -> CHW
        rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
        chw = rgb.transpose((2, 0, 1))
        # 정규화
        normalized = chw.astype(np.float32) / 255.0
        # 배치 차원 추가
        return np.expand_dims(normalized, axis=0)

    def detect(self, image: np.ndarray) -> List[Detection]:
        """객체 탐지 실행"""
        orig_h, orig_w = image.shape[:2]

        # 전처리
        input_data = self.preprocess(image)
        np.copyto(self.inputs[0]["host"], input_data.ravel())

        # GPU 전송
        cuda.memcpy_htod_async(
            self.inputs[0]["device"],
            self.inputs[0]["host"],
            self.stream
        )

        # 추론
        self.context.execute_async_v2(
            bindings=self.bindings,
            stream_handle=self.stream.handle
        )

        # 결과 전송
        cuda.memcpy_dtoh_async(
            self.outputs[0]["host"],
            self.outputs[0]["device"],
            self.stream
        )
        self.stream.synchronize()

        # 후처리
        return self.postprocess(
            self.outputs[0]["host"], orig_w, orig_h
        )

    def postprocess(
        self, output: np.ndarray, orig_w: int, orig_h: int
    ) -> List[Detection]:
        """탐지 결과 후처리 (NMS 포함)"""
        detections = []
        # YOLO 출력 형태: [batch, num_boxes, 5+num_classes]
        output = output.reshape((-1, 85))  # COCO 80 classes

        # 신뢰도 필터링
        scores = output[:, 4]
        mask = scores > self.conf_threshold
        boxes = output[mask]

        if len(boxes) == 0:
            return detections

        # 좌표 변환 (center to corner)
        scale_x = orig_w / self.input_size[0]
        scale_y = orig_h / self.input_size[1]

        for box in boxes:
            cx, cy, w, h = box[:4]
            x1 = int((cx - w/2) * scale_x)
            y1 = int((cy - h/2) * scale_y)
            x2 = int((cx + w/2) * scale_x)
            y2 = int((cy + h/2) * scale_y)

            class_scores = box[5:]
            class_id = np.argmax(class_scores)
            confidence = box[4] * class_scores[class_id]

            if confidence > self.conf_threshold:
                detections.append(Detection(
                    class_id=class_id,
                    confidence=confidence,
                    bbox=(x1, y1, x2, y2)
                ))

        # NMS 적용
        return self._nms(detections)

5결함 검출 파이프라인

제조 라인의 결함 검출은 객체 탐지와 분류를 결합한 2단계 파이프라인으로 구현됩니다. 먼저 ROI를 탐지한 후, 해당 영역을 결함 분류 모델에 전달합니다.

1

프레임 캡처

트리거 신호 수신 또는 연속 캡처

~3ms
2

ROI 탐지

검사 대상 영역 검출 (YOLO)

~5ms
3

결함 분류

각 ROI에 대해 OK/NG 및 결함 유형 분류

~3ms
4

결과 전송

PLC 신호 출력, DB 기록

~2ms
from dataclasses import dataclass
from enum import Enum
from typing import List, Tuple

class DefectType(Enum):
    OK = 0
    SCRATCH = 1
    CRACK = 2
    STAIN = 3
    DENT = 4
    MISSING = 5

@dataclass
class InspectionResult:
    """검사 결과"""
    is_ok: bool
    defect_type: DefectType
    confidence: float
    bbox: Tuple[int, int, int, int]
    roi_image: np.ndarray

class DefectInspectionPipeline:
    """2단계 결함 검출 파이프라인"""

    def __init__(
        self,
        roi_detector: TensorRTDetector,
        defect_classifier: "TensorRTClassifier",
        ok_threshold: float = 0.95
    ):
        self.roi_detector = roi_detector
        self.defect_classifier = defect_classifier
        self.ok_threshold = ok_threshold

        # 통계 추적
        self.total_inspected = 0
        self.total_defects = 0

    def inspect(self, frame: np.ndarray) -> List[InspectionResult]:
        """프레임 검사"""
        results = []

        # 1. ROI 탐지
        detections = self.roi_detector.detect(frame)

        for det in detections:
            # 2. ROI 추출
            x1, y1, x2, y2 = det.bbox
            roi = frame[y1:y2, x1:x2]

            if roi.size == 0:
                continue

            # 3. 결함 분류
            class_id, confidence = self.defect_classifier.classify(roi)
            defect_type = DefectType(class_id)

            # 4. 판정
            is_ok = (defect_type == DefectType.OK and
                     confidence >= self.ok_threshold)

            results.append(InspectionResult(
                is_ok=is_ok,
                defect_type=defect_type,
                confidence=confidence,
                bbox=det.bbox,
                roi_image=roi
            ))

            # 통계 업데이트
            self.total_inspected += 1
            if not is_ok:
                self.total_defects += 1

        return results

    def get_defect_rate(self) -> float:
        """불량률 계산"""
        if self.total_inspected == 0:
            return 0.0
        return self.total_defects / self.total_inspected * 100

6실시간 성능 최적화

비전 AI 시스템의 실시간 성능을 확보하기 위한 최적화 기법들입니다.

75+
목표 FPS
<15ms
E2E 지연시간
<5%
프레임 드롭률
99.9%
검출 안정성
import threading
from collections import deque
import time

class RealtimeVisionSystem:
    """최적화된 실시간 비전 시스템"""

    def __init__(
        self,
        camera: IndustrialCamera,
        pipeline: DefectInspectionPipeline,
        target_fps: int = 60
    ):
        self.camera = camera
        self.pipeline = pipeline
        self.target_fps = target_fps
        self.frame_interval = 1.0 / target_fps

        # 이중 버퍼링
        self.frame_buffer = [None, None]
        self.buffer_index = 0
        self.buffer_lock = threading.Lock()

        # 성능 모니터링
        self.latency_history = deque(maxlen=100)
        self.fps_history = deque(maxlen=100)

        # 제어 플래그
        self.running = False

    def start(self):
        """시스템 시작"""
        self.running = True
        self.camera.open()
        self.camera.start_capture()

        # 처리 스레드 시작
        self.process_thread = threading.Thread(
            target=self._process_loop, daemon=True
        )
        self.process_thread.start()

    def _process_loop(self):
        """메인 처리 루프"""
        last_time = time.perf_counter()

        while self.running:
            loop_start = time.perf_counter()

            # 프레임 획득
            frame, timestamp = self.camera.get_frame()
            if frame is None:
                continue

            # 검사 실행
            infer_start = time.perf_counter()
            results = self.pipeline.inspect(frame)
            infer_time = (time.perf_counter() - infer_start) * 1000

            # 결과 처리 (비동기)
            self._handle_results(results, frame)

            # 성능 기록
            loop_time = time.perf_counter() - loop_start
            self.latency_history.append(infer_time)
            self.fps_history.append(1.0 / (time.perf_counter() - last_time))
            last_time = time.perf_counter()

            # 프레임 속도 조절
            sleep_time = self.frame_interval - loop_time
            if sleep_time > 0:
                time.sleep(sleep_time)

    def _handle_results(self, results: List[InspectionResult], frame):
        """결과 처리 (콜백/PLC 신호)"""
        for result in results:
            if not result.is_ok:
                # 불량 알림 (비동기)
                threading.Thread(
                    target=self._report_defect,
                    args=(result,),
                    daemon=True
                ).start()

    def get_stats(self) -> dict:
        """성능 통계"""
        return {
            "avg_fps": np.mean(self.fps_history) if self.fps_history else 0,
            "avg_latency_ms": np.mean(self.latency_history) if self.latency_history else 0,
            "p99_latency_ms": np.percentile(self.latency_history, 99) if self.latency_history else 0,
            "defect_rate": self.pipeline.get_defect_rate()
        }

실시간 비전 시스템에서는 평균 지연시간보다 P99(99번째 백분위) 지연시간이 더 중요합니다. 간헐적인 지연 스파이크가 라인 정지로 이어질 수 있기 때문입니다.