728x90
from abc import ABC, abstractmethod # type: ignore
import cv2
import numpy as np
# ---------------------------
# 추상 클래스: 객체 검출
# ---------------------------
class ObjectDetector(ABC):
@abstractmethod
def detect_objects(self, frame):
pass
# ---------------------------
# YOLO 기반 객체 검출 구현
# ---------------------------
class YOLOProcessor(ObjectDetector):
def __init__(self, config_path, weights_path, classes_path, score_threshold=0.5, nms_threshold=0.4):
self.net, self.classes = self._initialize_yolo(config_path, weights_path, classes_path)
self.score_threshold = score_threshold
self.nms_threshold = nms_threshold
def _initialize_yolo(self, config_path, weights_path, classes_path):
with open(classes_path, 'r') as f:
classes = [line.strip() for line in f.readlines()]
net = cv2.dnn.readNet(weights_path, config_path)
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
return net, classes
def detect_objects(self, frame):
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
self.net.setInput(blob)
layer_outputs = self.net.forward(self.net.getUnconnectedOutLayersNames())
boxes, confidences = [], []
for output in layer_outputs:
for detection in output:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if self.classes[class_id] == "person" and confidence > self.score_threshold:
center_x = int(detection[0] * frame.shape[1])
center_y = int(detection[1] * frame.shape[0])
w = int(detection[2] * frame.shape[1])
h = int(detection[3] * frame.shape[0])
x = int(center_x - w / 2)
y = int(center_y - h / 2)
boxes.append([x, y, w, h])
confidences.append(float(confidence))
indices = cv2.dnn.NMSBoxes(boxes, confidences, self.score_threshold, self.nms_threshold)
# 수정된 부분: indices 처리
final_boxes = []
if indices is not None:
if isinstance(indices, (list, np.ndarray)): # 리스트나 배열인 경우
final_boxes = [boxes[i] for i in indices.flatten()] # indices를 평탄화 후 순회
elif isinstance(indices, int): # 스칼라인 경우
final_boxes = [boxes[indices]]
return final_boxes
# ---------------------------
# 추상 클래스: 옵티컬 플로우
# ---------------------------
class FlowProcessor(ABC):
@abstractmethod
def calculate_flow(self, prev_gray, gray, box):
pass
# ---------------------------
# 옵티컬 플로우 계산 구현
# ---------------------------
class OpticalFlowProcessor(FlowProcessor):
def __init__(self, frame_rate):
self.frame_rate = frame_rate
def calculate_flow(self, prev_gray, gray, box):
x, y, w, h = box
roi_prev_gray = prev_gray[y:y+h, x:x+w]
roi_gray = gray[y:y+h, x:x+w]
if roi_prev_gray.size == 0 or roi_gray.size == 0:
return None, None, None
flow = cv2.calcOpticalFlowFarneback(roi_prev_gray, roi_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
valid_magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
threshold = np.percentile(valid_magnitude, 80)
valid_mask = valid_magnitude > threshold
avg_fx = np.mean(flow[..., 0][valid_mask])
avg_fy = np.mean(flow[..., 1][valid_mask])
avg_speed = np.mean(valid_magnitude[valid_mask]) * self.frame_rate if valid_mask.any() else 0
return avg_fx, avg_fy, avg_speed
# ---------------------------
# 비즈니스 로직: 동영상 처리
# ---------------------------
class VideoProcessor:
def __init__(self, video_path, detector: ObjectDetector, flow_processor: FlowProcessor, frame_width=640, frame_height=360):
self.video_path = video_path
self.detector = detector
self.flow_processor = flow_processor
self.frame_width = frame_width
self.frame_height = frame_height
self.direction_buffer = {}
def process(self):
cap = cv2.VideoCapture(self.video_path)
ret, prev_frame = cap.read()
if not ret:
print("동영상을 열 수 없습니다.")
cap.release()
return
prev_frame = cv2.resize(prev_frame, (self.frame_width, self.frame_height))
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
frame_rate = cap.get(cv2.CAP_PROP_FPS)
while True:
ret, frame = cap.read()
if not ret:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
frame = cv2.resize(frame, (self.frame_width, self.frame_height))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
final_boxes = self.detector.detect_objects(frame)
for i, box in enumerate(final_boxes):
avg_fx, avg_fy, avg_speed = self.flow_processor.calculate_flow(prev_gray, gray, box)
if avg_fx is None or avg_fy is None:
continue
if avg_fx > 0:
avg_fx = -np.abs(avg_fx)
if np.abs(avg_fy) > 0.2:
avg_fy = 0
if i not in self.direction_buffer:
self.direction_buffer[i] = (avg_fx, avg_fy)
else:
prev_fx, prev_fy = self.direction_buffer[i]
avg_fx = 0.8 * prev_fx + 0.2 * avg_fx
avg_fy = 0.8 * prev_fy + 0.2 * avg_fy
self.direction_buffer[i] = (avg_fx, avg_fy)
self._draw_arrow_and_text(frame, box, avg_fx, avg_fy, avg_speed)
cv2.imshow('YOLO + NMS + Optical Flow', frame)
prev_gray = gray
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def _draw_arrow_and_text(self, frame, box, avg_fx, avg_fy, avg_speed, arrow_width=3, x_scale_factor=0.25):
"""
Draws an arrow indicating the movement direction and a text showing the speed.
:param frame: The frame to draw on.
:param box: The bounding box coordinates (x, y, w, h).
:param avg_fx: Average flow in the x-direction.
:param avg_fy: Average flow in the y-direction.
:param avg_speed: The calculated speed of the object.
:param arrow_width: The width of the arrow (default: 5).
:param x_scale_factor: Scaling factor for the arrow's x-axis length (default: 0.5).
"""
x, y, w, h = box
text = f"Speed: {avg_speed:.2f} px/s"
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
vector_length = text_size[0] # Base length for the arrow
# Normalize the direction vector
magnitude = np.sqrt(avg_fx**2 + avg_fy**2) + 1e-6
direction_x = (avg_fx / magnitude) * vector_length * x_scale_factor # Apply x-axis scale
direction_y = (avg_fy / magnitude) * vector_length # Keep y-axis length unchanged
# Calculate arrow start and end points
arrow_start = (x + w // 2, y - 10) # Arrow starts slightly above the box
arrow_end = (x + w // 2 + int(direction_x), arrow_start[1] - int(direction_y))
text_position = (arrow_start[0] - text_size[0] // 2, arrow_start[1] - 15)
# Draw arrow
cv2.arrowedLine(frame, arrow_start, arrow_end, (0, 0, 255), arrow_width, tipLength=0.2)
# Draw text
cv2.putText(frame, text, text_position, cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Draw bounding box
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# ---------------------------
# 실행 코드
# ---------------------------
if __name__ == "__main__":
yolo_config = 'D:\\yolov4-tiny.cfg'
yolo_weights = 'D:\\yolov4-tiny.weights'
yolo_classes = 'D:\\coco.names'
video_path = 'D:\\video.mp4'
yolo_processor = YOLOProcessor(yolo_config, yolo_weights, yolo_classes)
flow_processor = OpticalFlowProcessor(frame_rate=30)
video_processor = VideoProcessor(video_path, yolo_processor, flow_processor)
video_processor.process()

728x90
'PY(Python Image Processing)' 카테고리의 다른 글
| optical flow - python, opencv 추상화 및 디자인패턴 적용 - 화살표 방향 수정 (0) | 2025.02.25 |
|---|---|
| optical flow - python, opencv 추상화 및 디자인패턴 적용 (0) | 2024.12.24 |
| optical flow - opencv & python (0) | 2024.12.24 |
| python pip install (2) | 2024.09.02 |
| kakaotalk PC APP setText test (0) | 2024.08.15 |