{panelCode}
-{resolvedTitle}
-{panelCode}
+{resolvedTitle}
+{i18n.heading}
- {#if showMetrics} - - - {:else} - - - {/if} -diff --git a/devkit/__pycache__/sensor_server.cpython-313.pyc b/devkit/__pycache__/sensor_server.cpython-313.pyc index b94a2bb..35e9c36 100644 Binary files a/devkit/__pycache__/sensor_server.cpython-313.pyc and b/devkit/__pycache__/sensor_server.cpython-313.pyc differ diff --git a/devkit/sensor_server.py b/devkit/sensor_server.py index e138e2f..cff62d1 100644 --- a/devkit/sensor_server.py +++ b/devkit/sensor_server.py @@ -906,8 +906,361 @@ class PressureDirectionEstimator: return angle, mag +@dataclass +class LocalForceResult: + angle: float + magnitude: float + planar_x: float + planar_y: float + confidence: float + contact_active: bool + reportable: bool + total_pressure: float + peak: float + cop_x: float + cop_y: float + threshold: float -_estimator = PressureDirectionEstimator() + +class LocalTangentialForceEstimator: + CONTACT_ENTER_TOTAL_THRESHOLD = 520.0 + CONTACT_ENTER_PEAK_THRESHOLD = 50.0 + CONTACT_EXIT_TOTAL_THRESHOLD = 260.0 + CONTACT_EXIT_PEAK_THRESHOLD = 28.0 + CONTACT_ENTER_FRAMES_REQUIRED = 2 + CONTACT_EXIT_FRAMES_REQUIRED = 8 + + BASELINE_IDLE_ALPHA = 0.035 + BASELINE_BOOTSTRAP_ALPHA = 1.0 + BASELINE_NOISE_FLOOR = 5.0 + + ACTIVE_CELL_MIN_VALUE = 18.0 + ACTIVE_CELL_PEAK_RATIO = 0.14 + MIN_ACTIVE_CELLS = 3 + + VECTOR_SMOOTHING_ALPHA = 0.16 + REPORT_MAGNITUDE_ENTER = 0.12 + REPORT_MAGNITUDE_EXIT = 0.045 + REPORT_CONFIDENCE_ENTER = 0.14 + REPORT_CONFIDENCE_EXIT = 0.06 + REPORT_HOLD_FRAMES = 10 + + ASYMMETRY_WEIGHT = 1.1 + DRIFT_WEIGHT = 0.65 + MOTION_WEIGHT = 0.25 + EDGE_ASYMMETRY_DAMPING = 0.35 + EDGE_INWARD_ROLLING_BIAS = 0.55 + EDGE_START_COP_THRESHOLD = 0.45 + EDGE_START_BIAS_WEIGHT = 1.1 + ROLLING_FRICTION_ALPHA = 0.68 + ROLLING_FRICTION_MIN_MAGNITUDE = 0.05 + + def __init__(self): + self.reset_all() + + def reset_all(self): + self.baseline_frame = None + self.reset_contact_state() + + def reset_contact_state(self): + self.contact_active = False + self.contact_enter_counter = 0 + self.contact_exit_counter = 0 + self.anchor_cop_x = None + self.anchor_cop_y = None + self.last_cop_x = None + self.last_cop_y = None + self.edge_start_bias_x = 0.0 + self.edge_start_bias_y = 0.0 + self.smoothed_x = 0.0 + self.smoothed_y = 0.0 + self.report_active = False + self.report_hold_counter = 0 + self.held_report = None + + def update(self, adc_data, timestamp_ms: float) -> LocalForceResult: + raw = np.asarray(adc_data, dtype=np.float32).flatten() + if len(raw) != ADC_LEN: + raise ValueError(f"ADC data length must be {ADC_LEN}") + + baseline_subtracted = self._subtract_baseline(raw) + if not self._update_contact_state(raw, baseline_subtracted): + return self._inactive_result(float(np.sum(baseline_subtracted)), float(np.max(baseline_subtracted, initial=0.0))) + + stats = self._compute_contact_stats(baseline_subtracted) + if stats is None: + return self._stabilize_report(self._weak_contact_result(float(np.sum(baseline_subtracted)), float(np.max(baseline_subtracted, initial=0.0)))) + + if self.anchor_cop_x is None: + self.anchor_cop_x = stats["cop_x"] + self.anchor_cop_y = stats["cop_y"] + self.last_cop_x = stats["cop_x"] + self.last_cop_y = stats["cop_y"] + self.edge_start_bias_x, self.edge_start_bias_y = self._edge_start_bias(stats) + return self._stabilize_report(self._weak_contact_result(stats["total"], stats["peak"], stats["cop_x"], stats["cop_y"])) + + anchor_x = self.anchor_cop_x + anchor_y = self.anchor_cop_y if self.anchor_cop_y is not None else stats["cop_y"] + last_x = self.last_cop_x if self.last_cop_x is not None else stats["cop_x"] + last_y = self.last_cop_y if self.last_cop_y is not None else stats["cop_y"] + + drift_x = stats["cop_x"] - anchor_x + drift_y = stats["cop_y"] - anchor_y + motion_x = stats["cop_x"] - last_x + motion_y = stats["cop_y"] - last_y + + kinematic_x = drift_x * self.DRIFT_WEIGHT + motion_x * self.MOTION_WEIGHT + kinematic_y = drift_y * self.DRIFT_WEIGHT + motion_y * self.MOTION_WEIGHT + asymmetry_x, asymmetry_y = self._damp_edge_asymmetry( + stats, + kinematic_x + self.edge_start_bias_x, + kinematic_y + self.edge_start_bias_y, + ) + + combined_x = asymmetry_x + kinematic_x + self.edge_start_bias_x + combined_y = asymmetry_y + kinematic_y + self.edge_start_bias_y + combined_x, combined_y = self._apply_rolling_friction( + self.smoothed_x, + self.smoothed_y, + combined_x, + combined_y, + ) + + self.smoothed_x += (combined_x - self.smoothed_x) * self.VECTOR_SMOOTHING_ALPHA + self.smoothed_y += (combined_y - self.smoothed_y) * self.VECTOR_SMOOTHING_ALPHA + self.last_cop_x = stats["cop_x"] + self.last_cop_y = stats["cop_y"] + + planar_x = self.smoothed_x + planar_y = -self.smoothed_y + angle, magnitude = self.compute_vector_angle(planar_x, planar_y) + + active_span_rows = (stats["max_row"] - stats["min_row"] + 1) / SENSOR_ROWS + active_span_cols = (stats["max_col"] - stats["min_col"] + 1) / SENSOR_COLS + activity = min(max(stats["active_cells"] / ADC_LEN, 0.0), 1.0) + span = min(max((active_span_rows + active_span_cols) * 0.5, 0.0), 1.0) + pressure_ratio = min(max(stats["active_total"] / max(stats["total"], 1.0), 0.0), 1.0) + peak_ratio = min(max(stats["peak"] / (stats["total"] / stats["active_cells"] + 1.0), 0.0), 1.0) + confidence = min(max(activity * 0.35 + span * 0.2 + pressure_ratio * 0.3 + peak_ratio * 0.15, 0.0), 1.0) + + return self._stabilize_report(LocalForceResult( + angle=angle, + magnitude=magnitude, + planar_x=planar_x, + planar_y=planar_y, + confidence=confidence, + contact_active=True, + reportable=False, + total_pressure=stats["total"], + peak=stats["peak"], + cop_x=stats["cop_x"], + cop_y=stats["cop_y"], + threshold=self.CONTACT_ENTER_TOTAL_THRESHOLD, + )) + + def _update_idle_baseline(self, raw_frame, alpha: float): + if self.baseline_frame is None: + self.baseline_frame = np.array(raw_frame, dtype=np.float32).copy() + return + self.baseline_frame += (raw_frame - self.baseline_frame) * alpha + + def _subtract_baseline(self, raw_frame): + if self.baseline_frame is None: + self._update_idle_baseline(raw_frame, self.BASELINE_BOOTSTRAP_ALPHA) + diff = raw_frame - self.baseline_frame - self.BASELINE_NOISE_FLOOR + return np.clip(diff, 0, None) + + def _pressure_metrics(self, frame): + return float(np.sum(frame)), float(np.max(frame, initial=0.0)) + + def _update_contact_state(self, raw_frame, frame) -> bool: + total, peak = self._pressure_metrics(frame) + enter = total >= self.CONTACT_ENTER_TOTAL_THRESHOLD and peak >= self.CONTACT_ENTER_PEAK_THRESHOLD + exit_frame = total <= self.CONTACT_EXIT_TOTAL_THRESHOLD or peak <= self.CONTACT_EXIT_PEAK_THRESHOLD + + if self.contact_active: + if exit_frame: + self.contact_exit_counter += 1 + if self.contact_exit_counter >= self.CONTACT_EXIT_FRAMES_REQUIRED: + self._update_idle_baseline(raw_frame, self.BASELINE_IDLE_ALPHA) + self.reset_contact_state() + return False + else: + self.contact_exit_counter = 0 + return True + + if enter: + self.contact_enter_counter += 1 + if self.contact_enter_counter >= self.CONTACT_ENTER_FRAMES_REQUIRED: + self.contact_active = True + self.contact_enter_counter = 0 + self.contact_exit_counter = 0 + return True + return False + + self.contact_enter_counter = 0 + self._update_idle_baseline(raw_frame, self.BASELINE_IDLE_ALPHA) + return False + + def _compute_contact_stats(self, frame): + total, peak = self._pressure_metrics(frame) + if total <= 0.0 or peak <= 0.0: + return None + + active_threshold = max(peak * self.ACTIVE_CELL_PEAK_RATIO, self.ACTIVE_CELL_MIN_VALUE) + frame2d = np.asarray(frame, dtype=np.float32).reshape(SENSOR_ROWS, SENSOR_COLS) + active_mask = frame2d >= active_threshold + active_cells = int(np.count_nonzero(active_mask)) + if active_cells < self.MIN_ACTIVE_CELLS: + return None + + active_values = frame2d[active_mask] + active_total = float(np.sum(active_values)) + if active_total <= 0.0: + return None + + rows, cols = np.nonzero(active_mask) + cop_x = float(np.sum(active_values * cols) / active_total) + cop_y = float(np.sum(active_values * rows) / active_total) + min_row, max_row = int(np.min(rows)), int(np.max(rows)) + min_col, max_col = int(np.min(cols)), int(np.max(cols)) + bbox_center_x = (min_col + max_col) * 0.5 + bbox_center_y = (min_row + max_row) * 0.5 + half_width = max(max_col - min_col, 1) * 0.5 + half_height = max(max_row - min_row, 1) * 0.5 + asymmetry_x = float(np.sum(active_values * ((cols - bbox_center_x) / half_width)) / active_total) + asymmetry_y = float(np.sum(active_values * ((rows - bbox_center_y) / half_height)) / active_total) + + return { + "total": total, + "peak": peak, + "active_total": active_total, + "active_cells": active_cells, + "min_row": min_row, + "max_row": max_row, + "min_col": min_col, + "max_col": max_col, + "cop_x": cop_x, + "cop_y": cop_y, + "asymmetry_x": asymmetry_x, + "asymmetry_y": asymmetry_y, + } + + def _contact_touches_edge(self, stats) -> bool: + return ( + stats["min_row"] == 0 + or stats["max_row"] == SENSOR_ROWS - 1 + or stats["min_col"] == 0 + or stats["max_col"] == SENSOR_COLS - 1 + ) + + def _damp_edge_asymmetry(self, stats, kinematic_x: float, kinematic_y: float): + asymmetry_x = stats["asymmetry_x"] * self.ASYMMETRY_WEIGHT + asymmetry_y = stats["asymmetry_y"] * self.ASYMMETRY_WEIGHT + + if stats["min_col"] == 0 and asymmetry_x < 0.0: + asymmetry_x = -asymmetry_x * self.EDGE_INWARD_ROLLING_BIAS + if stats["max_col"] == SENSOR_COLS - 1 and asymmetry_x > 0.0: + asymmetry_x = -asymmetry_x * self.EDGE_INWARD_ROLLING_BIAS + if stats["min_row"] == 0 and asymmetry_y < 0.0: + asymmetry_y = -asymmetry_y * self.EDGE_INWARD_ROLLING_BIAS + if stats["max_row"] == SENSOR_ROWS - 1 and asymmetry_y > 0.0: + asymmetry_y = -asymmetry_y * self.EDGE_INWARD_ROLLING_BIAS + + opposing_dot = asymmetry_x * kinematic_x + asymmetry_y * kinematic_y + kinematic_mag = float(np.hypot(kinematic_x, kinematic_y)) + if self._contact_touches_edge(stats) and opposing_dot < 0.0 and kinematic_mag >= self.ROLLING_FRICTION_MIN_MAGNITUDE: + asymmetry_x *= self.EDGE_ASYMMETRY_DAMPING + asymmetry_y *= self.EDGE_ASYMMETRY_DAMPING + + return asymmetry_x, asymmetry_y + + def _edge_start_bias(self, stats): + center_x = (SENSOR_COLS - 1) * 0.5 + center_y = (SENSOR_ROWS - 1) * 0.5 + normalized_x = min(max((stats["cop_x"] - center_x) / max(center_x, 1.0), -1.0), 1.0) + normalized_y = min(max((stats["cop_y"] - center_y) / max(center_y, 1.0), -1.0), 1.0) + bias_x = self._edge_start_axis_bias(normalized_x) if stats["min_col"] == 0 or stats["max_col"] == SENSOR_COLS - 1 else 0.0 + bias_y = self._edge_start_axis_bias(normalized_y) if stats["min_row"] == 0 or stats["max_row"] == SENSOR_ROWS - 1 else 0.0 + return bias_x, bias_y + + def _edge_start_axis_bias(self, normalized_axis: float) -> float: + distance = abs(normalized_axis) + if distance <= self.EDGE_START_COP_THRESHOLD: + return 0.0 + strength = min(max((distance - self.EDGE_START_COP_THRESHOLD) / (1.0 - self.EDGE_START_COP_THRESHOLD), 0.0), 1.0) + return -np.sign(normalized_axis) * strength * self.EDGE_START_BIAS_WEIGHT + + def _apply_rolling_friction(self, previous_x: float, previous_y: float, current_x: float, current_y: float): + previous_mag = float(np.hypot(previous_x, previous_y)) + current_mag = float(np.hypot(current_x, current_y)) + if previous_mag < self.ROLLING_FRICTION_MIN_MAGNITUDE or current_mag < self.ROLLING_FRICTION_MIN_MAGNITUDE: + return current_x, current_y + + dot = previous_x * current_x + previous_y * current_y + if dot >= 0.0: + return current_x, current_y + + mixed_x = current_x * (1.0 - self.ROLLING_FRICTION_ALPHA) + previous_x * self.ROLLING_FRICTION_ALPHA + mixed_y = current_y * (1.0 - self.ROLLING_FRICTION_ALPHA) + previous_y * self.ROLLING_FRICTION_ALPHA + if mixed_x * previous_x + mixed_y * previous_y >= 0.0: + return mixed_x, mixed_y + + keep_mag = min(previous_mag, current_mag) * 0.5 + return previous_x / previous_mag * keep_mag, previous_y / previous_mag * keep_mag + + def _inactive_result(self, total_pressure=0.0, peak=0.0): + return LocalForceResult(0.0, 0.0, 0.0, 0.0, 0.0, False, False, total_pressure, peak, 0.0, 0.0, self.CONTACT_ENTER_TOTAL_THRESHOLD) + + def _weak_contact_result(self, total_pressure=0.0, peak=0.0, cop_x=0.0, cop_y=0.0): + return LocalForceResult(0.0, 0.0, 0.0, 0.0, 0.0, True, False, total_pressure, peak, cop_x, cop_y, self.CONTACT_ENTER_TOTAL_THRESHOLD) + + def _store_report(self, result: LocalForceResult): + result.reportable = True + self.report_active = True + self.report_hold_counter = 0 + self.held_report = result + return result + + def _hold_or_drop_report(self): + if self.report_active and self.report_hold_counter < self.REPORT_HOLD_FRAMES and self.held_report is not None: + self.report_hold_counter += 1 + held = self.held_report + held.reportable = True + return held + self.report_active = False + self.report_hold_counter = 0 + self.held_report = None + return self._weak_contact_result() + + def _stabilize_report(self, result: LocalForceResult): + if not result.contact_active: + self.report_active = False + self.report_hold_counter = 0 + self.held_report = None + return result + + can_enter = result.magnitude >= self.REPORT_MAGNITUDE_ENTER and result.confidence >= self.REPORT_CONFIDENCE_ENTER + can_stay = result.magnitude >= self.REPORT_MAGNITUDE_EXIT and result.confidence >= self.REPORT_CONFIDENCE_EXIT + if self.report_active: + if can_stay: + return self._store_report(result) + return self._hold_or_drop_report() + if can_enter: + return self._store_report(result) + return result + + def compute_vector_angle(self, x: float, y: float) -> Tuple[float, float]: + magnitude = float(np.hypot(x, y)) + if magnitude <= np.finfo(np.float32).eps: + return 0.0, 0.0 + angle = float(np.degrees(np.arctan2(y, x))) + if angle < 0.0: + angle += 360.0 + return angle, magnitude + + +_estimator = LocalTangentialForceEstimator() def reset_cop_state(): @@ -924,16 +1277,16 @@ def compute_pressure_direction(adc_data, timestamp_ms: float): return ( result.cop_x, result.cop_y, - result.row_min, - result.row_max, - result.col_min, - result.col_max, - result.dx, - result.dy, - result.base_x, - result.base_y, + 0, + SENSOR_ROWS - 1, + 0, + SENSOR_COLS - 1, + result.planar_x, + result.planar_y, + 0.0, + 0.0, result.magnitude, - result.state, + 1 if result.reportable else 0, result.total_pressure, result.threshold, ) @@ -956,11 +1309,11 @@ def get_pzt_angle(adc_data, timestamp_ms: float): return ( result.angle, result.magnitude, - result.state, + 1 if result.reportable else 0, result.cop_x, result.cop_y, - result.base_x, - result.base_y, + 0.0, + 0.0, result.total_pressure, result.threshold, ) diff --git a/src/lib/components/CenterStage.svelte b/src/lib/components/CenterStage.svelte index 2a2df49..285c49c 100644 --- a/src/lib/components/CenterStage.svelte +++ b/src/lib/components/CenterStage.svelte @@ -29,7 +29,6 @@ export let summary: HudSummary; export let pressureMatrix: number[] | null = null; export let spatialForce: HudSpatialForce | null = null; - export let devkitSpatialForce: HudSpatialForce | null = null; export let showConfigPanel = false; export let configPanelTitle = ""; export let configPanelHint = ""; @@ -278,41 +277,25 @@ {/each} -
{panelCode}
-{resolvedTitle}
-{panelCode}
+{resolvedTitle}
+{i18n.heading}
- {#if showMetrics} - - - {:else} - - - {/if} -