1비지도 이상 탐지의 필요성
비지도 이상 탐지(Unsupervised Anomaly Detection)는 불량 샘플 없이 정상 데이터만으로 이상을 탐지하는 기법입니다. 제조 현장에서는 불량 데이터가 희소하고 새로운 불량 유형이 지속 발생하므로, 지도학습의 한계를 극복하는 접근법입니다.
핵심 원리: "정상을 학습하고, 정상에서 벗어난 것을 이상으로 탐지한다." 모델이 정상 이미지의 분포를 학습한 후, 새로운 이미지가 이 분포에서 얼마나 벗어나는지 측정하여 이상을 판단합니다.
비지도 방식이 필요한 상황:
- 불량 데이터 부족: 신규 라인, 신제품에서 불량 샘플 수집 어려움
- 미지의 불량: 학습하지 않은 새로운 유형의 결함 탐지 필요
- 높은 레이블링 비용: 전문가의 수작업 라벨링 비용 회피
- 빠른 배포: 정상 이미지만으로 즉시 시스템 구축
| 구분 | 지도학습 | 비지도 이상 탐지 |
| 필요 데이터 |
정상 + 불량 (레이블) |
정상만 (레이블 불필요) |
| 탐지 범위 |
학습한 불량 유형만 |
모든 비정상 패턴 |
| 새 불량 대응 |
재학습 필요 |
즉시 탐지 가능 |
| 정확도 |
높음 (특정 불량) |
중간 (전반적) |
| False Positive |
낮음 |
임계값 튜닝 필요 |
2Autoencoder 기반 이상 탐지
Autoencoder는 입력을 압축(Encoding)했다가 복원(Decoding)하는 신경망입니다. 정상 이미지로만 학습하면, 이상 이미지는 복원 오차(Reconstruction Error)가 크게 발생합니다.
Encoder
이미지를 저차원 잠재 공간(Latent Space)으로 압축
Decoder
잠재 벡터에서 원본 이미지를 복원
Anomaly Score
입력과 복원 이미지의 MSE 또는 SSIM 차이
# PyTorch: Convolutional Autoencoder for Anomaly Detection
import torch
import torch.nn as nn
class ConvAutoencoder(nn.Module):
"""이미지 이상 탐지용 Convolutional Autoencoder"""
def __init__(self, latent_dim: int = 128):
super().__init__()
# Encoder: 이미지 → 잠재 벡터
self.encoder = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=4, stride=2, padding=1), # 224→112
nn.BatchNorm2d(32),
nn.LeakyReLU(0.2),
nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1), # 112→56
nn.BatchNorm2d(64),
nn.LeakyReLU(0.2),
nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), # 56→28
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1), # 28→14
nn.BatchNorm2d(256),
nn.LeakyReLU(0.2),
nn.Conv2d(256, latent_dim, kernel_size=4, stride=2, padding=1), # 14→7
nn.BatchNorm2d(latent_dim),
nn.LeakyReLU(0.2),
)
# Decoder: 잠재 벡터 → 이미지
self.decoder = nn.Sequential(
nn.ConvTranspose2d(latent_dim, 256, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1),
nn.Sigmoid(), # 출력 범위 [0, 1]
)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
def compute_anomaly_score(self, x):
"""이상 점수 계산 (Reconstruction Error)"""
with torch.no_grad():
reconstructed, _ = self.forward(x)
# 픽셀 단위 MSE
mse = torch.mean((x - reconstructed) ** 2, dim=[1, 2, 3])
return mse
class AnomalyDetector:
"""Autoencoder 기반 이상 탐지기"""
def __init__(self, model: ConvAutoencoder, threshold: float = None):
self.model = model
self.threshold = threshold
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)
self.model.eval()
def fit_threshold(self, normal_loader, percentile: float = 95):
"""정상 데이터로 임계값 결정"""
scores = []
with torch.no_grad():
for images, _ in normal_loader:
images = images.to(self.device)
score = self.model.compute_anomaly_score(images)
scores.extend(score.cpu().numpy())
# 상위 percentile을 임계값으로 설정
self.threshold = np.percentile(scores, percentile)
return self.threshold
def predict(self, image: torch.Tensor) -> dict:
"""이상 여부 판단"""
image = image.unsqueeze(0).to(self.device) if image.dim() == 3 else image.to(self.device)
score = self.model.compute_anomaly_score(image).item()
return {
'anomaly_score': score,
'threshold': self.threshold,
'is_anomaly': score > self.threshold if self.threshold else None
}
3Variational Autoencoder (VAE)
VAE는 잠재 공간을 확률 분포(정규분포)로 모델링하여 더 일반화된 표현을 학습합니다. 복원 오차와 함께 잠재 벡터의 분포 이탈도 이상 점수에 반영할 수 있습니다.
# PyTorch: VAE for Anomaly Detection
class VAE(nn.Module):
"""Variational Autoencoder"""
def __init__(self, latent_dim: int = 128):
super().__init__()
self.latent_dim = latent_dim
# Encoder
self.encoder_conv = nn.Sequential(
nn.Conv2d(3, 32, 4, 2, 1),
nn.BatchNorm2d(32),
nn.LeakyReLU(0.2),
nn.Conv2d(32, 64, 4, 2, 1),
nn.BatchNorm2d(64),
nn.LeakyReLU(0.2),
nn.Conv2d(64, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
nn.Conv2d(128, 256, 4, 2, 1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.2),
)
# VAE의 핵심: 평균(mu)과 분산(logvar)으로 분기
self.fc_mu = nn.Linear(256 * 14 * 14, latent_dim)
self.fc_logvar = nn.Linear(256 * 14 * 14, latent_dim)
# Decoder
self.fc_decode = nn.Linear(latent_dim, 256 * 14 * 14)
self.decoder_conv = nn.Sequential(
nn.ConvTranspose2d(256, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.ConvTranspose2d(128, 64, 4, 2, 1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.ConvTranspose2d(64, 32, 4, 2, 1),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.ConvTranspose2d(32, 3, 4, 2, 1),
nn.Sigmoid(),
)
def encode(self, x):
h = self.encoder_conv(x)
h = h.view(h.size(0), -1)
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def reparameterize(self, mu, logvar):
"""Reparameterization Trick: z = mu + sigma * epsilon"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
h = self.fc_decode(z)
h = h.view(h.size(0), 256, 14, 14)
return self.decoder_conv(h)
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
reconstructed = self.decode(z)
return reconstructed, mu, logvar
def loss_function(self, x, reconstructed, mu, logvar):
"""VAE Loss = Reconstruction Loss + KL Divergence"""
# 복원 손실 (MSE 또는 BCE)
recon_loss = nn.functional.mse_loss(reconstructed, x, reduction='sum')
# KL Divergence: 잠재 분포가 표준 정규분포에서 얼마나 벗어나는지
kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + kl_loss
def compute_anomaly_score(self, x, num_samples: int = 10):
"""이상 점수: 복원 오차 + 분포 이탈"""
with torch.no_grad():
mu, logvar = self.encode(x)
# 여러 번 샘플링하여 평균 복원 오차 계산
recon_errors = []
for _ in range(num_samples):
z = self.reparameterize(mu, logvar)
reconstructed = self.decode(z)
error = torch.mean((x - reconstructed) ** 2, dim=[1, 2, 3])
recon_errors.append(error)
recon_score = torch.stack(recon_errors).mean(dim=0)
# 잠재 공간에서의 이상 (마할라노비스 거리 근사)
latent_score = torch.sum(mu ** 2 + logvar.exp(), dim=1)
# 복합 점수 (가중 평균)
return recon_score + 0.1 * latent_score
4PatchCore: SOTA 이상 탐지
PatchCore는 사전 학습된 CNN(ImageNet)에서 추출한 패치 특징의 메모리 뱅크를 구축하고, 새 이미지의 패치가 메모리에서 얼마나 떨어져 있는지로 이상을 탐지합니다. 현재 MVTec AD 벤치마크에서 SOTA 성능을 보입니다.
PatchCore 장점: 학습 불필요(Feature extraction만), 빠른 추론, 높은 정확도, 이상 영역 위치 특정(Localization) 가능
# PatchCore 구현 (anomalib 스타일)
import torch
import torch.nn as nn
from torchvision.models import wide_resnet50_2, Wide_ResNet50_2_Weights
from sklearn.neighbors import NearestNeighbors
import numpy as np
class PatchCore:
"""PatchCore 이상 탐지 모델"""
def __init__(self, backbone: str = 'wide_resnet50',
layers: list = ['layer2', 'layer3'],
k_neighbors: int = 9,
sampling_ratio: float = 0.1):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 사전 학습된 백본
self.backbone = wide_resnet50_2(weights=Wide_ResNet50_2_Weights.IMAGENET1K_V1)
self.backbone = self.backbone.to(self.device)
self.backbone.eval()
self.layers = layers
self.k_neighbors = k_neighbors
self.sampling_ratio = sampling_ratio
# 특징 추출을 위한 Hook 등록
self.features = {}
self._register_hooks()
# 메모리 뱅크
self.memory_bank = None
self.nn_model = None
def _register_hooks(self):
"""중간 레이어 특징 추출 Hook"""
def hook_fn(name):
def hook(module, input, output):
self.features[name] = output
return hook
for name, module in self.backbone.named_children():
if name in self.layers:
module.register_forward_hook(hook_fn(name))
def _extract_features(self, images: torch.Tensor) -> torch.Tensor:
"""이미지에서 패치 특징 추출"""
self.features = {}
with torch.no_grad():
_ = self.backbone(images)
# 여러 레이어 특징 결합
features_list = []
for layer_name in self.layers:
feat = self.features[layer_name]
# 공간 해상도 통일 (Adaptive Average Pooling)
if feat.shape[2:] != (28, 28):
feat = nn.functional.adaptive_avg_pool2d(feat, (28, 28))
features_list.append(feat)
# 채널 방향 결합: [B, C1+C2, H, W]
combined = torch.cat(features_list, dim=1)
# [B, C, H, W] → [B*H*W, C] (패치 단위)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def fit(self, train_loader):
"""정상 이미지로 메모리 뱅크 구축"""
all_patches = []
for images, _ in train_loader:
images = images.to(self.device)
patches = self._extract_features(images)
all_patches.append(patches.cpu().numpy())
all_patches = np.concatenate(all_patches, axis=0)
# Coreset Sampling: 대표 패치만 선택 (메모리 효율)
n_samples = int(len(all_patches) * self.sampling_ratio)
indices = np.random.choice(len(all_patches), n_samples, replace=False)
self.memory_bank = all_patches[indices]
# KNN 모델 학습
self.nn_model = NearestNeighbors(n_neighbors=self.k_neighbors, metric='euclidean')
self.nn_model.fit(self.memory_bank)
print(f"Memory bank size: {self.memory_bank.shape}")
def predict(self, image: torch.Tensor) -> dict:
"""이상 점수 및 히트맵 계산"""
if image.dim() == 3:
image = image.unsqueeze(0)
image = image.to(self.device)
# 패치 특징 추출
patches = self._extract_features(image).cpu().numpy()
# 각 패치의 메모리 뱅크 최근접 거리
distances, _ = self.nn_model.kneighbors(patches)
patch_scores = distances.mean(axis=1) # k개 이웃 평균 거리
# 이미지 레벨 점수: 최대 패치 점수
image_score = patch_scores.max()
# 히트맵 생성 (28x28 → 원본 크기로 업샘플)
heatmap = patch_scores.reshape(28, 28)
return {
'anomaly_score': float(image_score),
'heatmap': heatmap,
'patch_scores': patch_scores
}
# 사용 예시
def run_patchcore_detection():
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# 정상 이미지로 학습
train_dataset = datasets.ImageFolder('data/train/good', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
model = PatchCore(sampling_ratio=0.1)
model.fit(train_loader)
# 테스트 이미지 추론
test_image = transform(Image.open('data/test/scratch_001.png'))
result = model.predict(test_image)
print(f"Anomaly Score: {result['anomaly_score']:.4f}")
# 히트맵 시각화
import matplotlib.pyplot as plt
plt.imshow(result['heatmap'], cmap='hot')
plt.colorbar()
plt.title('Anomaly Heatmap')
plt.savefig('heatmap.png')
5GANomaly: GAN 기반 접근
GANomaly는 GAN 구조를 활용하여 정상 이미지의 잠재 표현을 학습합니다. Encoder-Decoder-Encoder 구조로 잠재 벡터 간 차이를 이상 점수로 사용합니다.
# GANomaly 핵심 구조
class GANomaly(nn.Module):
"""GANomaly: Encoder-Decoder-Encoder 구조"""
def __init__(self, input_size: int = 224, latent_dim: int = 100):
super().__init__()
# Generator: Encoder_1 → Decoder → Encoder_2
self.encoder1 = self._make_encoder(3, latent_dim)
self.decoder = self._make_decoder(latent_dim, 3)
self.encoder2 = self._make_encoder(3, latent_dim)
# Discriminator
self.discriminator = self._make_discriminator(3)
def _make_encoder(self, in_channels, latent_dim):
return nn.Sequential(
nn.Conv2d(in_channels, 64, 4, 2, 1),
nn.LeakyReLU(0.2),
nn.Conv2d(64, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
nn.Conv2d(128, 256, 4, 2, 1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.2),
nn.Conv2d(256, latent_dim, 4, 2, 1),
)
def _make_decoder(self, latent_dim, out_channels):
return nn.Sequential(
nn.ConvTranspose2d(latent_dim, 256, 4, 2, 1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.ConvTranspose2d(256, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.ConvTranspose2d(128, 64, 4, 2, 1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.ConvTranspose2d(64, out_channels, 4, 2, 1),
nn.Tanh(),
)
def _make_discriminator(self, in_channels):
return nn.Sequential(
nn.Conv2d(in_channels, 64, 4, 2, 1),
nn.LeakyReLU(0.2),
nn.Conv2d(64, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
nn.Conv2d(128, 256, 4, 2, 1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.2),
nn.Conv2d(256, 1, 4, 1, 0),
nn.Sigmoid(),
)
def forward(self, x):
# Encoder 1: 입력 → 잠재 벡터 z1
z1 = self.encoder1(x)
# Decoder: z1 → 복원 이미지
x_hat = self.decoder(z1)
# Encoder 2: 복원 이미지 → 잠재 벡터 z2
z2 = self.encoder2(x_hat)
return x_hat, z1, z2
def compute_anomaly_score(self, x):
"""이상 점수: |z1 - z2| (잠재 벡터 차이)"""
with torch.no_grad():
_, z1, z2 = self.forward(x)
# L1 거리
score = torch.mean(torch.abs(z1 - z2), dim=[1, 2, 3])
return score
6임계값 설정과 성능 평가
비지도 이상 탐지의 핵심 과제는 적절한 임계값(Threshold) 설정입니다. 정상 데이터만으로 결정해야 하므로 통계적 접근이 필요합니다.
# 임계값 결정 전략
import numpy as np
from scipy import stats
class ThresholdCalibrator:
"""비지도 이상 탐지 임계값 캘리브레이션"""
def __init__(self, normal_scores: np.ndarray):
self.normal_scores = normal_scores
self.mean = np.mean(normal_scores)
self.std = np.std(normal_scores)
def percentile_threshold(self, percentile: float = 99) -> float:
"""상위 N% 백분위수를 임계값으로"""
return np.percentile(self.normal_scores, percentile)
def sigma_threshold(self, n_sigma: float = 3) -> float:
"""Mean + N*Sigma를 임계값으로 (정규분포 가정)"""
return self.mean + n_sigma * self.std
def kde_threshold(self, false_positive_rate: float = 0.01) -> float:
"""KDE 기반 임계값 (밀도 추정)"""
kde = stats.gaussian_kde(self.normal_scores)
x_range = np.linspace(
self.normal_scores.min(),
self.normal_scores.max() * 2,
1000
)
# CDF 계산
cdf = np.cumsum(kde(x_range)) * (x_range[1] - x_range[0])
# FPR에 해당하는 임계값 찾기
idx = np.argmin(np.abs(cdf - (1 - false_positive_rate)))
return x_range[idx]
def contamination_aware(self, expected_anomaly_ratio: float = 0.01) -> float:
"""예상 불량률 기반 임계값"""
# 정상 데이터에 일부 불량이 섞여있다고 가정
clean_scores = self.normal_scores[
self.normal_scores < np.percentile(self.normal_scores, 100 - expected_anomaly_ratio * 100)
]
return np.mean(clean_scores) + 3 * np.std(clean_scores)
# 성능 평가 (테스트 데이터에 레이블이 있는 경우)
from sklearn.metrics import roc_auc_score, precision_recall_curve, f1_score
def evaluate_anomaly_detection(y_true: np.ndarray, scores: np.ndarray):
"""이상 탐지 성능 평가"""
# AUROC: 임계값 독립적 평가
auroc = roc_auc_score(y_true, scores)
# 최적 임계값 (F1 기준)
precisions, recalls, thresholds = precision_recall_curve(y_true, scores)
f1_scores = 2 * precisions * recalls / (precisions + recalls + 1e-8)
best_idx = np.argmax(f1_scores)
best_threshold = thresholds[best_idx] if best_idx < len(thresholds) else thresholds[-1]
best_f1 = f1_scores[best_idx]
# 최적 임계값으로 예측
y_pred = (scores >= best_threshold).astype(int)
# 혼동 행렬 기반 지표
tp = np.sum((y_true == 1) & (y_pred == 1))
fp = np.sum((y_true == 0) & (y_pred == 1))
fn = np.sum((y_true == 1) & (y_pred == 0))
tn = np.sum((y_true == 0) & (y_pred == 0))
return {
'auroc': auroc,
'best_threshold': best_threshold,
'best_f1': best_f1,
'precision': tp / (tp + fp) if (tp + fp) > 0 else 0,
'recall': tp / (tp + fn) if (tp + fn) > 0 else 0, # = Detection Rate
'false_positive_rate': fp / (fp + tn) if (fp + tn) > 0 else 0,
}
| 임계값 전략 | 특징 | 적합 상황 |
| Percentile (99%) |
상위 1%를 이상으로 분류 |
불량률 약 1% 예상 시 |
| 3-Sigma |
평균 + 3σ (정규분포 기반) |
점수가 정규분포일 때 |
| KDE + FPR |
목표 FPR로 임계값 결정 |
False Alarm 비용 중요 |
| Validation Set |
소량 불량 샘플로 튜닝 |
불량 샘플 확보 가능 시 |
실무 주의: 비지도 방식은 False Positive가 높을 수 있습니다. 초기에는 보수적 임계값으로 시작하고, 운영 데이터를 축적하며 점진적으로 조정하세요. Human-in-the-loop 검증 프로세스를 병행하는 것이 좋습니다.