1Active Learning 개념
Active Learning은 모델이 가장 불확실한 샘플을 선택하여 라벨링하는 전략입니다. 전체 데이터를 라벨링하는 대신 가장 정보량이 높은 샘플만 선별하여 라벨링 비용을 크게 절감합니다.
Active Learning Loop
반복: 성능 목표 달성까지
제조 AI에서의 가치: 불량 샘플은 희소하고, 전문가 라벨링 비용이 높습니다. Active Learning으로 가장 중요한 샘플만 선별하여 효율적으로 모델을 개선할 수 있습니다.
2Uncertainty Sampling 전략
모델의 예측 불확실성을 측정하여 라벨링 대상 샘플을 선택하는 방법입니다.
1 Least Confidence
최고 확률이 가장 낮은 샘플 선택. 모델이 가장 자신 없는 샘플
2 Margin Sampling
1위와 2위 확률 차이가 작은 샘플. 결정 경계에 가까운 샘플
3 Entropy Sampling
엔트로피가 높은 샘플. 예측 분포가 균일한 샘플
4 MC Dropout
Dropout 활성화 상태로 여러 번 추론. 예측 분산이 큰 샘플
import torch
import numpy as np
from typing import List, Tuple
class UncertaintySampler:
"""Uncertainty 기반 Active Learning Sampler"""
def __init__(self, model, device='cuda'):
self.model = model
self.device = device
def get_predictions(self, dataloader) -> Tuple[np.ndarray, np.ndarray]:
"""모든 샘플에 대해 예측 확률 계산"""
self.model.eval()
all_probs = []
all_indices = []
with torch.no_grad():
for idx, (images, _) in enumerate(dataloader):
images = images.to(self.device)
outputs = self.model(images)
probs = torch.softmax(outputs, dim=1)
all_probs.append(probs.cpu().numpy())
all_indices.extend(range(idx * len(images), (idx+1) * len(images)))
return np.vstack(all_probs), np.array(all_indices)
def least_confidence(self, probs: np.ndarray, n_samples: int) -> np.ndarray:
"""Least Confidence: 최고 확률이 낮은 샘플"""
confidence = probs.max(axis=1)
return np.argsort(confidence)[:n_samples]
def margin_sampling(self, probs: np.ndarray, n_samples: int) -> np.ndarray:
"""Margin: 1위-2위 확률 차이가 작은 샘플"""
sorted_probs = np.sort(probs, axis=1)
margins = sorted_probs[:, -1] - sorted_probs[:, -2]
return np.argsort(margins)[:n_samples]
def entropy_sampling(self, probs: np.ndarray, n_samples: int) -> np.ndarray:
"""Entropy: 예측 불확실성이 높은 샘플"""
entropy = -np.sum(probs * np.log(probs + 1e-8), axis=1)
return np.argsort(-entropy)[:n_samples]
def select_samples(self, dataloader, n_samples: int,
strategy: str = 'entropy') -> List[int]:
"""Active Learning 샘플 선택"""
probs, indices = self.get_predictions(dataloader)
if strategy == 'least_confidence':
selected = self.least_confidence(probs, n_samples)
elif strategy == 'margin':
selected = self.margin_sampling(probs, n_samples)
else:
selected = self.entropy_sampling(probs, n_samples)
return indices[selected].tolist()
# 사용 예시
sampler = UncertaintySampler(model, device='cuda')
uncertain_indices = sampler.select_samples(
unlabeled_loader, n_samples=100, strategy='entropy'
)
print(f"라벨링 대상 {len(uncertain_indices)}개 샘플 선택")
3Active Learning 파이프라인
Active Learning을 실제 프로젝트에 적용하는 전체 파이프라인입니다.
class ActiveLearningPipeline:
"""Active Learning 전체 파이프라인"""
def __init__(self, model, labeled_data, unlabeled_data,
val_data, budget_per_round: int = 50):
self.model = model
self.labeled_data = labeled_data
self.unlabeled_data = unlabeled_data
self.val_data = val_data
self.budget = budget_per_round
self.sampler = UncertaintySampler(model)
self.history = []
def run_round(self):
"""Active Learning 1 라운드 실행"""
# 1. 현재 모델로 평가
initial_acc = self.evaluate()
# 2. 불확실 샘플 선택
unlabeled_loader = DataLoader(self.unlabeled_data, batch_size=32)
selected_indices = self.sampler.select_samples(
unlabeled_loader, self.budget, strategy='entropy'
)
# 3. 선택된 샘플을 labeled로 이동 (실제로는 라벨링 수행)
for idx in selected_indices:
sample = self.unlabeled_data[idx]
self.labeled_data.append(sample)
# 4. 모델 재학습
self.retrain()
# 5. 재평가
final_acc = self.evaluate()
self.history.append({
'labeled_count': len(self.labeled_data),
'initial_acc': initial_acc,
'final_acc': final_acc
})
return final_acc
def run(self, n_rounds: int, target_acc: float = 0.95):
"""여러 라운드 실행"""
for round_num in range(n_rounds):
acc = self.run_round()
print(f"Round {round_num+1}: Acc={acc:.4f}, "
f"Labeled={len(self.labeled_data)}")
if acc >= target_acc:
print(f"목표 정확도 달성!")
break
return self.history
4합성 데이터 생성
불량 샘플이 부족할 때 인위적으로 결함을 합성하여 학습 데이터를 생성하는 방법입니다.
import cv2
import numpy as np
from PIL import Image
class DefectSynthesizer:
"""결함 합성 데이터 생성기"""
def add_scratch(self, image: np.ndarray,
length_range=(50, 200),
width_range=(1, 5)) -> np.ndarray:
"""스크래치 결함 합성"""
h, w = image.shape[:2]
result = image.copy()
# 랜덤 시작점과 각도
x1, y1 = np.random.randint(0, w), np.random.randint(0, h)
length = np.random.randint(*length_range)
angle = np.random.uniform(0, 2 * np.pi)
x2 = int(x1 + length * np.cos(angle))
y2 = int(y1 + length * np.sin(angle))
width = np.random.randint(*width_range)
# 스크래치 그리기 (어두운 색)
color = tuple(np.random.randint(20, 80, 3).tolist())
cv2.line(result, (x1, y1), (x2, y2), color, width)
return result
def add_stain(self, image: np.ndarray,
radius_range=(10, 50)) -> np.ndarray:
"""얼룩 결함 합성"""
h, w = image.shape[:2]
result = image.copy()
# 랜덤 위치와 크기
cx, cy = np.random.randint(50, w-50), np.random.randint(50, h-50)
radius = np.random.randint(*radius_range)
# 마스크 생성 (부드러운 엣지)
mask = np.zeros((h, w), dtype=np.float32)
cv2.circle(mask, (cx, cy), radius, 1.0, -1)
mask = cv2.GaussianBlur(mask, (21, 21), 0)
# 얼룩 색상 합성
stain_color = np.random.randint(30, 100, 3)
for c in range(3):
result[:,:,c] = (result[:,:,c] * (1 - mask * 0.5) +
stain_color[c] * mask * 0.5).astype(np.uint8)
return result
def add_dent(self, image: np.ndarray) -> np.ndarray:
"""찍힘 결함 합성 (명암 변화)"""
h, w = image.shape[:2]
result = image.copy()
# 랜덤 위치와 크기
cx, cy = np.random.randint(30, w-30), np.random.randint(30, h-30)
axes = (np.random.randint(5, 20), np.random.randint(5, 20))
# 그라데이션 마스크 (3D 효과)
mask = np.zeros((h, w), dtype=np.float32)
cv2.ellipse(mask, (cx, cy), axes, 0, 0, 360, 1.0, -1)
mask = cv2.GaussianBlur(mask, (11, 11), 0)
# 한쪽은 밝게, 반대쪽은 어둡게
light_mask = np.roll(mask, -3, axis=0) * 0.3
dark_mask = np.roll(mask, 3, axis=0) * 0.3
result = (result * (1 + light_mask[:,:,None] - dark_mask[:,:,None]))
return np.clip(result, 0, 255).astype(np.uint8)
# 사용 예시
synthesizer = DefectSynthesizer()
ok_image = cv2.imread('ok_sample.jpg')
# 각 결함 유형 합성
scratch_image = synthesizer.add_scratch(ok_image)
stain_image = synthesizer.add_stain(ok_image)
dent_image = synthesizer.add_dent(ok_image)
합성 데이터 주의: 합성 결함이 실제 결함과 다르면 모델이 실제 결함을 놓칠 수 있습니다. 합성 데이터는 실제 불량 샘플을 참조하여 제작하고, 전문가 검증을 거쳐야 합니다.
5Copy-Paste Augmentation
실제 결함 영역을 잘라서 다른 정상 이미지에 붙여넣는 증강 기법입니다. 결함의 사실성을 유지하면서 다양한 배경에 적용할 수 있습니다.
class CopyPasteAugmentor:
"""Copy-Paste 증강 (결함 영역 복사-붙여넣기)"""
def __init__(self, defect_masks_dir: str):
self.defect_patches = self.load_defect_patches(defect_masks_dir)
def load_defect_patches(self, dir_path: str) -> List[dict]:
"""결함 패치와 마스크 로드"""
patches = []
for file in os.listdir(dir_path):
if file.endswith('_patch.png'):
mask_file = file.replace('_patch.png', '_mask.png')
patch = cv2.imread(os.path.join(dir_path, file))
mask = cv2.imread(os.path.join(dir_path, mask_file), 0)
patches.append({'patch': patch, 'mask': mask})
return patches
def apply(self, image: np.ndarray, n_defects: int = 1) -> np.ndarray:
"""정상 이미지에 결함 붙여넣기"""
result = image.copy()
h, w = image.shape[:2]
for _ in range(n_defects):
# 랜덤 결함 패치 선택
defect = np.random.choice(self.defect_patches)
patch = defect['patch']
mask = defect['mask']
# 랜덤 스케일 및 회전
scale = np.random.uniform(0.5, 1.5)
angle = np.random.randint(0, 360)
# 변환 적용
ph, pw = patch.shape[:2]
M = cv2.getRotationMatrix2D((pw//2, ph//2), angle, scale)
patch = cv2.warpAffine(patch, M, (pw, ph))
mask = cv2.warpAffine(mask, M, (pw, ph))
# 랜덤 위치에 붙여넣기
x = np.random.randint(0, w - pw)
y = np.random.randint(0, h - ph)
# 블렌딩
mask_3ch = np.stack([mask/255.0]*3, axis=-1)
result[y:y+ph, x:x+pw] = (
result[y:y+ph, x:x+pw] * (1 - mask_3ch) +
patch * mask_3ch
).astype(np.uint8)
return result
6실전 적용 전략
Active Learning과 데이터 증강을 실제 프로젝트에 적용하는 전략입니다.
# Active Learning + Data Augmentation 통합 전략
Phase 1: 초기 데이터 수집
├── 정상 샘플: 1,000장 이상
├── 불량 샘플: 가능한 만큼 (최소 50장/클래스)
└── 합성 결함으로 불량 샘플 보강
Phase 2: 초기 모델 학습
├── Transfer Learning으로 빠른 모델 구축
├── 강한 데이터 증강 적용
└── Baseline 성능 측정
Phase 3: Active Learning 루프
├── 운영 이미지 중 불확실 샘플 선택 (주간 100장)
├── 전문가 라벨링
├── Copy-Paste로 불량 샘플 증강
├── 모델 재학습 (Fine-tuning)
└── 성능 모니터링 및 반복
Phase 4: 안정화
├── Active Learning 빈도 감소 (월간)
├── 신규 결함 유형 발생 시 즉시 대응
└── 모델 버전 관리 및 롤백 체계
[성과 지표]
├── 라벨링 비용: 전수 라벨링 대비 70% 절감
├── 검출률: 99%+ 달성
└── 허위 불량률: 1% 이하
성공 핵심: Active Learning은 단순히 불확실 샘플만 선택하는 것이 아니라, 모델의 실제 실패 사례를 분석하여 학습 데이터를 보완하는 전략적 접근이 필요합니다.