Files
JE-Skin/src-tauri/src/serial_core/serial.rs

587 lines
19 KiB
Rust

use crate::serial_core::calibration_session::*;
use crate::serial_core::codec::Codec;
use crate::serial_core::codecs::tactile_a::TactileACodec;
use crate::serial_core::frame::{FrameHandler, TactileAFrame, TestFrame};
use crate::serial_core::model::{HudChartState, HudPacket};
use crate::serial_core::record::Recording;
use crate::serial_core::record::{FrameTiming, RecordedFrame};
use crate::serial_core::sensor_runtime::SensorRuntimeFilter;
use anyhow::Result;
use log::info;
use std::fs::File;
use std::future::pending;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tauri::{AppHandle, Emitter};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::{self, Duration, MissedTickBehavior};
use tokio_serial::SerialStream;
use tokio_util::sync::CancellationToken;
const DEFAULT_TACTILE_COLS: usize = 7;
const DEFAULT_TACTILE_ROWS: usize = 12;
const DEFAULT_TACTILE_POLL_INTERVAL_MS: u64 = 10;
const DEFAULT_TACTILE_REPLY_TIMEOUT_MS: u64 = 140;
use crate::serial_core::codecs::tactile_a::TactileAHandler;
pub enum PollMode<F> {
Disable,
Enabled(Box<dyn PollRequester<F>>),
}
pub trait SerialFrame: Clone + Send + 'static {
fn dts_ms(&self) -> u64;
fn to_hud_packet(
&self,
chart_state: &mut HudChartState,
display_values: Option<&[i32]>,
) -> Option<HudPacket>;
}
impl SerialFrame for TestFrame {
fn dts_ms(&self) -> u64 {
self.dts_ms
}
fn to_hud_packet(
&self,
chart_state: &mut HudChartState,
display_values: Option<&[i32]>,
) -> Option<HudPacket> {
Some(chart_state.apply_frame(self, display_values))
}
}
impl SerialFrame for TactileAFrame {
fn dts_ms(&self) -> u64 {
match self {
TactileAFrame::Req(_) => 0,
TactileAFrame::Rep(rep) => rep.dts_ms,
}
}
fn to_hud_packet(
&self,
chart_state: &mut HudChartState,
display_values: Option<&[i32]>,
) -> Option<HudPacket> {
match self {
TactileAFrame::Req(_) => None,
TactileAFrame::Rep(rep) => {
let proxy = TestFrame {
header: rep.meta.header,
cmd: rep.meta.func_code,
length: rep.meta.except_data_len,
payload: rep.payload.clone(),
checksum: rep.meta.checksum,
dts_ms: rep.dts_ms,
};
Some(chart_state.apply_frame(&proxy, display_values))
}
}
}
}
pub trait PollRequester<F>: Send {
fn poll_interval(&self) -> Option<Duration> {
None
}
fn should_request(&mut self) -> bool {
true
}
fn next_request(&mut self) -> Result<Option<F>> {
Ok(None)
}
fn on_rx_frame(&mut self, _frame: &F) {}
}
#[derive(Default)]
pub struct NoopPollRequester;
impl<F> PollRequester<F> for NoopPollRequester {}
pub struct TactileAPollRequester {
period: Duration,
cols: usize,
rows: usize,
awaiting_reply: bool,
last_request_at: Option<Instant>,
reply_timeout: Duration,
}
impl TactileAPollRequester {
pub fn new(period: Duration, cols: usize, rows: usize, reply_timeout: Duration) -> Self {
Self {
period,
cols,
rows,
awaiting_reply: false,
last_request_at: None,
reply_timeout,
}
}
}
impl PollRequester<TactileAFrame> for TactileAPollRequester {
fn poll_interval(&self) -> Option<Duration> {
Some(self.period)
}
fn should_request(&mut self) -> bool {
if !self.awaiting_reply {
return true;
}
let timed_out = self
.last_request_at
.map(|t| t.elapsed() >= self.reply_timeout)
.unwrap_or(false);
if timed_out {
self.awaiting_reply = false;
self.last_request_at = None;
return true;
}
false
}
fn next_request(&mut self) -> Result<Option<TactileAFrame>> {
let req = TactileACodec::build_req_frame(self.cols, self.rows)?;
self.awaiting_reply = true;
self.last_request_at = Some(Instant::now());
Ok(Some(req))
}
fn on_rx_frame(&mut self, frame: &TactileAFrame) {
if matches!(frame, TactileAFrame::Rep(_)) {
self.awaiting_reply = false;
self.last_request_at = None
}
}
}
pub async fn run_serial<C, H, T, F>(
app: AppHandle,
port: SerialStream,
codec: C,
handler: H,
session_started_at: Instant,
recording: Arc<Mutex<Recording<F>>>,
cancel: CancellationToken,
) -> Result<()>
where
F: SerialFrame,
C: Codec<F> + Send + 'static,
H: FrameHandler<F, T> + Send + 'static,
T: Into<i32>,
{
run_serial_with_poll(
app,
port,
codec,
handler,
session_started_at,
recording,
cancel,
PollMode::Disable,
)
.await
}
pub async fn run_serial_with_poll<C, H, T, F>(
app: AppHandle,
mut port: SerialStream,
mut codec: C,
mut handler: H,
session_started_at: Instant,
recording: Arc<Mutex<Recording<F>>>,
cancel: CancellationToken,
poll_mode: PollMode<F>,
) -> Result<()>
where
F: SerialFrame,
C: Codec<F> + Send + 'static,
H: FrameHandler<F, T> + Send + 'static,
T: Into<i32>,
{
let mut sensor_runtime =
SensorRuntimeFilter::new().map_err(|error| anyhow::anyhow!(error))?;
let mut requester = match poll_mode {
PollMode::Disable => None,
PollMode::Enabled(r) => Some(r),
};
let mut poll_interval = requester.as_ref().and_then(|r| r.poll_interval()).map(|d| {
let mut it = time::interval(d);
it.set_missed_tick_behavior(MissedTickBehavior::Skip);
it
});
let mut chart_state = HudChartState::new();
let mut buffer = [0u8; 1024];
let mut prune_interval = time::interval(Duration::from_millis(450));
prune_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = async {
match poll_interval.as_mut() {
Some(it) => {
it.tick().await;
}
None => pending::<()>().await,
}
} => {
if let Some(r) = requester.as_mut() {
if r.should_request() {
if let Some(req) = r.next_request()? {
let bytes = codec.encode(&req)?;
// debug!("send {:02X?}", bytes);
port.write_all(&bytes).await?;
}
}
}
}
_ = prune_interval.tick() => {
if let Some(packet) = chart_state.prune_stale() {
app.emit("hud_stream", packet)?;
}
}
read_result = port.read(&mut buffer) => {
let n = read_result?;
if n == 0 {
// Some serial drivers can resolve reads with 0 bytes repeatedly.
// Yield here so timer-driven poll requests are not starved by a busy loop.
tokio::task::yield_now().await;
continue;
}
let frames = codec.decode(&buffer[..n], session_started_at)?;
for frame in frames {
if let Some(r) = requester.as_mut() {
r.on_rx_frame(&frame);
}
let decode_res = handler
.on_frame(&frame)
.await?
.map(|vals| vals.into_iter().map(Into::into).collect::<Vec<i32>>());
let mut record = recording.lock().map_err(|_| anyhow::anyhow!("recording state poisoned"))?;
record.push(RecordedFrame{
timing: FrameTiming { pts_ms: None, dts_ms: frame.dts_ms() },
frame: frame.clone(),
});
let display_values = if let Some(vals) = decode_res.as_ref() {
let raw_summary = vals.iter().copied().sum::<i32>();
let raw_force_g = raw_to_g1(raw_summary as u32);
let stable_force_g =
sensor_runtime.process_sample_with_dts(raw_summary as f64, frame.dts_ms());
info!("raw force(g) = {raw_force_g:.3}, stable force(g) = {stable_force_g:.3}");
chart_state.record_summary(stable_force_g as f32);
chart_state.record_pressure_matrix(vals.as_slice());
Some(vec![stable_force_g.round() as i32])
} else {
None
};
if let Some(packet) = frame.to_hud_packet(&mut chart_state, display_values.as_deref()) {
app.emit("hud_stream", packet)?;
}
}
}
}
}
Ok(())
}
fn raw_to_g1(raw: u32) -> f64 {
const X: [u32; 11] = [
0, 74602, 105503, 131459, 153512, 172041, 193794, 218947, 240580, 295118, 332346,
];
const Y: [f64; 11] = [
0.0, 160.0, 260.0, 360.0, 460.0, 560.0, 660.0, 860.0, 1060.0, 1560.0, 2060.0,
];
let n = X.len();
if raw <= X[0] {
return Y[0] / 100.0;
}
if raw >= X[n - 1] {
return Y[n - 1] / 100.0;
}
let mut left = 0;
let mut right = n - 1;
while left + 1 < right {
let mid = (left + right) / 2;
if raw < X[mid] {
right = mid;
} else {
left = mid;
}
}
let ratio = (raw - X[left]) as f64 / (X[right] - X[left]) as f64;
Y[left] + ratio * (Y[right] - Y[left])
}
pub async fn run_serial_with_calibration(
app: AppHandle,
mut port: SerialStream,
session_started_at: Instant,
cancel: CancellationToken,
calibration_session: SharedCalibrationSession,
) -> Result<()> {
info!("run_serial_with_calibration begin");
emit_calibration_status(&app, &calibration_session)?;
let run_result = async {
let mut codec = TactileACodec::new(DEFAULT_TACTILE_COLS, DEFAULT_TACTILE_ROWS);
let mut handler = TactileAHandler;
let mut requester = TactileAPollRequester::new(
Duration::from_millis(DEFAULT_TACTILE_POLL_INTERVAL_MS),
DEFAULT_TACTILE_COLS,
DEFAULT_TACTILE_ROWS,
Duration::from_millis(DEFAULT_TACTILE_REPLY_TIMEOUT_MS),
);
let mut poll_interval =
time::interval(Duration::from_millis(DEFAULT_TACTILE_POLL_INTERVAL_MS));
poll_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut buffer = [0u8; 1024];
let recording = Arc::new(Mutex::new(Recording::new()));
let mut chart_state = HudChartState::new();
let mut prune_interval = time::interval(Duration::from_millis(450));
prune_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut next_round_at: Option<Instant> = None;
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = poll_interval.tick() => {
if let Some(deadline) = next_round_at {
if Instant::now() >= deadline {
next_round_at = None;
begin_next_calibration_round(&app, &calibration_session)?;
}
}
if calibration_is_collecting(&calibration_session)?
&& requester.should_request()
{
if let Some(req) = requester.next_request()? {
let bytes = codec.encode(&req)?;
port.write_all(&bytes).await?;
}
}
}
_ = prune_interval.tick() => {
if let Some(packet) = chart_state.prune_stale() {
app.emit("hud_stream", packet)?;
}
}
read_result = port.read(&mut buffer) => {
let n = read_result?;
if n == 0 {
tokio::task::yield_now().await;
continue;
}
let frames = codec.decode(&buffer[..n], session_started_at)?;
for frame in frames {
requester.on_rx_frame(&frame);
let decode_res = handler
.on_frame(&frame)
.await?
.map(|vals| vals.into_iter().map(Into::into).collect::<Vec<i32>>());
let display_values = if let Some(vals) = decode_res.as_ref() {
let summary = vals.iter().copied().sum::<i32>();
let val_summary = summary - vals[vals.len() - 1];
if val_summary < 8400 {
continue;
}
chart_state.record_summary(summary as f32);
chart_state.record_pressure_matrix(vals.as_slice());
Some(vec![summary])
} else {
None
};
let recorded_frame = RecordedFrame {
timing: FrameTiming { pts_ms: None, dts_ms: frame.dts_ms() },
frame: frame.clone(),
};
{
let mut record = recording
.lock()
.map_err(|_| anyhow::anyhow!("recording state poisoned"))?;
record.push(recorded_frame.clone());
}
if let Some(packet) = frame.to_hud_packet(&mut chart_state, display_values.as_deref()) {
app.emit("hud_stream", packet)?;
}
let should_export = {
let mut session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
session.add_frame(recorded_frame)
};
if should_export {
let current_round = {
let session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
session.current_round
};
export_calibration_data(&app, current_round, &recording).await?;
let (progress, round_interval_ms) = {
let mut session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
session.export_completed();
(session.get_progress(), session.round_interval_ms)
};
app.emit("calibration_status", progress.clone())?;
if let Ok(mut record) = recording.lock() {
record.frames.clear();
}
if progress.state == CalibrationState::Completed {
return Ok(());
}
if round_interval_ms == 0 {
begin_next_calibration_round(&app, &calibration_session)?;
} else {
next_round_at =
Some(Instant::now() + Duration::from_millis(round_interval_ms));
}
}
}
}
}
}
Ok(())
}
.await;
if cancel.is_cancelled() {
stop_calibration_session(&app, &calibration_session)?;
}
run_result
}
fn calibration_is_collecting(calibration_session: &SharedCalibrationSession) -> Result<bool> {
let session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
Ok(session.state == CalibrationState::CollectingData)
}
fn begin_next_calibration_round(
app: &AppHandle,
calibration_session: &SharedCalibrationSession,
) -> Result<()> {
{
let mut session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
if session.state == CalibrationState::WaitingForNextRound {
session
.begin_next_round()
.map_err(|error| anyhow::anyhow!(error))?;
}
}
emit_calibration_status(app, calibration_session)
}
fn stop_calibration_session(
app: &AppHandle,
calibration_session: &SharedCalibrationSession,
) -> Result<()> {
{
let mut session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
if session.state != CalibrationState::Completed {
session.stop();
}
}
emit_calibration_status(app, calibration_session)
}
fn emit_calibration_status(
app: &AppHandle,
calibration_session: &SharedCalibrationSession,
) -> Result<()> {
let progress = {
let session = calibration_session
.lock()
.map_err(|_| anyhow::anyhow!("calibration session poisoned"))?;
session.get_progress()
};
app.emit("calibration_status", progress)?;
Ok(())
}
use crate::serial_core::codecs::tactile_a::TactileACsvExporter;
use crate::serial_core::record::write_csv;
use std::time::{SystemTime, UNIX_EPOCH};
use tauri::Manager;
async fn export_calibration_data(
app: &AppHandle,
current_round: usize,
recording: &Arc<Mutex<Recording<TactileAFrame>>>,
) -> Result<()> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or_default();
let filename = format!("calibration_round{}_{}.csv", current_round, timestamp);
let mut output_dir = match app.path().desktop_dir() {
Ok(path) => path,
Err(_) => std::env::current_dir()?,
};
output_dir.push("calibration_data");
std::fs::create_dir_all(&output_dir)?;
let output_path = output_dir.join(&filename);
let file = File::create(&output_path)?;
let recording_lock = recording
.lock()
.map_err(|_| anyhow::anyhow!("Recording poisoned"))?;
let exporter = TactileACsvExporter::with_coarse_calibration(
DEFAULT_TACTILE_COLS * DEFAULT_TACTILE_ROWS,
7 * 12 * 10,
);
write_csv(&recording_lock, &exporter, file)?;
Ok(())
}