diff --git a/example/cpp/main.cpp b/example/cpp/main.cpp index b4116d8..069e29c 100644 --- a/example/cpp/main.cpp +++ b/example/cpp/main.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -7,6 +8,7 @@ #include #include #include +#include #include "../../include/eskin_ffi.h" @@ -64,20 +66,34 @@ static void demo_streaming(EskinDeviceHandle device, double duration_sec = 5.0) // 线程安全队列(参考 ROS publisher 的 read_loop + publish_callback 分离模式) std::mutex mtx; std::queue queue; + std::queue> raw_queue; bool running = true; - // 读取线程:持续从设备读取 sample 放入队列 + // 读取线程:持续从设备读取 sample 和原始帧放入队列 std::thread read_thread([&]() { while (running) { CFingerSample sample; memset(&sample, 0, sizeof(sample)); auto e = eskin_read_sample(device, 50, &sample); if (e == ESkinSuccess) { + uint8_t raw_buf[512] = {}; + uint32_t raw_len = 0; + auto raw_err = eskin_read_stream_frame( + device, 1, raw_buf, sizeof(raw_buf), &raw_len); + std::vector raw_frame; + if (raw_err == ESkinSuccess) { + raw_frame.assign(raw_buf, raw_buf + std::min(raw_len, sizeof(raw_buf))); + } + std::lock_guard lock(mtx); queue.push(sample); + raw_queue.push(std::move(raw_frame)); while (queue.size() > 100) { queue.pop(); // 防止堆积 } + while (raw_queue.size() > 100) { + raw_queue.pop(); + } } // 超时等非致命错误忽略,继续读取 } @@ -96,11 +112,18 @@ static void demo_streaming(EskinDeviceHandle device, double duration_sec = 5.0) std::lock_guard lock(mtx); while (!queue.empty()) { const auto &s = queue.front(); - printf("[%5u] module=%u fx=%u fy=%u fz=%u\n", + size_t raw_len = 0; + if (!raw_queue.empty()) { + raw_len = raw_queue.front().size(); + raw_queue.pop(); + } + + printf("[%5u] module=%u fx=%u fy=%u fz=%u raw_len=%zu\n", s.sequence, s.combined_force.module, s.combined_force.force.fx, s.combined_force.force.fy, - s.combined_force.force.fz); + s.combined_force.force.fz, + raw_len); queue.pop(); count++; } @@ -145,4 +168,4 @@ int main(int argc, char *argv[]) { eskin_close(device); printf("Device closed\n"); return 0; -} \ No newline at end of file +} diff --git a/example/python/eskin_ffi.py b/example/python/eskin_ffi.py index d257725..4873fba 100644 --- a/example/python/eskin_ffi.py +++ b/example/python/eskin_ffi.py @@ -126,6 +126,11 @@ class EskinDevice: c_void_p, c_uint32, POINTER(CFingerSample) ] + lib.eskin_read_stream_frame.restype = c_uint32 + lib.eskin_read_stream_frame.argtypes = [ + c_void_p, c_uint32, POINTER(c_uint8), c_uint32, POINTER(c_uint32) + ] + lib.eskin_get_mode.restype = c_uint32 lib.eskin_get_mode.argtypes = [c_void_p, POINTER(c_uint32)] @@ -271,6 +276,17 @@ class EskinDevice: raise RuntimeError(f"read_sample failed: error={err}") return sample + def read_stream_frame(self, timeout_ms: int = 200, max_len: int = 512) -> bytes: + """读取一个 stream 原始完整协议帧(流模式下调用)""" + buf = (c_uint8 * max_len)() + actual = c_uint32(0) + err = self._lib.eskin_read_stream_frame( + self._handle, timeout_ms, buf, len(buf), ctypes.byref(actual) + ) + if err != 0: + raise RuntimeError(f"read_stream_frame failed: error={err}") + return bytes(buf[:min(actual.value, len(buf))]) + def get_mode(self) -> int: """查询当前设备模式(0=Command, 1=Streaming)""" out = c_uint32(0) @@ -283,4 +299,4 @@ class EskinDevice: return self def __exit__(self, *args): - self.close() \ No newline at end of file + self.close() diff --git a/example/python/example.py b/example/python/example.py index 5e5b7ee..c494b5d 100644 --- a/example/python/example.py +++ b/example/python/example.py @@ -33,14 +33,17 @@ def demo_streaming(dev: EskinDevice, duration_sec: float = 5.0): # 线程安全的队列(参考 ROS demo 的 read_loop + publish_callback 分离模式) queue: deque = deque(maxlen=100) + raw_queue: deque = deque(maxlen=100) running = True def read_loop(): - """独立读取线程:持续从设备读取 sample""" + """独立读取线程:持续从设备读取 sample 和原始帧""" while running: try: sample = dev.read_sample(timeout_ms=50) queue.append(sample) + raw_frame = dev.read_stream_frame(timeout_ms=1) + raw_queue.append(raw_frame) except RuntimeError: # 超时等非致命错误,继续读取 pass @@ -55,12 +58,14 @@ def demo_streaming(dev: EskinDevice, duration_sec: float = 5.0): while time.monotonic() - start < duration_sec: if queue: sample: CFingerSample = queue.popleft() + raw_frame = raw_queue.popleft() if raw_queue else b"" f = sample.combined_force.force mod = sample.combined_force.module print( f"[{sample.sequence:5d}] " f"module={mod} " - f"fx={f.fx} fy={f.fy} fz={f.fz}" + f"fx={f.fx} fy={f.fy} fz={f.fz} " + f"raw_len={len(raw_frame)}" ) count += 1 else: @@ -89,4 +94,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/include/eskin_ffi.h b/include/eskin_ffi.h index 87db4fa..71fe3a8 100644 --- a/include/eskin_ffi.h +++ b/include/eskin_ffi.h @@ -135,6 +135,13 @@ typedef struct { EskinSdkErrorCode eskin_start_stream(EskinDeviceHandle handle); EskinSdkErrorCode eskin_stop_stream(EskinDeviceHandle handle); EskinSdkErrorCode eskin_read_sample(EskinDeviceHandle handle, uint32_t timeout_ms, CFingerSample* out); +EskinSdkErrorCode eskin_read_stream_frame( + EskinDeviceHandle handle, + uint32_t timeout_ms, + uint8_t* buf, + uint32_t buf_len, + uint32_t* actual_len +); EskinSdkErrorCode eskin_get_mode(EskinDeviceHandle handle, uint32_t* out); #ifdef __cplusplus diff --git a/src/channel.rs b/src/channel.rs index 85274bb..00f90e9 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -37,6 +37,9 @@ pub struct ChannelManager { pub sample_tx: Sender, pub sample_rx: Receiver, + pub raw_frame_tx: Sender>, + pub raw_frame_rx: Receiver>, + pub cmd_tx: Sender, pub cmd_rx: Receiver, @@ -56,12 +59,15 @@ impl ChannelManager { drop_policy: DropPolicy, ) -> Self { let (sample_tx, sample_rx) = bounded(sample_capacity); + let (raw_frame_tx, raw_frame_rx) = bounded(sample_capacity); let (cmd_tx, cmd_rx) = bounded(cmd_capacity); let (event_tx, event_rx) = bounded(event_capacity); Self { sample_tx, sample_rx, + raw_frame_tx, + raw_frame_rx, cmd_tx, cmd_rx, event_tx, @@ -111,6 +117,46 @@ impl ChannelManager { }) } + pub fn send_raw_frame(&self, frame: Vec) -> Result<(), SdkError> { + match self.drop_policy { + DropPolicy::DropNewest => match self.raw_frame_tx.try_send(frame) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => { + self.record_sample_drop(); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed), + }, + DropPolicy::DropOldest => match self.raw_frame_tx.try_send(frame) { + Ok(()) => Ok(()), + Err(TrySendError::Full(frame)) => { + let _ = self.raw_frame_rx.try_recv(); + self.record_sample_drop(); + + match self.raw_frame_tx.try_send(frame) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => { + self.record_sample_drop(); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed), + } + } + Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed), + }, + } + } + + pub fn recv_raw_frame(&self, timeout_ms: u32) -> Result, SdkError> { + let timeout = std::time::Duration::from_millis(timeout_ms as u64); + self.raw_frame_rx + .recv_timeout(timeout) + .map_err(|err| match err { + RecvTimeoutError::Timeout => SdkError::Timeout, + RecvTimeoutError::Disconnected => SdkError::ChannelClosed, + }) + } + pub fn send_cmd(&self, cmd: DeviceCommand) -> Result<(), SdkError> { self.cmd_tx.try_send(cmd).map_err(|err| match err { TrySendError::Full(_) => SdkError::BufferOverflow(1), diff --git a/src/device.rs b/src/device.rs index 464e60d..dcd85cb 100644 --- a/src/device.rs +++ b/src/device.rs @@ -6,8 +6,7 @@ use crate::{ config::{DeviceConfig, DeviceInfo}, error::SdkError, protocol::{ - EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec, - ReadRequest, WriteRequest, + EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec, ReadRequest, WriteRequest, }, stream::{StreamConfig, StreamController, StreamRuntime}, transport::{SerialTransport, SharedSerialTransport}, @@ -55,7 +54,7 @@ impl EskinDeviceInner { mode: DeviceMode::Command, transport: Arc::new(Mutex::new(transport)), codec: Box::new(EskinProtocolCodec), - stream: None + stream: None, } } @@ -75,7 +74,7 @@ impl EskinDeviceInner { mode: DeviceMode::Command, transport, codec: Box::new(EskinProtocolCodec), - stream: None + stream: None, } } @@ -94,9 +93,18 @@ impl EskinDeviceInner { let remaining = deadline .checked_duration_since(std::time::Instant::now()) .unwrap_or(std::time::Duration::from_millis(1)); - debug_println!("[device] read_exact: need {} bytes, have {} so far, remaining timeout: {:?}", buf.len() - offset, offset, remaining); + debug_println!( + "[device] read_exact: need {} bytes, have {} so far, remaining timeout: {:?}", + buf.len() - offset, + offset, + remaining + ); let n = transport.read(&mut buf[offset..], Duration::from_std(remaining).unwrap())?; - debug_println!("[device] read_exact: got {} bytes: {:02X?}", n, &buf[offset..offset + n]); + debug_println!( + "[device] read_exact: got {} bytes: {:02X?}", + n, + &buf[offset..offset + n] + ); if n == 0 { return Err(SdkError::Timeout); @@ -155,7 +163,7 @@ impl EskinDeviceInner { fn ensure_command_mode(&self) -> Result<(), SdkError> { match self.mode { DeviceMode::Command => Ok(()), - DeviceMode::Streaming => Err(SdkError::StreamingBusy) + DeviceMode::Streaming => Err(SdkError::StreamingBusy), } } @@ -170,8 +178,6 @@ impl EskinDeviceInner { pub fn shared_transport(&self) -> SharedSerialTransport { Arc::clone(&self.transport) } - - } pub trait EskinDeviceFunc { @@ -183,12 +189,13 @@ pub trait EskinDeviceFunc { fn write_device_config1(&mut self, enable: bool) -> Result; fn write_device_config2(&mut self, enable: bool) -> Result; fn write_matrix_row(&mut self, row: u8) -> Result; - fn write_matrix_col(&mut self, col: u8) -> Result; + fn write_matrix_col(&mut self, col: u8) -> Result; } impl EskinDeviceFunc for EskinDeviceInner { fn read_hdw_version(&mut self) -> Result { - let hdw = self.read_register(0, 2) + let hdw = self + .read_register(0, 2) .map_err(|_| SdkError::FrameError("read hardware version failed".into()))?; let version = format!("{}.{}", hdw[0], hdw[1]); @@ -196,39 +203,45 @@ impl EskinDeviceFunc for EskinDeviceInner { } fn read_matrix_row(&mut self) -> Result { - let row = self.read_register(0x0015, 1) + let row = self + .read_register(0x0015, 1) .map_err(|_| SdkError::FrameError("read matrix row failed".into()))?; Ok(row[0]) } fn read_matrix_col(&mut self) -> Result { - let col = self.read_register(0x0014, 1) + let col = self + .read_register(0x0014, 1) .map_err(|_| SdkError::FrameError("read matrix col failed".into()))?; Ok(col[0]) } fn write_matrix_row(&mut self, row: u8) -> Result { - let res = self.write_register(0x0015, &[row]) + let res = self + .write_register(0x0015, &[row]) .map_err(|_| SdkError::FrameError("write matrix row failed".into()))?; Ok(res) } fn write_matrix_col(&mut self, col: u8) -> Result { - let res = self.write_register(0x0015, &[col]) + let res = self + .write_register(0x0015, &[col]) .map_err(|_| SdkError::FrameError("write matrix row failed".into()))?; Ok(res) } fn read_device_config1(&mut self) -> Result { - let enabled = self.read_register(0x0017, 1) + let enabled = self + .read_register(0x0017, 1) .map_err(|_| SdkError::FrameError("read device config1 failed".into()))?; Ok(enabled[0]) } fn read_device_config2(&mut self) -> Result { - let enabled = self.read_register(0x0018, 1) + let enabled = self + .read_register(0x0018, 1) .map_err(|_| SdkError::FrameError("read device config2 failed".into()))?; Ok(enabled[0]) } @@ -255,6 +268,7 @@ pub trait EskinDevice { fn start_stream(&mut self) -> Result<(), SdkError>; fn stop_stream(&mut self) -> Result<(), SdkError>; fn read_sample(&self, timeout_ms: u32) -> Result; + fn read_stream_frame(&self, timeout_ms: u32) -> Result, SdkError>; fn read_event(&self, timeout_ms: u32) -> Result; fn read_register(&mut self, addr: u32, length: u16) -> Result, SdkError>; fn write_register(&mut self, addr: u32, data: &[u8]) -> Result; @@ -348,6 +362,10 @@ impl EskinDevice for EskinDeviceInner { self.channels.recv_sample(timeout_ms) } + fn read_stream_frame(&self, timeout_ms: u32) -> Result, SdkError> { + self.channels.recv_raw_frame(timeout_ms) + } + fn read_event(&self, timeout_ms: u32) -> Result { self.channels.recv_event(timeout_ms) } diff --git a/src/error.rs b/src/error.rs index 3d5aaf3..8dcd3b8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -72,5 +72,5 @@ pub enum SdkError { DeviceError(u16), #[error("Device is in streaming mode, command not allowed")] - StreamingBusy + StreamingBusy, } diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index cb99b26..b425ef9 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -1,9 +1,9 @@ -use std::{ptr}; -use std::ffi::{CStr, c_char}; use crate::device::{DeviceMode, EskinDevice, EskinDeviceFunc}; use crate::transport::SerialPortTransport; use crate::types::{CombinedForce, FingerSample}; use crate::{config::DeviceConfig, device::EskinDeviceInner, error::SdkErrorCode}; +use std::ffi::{CStr, c_char}; +use std::ptr; pub type EskinDeviceHandle = *mut core::ffi::c_void; @@ -67,10 +67,13 @@ fn sdk_error_to_code(err: crate::error::SdkError) -> SdkErrorCode { #[unsafe(no_mangle)] pub extern "C" fn eskin_version() -> EskinSdkVersion { - EskinSdkVersion { major: 0, minor: 1, patch: 0 } + EskinSdkVersion { + major: 0, + minor: 1, + patch: 0, + } } - #[unsafe(no_mangle)] pub unsafe extern "C" fn eskin_open( path: *const c_char, @@ -80,11 +83,9 @@ pub unsafe extern "C" fn eskin_open( return ptr::null_mut(); } - let path_str = match unsafe { - CStr::from_ptr(path) - }.to_str() { + let path_str = match unsafe { CStr::from_ptr(path) }.to_str() { Ok(s) => s.to_string(), - Err(_) => return ptr::null_mut() + Err(_) => return ptr::null_mut(), }; let device_config = if config.is_null() { @@ -95,15 +96,13 @@ pub unsafe extern "C" fn eskin_open( let transport = SerialPortTransport::new(path_str, 921600); let mut device = EskinDeviceInner::new(device_config, Box::new(transport)); - + if device.open().is_err() { return ptr::null_mut(); } - let wrapper = Box::new(DeviceWrapper { - device, - }); - + let wrapper = Box::new(DeviceWrapper { device }); + Box::into_raw(wrapper) as EskinDeviceHandle } @@ -434,12 +433,38 @@ pub unsafe extern "C" fn eskin_read_sample( } } +/// 读取一个 stream 原始完整协议帧(流模式下调用) +#[unsafe(no_mangle)] +pub unsafe extern "C" fn eskin_read_stream_frame( + handle: EskinDeviceHandle, + timeout_ms: u32, + buf: *mut u8, + buf_len: u32, + actual_len: *mut u32, +) -> SdkErrorCode { + if handle.is_null() || buf.is_null() || actual_len.is_null() { + return SdkErrorCode::InvalidPointer; + } + + let wrapper = unsafe { &mut *(handle as *mut DeviceWrapper) }; + + let frame = match wrapper.device.read_stream_frame(timeout_ms) { + Ok(frame) => frame, + Err(e) => return sdk_error_to_code(e), + }; + + let copy_len = std::cmp::min(frame.len(), buf_len as usize); + unsafe { + ptr::copy_nonoverlapping(frame.as_ptr(), buf, copy_len); + *actual_len = frame.len() as u32; + } + + SdkErrorCode::Success +} + /// 查询当前设备模式(Command=0, Streaming=1) #[unsafe(no_mangle)] -pub unsafe extern "C" fn eskin_get_mode( - handle: EskinDeviceHandle, - out: *mut u32, -) -> SdkErrorCode { +pub unsafe extern "C" fn eskin_get_mode(handle: EskinDeviceHandle, out: *mut u32) -> SdkErrorCode { if handle.is_null() || out.is_null() { return SdkErrorCode::InvalidPointer; } diff --git a/src/main.rs b/src/main.rs index d15fed2..be02a55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ -use std::io::{self, BufRead}; use eskin_finger_sdk::{ config::DeviceConfig, device::{EskinDevice, EskinDeviceFunc, EskinDeviceInner}, error::SdkError, transport::SerialPortTransport, }; +use std::io::{self, BufRead}; fn main() { // let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600); // let config = DeviceConfig::default(); @@ -35,7 +35,6 @@ fn read_check_group(device: &mut EskinDeviceInner) { print_payload_data(&group); } - fn read_row(device: &mut EskinDeviceInner) { let row = device.read_register(0x0015, 1).unwrap(); print_payload_data(&row); @@ -92,9 +91,20 @@ fn stream_demo() { if count % 5 == 0 { println!( "[#{count} seq={}] combined_force={:?}", - sample.sequence, - sample.combined_forces + sample.sequence, sample.combined_forces ); + + match device.read_stream_frame(20) { + Ok(frame) => { + println!("raw_frame len={} [{}]", frame.len(), format_hex(&frame)); + } + Err(SdkError::Timeout) => { + println!("raw_frame timeout"); + } + Err(e) => { + eprintln!("read_stream_frame error: {e}"); + } + } } } Err(SdkError::Timeout) => continue, @@ -107,7 +117,10 @@ fn stream_demo() { // 回到 Command 模式 device.stop_stream().unwrap(); - println!("Stream stopped, total samples: {count}, mode: {:?}", device.mode()); + println!( + "Stream stopped, total samples: {count}, mode: {:?}", + device.mode() + ); // Stream 停止后,Command 操作恢复正常 println!("Row: {}", device.read_matrix_row().unwrap()); @@ -119,9 +132,19 @@ fn print_payload_data(data: &[u8]) { for (i, chunk) in data.chunks(2).enumerate() { if chunk.len() == 2 { let val = u16::from_le_bytes([chunk[0], chunk[1]]); - println!(" [{:3}] [{:02X}] [{:02X}] => {}", i, chunk[0], chunk[1], val); + println!( + " [{:3}] [{:02X}] [{:02X}] => {}", + i, chunk[0], chunk[1], val + ); } else { println!(" [{:3}] [{:02X}] (odd byte)", i, chunk[0]); } } } + +fn format_hex(data: &[u8]) -> String { + data.iter() + .map(|byte| format!("{byte:02X}")) + .collect::>() + .join(" ") +} diff --git a/src/protocol.rs b/src/protocol.rs index 14a8eb6..bca6365 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -409,7 +409,7 @@ mod tests { let req = ReadRequest { device_addr: 0x34, start_addr: 0x1C00, - read_byte_count: 168 + read_byte_count: 168, }; let frame = codec().encode_read_request(&req).unwrap(); @@ -438,4 +438,4 @@ mod tests { assert_eq!(frame.len(), 14); } -} \ No newline at end of file +} diff --git a/src/register.rs b/src/register.rs index 4667b75..acecdbc 100644 --- a/src/register.rs +++ b/src/register.rs @@ -1,7 +1,7 @@ use crate::{ config::DeviceInfo, error::SdkError, - types::{CombinedForce, DistributionForce, Force3D, ForcePoint, ModuleError, SensorModule}, + types::{CombinedForce, DistributionForce, Force3D, ForcePoint, ModuleError, SensorModule}, }; pub const REG_SERIAL_NUMBER: u32 = 0x0000; @@ -146,7 +146,6 @@ impl RegisterMap for EskinRegisterMap { } } - pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result { // println!("{:02X?}", raw); // const MODULE_COUNT: usize = 28; @@ -178,10 +177,13 @@ pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result Result, SdkError> { } Ok(errors) -} \ No newline at end of file +} diff --git a/src/stream.rs b/src/stream.rs index 360df89..e3a76e0 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -3,7 +3,6 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, }; -use crate::{config, register::{REG_COMBINED_FORCE, REG_MODULE_ERROR}}; use chrono::Duration; use crate::{ @@ -14,9 +13,9 @@ use crate::{ types::{FingerSample, SensorModule}, }; -use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ReadRequest}; +use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, ReadRequest}; use std::thread::{self, JoinHandle}; -use std::time::Duration as StdDuration; +use std::time::{Duration as StdDuration, Instant}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamMode { Polling, @@ -43,7 +42,7 @@ impl Default for StreamConfig { poll_interval_ms: 10, device_addr: 0x34, read_timeout_ms: 100, - finger_addr: 0x1C00 + finger_addr: 0x1C00, } } } @@ -53,6 +52,7 @@ pub trait StreamController: Send { fn stop(&mut self) -> Result<(), SdkError>; fn is_running(&self) -> bool; fn next_sample(&self, timeout_ms: u32) -> Result; + fn next_raw_frame(&self, timeout_ms: u32) -> Result, SdkError>; fn next_event(&self, timeout_ms: u32) -> Result; } @@ -94,12 +94,13 @@ impl StreamController for StreamRuntime { return Err(SdkError::AlreadyStreaming); } - let collector = make_collector(&config, Arc::clone(&self.transport)); + // Old synchronous collector path: + // let collector = make_collector(&config, Arc::clone(&self.transport)); let worker = StreamWorker::new( Arc::clone(&self.running), Arc::clone(&self.channels), + Arc::clone(&self.transport), config.clone(), - collector, ) .spawn(); @@ -133,6 +134,10 @@ impl StreamController for StreamRuntime { self.channels.recv_sample(timeout_ms) } + fn next_raw_frame(&self, timeout_ms: u32) -> Result, SdkError> { + self.channels.recv_raw_frame(timeout_ms) + } + fn next_event(&self, timeout_ms: u32) -> Result { self.channels.recv_event(timeout_ms) } @@ -141,22 +146,30 @@ impl StreamController for StreamRuntime { pub struct StreamWorker { running: Arc, channels: Arc, + transport: SharedSerialTransport, config: StreamConfig, - collector: Box, + codec: Box, + rx_buffer: Vec, + sequence: u32, + next_request_at: Instant, } impl StreamWorker { pub fn new( running: Arc, channels: Arc, + transport: SharedSerialTransport, config: StreamConfig, - collector: Box, ) -> Self { Self { running, channels, + transport, config, - collector, + codec: Box::new(EskinProtocolCodec), + rx_buffer: Vec::with_capacity(512), + sequence: 0, + next_request_at: Instant::now(), } } @@ -175,30 +188,124 @@ impl StreamWorker { break; } - thread::sleep(StdDuration::from_millis( - self.config.poll_interval_ms as u64, - )); + thread::sleep(StdDuration::from_millis(1)); } } fn tick(&mut self) -> Result<(), SdkError> { - // let _transport = self - // .transport - // .lock() - // .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?; + // Old synchronous polling path kept for reference: + // + // let Some(sample) = self.collector.collect_once()? else { + // return Ok(()); + // }; + // self.channels.send_sample(sample)?; - let Some(sample) = self.collector.collect_once()? else { - return Ok(()); - }; - self.channels.send_sample(sample)?; + self.send_request_if_due()?; + self.read_and_decode_available()?; Ok(()) - // TODO: - // 1. encode read request - // 2. transport.write() - // 3. transport.read() - // 4. protocol.decode() - // 5. register parse - // 6. channels.send_sample() + } + + fn send_request_if_due(&mut self) -> Result<(), SdkError> { + if Instant::now() < self.next_request_at { + return Ok(()); + } + + let request = ReadRequest { + device_addr: self.config.device_addr, + start_addr: self.config.finger_addr, + read_byte_count: 168, + }; + let request_frame = self.codec.encode_read_request(&request)?; + + { + let mut transport = self + .transport + .lock() + .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?; + transport.write(&request_frame)?; + } + + let interval = StdDuration::from_millis(self.config.poll_interval_ms as u64); + self.next_request_at = Instant::now() + interval.max(StdDuration::from_millis(1)); + Ok(()) + } + + fn read_and_decode_available(&mut self) -> Result<(), SdkError> { + let mut buf = [0u8; 256]; + let read_result = { + let mut transport = self + .transport + .lock() + .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?; + transport.read(&mut buf, Duration::milliseconds(1)) + }; + + match read_result { + Ok(0) | Err(SdkError::Timeout) => return Ok(()), + Ok(n) => self.rx_buffer.extend_from_slice(&buf[..n]), + Err(err) => return Err(err), + } + + while let Some(frame) = self.pop_next_frame()? { + self.channels.send_raw_frame(frame.clone())?; + + let response = self.codec.decode_read_response(&frame)?; + if response.start_addr != self.config.finger_addr { + continue; + } + + let combined_forces = + crate::register::parse_combined_forces(&response.data, response.start_addr)?; + let sample = FingerSample { + timestamp_us: chrono::Utc::now().timestamp_micros() as u64, + sequence: self.next_sequence(), + combined_forces, + }; + self.channels.send_sample(sample)?; + } + + Ok(()) + } + + fn pop_next_frame(&mut self) -> Result>, SdkError> { + loop { + let Some(start) = self + .rx_buffer + .windows(FRAME_START_RESPONSE.len()) + .position(|window| window == FRAME_START_RESPONSE) + else { + self.rx_buffer.clear(); + return Ok(None); + }; + + if start > 0 { + self.rx_buffer.drain(..start); + } + + if self.rx_buffer.len() < 4 { + return Ok(None); + } + + let data_len = u16::from_le_bytes([self.rx_buffer[2], self.rx_buffer[3]]) as usize; + let frame_len = 4 + data_len + FRAME_CRC_LEN; + + if frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN { + self.rx_buffer.drain(..FRAME_START_RESPONSE.len()); + continue; + } + + if self.rx_buffer.len() < frame_len { + return Ok(None); + } + + return Ok(Some(self.rx_buffer.drain(..frame_len).collect())); + } + } + + fn next_sequence(&mut self) -> u32 { + let sequence = self.sequence; + self.sequence = self.sequence.wrapping_add(1); + sequence } } @@ -320,7 +427,8 @@ impl SampleCollector for PollingSampleCollector { let combined_force_raw = self.read_register(self.config.finger_addr, 168)?; - let combined_forces = crate::register::parse_combined_forces(&combined_force_raw, self.config.finger_addr)?; + let combined_forces = + crate::register::parse_combined_forces(&combined_force_raw, self.config.finger_addr)?; let now = chrono::Utc::now().timestamp_micros() as u64; @@ -336,6 +444,7 @@ impl SampleCollector for PollingSampleCollector { } } +#[allow(dead_code)] fn make_collector( config: &StreamConfig, transport: SharedSerialTransport, diff --git a/src/types.rs b/src/types.rs index 7990fdf..34d922c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -118,7 +118,6 @@ impl From for SensorModule { } } - pub const SENSOR_MODULE_COUNT: usize = 28; #[repr(C)]