update devkit server and model files
This commit is contained in:
@@ -235,25 +235,45 @@ class SensorPushServicer(sensor_stream_pb2_grpc.SensorPushServicer):
|
||||
self.frame_count = 0
|
||||
self.last_report_time = time.time()
|
||||
self.last_angle = None
|
||||
self.last_state = 0
|
||||
self.last_magnitude = 0.0
|
||||
|
||||
def Upload(self, request_iterator, context):
|
||||
print("[SensorPush] Client connected, waiting for frames...")
|
||||
reset_baseline()
|
||||
self.last_angle = None
|
||||
self.last_state = 0
|
||||
self.last_magnitude = 0.0
|
||||
|
||||
for frame in request_iterator:
|
||||
self.frame_count += 1
|
||||
angle = 0.0
|
||||
magnitude = 0.0
|
||||
state = 0
|
||||
cop_x = 0.0
|
||||
cop_y = 0.0
|
||||
base_x = 0.0
|
||||
base_y = 0.0
|
||||
total_press = 0.0
|
||||
threshold = 0.0
|
||||
ok = True
|
||||
message = "OK"
|
||||
if len(frame.matrix) == SENSOR_ROWS * SENSOR_COLS:
|
||||
try:
|
||||
angle = get_pzt_angle(frame.matrix)
|
||||
result = get_pzt_angle(frame.matrix, float(frame.dts_ms))
|
||||
angle, magnitude, state, cop_x, cop_y, base_x, base_y, total_press, threshold = result
|
||||
threshold = threshold or 0.0
|
||||
self.last_angle = angle
|
||||
self.last_state = state
|
||||
self.last_magnitude = magnitude
|
||||
if self.frame_count <= 10 or self.frame_count % 30 == 0:
|
||||
print(
|
||||
f"[SensorPush] PZT angle frame #{frame.seq} "
|
||||
f"dts={frame.dts_ms} angle={angle:.2f}"
|
||||
f"dts={frame.dts_ms} angle={angle:.2f} "
|
||||
f"mag={magnitude:.2f} state={state} "
|
||||
f"cop=({cop_x:.2f},{cop_y:.2f}) "
|
||||
f"base=({base_x:.2f},{base_y:.2f}) "
|
||||
f"total={total_press:.2f} threshold={threshold:.2f}"
|
||||
)
|
||||
except Exception as e:
|
||||
ok = False
|
||||
@@ -270,6 +290,14 @@ class SensorPushServicer(sensor_stream_pb2_grpc.SensorPushServicer):
|
||||
dts_ms=frame.dts_ms,
|
||||
ok=ok,
|
||||
message=message,
|
||||
magnitude=magnitude,
|
||||
state=state,
|
||||
cop_x=cop_x,
|
||||
cop_y=cop_y,
|
||||
base_x=base_x,
|
||||
base_y=base_y,
|
||||
total_press=total_press,
|
||||
threshold=threshold or 0.0,
|
||||
)
|
||||
|
||||
if self.frame_count % 100 == 0:
|
||||
@@ -286,6 +314,8 @@ class SensorPushServicer(sensor_stream_pb2_grpc.SensorPushServicer):
|
||||
f"[SensorPush] Frame #{frame.seq} | "
|
||||
f"{frame.rows}x{frame.cols} | "
|
||||
f"angle={angle_text} | "
|
||||
f"mag={self.last_magnitude:.2f} | "
|
||||
f"state={self.last_state} | "
|
||||
f"force={frame.resultant_force:.1f} | "
|
||||
f"total={self.frame_count} | ~{fps:.1f} fps"
|
||||
)
|
||||
@@ -351,22 +381,42 @@ def serve(port: int):
|
||||
|
||||
import numpy as np
|
||||
import threading
|
||||
from collections import deque
|
||||
|
||||
# ===================== 算法参数=====================
|
||||
TOTAL_PRESSURE_LOW_THRESHOLD = 500
|
||||
COP_STABILITY_FRAMES_REQUIRED = 5
|
||||
SENSOR_ROWS = 12
|
||||
SENSOR_COLS = 7
|
||||
# ===================== 切向力算法参数 ======================
|
||||
COP_INIT_MEDIAN_FRAMES = 1
|
||||
NOISE_COLLECT_FRAMES = 20
|
||||
THRESH_K = 5
|
||||
SENSOR_ROWS = 12
|
||||
SENSOR_COLS = 7
|
||||
|
||||
# ===================== 线程安全全局状态 =====================
|
||||
first_frame = None
|
||||
first_frame_lock = threading.Lock()
|
||||
SNAP_CENTER_X, SNAP_CENTER_Y = 3.0, 5.5
|
||||
SNAP_RANGE_X = 0.0
|
||||
SNAP_RANGE_Y = 0.0
|
||||
|
||||
first_contact_CoP_x = None
|
||||
first_contact_CoP_y = None
|
||||
contact_initialized = False
|
||||
POST_INIT_WINDOW_CNT = 600000
|
||||
POST_INIT_STABLE_CNT = 500
|
||||
POST_INIT_STABLE_THRESH = 0.1
|
||||
|
||||
total_pressure_low_counter = 0
|
||||
# ===================== 线程安全全局状态 ======================
|
||||
first_frame = None
|
||||
first_frame_lock = threading.Lock()
|
||||
|
||||
first_contact_CoP_x = None
|
||||
first_contact_CoP_y = None
|
||||
contact_initialized = False
|
||||
|
||||
cop_init_x_buf = deque(maxlen=COP_INIT_MEDIAN_FRAMES)
|
||||
cop_init_y_buf = deque(maxlen=COP_INIT_MEDIAN_FRAMES)
|
||||
|
||||
noise_sum_buf = deque(maxlen=NOISE_COLLECT_FRAMES)
|
||||
dynamic_thresh = None
|
||||
|
||||
post_init_frame_cnt = 0
|
||||
post_stable_cnt = 0
|
||||
post_refined_flag = False
|
||||
post_cand_x = None
|
||||
post_cand_y = None
|
||||
|
||||
# ===================== 基线减除 =====================
|
||||
def subtract_baseline(current_frame):
|
||||
@@ -378,39 +428,49 @@ def subtract_baseline(current_frame):
|
||||
first_frame = current_frame.copy()
|
||||
|
||||
diff = current_frame - first_frame
|
||||
return np.clip(diff, 0, None)
|
||||
return np.clip(diff, 0, None)
|
||||
|
||||
# ===================== 重置CoP状态 =====================
|
||||
def reset_cop_state():
|
||||
def _legacy_reset_cop_state():
|
||||
global first_contact_CoP_x, first_contact_CoP_y, contact_initialized
|
||||
global total_pressure_low_counter
|
||||
global post_init_frame_cnt, post_stable_cnt, post_refined_flag
|
||||
global post_cand_x, post_cand_y
|
||||
|
||||
first_contact_CoP_x = None
|
||||
first_contact_CoP_y = None
|
||||
contact_initialized = False
|
||||
total_pressure_low_counter = 0
|
||||
cop_init_x_buf.clear()
|
||||
cop_init_y_buf.clear()
|
||||
post_init_frame_cnt = 0
|
||||
post_stable_cnt = 0
|
||||
post_refined_flag = False
|
||||
post_cand_x = None
|
||||
post_cand_y = None
|
||||
|
||||
# ===================== CoP压力中心计算 =====================
|
||||
def compute_pressure_direction(baseline_subtracted_frame):
|
||||
|
||||
# ===================== CoP压力中心计算(新算法) =====================
|
||||
def _legacy_compute_pressure_direction(baseline_subtracted_frame):
|
||||
global first_contact_CoP_x, first_contact_CoP_y, contact_initialized
|
||||
global total_pressure_low_counter
|
||||
global post_init_frame_cnt, post_stable_cnt, post_refined_flag
|
||||
global post_cand_x, post_cand_y
|
||||
global noise_sum_buf, dynamic_thresh
|
||||
|
||||
rows, cols = SENSOR_ROWS, SENSOR_COLS
|
||||
frame_flat = np.asarray(baseline_subtracted_frame, dtype=np.float32).flatten()
|
||||
frame2d = frame_flat.reshape(rows, cols)
|
||||
|
||||
total_pressure = np.sum(frame2d)
|
||||
if total_pressure < TOTAL_PRESSURE_LOW_THRESHOLD:
|
||||
total_pressure_low_counter += 1
|
||||
else:
|
||||
total_pressure_low_counter = 0
|
||||
|
||||
if total_pressure_low_counter >= COP_STABILITY_FRAMES_REQUIRED:
|
||||
reset_cop_state()
|
||||
return 0.0, 0.0
|
||||
if dynamic_thresh is None:
|
||||
noise_sum_buf.append(total_pressure)
|
||||
if len(noise_sum_buf) >= NOISE_COLLECT_FRAMES:
|
||||
sums = np.array(noise_sum_buf)
|
||||
dynamic_thresh = THRESH_K * float(np.mean(sums))
|
||||
|
||||
if total_pressure == 0:
|
||||
return 0.0, 0.0
|
||||
if total_pressure == 0 or (dynamic_thresh is not None and total_pressure < dynamic_thresh):
|
||||
if contact_initialized and dynamic_thresh is not None:
|
||||
_legacy_reset_cop_state()
|
||||
return 0.0, 0.0, 0, rows-1, 0, cols-1, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 0.0, dynamic_thresh
|
||||
|
||||
x_grid = np.tile(np.arange(cols), (rows, 1))
|
||||
y_grid = np.repeat(np.arange(rows), cols).reshape(rows, cols)
|
||||
@@ -419,45 +479,495 @@ def compute_pressure_direction(baseline_subtracted_frame):
|
||||
|
||||
delta_CoP_x = 0.0
|
||||
delta_CoP_y = 0.0
|
||||
base_x = cop_x
|
||||
base_y = cop_y
|
||||
|
||||
if not contact_initialized:
|
||||
first_contact_CoP_x = cop_x
|
||||
first_contact_CoP_y = cop_y
|
||||
contact_initialized = True
|
||||
else:
|
||||
delta_CoP_x = cop_x - first_contact_CoP_x
|
||||
delta_CoP_y = cop_y - first_contact_CoP_y
|
||||
cop_init_x_buf.append(cop_x)
|
||||
cop_init_y_buf.append(cop_y)
|
||||
|
||||
return delta_CoP_x, delta_CoP_y
|
||||
if len(cop_init_x_buf) >= COP_INIT_MEDIAN_FRAMES:
|
||||
first_contact_CoP_x = float(np.median(cop_init_x_buf))
|
||||
first_contact_CoP_y = float(np.median(cop_init_y_buf))
|
||||
contact_initialized = True
|
||||
cop_init_x_buf.clear()
|
||||
cop_init_y_buf.clear()
|
||||
if (abs(first_contact_CoP_x - SNAP_CENTER_X) <= SNAP_RANGE_X and
|
||||
abs(first_contact_CoP_y - SNAP_CENTER_Y) <= SNAP_RANGE_Y):
|
||||
first_contact_CoP_x = SNAP_CENTER_X
|
||||
first_contact_CoP_y = SNAP_CENTER_Y
|
||||
|
||||
else:
|
||||
post_init_frame_cnt += 1
|
||||
if not post_refined_flag and post_init_frame_cnt <= POST_INIT_WINDOW_CNT:
|
||||
if post_cand_x is not None:
|
||||
dist_val = np.hypot(cop_x - post_cand_x, cop_y - post_cand_y)
|
||||
if dist_val <= POST_INIT_STABLE_THRESH:
|
||||
post_stable_cnt += 1
|
||||
else:
|
||||
post_cand_x = cop_x
|
||||
post_cand_y = cop_y
|
||||
post_stable_cnt = 1
|
||||
else:
|
||||
post_cand_x = cop_x
|
||||
post_cand_y = cop_y
|
||||
post_stable_cnt = 1
|
||||
|
||||
if post_stable_cnt >= POST_INIT_STABLE_CNT:
|
||||
first_contact_CoP_x = post_cand_x
|
||||
first_contact_CoP_y = post_cand_y
|
||||
post_refined_flag = True
|
||||
else:
|
||||
post_refined_flag = True
|
||||
|
||||
delta_CoP_x = cop_x - first_contact_CoP_x
|
||||
delta_CoP_y = first_contact_CoP_y - cop_y
|
||||
base_x = first_contact_CoP_x
|
||||
base_y = first_contact_CoP_y
|
||||
|
||||
magnitude = np.hypot(delta_CoP_x, delta_CoP_y)
|
||||
if not contact_initialized:
|
||||
state = 0
|
||||
elif not post_refined_flag:
|
||||
state = 1
|
||||
else:
|
||||
state = 2
|
||||
|
||||
return (cop_x, cop_y,
|
||||
0, rows-1, 0, cols-1,
|
||||
delta_CoP_x, delta_CoP_y,
|
||||
base_x, base_y,
|
||||
magnitude, state,
|
||||
total_pressure, dynamic_thresh)
|
||||
|
||||
# ===================== 角度计算核心 =====================
|
||||
def compute_vector_angle(x: float, y: float) -> tuple[float, float]:
|
||||
def _legacy_compute_vector_angle(x: float, y: float) -> tuple[float, float]:
|
||||
epsilon = 1e-8
|
||||
mag = np.hypot(x, y)
|
||||
angle = np.degrees(np.arctan2(y, x + epsilon))
|
||||
mag = np.hypot(x, y)
|
||||
angle = np.degrees(np.arctan2(y, x + epsilon))
|
||||
if angle < 0:
|
||||
angle += 360
|
||||
return angle, mag
|
||||
|
||||
def compute_PZT_angle(Px: float, Py: float) -> tuple[float, float]:
|
||||
return compute_vector_angle(Px, -Py)
|
||||
def _legacy_compute_PZT_angle(Px: float, Py: float) -> tuple[float, float]:
|
||||
return _legacy_compute_vector_angle(Px, Py)
|
||||
|
||||
# ===================== 核心入口函数 =====================
|
||||
def get_pzt_angle(adc_data):
|
||||
def _legacy_get_pzt_angle(adc_data):
|
||||
if len(adc_data) != 84:
|
||||
raise ValueError("ADC数据长度必须为84")
|
||||
baseline_subtracted = subtract_baseline(adc_data)
|
||||
dx, dy = compute_pressure_direction(baseline_subtracted)
|
||||
pzt_angle, _ = compute_PZT_angle(dx, dy)
|
||||
|
||||
return pzt_angle
|
||||
result = _legacy_compute_pressure_direction(adc_data)
|
||||
cop_x, cop_y = result[0], result[1]
|
||||
dx, dy = result[6], result[7]
|
||||
base_x, base_y = result[8], result[9]
|
||||
magnitude = result[10]
|
||||
state = int(result[11])
|
||||
total_press = result[12]
|
||||
threshold = result[13]
|
||||
pzt_angle, _ = _legacy_compute_PZT_angle(dx, dy)
|
||||
return pzt_angle, magnitude, state, cop_x, cop_y, base_x, base_y, total_press, threshold
|
||||
|
||||
# ===================== 重置基线(校准用) =====================
|
||||
def reset_baseline():
|
||||
global first_frame
|
||||
def _legacy_reset_baseline():
|
||||
global first_frame, noise_sum_buf, dynamic_thresh
|
||||
with first_frame_lock:
|
||||
first_frame = None
|
||||
reset_cop_state()
|
||||
noise_sum_buf.clear()
|
||||
dynamic_thresh = None
|
||||
_legacy_reset_cop_state()
|
||||
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
ADC_LEN = SENSOR_ROWS * SENSOR_COLS
|
||||
|
||||
|
||||
class CoPState(IntEnum):
|
||||
NO_CONTACT = 0
|
||||
INIT_COLLECTING = 1
|
||||
POST_REFINING = 2
|
||||
READY = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class CoPResult:
|
||||
cop_x: float
|
||||
cop_y: float
|
||||
row_min: int
|
||||
row_max: int
|
||||
col_min: int
|
||||
col_max: int
|
||||
dx: float
|
||||
dy: float
|
||||
base_x: float
|
||||
base_y: float
|
||||
magnitude: float
|
||||
state: int
|
||||
total_pressure: float
|
||||
threshold: Optional[float]
|
||||
angle: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class CoPConfig:
|
||||
rows: int = SENSOR_ROWS
|
||||
cols: int = SENSOR_COLS
|
||||
noise_collect_ms: float = 300.0
|
||||
thresh_k: float = 5.0
|
||||
min_threshold: float = 50.0
|
||||
contact_confirm_ms: float = 20.0
|
||||
release_confirm_ms: float = 50.0
|
||||
init_collect_ms: float = 80.0
|
||||
snap_enable: bool = True
|
||||
snap_center_x: float = 3.0
|
||||
snap_center_y: float = 5.5
|
||||
snap_range_x: float = 0.25
|
||||
snap_range_y: float = 0.25
|
||||
post_refine_enable: bool = True
|
||||
post_refine_window_ms: float = 800.0
|
||||
post_stable_ms: float = 200.0
|
||||
post_stable_thresh: float = 0.1
|
||||
cop_lpf_alpha: float = 0.25
|
||||
epsilon: float = 1e-8
|
||||
|
||||
|
||||
class PressureDirectionEstimator:
|
||||
def __init__(self, config: CoPConfig = CoPConfig()):
|
||||
self.cfg = config
|
||||
self.reset_all()
|
||||
|
||||
def reset_all(self):
|
||||
self.dynamic_thresh: Optional[float] = None
|
||||
self.noise_samples = []
|
||||
self.noise_start_ms: Optional[float] = None
|
||||
self.reset_contact_state()
|
||||
|
||||
def reset_contact_state(self):
|
||||
self.first_contact_cop_x: Optional[float] = None
|
||||
self.first_contact_cop_y: Optional[float] = None
|
||||
self.state = CoPState.NO_CONTACT
|
||||
self.init_x_buf = []
|
||||
self.init_y_buf = []
|
||||
self.init_start_ms: Optional[float] = None
|
||||
self.post_start_ms: Optional[float] = None
|
||||
self.post_stable_start_ms: Optional[float] = None
|
||||
self.post_cand_x: Optional[float] = None
|
||||
self.post_cand_y: Optional[float] = None
|
||||
self.post_refined = False
|
||||
self.contact_candidate_start_ms: Optional[float] = None
|
||||
self.release_candidate_start_ms: Optional[float] = None
|
||||
self.filtered_cop_x: Optional[float] = None
|
||||
self.filtered_cop_y: Optional[float] = None
|
||||
|
||||
def update(self, adc_data, timestamp_ms: float) -> CoPResult:
|
||||
frame2d = self._prepare_frame(adc_data)
|
||||
total_pressure = float(np.sum(frame2d))
|
||||
|
||||
self._update_dynamic_threshold(total_pressure, timestamp_ms)
|
||||
|
||||
raw_contact = self._is_raw_contact(total_pressure)
|
||||
contact_valid = self._debounce_contact(raw_contact, timestamp_ms)
|
||||
|
||||
if not contact_valid:
|
||||
self._handle_no_contact()
|
||||
return self._make_empty_result(total_pressure)
|
||||
|
||||
cop_x, cop_y = self._compute_cop(frame2d, total_pressure)
|
||||
cop_x, cop_y = self._filter_cop(cop_x, cop_y)
|
||||
|
||||
self._update_state_machine(cop_x, cop_y, timestamp_ms)
|
||||
|
||||
if self.first_contact_cop_x is None or self.first_contact_cop_y is None:
|
||||
dx = 0.0
|
||||
dy = 0.0
|
||||
base_x = cop_x
|
||||
base_y = cop_y
|
||||
else:
|
||||
base_x = self.first_contact_cop_x
|
||||
base_y = self.first_contact_cop_y
|
||||
dx = cop_x - base_x
|
||||
dy = base_y - cop_y
|
||||
|
||||
magnitude = float(np.hypot(dx, dy))
|
||||
angle = self.compute_vector_angle(dx, dy)[0]
|
||||
|
||||
return CoPResult(
|
||||
cop_x=float(cop_x),
|
||||
cop_y=float(cop_y),
|
||||
row_min=0,
|
||||
row_max=self.cfg.rows - 1,
|
||||
col_min=0,
|
||||
col_max=self.cfg.cols - 1,
|
||||
dx=float(dx),
|
||||
dy=float(dy),
|
||||
base_x=float(base_x),
|
||||
base_y=float(base_y),
|
||||
magnitude=magnitude,
|
||||
state=int(self.state),
|
||||
total_pressure=total_pressure,
|
||||
threshold=self.dynamic_thresh,
|
||||
angle=float(angle),
|
||||
)
|
||||
|
||||
def _prepare_frame(self, adc_data) -> np.ndarray:
|
||||
arr = np.asarray(adc_data, dtype=np.float32).flatten()
|
||||
expected_len = self.cfg.rows * self.cfg.cols
|
||||
|
||||
if len(arr) != expected_len:
|
||||
raise ValueError(f"ADC数据长度必须为{expected_len},当前为{len(arr)}")
|
||||
|
||||
arr = np.clip(arr, 0, None)
|
||||
return arr.reshape(self.cfg.rows, self.cfg.cols)
|
||||
|
||||
def _update_dynamic_threshold(self, total_pressure: float, timestamp_ms: float):
|
||||
if self.dynamic_thresh is not None:
|
||||
return
|
||||
|
||||
if self.noise_start_ms is None:
|
||||
self.noise_start_ms = timestamp_ms
|
||||
|
||||
self.noise_samples.append(total_pressure)
|
||||
|
||||
if timestamp_ms - self.noise_start_ms >= self.cfg.noise_collect_ms:
|
||||
samples = np.asarray(self.noise_samples, dtype=np.float32)
|
||||
mean_val = float(np.mean(samples))
|
||||
std_val = float(np.std(samples))
|
||||
thresh = mean_val + self.cfg.thresh_k * std_val
|
||||
self.dynamic_thresh = max(thresh, self.cfg.min_threshold)
|
||||
|
||||
def _is_raw_contact(self, total_pressure: float) -> bool:
|
||||
if self.dynamic_thresh is None:
|
||||
return False
|
||||
return total_pressure >= self.dynamic_thresh
|
||||
|
||||
def _debounce_contact(self, raw_contact: bool, timestamp_ms: float) -> bool:
|
||||
currently_in_contact = self.state != CoPState.NO_CONTACT
|
||||
|
||||
if raw_contact:
|
||||
self.release_candidate_start_ms = None
|
||||
|
||||
if currently_in_contact:
|
||||
return True
|
||||
|
||||
if self.contact_candidate_start_ms is None:
|
||||
self.contact_candidate_start_ms = timestamp_ms
|
||||
|
||||
return timestamp_ms - self.contact_candidate_start_ms >= self.cfg.contact_confirm_ms
|
||||
|
||||
self.contact_candidate_start_ms = None
|
||||
|
||||
if not currently_in_contact:
|
||||
return False
|
||||
|
||||
if self.release_candidate_start_ms is None:
|
||||
self.release_candidate_start_ms = timestamp_ms
|
||||
|
||||
if timestamp_ms - self.release_candidate_start_ms >= self.cfg.release_confirm_ms:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _handle_no_contact(self):
|
||||
if self.state != CoPState.NO_CONTACT:
|
||||
self.reset_contact_state()
|
||||
|
||||
def _compute_cop(self, frame2d: np.ndarray, total_pressure: float) -> Tuple[float, float]:
|
||||
rows, cols = self.cfg.rows, self.cfg.cols
|
||||
x_grid = np.tile(np.arange(cols, dtype=np.float32), (rows, 1))
|
||||
y_grid = np.repeat(np.arange(rows, dtype=np.float32), cols).reshape(rows, cols)
|
||||
cop_x = float(np.sum(frame2d * x_grid) / total_pressure)
|
||||
cop_y = float(np.sum(frame2d * y_grid) / total_pressure)
|
||||
return cop_x, cop_y
|
||||
|
||||
def _filter_cop(self, cop_x: float, cop_y: float) -> Tuple[float, float]:
|
||||
alpha = self.cfg.cop_lpf_alpha
|
||||
|
||||
if alpha <= 0.0:
|
||||
return cop_x, cop_y
|
||||
|
||||
if self.filtered_cop_x is None or self.filtered_cop_y is None:
|
||||
self.filtered_cop_x = cop_x
|
||||
self.filtered_cop_y = cop_y
|
||||
else:
|
||||
self.filtered_cop_x = alpha * cop_x + (1.0 - alpha) * self.filtered_cop_x
|
||||
self.filtered_cop_y = alpha * cop_y + (1.0 - alpha) * self.filtered_cop_y
|
||||
|
||||
return self.filtered_cop_x, self.filtered_cop_y
|
||||
|
||||
def _update_state_machine(self, cop_x: float, cop_y: float, timestamp_ms: float):
|
||||
if self.state == CoPState.NO_CONTACT:
|
||||
self.state = CoPState.INIT_COLLECTING
|
||||
self.init_start_ms = timestamp_ms
|
||||
self.init_x_buf.clear()
|
||||
self.init_y_buf.clear()
|
||||
|
||||
if self.state == CoPState.INIT_COLLECTING:
|
||||
self.init_x_buf.append(cop_x)
|
||||
self.init_y_buf.append(cop_y)
|
||||
|
||||
if self.init_start_ms is None:
|
||||
self.init_start_ms = timestamp_ms
|
||||
|
||||
if timestamp_ms - self.init_start_ms >= self.cfg.init_collect_ms:
|
||||
base_x = float(np.median(self.init_x_buf))
|
||||
base_y = float(np.median(self.init_y_buf))
|
||||
base_x, base_y = self._apply_center_snap(base_x, base_y)
|
||||
|
||||
self.first_contact_cop_x = base_x
|
||||
self.first_contact_cop_y = base_y
|
||||
self.post_start_ms = timestamp_ms
|
||||
self.post_cand_x = None
|
||||
self.post_cand_y = None
|
||||
self.post_stable_start_ms = None
|
||||
|
||||
if self.cfg.post_refine_enable:
|
||||
self.state = CoPState.POST_REFINING
|
||||
else:
|
||||
self.post_refined = True
|
||||
self.state = CoPState.READY
|
||||
|
||||
return
|
||||
|
||||
if self.state == CoPState.POST_REFINING:
|
||||
self._post_refine(cop_x, cop_y, timestamp_ms)
|
||||
|
||||
def _apply_center_snap(self, base_x: float, base_y: float) -> Tuple[float, float]:
|
||||
if not self.cfg.snap_enable:
|
||||
return base_x, base_y
|
||||
|
||||
if (
|
||||
abs(base_x - self.cfg.snap_center_x) <= self.cfg.snap_range_x
|
||||
and abs(base_y - self.cfg.snap_center_y) <= self.cfg.snap_range_y
|
||||
):
|
||||
return self.cfg.snap_center_x, self.cfg.snap_center_y
|
||||
|
||||
return base_x, base_y
|
||||
|
||||
def _post_refine(self, cop_x: float, cop_y: float, timestamp_ms: float):
|
||||
if self.post_start_ms is None:
|
||||
self.post_start_ms = timestamp_ms
|
||||
|
||||
if timestamp_ms - self.post_start_ms >= self.cfg.post_refine_window_ms:
|
||||
self.post_refined = True
|
||||
self.state = CoPState.READY
|
||||
return
|
||||
|
||||
if self.post_cand_x is None or self.post_cand_y is None:
|
||||
self.post_cand_x = cop_x
|
||||
self.post_cand_y = cop_y
|
||||
self.post_stable_start_ms = timestamp_ms
|
||||
return
|
||||
|
||||
dist = float(np.hypot(cop_x - self.post_cand_x, cop_y - self.post_cand_y))
|
||||
|
||||
if dist <= self.cfg.post_stable_thresh:
|
||||
if self.post_stable_start_ms is None:
|
||||
self.post_stable_start_ms = timestamp_ms
|
||||
|
||||
if timestamp_ms - self.post_stable_start_ms >= self.cfg.post_stable_ms:
|
||||
refined_x, refined_y = self._apply_center_snap(self.post_cand_x, self.post_cand_y)
|
||||
self.first_contact_cop_x = float(refined_x)
|
||||
self.first_contact_cop_y = float(refined_y)
|
||||
self.post_refined = True
|
||||
self.state = CoPState.READY
|
||||
else:
|
||||
self.post_cand_x = cop_x
|
||||
self.post_cand_y = cop_y
|
||||
self.post_stable_start_ms = timestamp_ms
|
||||
|
||||
def _make_empty_result(self, total_pressure: float) -> CoPResult:
|
||||
return CoPResult(
|
||||
cop_x=0.0,
|
||||
cop_y=0.0,
|
||||
row_min=0,
|
||||
row_max=self.cfg.rows - 1,
|
||||
col_min=0,
|
||||
col_max=self.cfg.cols - 1,
|
||||
dx=0.0,
|
||||
dy=0.0,
|
||||
base_x=0.0,
|
||||
base_y=0.0,
|
||||
magnitude=0.0,
|
||||
state=int(CoPState.NO_CONTACT),
|
||||
total_pressure=float(total_pressure),
|
||||
threshold=self.dynamic_thresh,
|
||||
angle=0.0,
|
||||
)
|
||||
|
||||
def compute_vector_angle(self, x: float, y: float) -> Tuple[float, float]:
|
||||
mag = float(np.hypot(x, y))
|
||||
angle = float(np.degrees(np.arctan2(y, x + self.cfg.epsilon)))
|
||||
|
||||
if angle < 0:
|
||||
angle += 360.0
|
||||
|
||||
return angle, mag
|
||||
|
||||
|
||||
_estimator = PressureDirectionEstimator()
|
||||
|
||||
|
||||
def reset_cop_state():
|
||||
_estimator.reset_contact_state()
|
||||
|
||||
|
||||
def reset_all_state():
|
||||
_estimator.reset_all()
|
||||
|
||||
|
||||
def compute_pressure_direction(adc_data, timestamp_ms: float):
|
||||
result = _estimator.update(adc_data, timestamp_ms)
|
||||
|
||||
return (
|
||||
result.cop_x,
|
||||
result.cop_y,
|
||||
result.row_min,
|
||||
result.row_max,
|
||||
result.col_min,
|
||||
result.col_max,
|
||||
result.dx,
|
||||
result.dy,
|
||||
result.base_x,
|
||||
result.base_y,
|
||||
result.magnitude,
|
||||
result.state,
|
||||
result.total_pressure,
|
||||
result.threshold,
|
||||
)
|
||||
|
||||
|
||||
def compute_vector_angle(x: float, y: float) -> Tuple[float, float]:
|
||||
return _estimator.compute_vector_angle(x, y)
|
||||
|
||||
|
||||
def compute_PZT_angle(Px: float, Py: float) -> Tuple[float, float]:
|
||||
return compute_vector_angle(Px, Py)
|
||||
|
||||
|
||||
def get_pzt_angle(adc_data, timestamp_ms: float):
|
||||
if len(adc_data) != ADC_LEN:
|
||||
raise ValueError(f"ADC数据长度必须为{ADC_LEN}")
|
||||
|
||||
result = _estimator.update(adc_data, timestamp_ms)
|
||||
|
||||
return (
|
||||
result.angle,
|
||||
result.magnitude,
|
||||
result.state,
|
||||
result.cop_x,
|
||||
result.cop_y,
|
||||
result.base_x,
|
||||
result.base_y,
|
||||
result.total_pressure,
|
||||
result.threshold,
|
||||
)
|
||||
|
||||
|
||||
def reset_baseline():
|
||||
reset_all_state()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user