반응형
"""
Road Vertical Contour Detection (stabilized coordinate frame) – core algorithm skeleton
- 전체 Activity Diagram
- 단일 프레임 처리 Activity
- Multi-frame 정렬 + 1D 샘플링 Activity
GUI / 실제 영상 / 실제 차량 CAN 연동은 제외.
수학/비전 알고리즘 부분은 최소 동작 가능한 수준 + TODO 로 표시.
"""
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Dict
import numpy as np
# ============================================================
# 0. 공통 데이터 구조
# ============================================================
@dataclass
class FrameData:
"""입력 프레임 및 차량 상태"""
frame_id: int
timestamp: float
image: np.ndarray # H x W x 3
vehicle_speed: float # m/s
yaw_rate: float # rad/s
@dataclass
class PoseEstimate:
"""도메인 상 pose / 평면 / 호모그래피 정보"""
R: np.ndarray # 3x3
T: np.ndarray # 3x1
N_curr: np.ndarray # 3x1 (도로 평면 법선)
D_curr: float # 평면 거리
N_stab: np.ndarray # 안정화된 법선
D_stab: float # 안정화된 거리
H: np.ndarray # 3x3 (undistorted)
H_dist: np.ndarray # 3x3 (distorted)
H_far: Optional[np.ndarray] = None # 3x3
H_far_dist: Optional[np.ndarray] = None # 3x3
@dataclass
class RoadProfile1Frame:
"""단일 프레임 기준 거리–높이 프로파일"""
frame_id: int
z: np.ndarray # shape (N,)
h: np.ndarray # shape (N,)
confidence: np.ndarray # shape (N,) in [0,1]
@dataclass
class RoadProfileMultiFrame:
"""K프레임 정렬 + median 기반 multi-frame profile"""
z: np.ndarray # shape (N,)
h_multi: np.ndarray # shape (N,)
h_all_frames: List[np.ndarray] # 각 프레임별 정렬 후 h
confidence: np.ndarray # shape (N,)
@dataclass
class SampleBurst1D:
"""1D 샘플 버스트 (Control / CAN 에 보내는 용도)"""
timestamp: float
Z_ref: float
z_samples: np.ndarray # shape (M,)
h_samples: np.ndarray # shape (M,)
conf_samples: np.ndarray # shape (M,)
# ============================================================
# 1. 유틸리티 함수 (placeholder 수준)
# ============================================================
def undistort_image(image: np.ndarray) -> np.ndarray:
"""렌즈 왜곡 보정 (radial, tangential) – 여기서는 단순 패스."""
# TODO: 실제 카메라 매트릭스 / 왜곡계수 이용
return image.copy()
def compensate_rolling_shutter(
image: np.ndarray,
vehicle_speed: float,
yaw_rate: float,
row_reference: int
) -> np.ndarray:
"""
롤링셔터 보정.
특허에서는 각 row가 다른 시점에 캡처되므로
row(y)별 ego-motion 보정 수행.
여기서는 스텁으로 구현.
"""
# TODO: 실제 motion model 기반 warp
return image.copy()
def estimate_homography(
prev_points: np.ndarray,
curr_points: np.ndarray
) -> np.ndarray:
"""
RANSAC/IRLS 기반 호모그래피 추정 (3x3).
여기서는 numpy 의 최소 구현만 placeholder.
"""
# TODO: cv2.findHomography + RANSAC 등 사용
H = np.eye(3, dtype=np.float32)
return H
def decompose_homography(
H: np.ndarray,
K: np.ndarray,
plane_normal_guess: np.ndarray,
camera_height_guess: float
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
"""
호모그래피를 R,T, N,D 로 분해.
- H = K(R + 1/d * T * N^T) K^{-1} 형태를 가정.
여기서는 간단한 placeholder.
"""
# TODO: 실제 분해 알고리즘 구현
R = np.eye(3, dtype=np.float32)
T = np.array([[0.0], [0.0], [1.0]], dtype=np.float32)
N_curr = plane_normal_guess / (np.linalg.norm(plane_normal_guess) + 1e-6)
D_curr = camera_height_guess
return R, T, N_curr, D_curr
def stabilize_plane(
N_hist: List[np.ndarray],
D_hist: List[float],
N_curr: np.ndarray,
D_curr: float,
hist_len: int = 10
) -> Tuple[np.ndarray, float, List[np.ndarray], List[float]]:
"""
N, D 히스토리를 이용해 안정화된 평면값 계산 (moving average 정도).
"""
N_hist.append(N_curr)
D_hist.append(D_curr)
if len(N_hist) > hist_len:
N_hist.pop(0)
D_hist.pop(0)
N_arr = np.stack(N_hist, axis=0) # (T,3)
D_arr = np.array(D_hist)
N_stab = np.mean(N_arr, axis=0)
N_stab /= (np.linalg.norm(N_stab) + 1e-6)
D_stab = float(np.mean(D_arr))
return N_stab, D_stab, N_hist, D_hist
def project_vehicle_path_world(
R_far: np.ndarray,
T_far: np.ndarray,
N_stab: np.ndarray,
D_stab: float,
num_points: int = 50,
max_distance: float = 30.0
) -> np.ndarray:
"""
세계 좌표계 상에서 차량 경로를 샘플링.
간단하게 x-축 진행, y=0, z=0 으로 가정.
반환: points_world (N, 3)
"""
z = np.linspace(0.0, max_distance, num_points)
x = np.zeros_like(z)
y = np.zeros_like(z)
points_world = np.stack([x, y, z], axis=-1)
# TODO: 실제 R_far, T_far, 평면 교차 등 반영
return points_world
def world_to_image(
K: np.ndarray,
R: np.ndarray,
T: np.ndarray,
points_world: np.ndarray
) -> np.ndarray:
"""
세계 좌표계 -> 카메라 좌표 -> 이미지 좌표 (u,v).
"""
# World -> Camera
R_wc = R
T_wc = T
pts_cam = (R_wc @ points_world.T + T_wc).T # (N,3)
# Camera -> Image
uvw = (K @ pts_cam.T).T
u = uvw[:, 0] / (uvw[:, 2] + 1e-6)
v = uvw[:, 1] / (uvw[:, 2] + 1e-6)
return np.stack([u, v], axis=-1)
def compute_residual_profile(
image_far: np.ndarray,
image_cur: np.ndarray,
path_pixels: np.ndarray,
strip_half_width_px: int = 5
) -> Tuple[np.ndarray, np.ndarray]:
"""
road 상 차량 경로 주변 strip 을 warping 하고 잔차 motion dy 계산.
여기서는 단순히 z 축은 index, h는 random noise 로 placeholder.
"""
num_points = path_pixels.shape[0]
z = np.linspace(0, 30, num_points)
# TODO: 실제 strip warp + optical flow / NCC 기반 dy 추정
dy = np.random.randn(num_points) * 0.01 # 작은 noise
# dy를 height 로 변환하는 scale factor (카메라 pitch/height 기반)
scale = 0.5
h = dy * scale
return z, h
def compute_confidence_from_residual(
h: np.ndarray
) -> np.ndarray:
"""
residual 기반 confidence 계산. 여기서는 단순히 |h| 크기에 반비례.
"""
mag = np.abs(h)
conf = np.exp(-mag / (np.std(mag) + 1e-6))
conf = np.clip(conf, 0.0, 1.0)
return conf
# ============================================================
# 2. 단일 프레임 파이프라인
# ============================================================
class SingleFrameProcessor:
"""
- Frame k, Frame far(= k - delta) 이용
- H 추정, R,T,N,D 분해
- 안정화된 평면 기반 경로 residual -> single-frame profile 생성
"""
def __init__(
self,
K: np.ndarray,
plane_normal_guess: np.ndarray,
camera_height_guess: float,
row_reference: int = 0
):
self.K = K
self.plane_normal_guess = plane_normal_guess
self.camera_height_guess = camera_height_guess
self.row_reference = row_reference
# 평면 안정화를 위한 히스토리
self.N_hist: List[np.ndarray] = []
self.D_hist: List[float] = []
# 이전 프레임 저장용
self.prev_frame: Optional[FrameData] = None
def process_frame(
self,
frame: FrameData
) -> Tuple[Optional[PoseEstimate], Optional[RoadProfile1Frame]]:
"""
Frame 단위 처리.
prev_frame 이 없다면 pose/profile 계산은 skip.
"""
if self.prev_frame is None:
self.prev_frame = frame
return None, None
prev = self.prev_frame
cur = frame
# 1) why: 두 이미지 undistort
img_prev_und = undistort_image(prev.image)
img_cur_und = undistort_image(cur.image)
# 2) rolling shutter 보정 (distorted domain에서)
img_prev_rs = compensate_rolling_shutter(
prev.image, prev.vehicle_speed, prev.yaw_rate, self.row_reference
)
img_cur_rs = compensate_rolling_shutter(
cur.image, cur.vehicle_speed, cur.yaw_rate, self.row_reference
)
# 3) 도로 feature point tracking (여기서는 임의 좌표 생성)
# TODO: 실제 optical flow / feature detection 사용
num_points = 100
h, w = cur.image.shape[:2]
# road 하부 부분에 랜덤 점
prev_pts = np.stack([
np.random.uniform(0, w, num_points),
np.random.uniform(h * 0.5, h, num_points)
], axis=-1)
curr_pts = prev_pts + np.random.randn(*prev_pts.shape) * 0.5
# 4) RANSAC 기반 호모그래피 추정
H = estimate_homography(prev_pts, curr_pts)
# 5) H -> R,T,N_curr,D_curr 분해
R, T, N_curr, D_curr = decompose_homography(
H, self.K, self.plane_normal_guess, self.camera_height_guess
)
# 6) 평면 안정화
N_stab, D_stab, self.N_hist, self.D_hist = stabilize_plane(
self.N_hist, self.D_hist, N_curr, D_curr
)
# 7) far frame 선정 (여기서는 직전 프레임 = prev 를 far 로 사용)
# 실제 구현에서는 몇 프레임 전(거리상 멀어지게) 프레임을 선택.
H_far = H.copy()
H_far_dist = H_far.copy() # 단순 placeholder
pose = PoseEstimate(
R=R,
T=T,
N_curr=N_curr,
D_curr=D_curr,
N_stab=N_stab,
D_stab=D_stab,
H=H,
H_dist=H, # 실제로는 undist, dist 분리
H_far=H_far,
H_far_dist=H_far_dist
)
# 8) 세계좌표 상 차량 경로 샘플링
points_world = project_vehicle_path_world(
R_far=R,
T_far=T,
N_stab=N_stab,
D_stab=D_stab,
num_points=80,
max_distance=30.0
)
# 9) world -> image (undistorted) -> distorted 로 변환
path_und = world_to_image(
self.K, R, T, points_world
)
# 여기서는 dist/undist 차이를 무시
path_pixels = path_und
# 10) strip warp + residual profile 계산
z, h = compute_residual_profile(
image_far=img_prev_rs,
image_cur=img_cur_rs,
path_pixels=path_pixels,
strip_half_width_px=5
)
# 11) residual -> height profile + confidence
conf = compute_confidence_from_residual(h)
profile = RoadProfile1Frame(
frame_id=cur.frame_id,
z=z,
h=h,
confidence=conf
)
self.prev_frame = frame
return pose, profile
# ============================================================
# 3. Multi-frame 정렬 + 최적화 (slope a, offset b)
# ============================================================
@dataclass
class ProfileHistoryEntry:
profile: RoadProfile1Frame
a: float = 1.0 # slope
b: float = 0.0 # offset
class MultiFrameAligner:
"""
K개의 최근 단일 프레임 프로파일을 정렬 (slope, offset) 후
median 기반 multi-frame profile 생성.
"""
def __init__(self, K: int = 5):
self.K = K
self.history: List[ProfileHistoryEntry] = []
def add_profile(self, profile: RoadProfile1Frame) -> None:
"""히스토리에 프로파일 추가."""
self.history.append(ProfileHistoryEntry(profile=profile))
if len(self.history) > self.K:
self.history.pop(0)
def _interpolate_to_common_grid(
self,
profiles: List[RoadProfile1Frame],
num_samples: int = 100
) -> Tuple[np.ndarray, np.ndarray]:
"""
여러 profile (각각 z,h 길이가 다를 수 있음)을
공통 z_grid 에 인터폴레이션.
반환:
z_grid: (N,)
h_matrix: (M,N) -> M = #profiles
"""
# 공통 z 범위: 모든 profile 의 공통구간 교집합
z_min = max(p.z.min() for p in profiles)
z_max = min(p.z.max() for p in profiles)
if z_max <= z_min:
# 교집합이 거의 없는 경우, 임의로 첫 profile 기준 사용
base = profiles[0]
return base.z.copy(), np.stack([p.h for p in profiles], axis=0)
z_grid = np.linspace(z_min, z_max, num_samples)
h_matrix = []
for p in profiles:
h_interp = np.interp(z_grid, p.z, p.h)
h_matrix.append(h_interp)
h_matrix = np.stack(h_matrix, axis=0)
return z_grid, h_matrix
def _compute_cost_and_grad(
self,
base_grid: np.ndarray,
base_h: np.ndarray,
target_h: np.ndarray,
a: float,
b: float,
lam_smooth: float = 1e-3,
lam_reg: float = 1e-3
) -> Tuple[float, float, float]:
"""
간단한 비용함수 + gradient:
E(a,b) = || (a * base_h + b) - target_h ||^2
+ lam_smooth * ||grad_z (a * base_h + b)||^2
+ lam_reg * (a-1)^2
"""
h_adj = a * base_h + b
diff = h_adj - target_h
# data term
E_data = float(np.mean(diff ** 2))
# smoothness term (1차 차분)
grad_adj = np.diff(h_adj)
E_smooth = float(np.mean(grad_adj ** 2))
# reg term (a 근처에서)
E_reg = float((a - 1.0) ** 2)
E_total = E_data + lam_smooth * E_smooth + lam_reg * E_reg
# gradient w.r.t. a,b (대략적인 도함수)
dE_da = 2.0 * np.mean(diff * base_h) + 2.0 * lam_reg * (a - 1.0)
dE_db = 2.0 * np.mean(diff)
# smoothness term contribution (대략적인 finite difference)
# grad_adj = (a*h[i+1]+b - (a*h[i]+b)) = a*(h[i+1]-h[i])
# dE/da ~= 2 * mean(grad_adj * (h[i+1]-h[i]))
dh = np.diff(base_h)
dE_da += 2.0 * lam_smooth * np.mean(grad_adj * dh)
return E_total, float(dE_da), float(dE_db)
def _align_single_profile(
self,
ref_h: np.ndarray,
cur_h: np.ndarray,
init_a: float = 1.0,
init_b: float = 0.0,
n_iter: int = 20,
lr: float = 0.1
) -> Tuple[float, float]:
"""SGD 비슷하게 slope a, offset b 최적화."""
a = init_a
b = init_b
for _ in range(n_iter):
E, dE_da, dE_db = self._compute_cost_and_grad(
base_grid=None,
base_h=cur_h,
target_h=ref_h,
a=a,
b=b
)
a -= lr * dE_da
b -= lr * dE_db
return a, b
def build_multi_frame_profile(self) -> Optional[RoadProfileMultiFrame]:
"""
현재 history를 이용해 multi-frame profile 생성.
"""
if len(self.history) == 0:
return None
if len(self.history) == 1:
p = self.history[0].profile
return RoadProfileMultiFrame(
z=p.z.copy(),
h_multi=p.h.copy(),
h_all_frames=[p.h.copy()],
confidence=p.confidence.copy()
)
# 1) profile 리스트
profiles = [h.profile for h in self.history]
# 2) 공통 z_grid 및 h_matrix
z_grid, h_matrix = self._interpolate_to_common_grid(profiles)
M, N = h_matrix.shape
# 3) 기준 profile: 가장 최근 profile 을 기준으로 사용
ref_idx = M - 1
ref_h = h_matrix[ref_idx]
# 4) 각 profile 마다 (a,b) 최적화
aligned_h_all = []
for i in range(M):
cur_h = h_matrix[i]
if i == ref_idx:
# 기준 프로파일: (a,b) = (1,0)
a_i, b_i = 1.0, 0.0
else:
a_i, b_i = self._align_single_profile(ref_h, cur_h)
# 정렬 적용
aligned_h = a_i * cur_h + b_i
aligned_h_all.append(aligned_h)
aligned_h_all = np.stack(aligned_h_all, axis=0) # (M,N)
# 5) median 기반 multi-frame profile
h_multi = np.median(aligned_h_all, axis=0)
# 6) confidence: 간단히 평균값 사용
conf_multi = np.ones_like(h_multi) # placeholder
return RoadProfileMultiFrame(
z=z_grid,
h_multi=h_multi,
h_all_frames=[aligned_h_all[i] for i in range(M)],
confidence=conf_multi
)
# ============================================================
# 4. 1D 샘플링 + 버스트 생성
# ============================================================
class ProfileSampler1D:
"""
- multi-frame profile 로부터 특정 거리 Z_ref 앞의 샘플 버스트 생성
- 버스트 내에서는 Z_ref 근처를 dZ/10 간격으로 샘플
"""
def __init__(
self,
burst_period: float = 0.05, # 20Hz 등
latency: float = 0.0
):
self.burst_period = burst_period
self.latency = latency
self.prev_burst: Optional[SampleBurst1D] = None
def sample_burst(
self,
profile_multi: RoadProfileMultiFrame,
vehicle_speed: float,
timestamp: float,
Z_ref: float = 7.0,
num_samples: int = 20
) -> SampleBurst1D:
"""
- dZ = v * burst_period
- Z_i = Z_ref + (i - num_samples/2) * (dZ/10) + latency 보정
"""
z_grid = profile_multi.z
h_multi = profile_multi.h_multi
dZ = vehicle_speed * self.burst_period
offsets = (np.arange(num_samples) - num_samples / 2.0) * (dZ / 10.0)
Z_i = Z_ref + offsets + vehicle_speed * self.latency
# z 범위를 벗어나면 클리핑
Z_i_clipped = np.clip(Z_i, z_grid.min(), z_grid.max())
# 보간
h_i = np.interp(Z_i_clipped, z_grid, h_multi)
# confidence: multi-frame confidence 를 동일하게 적용 (간단화)
conf_i = np.ones_like(h_i)
# 부드러운 연결 (이전 버스트와 blending) – placeholder
if self.prev_burst is not None:
alpha = 0.5
h_i = alpha * h_i + (1 - alpha) * self.prev_burst.h_samples
burst = SampleBurst1D(
timestamp=timestamp,
Z_ref=Z_ref,
z_samples=Z_i_clipped,
h_samples=h_i,
conf_samples=conf_i
)
self.prev_burst = burst
return burst
# ============================================================
# 5. 전체 파이프라인 (Activity Diagram 통합)
# ============================================================
class RoadProfilePipeline:
"""
- 전체 Activity Diagram 의 핵심 알고리즘 파이프라인
- GUI 없음, 영상/프레임만 들어온다고 가정
"""
def __init__(
self,
K: np.ndarray,
plane_normal_guess: np.ndarray,
camera_height_guess: float,
history_K: int = 5,
burst_period: float = 0.05
):
self.single_frame_proc = SingleFrameProcessor(
K=K,
plane_normal_guess=plane_normal_guess,
camera_height_guess=camera_height_guess
)
self.aligner = MultiFrameAligner(K=history_K)
self.sampler = ProfileSampler1D(burst_period=burst_period)
def process_frame(
self,
frame: FrameData
) -> Dict[str, Optional[object]]:
"""
전체 흐름:
- 단일 프레임 처리
- history 업데이트 + multi-frame profile 생성
- 1D 버스트 샘플링
"""
pose, profile_1f = self.single_frame_proc.process_frame(frame)
if profile_1f is not None:
self.aligner.add_profile(profile_1f)
profile_multi: Optional[RoadProfileMultiFrame] = None
if len(self.aligner.history) > 0:
profile_multi = self.aligner.build_multi_frame_profile()
burst: Optional[SampleBurst1D] = None
if profile_multi is not None:
burst = self.sampler.sample_burst(
profile_multi=profile_multi,
vehicle_speed=frame.vehicle_speed,
timestamp=frame.timestamp
)
return {
"pose": pose,
"profile_1f": profile_1f,
"profile_multi": profile_multi,
"burst": burst
}
# ============================================================
# 6. 사용 예시 (dummy 데이터로 파이프라인 흐름만 확인)
# ============================================================
if __name__ == "__main__":
# 카메라 내참수, 도로 평면 초기 guess
K = np.array([[1200, 0, 640],
[0, 1200, 360],
[0, 0, 1]], dtype=np.float32)
plane_normal_guess = np.array([0, 1, 0], dtype=np.float32) # 위쪽
camera_height_guess = 1.5
pipeline = RoadProfilePipeline(
K=K,
plane_normal_guess=plane_normal_guess,
camera_height_guess=camera_height_guess,
history_K=5,
burst_period=0.05
)
# dummy 프레임 생성
H_img, W_img = 720, 1280
for i in range(20):
img = np.zeros((H_img, W_img, 3), dtype=np.uint8)
# 간단한 패턴 추가
img[:, :] = (i * 5) % 255
frame = FrameData(
frame_id=i,
timestamp=0.05 * i,
image=img,
vehicle_speed=15.0, # m/s
yaw_rate=0.0
)
out = pipeline.process_frame(frame)
print(
f"Frame {i:02d} -> "
f"pose: {out['pose'] is not None}, "
f"1f_profile: {out['profile_1f'] is not None}, "
f"multi_profile: {out['profile_multi'] is not None}, "
f"burst: {out['burst'] is not None}"
)
반응형
'개발자 > Visual Odometry' 카테고리의 다른 글
| RoadVertical estimation Dynamic Programming (0) | 2026.01.11 |
|---|---|
| ROAD VERTICAL CONTOUR DETECTIONUSING A STABILIZED COORDINATEFRAME #1 (1) | 2026.01.02 |
| 초기 Homography 구하는 방식, Homography Refinement 작업까지 상세하게 (0) | 2025.11.26 |
| 제미나이 구현 (0) | 2025.11.26 |
| Homography refinement 잘 됐는지 확인하기 (0) | 2025.11.24 |