feat: 切向力算法更新 + AD反解x模块

This commit is contained in:
lenn
2026-05-25 14:44:31 +08:00
parent e52c86ea1a
commit 011bfe2450
35 changed files with 31833 additions and 79 deletions

127
devkit/test_pzt.py Normal file
View File

@@ -0,0 +1,127 @@
"""
独立测试脚本读取84个原始ADC数据传入CoP算法计算角度终端打印结果。
用法:
python test_pzt.py # 从 stdin 逐行读取每行84个逗号分隔数值
python test_pzt.py data.csv # 从 CSV 文件逐行读取
python test_pzt.py --random # 生成随机测试数据(调试用)
"""
import sys
import csv
import numpy as np
# ── 从 sensor_server.py 导入算法 ──
sys.path.insert(0, ".")
from sensor_server import (
get_pzt_angle,
reset_baseline,
subtract_baseline,
compute_pressure_direction,
compute_PZT_angle,
)
def print_result(data_label: str, pzt_angle: float, magnitude: float, state: int, cop_x: float, cop_y: float, base_x: float, base_y: float):
dx = cop_x - base_x
dy = base_y - cop_y
print(
f"devkit: angle={pzt_angle:.2f}, magnitude={magnitude:.4f}, state={state}, "
f"cop_x={cop_x:.4f}, cop_y={cop_y:.4f}, dx={dx:.4f}, dy={dy:.4f}"
)
def process_values(values: list[int | float]):
"""处理一帧84个值并打印结果"""
if len(values) != 84:
print(f"[ERROR] 期望84个值实际收到 {len(values)}", file=sys.stderr)
return
try:
pzt_angle, magnitude, state, cop_x, cop_y, base_x, base_y = get_pzt_angle(values)
print_result("", pzt_angle, magnitude, state, cop_x, cop_y, base_x, base_y)
except Exception as e:
print(f"[ERROR] 计算失败: {e}", file=sys.stderr)
def run_random_test():
"""生成随机数据测试算法"""
reset_baseline()
print("[TEST] 使用随机数据测试 CoP 算法")
print("[TEST] 先用全零帧建立基线...")
process_values([0] * 84)
print("[TEST] 模拟右侧偏移按压...")
# 模拟row 5-7, col 4-6 区域有压力
data = [0.0] * 84
for r in range(5, 8):
for c in range(4, 7):
idx = r * 7 + c
data[idx] = 100.0 + (c - 4) * 50 # 右侧更强
process_values(data)
print("[TEST] 模拟下方偏移按压...")
data2 = [0.0] * 84
for r in range(8, 11):
for c in range(2, 5):
idx = r * 7 + c
data2[idx] = 150.0 + (r - 8) * 30
process_values(data2)
print("[TEST] 完成")
def run_csv_mode(filepath: str):
"""从 CSV 文件逐行读取并处理"""
reset_baseline()
print(f"[CSV] 读取文件: {filepath}")
with open(filepath, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.reader(f)
for i, row in enumerate(reader):
if not row:
continue
# 跳过 header
if row[0].strip() in ("seq", "timestamp_ms"):
print(f"[CSV] 跳过 header: {row[:5]}...")
continue
try:
values = [float(v) for v in row]
if len(values) == 84:
process_values(values)
elif len(values) > 84:
process_values(values[:84])
except ValueError:
continue
def run_stdin_mode():
"""从 stdin 逐行读取"""
reset_baseline()
print("[STDIN] 等待输入每行84个逗号分隔数值Ctrl+C 退出)...")
try:
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
values = [float(v) for v in line.split(",")]
if len(values) >= 84:
process_values(values[:84])
except ValueError:
continue
except KeyboardInterrupt:
print("\n[STDIN] 已退出")
def main():
if len(sys.argv) > 1:
arg = sys.argv[1]
if arg == "--random":
run_random_test()
elif arg == "--help" or arg == "-h":
print(__doc__)
else:
run_csv_mode(arg)
else:
run_stdin_mode()
if __name__ == "__main__":
main()