본문 바로가기

PY(Python Image Processing)

optical flow - opencv & python

728x90
import cv2
import numpy as np

# YOLO 설정
yolo_config = 'D:\\yolov4-tiny.cfg'
yolo_weights = 'D:\\yolov4-tiny.weights'
yolo_classes = 'D:\\coco.names'

with open(yolo_classes, 'r') as f:
    classes = [line.strip() for line in f.readlines()]

net = cv2.dnn.readNet(yolo_weights, yolo_config)
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)

# 동영상 파일 경로
video_path = 'D:\\video.mp4'
cap = cv2.VideoCapture(video_path)

# 고정 해상도 설정
fixed_width = 640
fixed_height = 360
dim = (fixed_width, fixed_height)

# 첫 번째 프레임 처리
ret, prev_frame = cap.read()
if not ret:
    print("동영상을 열 수 없습니다.")
    cap.release()
    exit()

prev_frame = cv2.resize(prev_frame, dim)
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

frame_rate = cap.get(cv2.CAP_PROP_FPS)  # FPS 추출

# NMS 파라미터 설정
score_threshold = 0.5
nms_threshold = 0.4

# 이동 방향 누적 벡터 초기화
direction_buffer = {}

while True:
    ret, frame = cap.read()
    if not ret:
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)  # 동영상 반복 재생
        continue

    frame = cv2.resize(frame, dim)
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # YOLO로 객체 검출
    blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
    net.setInput(blob)
    layer_outputs = net.forward(net.getUnconnectedOutLayersNames())

    boxes = []
    confidences = []
    for output in layer_outputs:
        for detection in output:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if classes[class_id] == "person" and confidence > score_threshold:
                center_x = int(detection[0] * frame.shape[1])  # 실제 프레임 가로 크기 비례
                center_y = int(detection[1] * frame.shape[0])  # 실제 프레임 세로 크기 비례
                w = int(detection[2] * frame.shape[1])         # 실제 폭
                h = int(detection[3] * frame.shape[0])         # 실제 높이
                x = int(center_x - w / 2)
                y = int(center_y - h / 2)
                boxes.append([x, y, w, h])
                confidences.append(float(confidence))

    # Non-Maximum Suppression 적용
    indices = cv2.dnn.NMSBoxes(boxes, confidences, score_threshold, nms_threshold)

    # 선택된 박스만 사용
    final_boxes = []
    if len(indices) > 0:
        for i in indices.flatten():
            final_boxes.append(boxes[i])

    # 바운딩 박스 및 옵티컬 플로우 처리
    for i, (x, y, w, h) in enumerate(final_boxes):
        roi_prev_gray = prev_gray[y:y+h, x:x+w]
        roi_gray = gray[y:y+h, x:x+w]

        if roi_prev_gray.size == 0 or roi_gray.size == 0:
            continue

        # 옵티컬 플로우 계산
        flow = cv2.calcOpticalFlowFarneback(roi_prev_gray, roi_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

        # ROI 내 이동 벡터 필터링
        valid_magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
        threshold = np.percentile(valid_magnitude, 80)  # 상위 20% 이동 벡터만 사용
        valid_mask = valid_magnitude > threshold
        avg_fx = np.mean(flow[..., 0][valid_mask])
        avg_fy = np.mean(flow[..., 1][valid_mask])
        avg_speed = np.mean(valid_magnitude[valid_mask]) * frame_rate  # 속도 (픽셀/초)

        # 항상 왼쪽 이동으로 강제
        if avg_fx > 0:
            avg_fx = -np.abs(avg_fx)

        # y축 이동 제거
        if np.abs(avg_fy) > 0.2:  # 기준값 이상 이동하는 경우 제거
            avg_fy = 0

        # 방향 누적 (스무싱)
        if i not in direction_buffer:
            direction_buffer[i] = (avg_fx, avg_fy)
        else:
            prev_fx, prev_fy = direction_buffer[i]
            avg_fx = 0.8 * prev_fx + 0.2 * avg_fx  # 평탄화
            avg_fy = 0.8 * prev_fy + 0.2 * avg_fy
            direction_buffer[i] = (avg_fx, avg_fy)

        # 화살표 크기를 텍스트 폭으로 고정
        text = f"Speed: {avg_speed:.2f} px/s"
        text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]  # 텍스트 폭 계산
        vector_length = text_size[0] / 4 # 텍스트 폭을 화살표 길이로 사용

        # 방향 벡터 정규화
        magnitude = np.sqrt(avg_fx**2 + avg_fy**2) + 1e-6
        direction_x = (avg_fx / magnitude) * vector_length
        direction_y = (avg_fy / magnitude) * vector_length

        # 화살표 시작점과 끝점 계산 (머리 위)
        arrow_start = (x + w // 2, y - 10)  # 화살표는 머리 위 약간 위쪽에
        arrow_end = (
            x + w // 2 + int(direction_x),  # 화살표 끝점 x
            arrow_start[1] - int(direction_y)  # 화살표 끝점 y
        )

        # 텍스트 위치 계산
        text_position = (arrow_start[0] - text_size[0] // 2, arrow_start[1] - 15)

        # 화살표 그리기
        cv2.arrowedLine(frame, arrow_start, arrow_end, (0, 0, 255), 3, tipLength=0.2)

        # 속도 표시 (텍스트)
        cv2.putText(
            frame,
            text,
            text_position,
            cv2.FONT_HERSHEY_SIMPLEX,
            0.6,
            (255, 255, 255),
            2
        )

        # 바운딩 박스 그리기
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

    # 출력
    cv2.imshow('YOLO + NMS + Optical Flow', frame)
    prev_gray = gray

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

 

728x90