Compare commits

...

1 Commits

Author SHA1 Message Date
lenn
47722bb383 add stream raw frame access 2026-05-26 22:22:28 +08:00
13 changed files with 358 additions and 85 deletions

View File

@@ -1,3 +1,4 @@
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <cstdint>
@@ -7,6 +8,7 @@
#include <queue>
#include <string>
#include <thread>
#include <vector>
#include "../../include/eskin_ffi.h"
@@ -64,20 +66,34 @@ static void demo_streaming(EskinDeviceHandle device, double duration_sec = 5.0)
// 线程安全队列(参考 ROS publisher 的 read_loop + publish_callback 分离模式)
std::mutex mtx;
std::queue<CFingerSample> queue;
std::queue<std::vector<uint8_t>> raw_queue;
bool running = true;
// 读取线程:持续从设备读取 sample 放入队列
// 读取线程:持续从设备读取 sample 和原始帧放入队列
std::thread read_thread([&]() {
while (running) {
CFingerSample sample;
memset(&sample, 0, sizeof(sample));
auto e = eskin_read_sample(device, 50, &sample);
if (e == ESkinSuccess) {
uint8_t raw_buf[512] = {};
uint32_t raw_len = 0;
auto raw_err = eskin_read_stream_frame(
device, 1, raw_buf, sizeof(raw_buf), &raw_len);
std::vector<uint8_t> raw_frame;
if (raw_err == ESkinSuccess) {
raw_frame.assign(raw_buf, raw_buf + std::min<uint32_t>(raw_len, sizeof(raw_buf)));
}
std::lock_guard<std::mutex> lock(mtx);
queue.push(sample);
raw_queue.push(std::move(raw_frame));
while (queue.size() > 100) {
queue.pop(); // 防止堆积
}
while (raw_queue.size() > 100) {
raw_queue.pop();
}
}
// 超时等非致命错误忽略,继续读取
}
@@ -96,11 +112,18 @@ static void demo_streaming(EskinDeviceHandle device, double duration_sec = 5.0)
std::lock_guard<std::mutex> lock(mtx);
while (!queue.empty()) {
const auto &s = queue.front();
printf("[%5u] module=%u fx=%u fy=%u fz=%u\n",
size_t raw_len = 0;
if (!raw_queue.empty()) {
raw_len = raw_queue.front().size();
raw_queue.pop();
}
printf("[%5u] module=%u fx=%u fy=%u fz=%u raw_len=%zu\n",
s.sequence, s.combined_force.module,
s.combined_force.force.fx,
s.combined_force.force.fy,
s.combined_force.force.fz);
s.combined_force.force.fz,
raw_len);
queue.pop();
count++;
}
@@ -145,4 +168,4 @@ int main(int argc, char *argv[]) {
eskin_close(device);
printf("Device closed\n");
return 0;
}
}

View File

@@ -126,6 +126,11 @@ class EskinDevice:
c_void_p, c_uint32, POINTER(CFingerSample)
]
lib.eskin_read_stream_frame.restype = c_uint32
lib.eskin_read_stream_frame.argtypes = [
c_void_p, c_uint32, POINTER(c_uint8), c_uint32, POINTER(c_uint32)
]
lib.eskin_get_mode.restype = c_uint32
lib.eskin_get_mode.argtypes = [c_void_p, POINTER(c_uint32)]
@@ -271,6 +276,17 @@ class EskinDevice:
raise RuntimeError(f"read_sample failed: error={err}")
return sample
def read_stream_frame(self, timeout_ms: int = 200, max_len: int = 512) -> bytes:
"""读取一个 stream 原始完整协议帧(流模式下调用)"""
buf = (c_uint8 * max_len)()
actual = c_uint32(0)
err = self._lib.eskin_read_stream_frame(
self._handle, timeout_ms, buf, len(buf), ctypes.byref(actual)
)
if err != 0:
raise RuntimeError(f"read_stream_frame failed: error={err}")
return bytes(buf[:min(actual.value, len(buf))])
def get_mode(self) -> int:
"""查询当前设备模式0=Command, 1=Streaming"""
out = c_uint32(0)
@@ -283,4 +299,4 @@ class EskinDevice:
return self
def __exit__(self, *args):
self.close()
self.close()

View File

@@ -33,14 +33,17 @@ def demo_streaming(dev: EskinDevice, duration_sec: float = 5.0):
# 线程安全的队列(参考 ROS demo 的 read_loop + publish_callback 分离模式)
queue: deque = deque(maxlen=100)
raw_queue: deque = deque(maxlen=100)
running = True
def read_loop():
"""独立读取线程:持续从设备读取 sample"""
"""独立读取线程:持续从设备读取 sample 和原始帧"""
while running:
try:
sample = dev.read_sample(timeout_ms=50)
queue.append(sample)
raw_frame = dev.read_stream_frame(timeout_ms=1)
raw_queue.append(raw_frame)
except RuntimeError:
# 超时等非致命错误,继续读取
pass
@@ -55,12 +58,14 @@ def demo_streaming(dev: EskinDevice, duration_sec: float = 5.0):
while time.monotonic() - start < duration_sec:
if queue:
sample: CFingerSample = queue.popleft()
raw_frame = raw_queue.popleft() if raw_queue else b""
f = sample.combined_force.force
mod = sample.combined_force.module
print(
f"[{sample.sequence:5d}] "
f"module={mod} "
f"fx={f.fx} fy={f.fy} fz={f.fz}"
f"fx={f.fx} fy={f.fy} fz={f.fz} "
f"raw_len={len(raw_frame)}"
)
count += 1
else:
@@ -89,4 +94,4 @@ def main():
if __name__ == "__main__":
main()
main()

View File

@@ -135,6 +135,13 @@ typedef struct {
EskinSdkErrorCode eskin_start_stream(EskinDeviceHandle handle);
EskinSdkErrorCode eskin_stop_stream(EskinDeviceHandle handle);
EskinSdkErrorCode eskin_read_sample(EskinDeviceHandle handle, uint32_t timeout_ms, CFingerSample* out);
EskinSdkErrorCode eskin_read_stream_frame(
EskinDeviceHandle handle,
uint32_t timeout_ms,
uint8_t* buf,
uint32_t buf_len,
uint32_t* actual_len
);
EskinSdkErrorCode eskin_get_mode(EskinDeviceHandle handle, uint32_t* out);
#ifdef __cplusplus

View File

@@ -37,6 +37,9 @@ pub struct ChannelManager {
pub sample_tx: Sender<FingerSample>,
pub sample_rx: Receiver<FingerSample>,
pub raw_frame_tx: Sender<Vec<u8>>,
pub raw_frame_rx: Receiver<Vec<u8>>,
pub cmd_tx: Sender<DeviceCommand>,
pub cmd_rx: Receiver<DeviceCommand>,
@@ -56,12 +59,15 @@ impl ChannelManager {
drop_policy: DropPolicy,
) -> Self {
let (sample_tx, sample_rx) = bounded(sample_capacity);
let (raw_frame_tx, raw_frame_rx) = bounded(sample_capacity);
let (cmd_tx, cmd_rx) = bounded(cmd_capacity);
let (event_tx, event_rx) = bounded(event_capacity);
Self {
sample_tx,
sample_rx,
raw_frame_tx,
raw_frame_rx,
cmd_tx,
cmd_rx,
event_tx,
@@ -111,6 +117,46 @@ impl ChannelManager {
})
}
pub fn send_raw_frame(&self, frame: Vec<u8>) -> Result<(), SdkError> {
match self.drop_policy {
DropPolicy::DropNewest => match self.raw_frame_tx.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.record_sample_drop();
Ok(())
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
},
DropPolicy::DropOldest => match self.raw_frame_tx.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(frame)) => {
let _ = self.raw_frame_rx.try_recv();
self.record_sample_drop();
match self.raw_frame_tx.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.record_sample_drop();
Ok(())
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
}
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
},
}
}
pub fn recv_raw_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError> {
let timeout = std::time::Duration::from_millis(timeout_ms as u64);
self.raw_frame_rx
.recv_timeout(timeout)
.map_err(|err| match err {
RecvTimeoutError::Timeout => SdkError::Timeout,
RecvTimeoutError::Disconnected => SdkError::ChannelClosed,
})
}
pub fn send_cmd(&self, cmd: DeviceCommand) -> Result<(), SdkError> {
self.cmd_tx.try_send(cmd).map_err(|err| match err {
TrySendError::Full(_) => SdkError::BufferOverflow(1),

View File

@@ -6,8 +6,7 @@ use crate::{
config::{DeviceConfig, DeviceInfo},
error::SdkError,
protocol::{
EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec,
ReadRequest, WriteRequest,
EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec, ReadRequest, WriteRequest,
},
stream::{StreamConfig, StreamController, StreamRuntime},
transport::{SerialTransport, SharedSerialTransport},
@@ -55,7 +54,7 @@ impl EskinDeviceInner {
mode: DeviceMode::Command,
transport: Arc::new(Mutex::new(transport)),
codec: Box::new(EskinProtocolCodec),
stream: None
stream: None,
}
}
@@ -75,7 +74,7 @@ impl EskinDeviceInner {
mode: DeviceMode::Command,
transport,
codec: Box::new(EskinProtocolCodec),
stream: None
stream: None,
}
}
@@ -94,9 +93,18 @@ impl EskinDeviceInner {
let remaining = deadline
.checked_duration_since(std::time::Instant::now())
.unwrap_or(std::time::Duration::from_millis(1));
debug_println!("[device] read_exact: need {} bytes, have {} so far, remaining timeout: {:?}", buf.len() - offset, offset, remaining);
debug_println!(
"[device] read_exact: need {} bytes, have {} so far, remaining timeout: {:?}",
buf.len() - offset,
offset,
remaining
);
let n = transport.read(&mut buf[offset..], Duration::from_std(remaining).unwrap())?;
debug_println!("[device] read_exact: got {} bytes: {:02X?}", n, &buf[offset..offset + n]);
debug_println!(
"[device] read_exact: got {} bytes: {:02X?}",
n,
&buf[offset..offset + n]
);
if n == 0 {
return Err(SdkError::Timeout);
@@ -155,7 +163,7 @@ impl EskinDeviceInner {
fn ensure_command_mode(&self) -> Result<(), SdkError> {
match self.mode {
DeviceMode::Command => Ok(()),
DeviceMode::Streaming => Err(SdkError::StreamingBusy)
DeviceMode::Streaming => Err(SdkError::StreamingBusy),
}
}
@@ -170,8 +178,6 @@ impl EskinDeviceInner {
pub fn shared_transport(&self) -> SharedSerialTransport {
Arc::clone(&self.transport)
}
}
pub trait EskinDeviceFunc {
@@ -183,12 +189,13 @@ pub trait EskinDeviceFunc {
fn write_device_config1(&mut self, enable: bool) -> Result<u16, SdkError>;
fn write_device_config2(&mut self, enable: bool) -> Result<u16, SdkError>;
fn write_matrix_row(&mut self, row: u8) -> Result<u16, SdkError>;
fn write_matrix_col(&mut self, col: u8) -> Result<u16, SdkError>;
fn write_matrix_col(&mut self, col: u8) -> Result<u16, SdkError>;
}
impl EskinDeviceFunc for EskinDeviceInner {
fn read_hdw_version(&mut self) -> Result<String, SdkError> {
let hdw = self.read_register(0, 2)
let hdw = self
.read_register(0, 2)
.map_err(|_| SdkError::FrameError("read hardware version failed".into()))?;
let version = format!("{}.{}", hdw[0], hdw[1]);
@@ -196,39 +203,45 @@ impl EskinDeviceFunc for EskinDeviceInner {
}
fn read_matrix_row(&mut self) -> Result<u8, SdkError> {
let row = self.read_register(0x0015, 1)
let row = self
.read_register(0x0015, 1)
.map_err(|_| SdkError::FrameError("read matrix row failed".into()))?;
Ok(row[0])
}
fn read_matrix_col(&mut self) -> Result<u8, SdkError> {
let col = self.read_register(0x0014, 1)
let col = self
.read_register(0x0014, 1)
.map_err(|_| SdkError::FrameError("read matrix col failed".into()))?;
Ok(col[0])
}
fn write_matrix_row(&mut self, row: u8) -> Result<u16, SdkError> {
let res = self.write_register(0x0015, &[row])
let res = self
.write_register(0x0015, &[row])
.map_err(|_| SdkError::FrameError("write matrix row failed".into()))?;
Ok(res)
}
fn write_matrix_col(&mut self, col: u8) -> Result<u16, SdkError> {
let res = self.write_register(0x0015, &[col])
let res = self
.write_register(0x0015, &[col])
.map_err(|_| SdkError::FrameError("write matrix row failed".into()))?;
Ok(res)
}
fn read_device_config1(&mut self) -> Result<u8, SdkError> {
let enabled = self.read_register(0x0017, 1)
let enabled = self
.read_register(0x0017, 1)
.map_err(|_| SdkError::FrameError("read device config1 failed".into()))?;
Ok(enabled[0])
}
fn read_device_config2(&mut self) -> Result<u8, SdkError> {
let enabled = self.read_register(0x0018, 1)
let enabled = self
.read_register(0x0018, 1)
.map_err(|_| SdkError::FrameError("read device config2 failed".into()))?;
Ok(enabled[0])
}
@@ -255,6 +268,7 @@ pub trait EskinDevice {
fn start_stream(&mut self) -> Result<(), SdkError>;
fn stop_stream(&mut self) -> Result<(), SdkError>;
fn read_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>;
fn read_stream_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError>;
fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>;
fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError>;
fn write_register(&mut self, addr: u32, data: &[u8]) -> Result<u16, SdkError>;
@@ -348,6 +362,10 @@ impl EskinDevice for EskinDeviceInner {
self.channels.recv_sample(timeout_ms)
}
fn read_stream_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError> {
self.channels.recv_raw_frame(timeout_ms)
}
fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
self.channels.recv_event(timeout_ms)
}

View File

@@ -72,5 +72,5 @@ pub enum SdkError {
DeviceError(u16),
#[error("Device is in streaming mode, command not allowed")]
StreamingBusy
StreamingBusy,
}

View File

@@ -1,9 +1,9 @@
use std::{ptr};
use std::ffi::{CStr, c_char};
use crate::device::{DeviceMode, EskinDevice, EskinDeviceFunc};
use crate::transport::SerialPortTransport;
use crate::types::{CombinedForce, FingerSample};
use crate::{config::DeviceConfig, device::EskinDeviceInner, error::SdkErrorCode};
use std::ffi::{CStr, c_char};
use std::ptr;
pub type EskinDeviceHandle = *mut core::ffi::c_void;
@@ -67,10 +67,13 @@ fn sdk_error_to_code(err: crate::error::SdkError) -> SdkErrorCode {
#[unsafe(no_mangle)]
pub extern "C" fn eskin_version() -> EskinSdkVersion {
EskinSdkVersion { major: 0, minor: 1, patch: 0 }
EskinSdkVersion {
major: 0,
minor: 1,
patch: 0,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn eskin_open(
path: *const c_char,
@@ -80,11 +83,9 @@ pub unsafe extern "C" fn eskin_open(
return ptr::null_mut();
}
let path_str = match unsafe {
CStr::from_ptr(path)
}.to_str() {
let path_str = match unsafe { CStr::from_ptr(path) }.to_str() {
Ok(s) => s.to_string(),
Err(_) => return ptr::null_mut()
Err(_) => return ptr::null_mut(),
};
let device_config = if config.is_null() {
@@ -95,15 +96,13 @@ pub unsafe extern "C" fn eskin_open(
let transport = SerialPortTransport::new(path_str, 921600);
let mut device = EskinDeviceInner::new(device_config, Box::new(transport));
if device.open().is_err() {
return ptr::null_mut();
}
let wrapper = Box::new(DeviceWrapper {
device,
});
let wrapper = Box::new(DeviceWrapper { device });
Box::into_raw(wrapper) as EskinDeviceHandle
}
@@ -434,12 +433,38 @@ pub unsafe extern "C" fn eskin_read_sample(
}
}
/// 读取一个 stream 原始完整协议帧(流模式下调用)
#[unsafe(no_mangle)]
pub unsafe extern "C" fn eskin_read_stream_frame(
handle: EskinDeviceHandle,
timeout_ms: u32,
buf: *mut u8,
buf_len: u32,
actual_len: *mut u32,
) -> SdkErrorCode {
if handle.is_null() || buf.is_null() || actual_len.is_null() {
return SdkErrorCode::InvalidPointer;
}
let wrapper = unsafe { &mut *(handle as *mut DeviceWrapper) };
let frame = match wrapper.device.read_stream_frame(timeout_ms) {
Ok(frame) => frame,
Err(e) => return sdk_error_to_code(e),
};
let copy_len = std::cmp::min(frame.len(), buf_len as usize);
unsafe {
ptr::copy_nonoverlapping(frame.as_ptr(), buf, copy_len);
*actual_len = frame.len() as u32;
}
SdkErrorCode::Success
}
/// 查询当前设备模式Command=0, Streaming=1
#[unsafe(no_mangle)]
pub unsafe extern "C" fn eskin_get_mode(
handle: EskinDeviceHandle,
out: *mut u32,
) -> SdkErrorCode {
pub unsafe extern "C" fn eskin_get_mode(handle: EskinDeviceHandle, out: *mut u32) -> SdkErrorCode {
if handle.is_null() || out.is_null() {
return SdkErrorCode::InvalidPointer;
}

View File

@@ -1,10 +1,10 @@
use std::io::{self, BufRead};
use eskin_finger_sdk::{
config::DeviceConfig,
device::{EskinDevice, EskinDeviceFunc, EskinDeviceInner},
error::SdkError,
transport::SerialPortTransport,
};
use std::io::{self, BufRead};
fn main() {
// let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600);
// let config = DeviceConfig::default();
@@ -35,7 +35,6 @@ fn read_check_group(device: &mut EskinDeviceInner) {
print_payload_data(&group);
}
fn read_row(device: &mut EskinDeviceInner) {
let row = device.read_register(0x0015, 1).unwrap();
print_payload_data(&row);
@@ -92,9 +91,20 @@ fn stream_demo() {
if count % 5 == 0 {
println!(
"[#{count} seq={}] combined_force={:?}",
sample.sequence,
sample.combined_forces
sample.sequence, sample.combined_forces
);
match device.read_stream_frame(20) {
Ok(frame) => {
println!("raw_frame len={} [{}]", frame.len(), format_hex(&frame));
}
Err(SdkError::Timeout) => {
println!("raw_frame timeout");
}
Err(e) => {
eprintln!("read_stream_frame error: {e}");
}
}
}
}
Err(SdkError::Timeout) => continue,
@@ -107,7 +117,10 @@ fn stream_demo() {
// 回到 Command 模式
device.stop_stream().unwrap();
println!("Stream stopped, total samples: {count}, mode: {:?}", device.mode());
println!(
"Stream stopped, total samples: {count}, mode: {:?}",
device.mode()
);
// Stream 停止后Command 操作恢复正常
println!("Row: {}", device.read_matrix_row().unwrap());
@@ -119,9 +132,19 @@ fn print_payload_data(data: &[u8]) {
for (i, chunk) in data.chunks(2).enumerate() {
if chunk.len() == 2 {
let val = u16::from_le_bytes([chunk[0], chunk[1]]);
println!(" [{:3}] [{:02X}] [{:02X}] => {}", i, chunk[0], chunk[1], val);
println!(
" [{:3}] [{:02X}] [{:02X}] => {}",
i, chunk[0], chunk[1], val
);
} else {
println!(" [{:3}] [{:02X}] (odd byte)", i, chunk[0]);
}
}
}
fn format_hex(data: &[u8]) -> String {
data.iter()
.map(|byte| format!("{byte:02X}"))
.collect::<Vec<_>>()
.join(" ")
}

View File

@@ -409,7 +409,7 @@ mod tests {
let req = ReadRequest {
device_addr: 0x34,
start_addr: 0x1C00,
read_byte_count: 168
read_byte_count: 168,
};
let frame = codec().encode_read_request(&req).unwrap();
@@ -438,4 +438,4 @@ mod tests {
assert_eq!(frame.len(), 14);
}
}
}

View File

@@ -1,7 +1,7 @@
use crate::{
config::DeviceInfo,
error::SdkError,
types::{CombinedForce, DistributionForce, Force3D, ForcePoint, ModuleError, SensorModule},
types::{CombinedForce, DistributionForce, Force3D, ForcePoint, ModuleError, SensorModule},
};
pub const REG_SERIAL_NUMBER: u32 = 0x0000;
@@ -146,7 +146,6 @@ impl RegisterMap for EskinRegisterMap {
}
}
pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result<CombinedForce, SdkError> {
// println!("{:02X?}", raw);
// const MODULE_COUNT: usize = 28;
@@ -178,10 +177,13 @@ pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result<CombinedForce, Sdk
.sum();
let force = CombinedForce {
module: addr.into(),
force: Force3D { fx: 0, fy: 0, fz: comb_force }
force: Force3D {
fx: 0,
fy: 0,
fz: comb_force,
},
};
Ok(force)
}
@@ -211,4 +213,4 @@ pub fn parse_module_errors(raw: &[u8]) -> Result<Vec<ModuleError>, SdkError> {
}
Ok(errors)
}
}

View File

@@ -3,7 +3,6 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
};
use crate::{config, register::{REG_COMBINED_FORCE, REG_MODULE_ERROR}};
use chrono::Duration;
use crate::{
@@ -14,9 +13,9 @@ use crate::{
types::{FingerSample, SensorModule},
};
use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ReadRequest};
use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, ReadRequest};
use std::thread::{self, JoinHandle};
use std::time::Duration as StdDuration;
use std::time::{Duration as StdDuration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamMode {
Polling,
@@ -43,7 +42,7 @@ impl Default for StreamConfig {
poll_interval_ms: 10,
device_addr: 0x34,
read_timeout_ms: 100,
finger_addr: 0x1C00
finger_addr: 0x1C00,
}
}
}
@@ -53,6 +52,7 @@ pub trait StreamController: Send {
fn stop(&mut self) -> Result<(), SdkError>;
fn is_running(&self) -> bool;
fn next_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>;
fn next_raw_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError>;
fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>;
}
@@ -94,12 +94,13 @@ impl StreamController for StreamRuntime {
return Err(SdkError::AlreadyStreaming);
}
let collector = make_collector(&config, Arc::clone(&self.transport));
// Old synchronous collector path:
// let collector = make_collector(&config, Arc::clone(&self.transport));
let worker = StreamWorker::new(
Arc::clone(&self.running),
Arc::clone(&self.channels),
Arc::clone(&self.transport),
config.clone(),
collector,
)
.spawn();
@@ -133,6 +134,10 @@ impl StreamController for StreamRuntime {
self.channels.recv_sample(timeout_ms)
}
fn next_raw_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError> {
self.channels.recv_raw_frame(timeout_ms)
}
fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
self.channels.recv_event(timeout_ms)
}
@@ -141,22 +146,30 @@ impl StreamController for StreamRuntime {
pub struct StreamWorker {
running: Arc<AtomicBool>,
channels: Arc<ChannelManager>,
transport: SharedSerialTransport,
config: StreamConfig,
collector: Box<dyn SampleCollector>,
codec: Box<dyn ProtocolCodec>,
rx_buffer: Vec<u8>,
sequence: u32,
next_request_at: Instant,
}
impl StreamWorker {
pub fn new(
running: Arc<AtomicBool>,
channels: Arc<ChannelManager>,
transport: SharedSerialTransport,
config: StreamConfig,
collector: Box<dyn SampleCollector>,
) -> Self {
Self {
running,
channels,
transport,
config,
collector,
codec: Box::new(EskinProtocolCodec),
rx_buffer: Vec::with_capacity(512),
sequence: 0,
next_request_at: Instant::now(),
}
}
@@ -175,30 +188,124 @@ impl StreamWorker {
break;
}
thread::sleep(StdDuration::from_millis(
self.config.poll_interval_ms as u64,
));
thread::sleep(StdDuration::from_millis(1));
}
}
fn tick(&mut self) -> Result<(), SdkError> {
// let _transport = self
// .transport
// .lock()
// .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
// Old synchronous polling path kept for reference:
//
// let Some(sample) = self.collector.collect_once()? else {
// return Ok(());
// };
// self.channels.send_sample(sample)?;
let Some(sample) = self.collector.collect_once()? else {
return Ok(());
};
self.channels.send_sample(sample)?;
self.send_request_if_due()?;
self.read_and_decode_available()?;
Ok(())
// TODO:
// 1. encode read request
// 2. transport.write()
// 3. transport.read()
// 4. protocol.decode()
// 5. register parse
// 6. channels.send_sample()
}
fn send_request_if_due(&mut self) -> Result<(), SdkError> {
if Instant::now() < self.next_request_at {
return Ok(());
}
let request = ReadRequest {
device_addr: self.config.device_addr,
start_addr: self.config.finger_addr,
read_byte_count: 168,
};
let request_frame = self.codec.encode_read_request(&request)?;
{
let mut transport = self
.transport
.lock()
.map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
transport.write(&request_frame)?;
}
let interval = StdDuration::from_millis(self.config.poll_interval_ms as u64);
self.next_request_at = Instant::now() + interval.max(StdDuration::from_millis(1));
Ok(())
}
fn read_and_decode_available(&mut self) -> Result<(), SdkError> {
let mut buf = [0u8; 256];
let read_result = {
let mut transport = self
.transport
.lock()
.map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
transport.read(&mut buf, Duration::milliseconds(1))
};
match read_result {
Ok(0) | Err(SdkError::Timeout) => return Ok(()),
Ok(n) => self.rx_buffer.extend_from_slice(&buf[..n]),
Err(err) => return Err(err),
}
while let Some(frame) = self.pop_next_frame()? {
self.channels.send_raw_frame(frame.clone())?;
let response = self.codec.decode_read_response(&frame)?;
if response.start_addr != self.config.finger_addr {
continue;
}
let combined_forces =
crate::register::parse_combined_forces(&response.data, response.start_addr)?;
let sample = FingerSample {
timestamp_us: chrono::Utc::now().timestamp_micros() as u64,
sequence: self.next_sequence(),
combined_forces,
};
self.channels.send_sample(sample)?;
}
Ok(())
}
fn pop_next_frame(&mut self) -> Result<Option<Vec<u8>>, SdkError> {
loop {
let Some(start) = self
.rx_buffer
.windows(FRAME_START_RESPONSE.len())
.position(|window| window == FRAME_START_RESPONSE)
else {
self.rx_buffer.clear();
return Ok(None);
};
if start > 0 {
self.rx_buffer.drain(..start);
}
if self.rx_buffer.len() < 4 {
return Ok(None);
}
let data_len = u16::from_le_bytes([self.rx_buffer[2], self.rx_buffer[3]]) as usize;
let frame_len = 4 + data_len + FRAME_CRC_LEN;
if frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN {
self.rx_buffer.drain(..FRAME_START_RESPONSE.len());
continue;
}
if self.rx_buffer.len() < frame_len {
return Ok(None);
}
return Ok(Some(self.rx_buffer.drain(..frame_len).collect()));
}
}
fn next_sequence(&mut self) -> u32 {
let sequence = self.sequence;
self.sequence = self.sequence.wrapping_add(1);
sequence
}
}
@@ -320,7 +427,8 @@ impl SampleCollector for PollingSampleCollector {
let combined_force_raw = self.read_register(self.config.finger_addr, 168)?;
let combined_forces = crate::register::parse_combined_forces(&combined_force_raw, self.config.finger_addr)?;
let combined_forces =
crate::register::parse_combined_forces(&combined_force_raw, self.config.finger_addr)?;
let now = chrono::Utc::now().timestamp_micros() as u64;
@@ -336,6 +444,7 @@ impl SampleCollector for PollingSampleCollector {
}
}
#[allow(dead_code)]
fn make_collector(
config: &StreamConfig,
transport: SharedSerialTransport,

View File

@@ -118,7 +118,6 @@ impl From<u32> for SensorModule {
}
}
pub const SENSOR_MODULE_COUNT: usize = 28;
#[repr(C)]