add stream raw frame access

This commit is contained in:
lenn
2026-05-26 22:22:28 +08:00
parent 705375085f
commit 47722bb383
13 changed files with 358 additions and 85 deletions

View File

@@ -1,3 +1,4 @@
#include <algorithm>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <cstdint> #include <cstdint>
@@ -7,6 +8,7 @@
#include <queue> #include <queue>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector>
#include "../../include/eskin_ffi.h" #include "../../include/eskin_ffi.h"
@@ -64,20 +66,34 @@ static void demo_streaming(EskinDeviceHandle device, double duration_sec = 5.0)
// 线程安全队列(参考 ROS publisher 的 read_loop + publish_callback 分离模式) // 线程安全队列(参考 ROS publisher 的 read_loop + publish_callback 分离模式)
std::mutex mtx; std::mutex mtx;
std::queue<CFingerSample> queue; std::queue<CFingerSample> queue;
std::queue<std::vector<uint8_t>> raw_queue;
bool running = true; bool running = true;
// 读取线程:持续从设备读取 sample 放入队列 // 读取线程:持续从设备读取 sample 和原始帧放入队列
std::thread read_thread([&]() { std::thread read_thread([&]() {
while (running) { while (running) {
CFingerSample sample; CFingerSample sample;
memset(&sample, 0, sizeof(sample)); memset(&sample, 0, sizeof(sample));
auto e = eskin_read_sample(device, 50, &sample); auto e = eskin_read_sample(device, 50, &sample);
if (e == ESkinSuccess) { if (e == ESkinSuccess) {
uint8_t raw_buf[512] = {};
uint32_t raw_len = 0;
auto raw_err = eskin_read_stream_frame(
device, 1, raw_buf, sizeof(raw_buf), &raw_len);
std::vector<uint8_t> raw_frame;
if (raw_err == ESkinSuccess) {
raw_frame.assign(raw_buf, raw_buf + std::min<uint32_t>(raw_len, sizeof(raw_buf)));
}
std::lock_guard<std::mutex> lock(mtx); std::lock_guard<std::mutex> lock(mtx);
queue.push(sample); queue.push(sample);
raw_queue.push(std::move(raw_frame));
while (queue.size() > 100) { while (queue.size() > 100) {
queue.pop(); // 防止堆积 queue.pop(); // 防止堆积
} }
while (raw_queue.size() > 100) {
raw_queue.pop();
}
} }
// 超时等非致命错误忽略,继续读取 // 超时等非致命错误忽略,继续读取
} }
@@ -96,11 +112,18 @@ static void demo_streaming(EskinDeviceHandle device, double duration_sec = 5.0)
std::lock_guard<std::mutex> lock(mtx); std::lock_guard<std::mutex> lock(mtx);
while (!queue.empty()) { while (!queue.empty()) {
const auto &s = queue.front(); const auto &s = queue.front();
printf("[%5u] module=%u fx=%u fy=%u fz=%u\n", size_t raw_len = 0;
if (!raw_queue.empty()) {
raw_len = raw_queue.front().size();
raw_queue.pop();
}
printf("[%5u] module=%u fx=%u fy=%u fz=%u raw_len=%zu\n",
s.sequence, s.combined_force.module, s.sequence, s.combined_force.module,
s.combined_force.force.fx, s.combined_force.force.fx,
s.combined_force.force.fy, s.combined_force.force.fy,
s.combined_force.force.fz); s.combined_force.force.fz,
raw_len);
queue.pop(); queue.pop();
count++; count++;
} }

View File

@@ -126,6 +126,11 @@ class EskinDevice:
c_void_p, c_uint32, POINTER(CFingerSample) c_void_p, c_uint32, POINTER(CFingerSample)
] ]
lib.eskin_read_stream_frame.restype = c_uint32
lib.eskin_read_stream_frame.argtypes = [
c_void_p, c_uint32, POINTER(c_uint8), c_uint32, POINTER(c_uint32)
]
lib.eskin_get_mode.restype = c_uint32 lib.eskin_get_mode.restype = c_uint32
lib.eskin_get_mode.argtypes = [c_void_p, POINTER(c_uint32)] lib.eskin_get_mode.argtypes = [c_void_p, POINTER(c_uint32)]
@@ -271,6 +276,17 @@ class EskinDevice:
raise RuntimeError(f"read_sample failed: error={err}") raise RuntimeError(f"read_sample failed: error={err}")
return sample return sample
def read_stream_frame(self, timeout_ms: int = 200, max_len: int = 512) -> bytes:
"""读取一个 stream 原始完整协议帧(流模式下调用)"""
buf = (c_uint8 * max_len)()
actual = c_uint32(0)
err = self._lib.eskin_read_stream_frame(
self._handle, timeout_ms, buf, len(buf), ctypes.byref(actual)
)
if err != 0:
raise RuntimeError(f"read_stream_frame failed: error={err}")
return bytes(buf[:min(actual.value, len(buf))])
def get_mode(self) -> int: def get_mode(self) -> int:
"""查询当前设备模式0=Command, 1=Streaming""" """查询当前设备模式0=Command, 1=Streaming"""
out = c_uint32(0) out = c_uint32(0)

View File

@@ -33,14 +33,17 @@ def demo_streaming(dev: EskinDevice, duration_sec: float = 5.0):
# 线程安全的队列(参考 ROS demo 的 read_loop + publish_callback 分离模式) # 线程安全的队列(参考 ROS demo 的 read_loop + publish_callback 分离模式)
queue: deque = deque(maxlen=100) queue: deque = deque(maxlen=100)
raw_queue: deque = deque(maxlen=100)
running = True running = True
def read_loop(): def read_loop():
"""独立读取线程:持续从设备读取 sample""" """独立读取线程:持续从设备读取 sample 和原始帧"""
while running: while running:
try: try:
sample = dev.read_sample(timeout_ms=50) sample = dev.read_sample(timeout_ms=50)
queue.append(sample) queue.append(sample)
raw_frame = dev.read_stream_frame(timeout_ms=1)
raw_queue.append(raw_frame)
except RuntimeError: except RuntimeError:
# 超时等非致命错误,继续读取 # 超时等非致命错误,继续读取
pass pass
@@ -55,12 +58,14 @@ def demo_streaming(dev: EskinDevice, duration_sec: float = 5.0):
while time.monotonic() - start < duration_sec: while time.monotonic() - start < duration_sec:
if queue: if queue:
sample: CFingerSample = queue.popleft() sample: CFingerSample = queue.popleft()
raw_frame = raw_queue.popleft() if raw_queue else b""
f = sample.combined_force.force f = sample.combined_force.force
mod = sample.combined_force.module mod = sample.combined_force.module
print( print(
f"[{sample.sequence:5d}] " f"[{sample.sequence:5d}] "
f"module={mod} " f"module={mod} "
f"fx={f.fx} fy={f.fy} fz={f.fz}" f"fx={f.fx} fy={f.fy} fz={f.fz} "
f"raw_len={len(raw_frame)}"
) )
count += 1 count += 1
else: else:

View File

@@ -135,6 +135,13 @@ typedef struct {
EskinSdkErrorCode eskin_start_stream(EskinDeviceHandle handle); EskinSdkErrorCode eskin_start_stream(EskinDeviceHandle handle);
EskinSdkErrorCode eskin_stop_stream(EskinDeviceHandle handle); EskinSdkErrorCode eskin_stop_stream(EskinDeviceHandle handle);
EskinSdkErrorCode eskin_read_sample(EskinDeviceHandle handle, uint32_t timeout_ms, CFingerSample* out); EskinSdkErrorCode eskin_read_sample(EskinDeviceHandle handle, uint32_t timeout_ms, CFingerSample* out);
EskinSdkErrorCode eskin_read_stream_frame(
EskinDeviceHandle handle,
uint32_t timeout_ms,
uint8_t* buf,
uint32_t buf_len,
uint32_t* actual_len
);
EskinSdkErrorCode eskin_get_mode(EskinDeviceHandle handle, uint32_t* out); EskinSdkErrorCode eskin_get_mode(EskinDeviceHandle handle, uint32_t* out);
#ifdef __cplusplus #ifdef __cplusplus

View File

@@ -37,6 +37,9 @@ pub struct ChannelManager {
pub sample_tx: Sender<FingerSample>, pub sample_tx: Sender<FingerSample>,
pub sample_rx: Receiver<FingerSample>, pub sample_rx: Receiver<FingerSample>,
pub raw_frame_tx: Sender<Vec<u8>>,
pub raw_frame_rx: Receiver<Vec<u8>>,
pub cmd_tx: Sender<DeviceCommand>, pub cmd_tx: Sender<DeviceCommand>,
pub cmd_rx: Receiver<DeviceCommand>, pub cmd_rx: Receiver<DeviceCommand>,
@@ -56,12 +59,15 @@ impl ChannelManager {
drop_policy: DropPolicy, drop_policy: DropPolicy,
) -> Self { ) -> Self {
let (sample_tx, sample_rx) = bounded(sample_capacity); let (sample_tx, sample_rx) = bounded(sample_capacity);
let (raw_frame_tx, raw_frame_rx) = bounded(sample_capacity);
let (cmd_tx, cmd_rx) = bounded(cmd_capacity); let (cmd_tx, cmd_rx) = bounded(cmd_capacity);
let (event_tx, event_rx) = bounded(event_capacity); let (event_tx, event_rx) = bounded(event_capacity);
Self { Self {
sample_tx, sample_tx,
sample_rx, sample_rx,
raw_frame_tx,
raw_frame_rx,
cmd_tx, cmd_tx,
cmd_rx, cmd_rx,
event_tx, event_tx,
@@ -111,6 +117,46 @@ impl ChannelManager {
}) })
} }
pub fn send_raw_frame(&self, frame: Vec<u8>) -> Result<(), SdkError> {
match self.drop_policy {
DropPolicy::DropNewest => match self.raw_frame_tx.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.record_sample_drop();
Ok(())
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
},
DropPolicy::DropOldest => match self.raw_frame_tx.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(frame)) => {
let _ = self.raw_frame_rx.try_recv();
self.record_sample_drop();
match self.raw_frame_tx.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.record_sample_drop();
Ok(())
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
}
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
},
}
}
pub fn recv_raw_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError> {
let timeout = std::time::Duration::from_millis(timeout_ms as u64);
self.raw_frame_rx
.recv_timeout(timeout)
.map_err(|err| match err {
RecvTimeoutError::Timeout => SdkError::Timeout,
RecvTimeoutError::Disconnected => SdkError::ChannelClosed,
})
}
pub fn send_cmd(&self, cmd: DeviceCommand) -> Result<(), SdkError> { pub fn send_cmd(&self, cmd: DeviceCommand) -> Result<(), SdkError> {
self.cmd_tx.try_send(cmd).map_err(|err| match err { self.cmd_tx.try_send(cmd).map_err(|err| match err {
TrySendError::Full(_) => SdkError::BufferOverflow(1), TrySendError::Full(_) => SdkError::BufferOverflow(1),

View File

@@ -6,8 +6,7 @@ use crate::{
config::{DeviceConfig, DeviceInfo}, config::{DeviceConfig, DeviceInfo},
error::SdkError, error::SdkError,
protocol::{ protocol::{
EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec, EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec, ReadRequest, WriteRequest,
ReadRequest, WriteRequest,
}, },
stream::{StreamConfig, StreamController, StreamRuntime}, stream::{StreamConfig, StreamController, StreamRuntime},
transport::{SerialTransport, SharedSerialTransport}, transport::{SerialTransport, SharedSerialTransport},
@@ -55,7 +54,7 @@ impl EskinDeviceInner {
mode: DeviceMode::Command, mode: DeviceMode::Command,
transport: Arc::new(Mutex::new(transport)), transport: Arc::new(Mutex::new(transport)),
codec: Box::new(EskinProtocolCodec), codec: Box::new(EskinProtocolCodec),
stream: None stream: None,
} }
} }
@@ -75,7 +74,7 @@ impl EskinDeviceInner {
mode: DeviceMode::Command, mode: DeviceMode::Command,
transport, transport,
codec: Box::new(EskinProtocolCodec), codec: Box::new(EskinProtocolCodec),
stream: None stream: None,
} }
} }
@@ -94,9 +93,18 @@ impl EskinDeviceInner {
let remaining = deadline let remaining = deadline
.checked_duration_since(std::time::Instant::now()) .checked_duration_since(std::time::Instant::now())
.unwrap_or(std::time::Duration::from_millis(1)); .unwrap_or(std::time::Duration::from_millis(1));
debug_println!("[device] read_exact: need {} bytes, have {} so far, remaining timeout: {:?}", buf.len() - offset, offset, remaining); debug_println!(
"[device] read_exact: need {} bytes, have {} so far, remaining timeout: {:?}",
buf.len() - offset,
offset,
remaining
);
let n = transport.read(&mut buf[offset..], Duration::from_std(remaining).unwrap())?; let n = transport.read(&mut buf[offset..], Duration::from_std(remaining).unwrap())?;
debug_println!("[device] read_exact: got {} bytes: {:02X?}", n, &buf[offset..offset + n]); debug_println!(
"[device] read_exact: got {} bytes: {:02X?}",
n,
&buf[offset..offset + n]
);
if n == 0 { if n == 0 {
return Err(SdkError::Timeout); return Err(SdkError::Timeout);
@@ -155,7 +163,7 @@ impl EskinDeviceInner {
fn ensure_command_mode(&self) -> Result<(), SdkError> { fn ensure_command_mode(&self) -> Result<(), SdkError> {
match self.mode { match self.mode {
DeviceMode::Command => Ok(()), DeviceMode::Command => Ok(()),
DeviceMode::Streaming => Err(SdkError::StreamingBusy) DeviceMode::Streaming => Err(SdkError::StreamingBusy),
} }
} }
@@ -170,8 +178,6 @@ impl EskinDeviceInner {
pub fn shared_transport(&self) -> SharedSerialTransport { pub fn shared_transport(&self) -> SharedSerialTransport {
Arc::clone(&self.transport) Arc::clone(&self.transport)
} }
} }
pub trait EskinDeviceFunc { pub trait EskinDeviceFunc {
@@ -188,7 +194,8 @@ pub trait EskinDeviceFunc {
impl EskinDeviceFunc for EskinDeviceInner { impl EskinDeviceFunc for EskinDeviceInner {
fn read_hdw_version(&mut self) -> Result<String, SdkError> { fn read_hdw_version(&mut self) -> Result<String, SdkError> {
let hdw = self.read_register(0, 2) let hdw = self
.read_register(0, 2)
.map_err(|_| SdkError::FrameError("read hardware version failed".into()))?; .map_err(|_| SdkError::FrameError("read hardware version failed".into()))?;
let version = format!("{}.{}", hdw[0], hdw[1]); let version = format!("{}.{}", hdw[0], hdw[1]);
@@ -196,39 +203,45 @@ impl EskinDeviceFunc for EskinDeviceInner {
} }
fn read_matrix_row(&mut self) -> Result<u8, SdkError> { fn read_matrix_row(&mut self) -> Result<u8, SdkError> {
let row = self.read_register(0x0015, 1) let row = self
.read_register(0x0015, 1)
.map_err(|_| SdkError::FrameError("read matrix row failed".into()))?; .map_err(|_| SdkError::FrameError("read matrix row failed".into()))?;
Ok(row[0]) Ok(row[0])
} }
fn read_matrix_col(&mut self) -> Result<u8, SdkError> { fn read_matrix_col(&mut self) -> Result<u8, SdkError> {
let col = self.read_register(0x0014, 1) let col = self
.read_register(0x0014, 1)
.map_err(|_| SdkError::FrameError("read matrix col failed".into()))?; .map_err(|_| SdkError::FrameError("read matrix col failed".into()))?;
Ok(col[0]) Ok(col[0])
} }
fn write_matrix_row(&mut self, row: u8) -> Result<u16, SdkError> { fn write_matrix_row(&mut self, row: u8) -> Result<u16, SdkError> {
let res = self.write_register(0x0015, &[row]) let res = self
.write_register(0x0015, &[row])
.map_err(|_| SdkError::FrameError("write matrix row failed".into()))?; .map_err(|_| SdkError::FrameError("write matrix row failed".into()))?;
Ok(res) Ok(res)
} }
fn write_matrix_col(&mut self, col: u8) -> Result<u16, SdkError> { fn write_matrix_col(&mut self, col: u8) -> Result<u16, SdkError> {
let res = self.write_register(0x0015, &[col]) let res = self
.write_register(0x0015, &[col])
.map_err(|_| SdkError::FrameError("write matrix row failed".into()))?; .map_err(|_| SdkError::FrameError("write matrix row failed".into()))?;
Ok(res) Ok(res)
} }
fn read_device_config1(&mut self) -> Result<u8, SdkError> { fn read_device_config1(&mut self) -> Result<u8, SdkError> {
let enabled = self.read_register(0x0017, 1) let enabled = self
.read_register(0x0017, 1)
.map_err(|_| SdkError::FrameError("read device config1 failed".into()))?; .map_err(|_| SdkError::FrameError("read device config1 failed".into()))?;
Ok(enabled[0]) Ok(enabled[0])
} }
fn read_device_config2(&mut self) -> Result<u8, SdkError> { fn read_device_config2(&mut self) -> Result<u8, SdkError> {
let enabled = self.read_register(0x0018, 1) let enabled = self
.read_register(0x0018, 1)
.map_err(|_| SdkError::FrameError("read device config2 failed".into()))?; .map_err(|_| SdkError::FrameError("read device config2 failed".into()))?;
Ok(enabled[0]) Ok(enabled[0])
} }
@@ -255,6 +268,7 @@ pub trait EskinDevice {
fn start_stream(&mut self) -> Result<(), SdkError>; fn start_stream(&mut self) -> Result<(), SdkError>;
fn stop_stream(&mut self) -> Result<(), SdkError>; fn stop_stream(&mut self) -> Result<(), SdkError>;
fn read_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>; fn read_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>;
fn read_stream_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError>;
fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>; fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>;
fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError>; fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError>;
fn write_register(&mut self, addr: u32, data: &[u8]) -> Result<u16, SdkError>; fn write_register(&mut self, addr: u32, data: &[u8]) -> Result<u16, SdkError>;
@@ -348,6 +362,10 @@ impl EskinDevice for EskinDeviceInner {
self.channels.recv_sample(timeout_ms) self.channels.recv_sample(timeout_ms)
} }
fn read_stream_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError> {
self.channels.recv_raw_frame(timeout_ms)
}
fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> { fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
self.channels.recv_event(timeout_ms) self.channels.recv_event(timeout_ms)
} }

View File

@@ -72,5 +72,5 @@ pub enum SdkError {
DeviceError(u16), DeviceError(u16),
#[error("Device is in streaming mode, command not allowed")] #[error("Device is in streaming mode, command not allowed")]
StreamingBusy StreamingBusy,
} }

View File

@@ -1,9 +1,9 @@
use std::{ptr};
use std::ffi::{CStr, c_char};
use crate::device::{DeviceMode, EskinDevice, EskinDeviceFunc}; use crate::device::{DeviceMode, EskinDevice, EskinDeviceFunc};
use crate::transport::SerialPortTransport; use crate::transport::SerialPortTransport;
use crate::types::{CombinedForce, FingerSample}; use crate::types::{CombinedForce, FingerSample};
use crate::{config::DeviceConfig, device::EskinDeviceInner, error::SdkErrorCode}; use crate::{config::DeviceConfig, device::EskinDeviceInner, error::SdkErrorCode};
use std::ffi::{CStr, c_char};
use std::ptr;
pub type EskinDeviceHandle = *mut core::ffi::c_void; pub type EskinDeviceHandle = *mut core::ffi::c_void;
@@ -67,10 +67,13 @@ fn sdk_error_to_code(err: crate::error::SdkError) -> SdkErrorCode {
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn eskin_version() -> EskinSdkVersion { pub extern "C" fn eskin_version() -> EskinSdkVersion {
EskinSdkVersion { major: 0, minor: 1, patch: 0 } EskinSdkVersion {
major: 0,
minor: 1,
patch: 0,
}
} }
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub unsafe extern "C" fn eskin_open( pub unsafe extern "C" fn eskin_open(
path: *const c_char, path: *const c_char,
@@ -80,11 +83,9 @@ pub unsafe extern "C" fn eskin_open(
return ptr::null_mut(); return ptr::null_mut();
} }
let path_str = match unsafe { let path_str = match unsafe { CStr::from_ptr(path) }.to_str() {
CStr::from_ptr(path)
}.to_str() {
Ok(s) => s.to_string(), Ok(s) => s.to_string(),
Err(_) => return ptr::null_mut() Err(_) => return ptr::null_mut(),
}; };
let device_config = if config.is_null() { let device_config = if config.is_null() {
@@ -100,9 +101,7 @@ pub unsafe extern "C" fn eskin_open(
return ptr::null_mut(); return ptr::null_mut();
} }
let wrapper = Box::new(DeviceWrapper { let wrapper = Box::new(DeviceWrapper { device });
device,
});
Box::into_raw(wrapper) as EskinDeviceHandle Box::into_raw(wrapper) as EskinDeviceHandle
} }
@@ -434,12 +433,38 @@ pub unsafe extern "C" fn eskin_read_sample(
} }
} }
/// 读取一个 stream 原始完整协议帧(流模式下调用)
#[unsafe(no_mangle)]
pub unsafe extern "C" fn eskin_read_stream_frame(
handle: EskinDeviceHandle,
timeout_ms: u32,
buf: *mut u8,
buf_len: u32,
actual_len: *mut u32,
) -> SdkErrorCode {
if handle.is_null() || buf.is_null() || actual_len.is_null() {
return SdkErrorCode::InvalidPointer;
}
let wrapper = unsafe { &mut *(handle as *mut DeviceWrapper) };
let frame = match wrapper.device.read_stream_frame(timeout_ms) {
Ok(frame) => frame,
Err(e) => return sdk_error_to_code(e),
};
let copy_len = std::cmp::min(frame.len(), buf_len as usize);
unsafe {
ptr::copy_nonoverlapping(frame.as_ptr(), buf, copy_len);
*actual_len = frame.len() as u32;
}
SdkErrorCode::Success
}
/// 查询当前设备模式Command=0, Streaming=1 /// 查询当前设备模式Command=0, Streaming=1
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub unsafe extern "C" fn eskin_get_mode( pub unsafe extern "C" fn eskin_get_mode(handle: EskinDeviceHandle, out: *mut u32) -> SdkErrorCode {
handle: EskinDeviceHandle,
out: *mut u32,
) -> SdkErrorCode {
if handle.is_null() || out.is_null() { if handle.is_null() || out.is_null() {
return SdkErrorCode::InvalidPointer; return SdkErrorCode::InvalidPointer;
} }

View File

@@ -1,10 +1,10 @@
use std::io::{self, BufRead};
use eskin_finger_sdk::{ use eskin_finger_sdk::{
config::DeviceConfig, config::DeviceConfig,
device::{EskinDevice, EskinDeviceFunc, EskinDeviceInner}, device::{EskinDevice, EskinDeviceFunc, EskinDeviceInner},
error::SdkError, error::SdkError,
transport::SerialPortTransport, transport::SerialPortTransport,
}; };
use std::io::{self, BufRead};
fn main() { fn main() {
// let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600); // let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600);
// let config = DeviceConfig::default(); // let config = DeviceConfig::default();
@@ -35,7 +35,6 @@ fn read_check_group(device: &mut EskinDeviceInner) {
print_payload_data(&group); print_payload_data(&group);
} }
fn read_row(device: &mut EskinDeviceInner) { fn read_row(device: &mut EskinDeviceInner) {
let row = device.read_register(0x0015, 1).unwrap(); let row = device.read_register(0x0015, 1).unwrap();
print_payload_data(&row); print_payload_data(&row);
@@ -92,9 +91,20 @@ fn stream_demo() {
if count % 5 == 0 { if count % 5 == 0 {
println!( println!(
"[#{count} seq={}] combined_force={:?}", "[#{count} seq={}] combined_force={:?}",
sample.sequence, sample.sequence, sample.combined_forces
sample.combined_forces
); );
match device.read_stream_frame(20) {
Ok(frame) => {
println!("raw_frame len={} [{}]", frame.len(), format_hex(&frame));
}
Err(SdkError::Timeout) => {
println!("raw_frame timeout");
}
Err(e) => {
eprintln!("read_stream_frame error: {e}");
}
}
} }
} }
Err(SdkError::Timeout) => continue, Err(SdkError::Timeout) => continue,
@@ -107,7 +117,10 @@ fn stream_demo() {
// 回到 Command 模式 // 回到 Command 模式
device.stop_stream().unwrap(); device.stop_stream().unwrap();
println!("Stream stopped, total samples: {count}, mode: {:?}", device.mode()); println!(
"Stream stopped, total samples: {count}, mode: {:?}",
device.mode()
);
// Stream 停止后Command 操作恢复正常 // Stream 停止后Command 操作恢复正常
println!("Row: {}", device.read_matrix_row().unwrap()); println!("Row: {}", device.read_matrix_row().unwrap());
@@ -119,9 +132,19 @@ fn print_payload_data(data: &[u8]) {
for (i, chunk) in data.chunks(2).enumerate() { for (i, chunk) in data.chunks(2).enumerate() {
if chunk.len() == 2 { if chunk.len() == 2 {
let val = u16::from_le_bytes([chunk[0], chunk[1]]); let val = u16::from_le_bytes([chunk[0], chunk[1]]);
println!(" [{:3}] [{:02X}] [{:02X}] => {}", i, chunk[0], chunk[1], val); println!(
" [{:3}] [{:02X}] [{:02X}] => {}",
i, chunk[0], chunk[1], val
);
} else { } else {
println!(" [{:3}] [{:02X}] (odd byte)", i, chunk[0]); println!(" [{:3}] [{:02X}] (odd byte)", i, chunk[0]);
} }
} }
} }
fn format_hex(data: &[u8]) -> String {
data.iter()
.map(|byte| format!("{byte:02X}"))
.collect::<Vec<_>>()
.join(" ")
}

View File

@@ -409,7 +409,7 @@ mod tests {
let req = ReadRequest { let req = ReadRequest {
device_addr: 0x34, device_addr: 0x34,
start_addr: 0x1C00, start_addr: 0x1C00,
read_byte_count: 168 read_byte_count: 168,
}; };
let frame = codec().encode_read_request(&req).unwrap(); let frame = codec().encode_read_request(&req).unwrap();

View File

@@ -146,7 +146,6 @@ impl RegisterMap for EskinRegisterMap {
} }
} }
pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result<CombinedForce, SdkError> { pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result<CombinedForce, SdkError> {
// println!("{:02X?}", raw); // println!("{:02X?}", raw);
// const MODULE_COUNT: usize = 28; // const MODULE_COUNT: usize = 28;
@@ -178,10 +177,13 @@ pub fn parse_combined_forces(raw: &[u8], addr: u32) -> Result<CombinedForce, Sdk
.sum(); .sum();
let force = CombinedForce { let force = CombinedForce {
module: addr.into(), module: addr.into(),
force: Force3D { fx: 0, fy: 0, fz: comb_force } force: Force3D {
fx: 0,
fy: 0,
fz: comb_force,
},
}; };
Ok(force) Ok(force)
} }

View File

@@ -3,7 +3,6 @@ use std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
}; };
use crate::{config, register::{REG_COMBINED_FORCE, REG_MODULE_ERROR}};
use chrono::Duration; use chrono::Duration;
use crate::{ use crate::{
@@ -14,9 +13,9 @@ use crate::{
types::{FingerSample, SensorModule}, types::{FingerSample, SensorModule},
}; };
use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ReadRequest}; use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, ReadRequest};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::Duration as StdDuration; use std::time::{Duration as StdDuration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamMode { pub enum StreamMode {
Polling, Polling,
@@ -43,7 +42,7 @@ impl Default for StreamConfig {
poll_interval_ms: 10, poll_interval_ms: 10,
device_addr: 0x34, device_addr: 0x34,
read_timeout_ms: 100, read_timeout_ms: 100,
finger_addr: 0x1C00 finger_addr: 0x1C00,
} }
} }
} }
@@ -53,6 +52,7 @@ pub trait StreamController: Send {
fn stop(&mut self) -> Result<(), SdkError>; fn stop(&mut self) -> Result<(), SdkError>;
fn is_running(&self) -> bool; fn is_running(&self) -> bool;
fn next_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>; fn next_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>;
fn next_raw_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError>;
fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>; fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>;
} }
@@ -94,12 +94,13 @@ impl StreamController for StreamRuntime {
return Err(SdkError::AlreadyStreaming); return Err(SdkError::AlreadyStreaming);
} }
let collector = make_collector(&config, Arc::clone(&self.transport)); // Old synchronous collector path:
// let collector = make_collector(&config, Arc::clone(&self.transport));
let worker = StreamWorker::new( let worker = StreamWorker::new(
Arc::clone(&self.running), Arc::clone(&self.running),
Arc::clone(&self.channels), Arc::clone(&self.channels),
Arc::clone(&self.transport),
config.clone(), config.clone(),
collector,
) )
.spawn(); .spawn();
@@ -133,6 +134,10 @@ impl StreamController for StreamRuntime {
self.channels.recv_sample(timeout_ms) self.channels.recv_sample(timeout_ms)
} }
fn next_raw_frame(&self, timeout_ms: u32) -> Result<Vec<u8>, SdkError> {
self.channels.recv_raw_frame(timeout_ms)
}
fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> { fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
self.channels.recv_event(timeout_ms) self.channels.recv_event(timeout_ms)
} }
@@ -141,22 +146,30 @@ impl StreamController for StreamRuntime {
pub struct StreamWorker { pub struct StreamWorker {
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
channels: Arc<ChannelManager>, channels: Arc<ChannelManager>,
transport: SharedSerialTransport,
config: StreamConfig, config: StreamConfig,
collector: Box<dyn SampleCollector>, codec: Box<dyn ProtocolCodec>,
rx_buffer: Vec<u8>,
sequence: u32,
next_request_at: Instant,
} }
impl StreamWorker { impl StreamWorker {
pub fn new( pub fn new(
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
channels: Arc<ChannelManager>, channels: Arc<ChannelManager>,
transport: SharedSerialTransport,
config: StreamConfig, config: StreamConfig,
collector: Box<dyn SampleCollector>,
) -> Self { ) -> Self {
Self { Self {
running, running,
channels, channels,
transport,
config, config,
collector, codec: Box::new(EskinProtocolCodec),
rx_buffer: Vec::with_capacity(512),
sequence: 0,
next_request_at: Instant::now(),
} }
} }
@@ -175,30 +188,124 @@ impl StreamWorker {
break; break;
} }
thread::sleep(StdDuration::from_millis( thread::sleep(StdDuration::from_millis(1));
self.config.poll_interval_ms as u64,
));
} }
} }
fn tick(&mut self) -> Result<(), SdkError> { fn tick(&mut self) -> Result<(), SdkError> {
// let _transport = self // Old synchronous polling path kept for reference:
// .transport //
// .lock() // let Some(sample) = self.collector.collect_once()? else {
// .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?; // return Ok(());
// };
// self.channels.send_sample(sample)?;
let Some(sample) = self.collector.collect_once()? else { self.send_request_if_due()?;
self.read_and_decode_available()?;
Ok(())
}
fn send_request_if_due(&mut self) -> Result<(), SdkError> {
if Instant::now() < self.next_request_at {
return Ok(()); return Ok(());
}
let request = ReadRequest {
device_addr: self.config.device_addr,
start_addr: self.config.finger_addr,
read_byte_count: 168,
};
let request_frame = self.codec.encode_read_request(&request)?;
{
let mut transport = self
.transport
.lock()
.map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
transport.write(&request_frame)?;
}
let interval = StdDuration::from_millis(self.config.poll_interval_ms as u64);
self.next_request_at = Instant::now() + interval.max(StdDuration::from_millis(1));
Ok(())
}
fn read_and_decode_available(&mut self) -> Result<(), SdkError> {
let mut buf = [0u8; 256];
let read_result = {
let mut transport = self
.transport
.lock()
.map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
transport.read(&mut buf, Duration::milliseconds(1))
};
match read_result {
Ok(0) | Err(SdkError::Timeout) => return Ok(()),
Ok(n) => self.rx_buffer.extend_from_slice(&buf[..n]),
Err(err) => return Err(err),
}
while let Some(frame) = self.pop_next_frame()? {
self.channels.send_raw_frame(frame.clone())?;
let response = self.codec.decode_read_response(&frame)?;
if response.start_addr != self.config.finger_addr {
continue;
}
let combined_forces =
crate::register::parse_combined_forces(&response.data, response.start_addr)?;
let sample = FingerSample {
timestamp_us: chrono::Utc::now().timestamp_micros() as u64,
sequence: self.next_sequence(),
combined_forces,
}; };
self.channels.send_sample(sample)?; self.channels.send_sample(sample)?;
}
Ok(()) Ok(())
// TODO: }
// 1. encode read request
// 2. transport.write() fn pop_next_frame(&mut self) -> Result<Option<Vec<u8>>, SdkError> {
// 3. transport.read() loop {
// 4. protocol.decode() let Some(start) = self
// 5. register parse .rx_buffer
// 6. channels.send_sample() .windows(FRAME_START_RESPONSE.len())
.position(|window| window == FRAME_START_RESPONSE)
else {
self.rx_buffer.clear();
return Ok(None);
};
if start > 0 {
self.rx_buffer.drain(..start);
}
if self.rx_buffer.len() < 4 {
return Ok(None);
}
let data_len = u16::from_le_bytes([self.rx_buffer[2], self.rx_buffer[3]]) as usize;
let frame_len = 4 + data_len + FRAME_CRC_LEN;
if frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN {
self.rx_buffer.drain(..FRAME_START_RESPONSE.len());
continue;
}
if self.rx_buffer.len() < frame_len {
return Ok(None);
}
return Ok(Some(self.rx_buffer.drain(..frame_len).collect()));
}
}
fn next_sequence(&mut self) -> u32 {
let sequence = self.sequence;
self.sequence = self.sequence.wrapping_add(1);
sequence
} }
} }
@@ -320,7 +427,8 @@ impl SampleCollector for PollingSampleCollector {
let combined_force_raw = self.read_register(self.config.finger_addr, 168)?; let combined_force_raw = self.read_register(self.config.finger_addr, 168)?;
let combined_forces = crate::register::parse_combined_forces(&combined_force_raw, self.config.finger_addr)?; let combined_forces =
crate::register::parse_combined_forces(&combined_force_raw, self.config.finger_addr)?;
let now = chrono::Utc::now().timestamp_micros() as u64; let now = chrono::Utc::now().timestamp_micros() as u64;
@@ -336,6 +444,7 @@ impl SampleCollector for PollingSampleCollector {
} }
} }
#[allow(dead_code)]
fn make_collector( fn make_collector(
config: &StreamConfig, config: &StreamConfig,
transport: SharedSerialTransport, transport: SharedSerialTransport,

View File

@@ -118,7 +118,6 @@ impl From<u32> for SensorModule {
} }
} }
pub const SENSOR_MODULE_COUNT: usize = 28; pub const SENSOR_MODULE_COUNT: usize = 28;
#[repr(C)] #[repr(C)]