feat: integrate StreamRuntime with EskinDevice, add streaming mode demo

- Separate DeviceState (connection lifecycle) and DeviceMode (Command/Streaming)
- Add stream: Option<StreamRuntime> to EskinDeviceInner
- Wire up start_stream/stop_stream with StreamRuntime (worker thread)
- Add StreamingBusy error for command-mode guard
- Fix stream frame total_len calculation (was double-counting status byte)
- Add ensure_command_mode() check on read_register/write_register
- close() now auto-stops stream if active
- Add stream_demo() in main.rs with Enter-to-stop loop
- Add finger_addr config to StreamConfig
This commit is contained in:
lenn
2026-05-07 17:27:40 +08:00
parent 1b1ab46f1d
commit c195234771
6 changed files with 150 additions and 34 deletions

View File

@@ -9,7 +9,7 @@ use crate::{
EskinProtocolCodec, FRAME_START_RESPONSE, ProtocolCodec,
ReadRequest, WriteRequest,
},
stream::StreamRuntime,
stream::{StreamConfig, StreamController, StreamRuntime},
transport::{SerialTransport, SharedSerialTransport},
types::FingerSample,
};
@@ -18,17 +18,24 @@ use crate::{
pub enum DeviceState {
Closed,
Open,
Streaming,
Error,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeviceMode {
Command,
Streaming,
}
pub struct EskinDeviceInner {
pub info: DeviceInfo,
pub config: DeviceConfig,
pub channels: Arc<ChannelManager>,
pub state: DeviceState,
pub mode: DeviceMode,
pub transport: SharedSerialTransport,
pub codec: Box<dyn ProtocolCodec>,
stream: Option<StreamRuntime>,
}
impl EskinDeviceInner {
@@ -45,8 +52,10 @@ impl EskinDeviceInner {
config,
channels: Arc::new(channels),
state: DeviceState::Closed,
mode: DeviceMode::Command,
transport: Arc::new(Mutex::new(transport)),
codec: Box::new(EskinProtocolCodec),
stream: None
}
}
@@ -63,8 +72,10 @@ impl EskinDeviceInner {
config,
channels: Arc::new(channels),
state: DeviceState::Closed,
mode: DeviceMode::Command,
transport,
codec: Box::new(EskinProtocolCodec),
stream: None
}
}
@@ -135,12 +146,19 @@ impl EskinDeviceInner {
fn ensure_open(&self) -> Result<(), SdkError> {
match self.state {
DeviceState::Open | DeviceState::Streaming => Ok(()),
DeviceState::Open => Ok(()),
DeviceState::Closed => Err(SdkError::NotInitialized),
DeviceState::Error => Err(SdkError::InternalError("device is in error state".into())),
}
}
fn ensure_command_mode(&self) -> Result<(), SdkError> {
match self.mode {
DeviceMode::Command => Ok(()),
DeviceMode::Streaming => Err(SdkError::StreamingBusy)
}
}
pub fn channels(&self) -> Arc<ChannelManager> {
Arc::clone(&self.channels)
}
@@ -230,6 +248,7 @@ pub trait EskinDevice {
fn open(&mut self) -> Result<(), SdkError>;
fn close(&mut self) -> Result<(), SdkError>;
fn state(&self) -> DeviceState;
fn mode(&self) -> DeviceMode;
fn device_info(&self) -> Result<DeviceInfo, SdkError>;
fn config(&self) -> &DeviceConfig;
fn apply_config(&mut self, config: DeviceConfig) -> Result<(), SdkError>;
@@ -253,6 +272,9 @@ impl EskinDevice for EskinDeviceInner {
}
fn close(&mut self) -> Result<(), SdkError> {
if self.mode == DeviceMode::Streaming {
self.stop_stream()?;
}
{
let mut transport = self.lock_transport()?;
transport.close()?;
@@ -265,6 +287,10 @@ impl EskinDevice for EskinDeviceInner {
self.state
}
fn mode(&self) -> DeviceMode {
self.mode
}
fn device_info(&self) -> Result<DeviceInfo, SdkError> {
Ok(self.info.clone())
}
@@ -279,26 +305,42 @@ impl EskinDevice for EskinDeviceInner {
}
fn start_stream(&mut self) -> Result<(), SdkError> {
if self.state == DeviceState::Streaming {
return Err(SdkError::AlreadyStreaming);
self.ensure_open()?;
if self.mode == DeviceMode::Streaming {
return Err(SdkError::StreamingBusy);
}
if self.state != DeviceState::Open {
return Err(SdkError::NotInitialized);
}
let stream_config = StreamConfig {
mode: crate::stream::StreamMode::Polling,
device_addr: self.config.device_addr,
read_timeout_ms: self.config.read_timeout_ms,
..Default::default()
};
println!("stream_config: {:?}", stream_config);
let mut runtime = self.create_stream_runtime();
runtime.start(stream_config)?;
self.stream = Some(runtime);
self.mode = DeviceMode::Streaming;
self.state = DeviceState::Streaming;
self.channels.send_event(DeviceEvent::StreamStarted)?;
Ok(())
}
fn stop_stream(&mut self) -> Result<(), SdkError> {
if self.state != DeviceState::Streaming {
if self.mode != DeviceMode::Streaming {
return Err(SdkError::NotStreaming);
}
self.state = DeviceState::Open;
self.channels.send_event(DeviceEvent::StreamStopped)?;
if let Some(mut runtime) = self.stream.take() {
// Worker 可能已经因为 I/O 错误自行停止,忽略 NotStreaming
match runtime.stop() {
Ok(()) | Err(SdkError::NotStreaming) => {}
Err(e) => return Err(e),
}
}
self.mode = DeviceMode::Command;
Ok(())
}
@@ -312,7 +354,7 @@ impl EskinDevice for EskinDeviceInner {
fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError> {
self.ensure_open()?;
self.ensure_command_mode()?;
let request = ReadRequest {
device_addr: self.config.device_addr,
start_addr: addr,
@@ -334,7 +376,7 @@ impl EskinDevice for EskinDeviceInner {
fn write_register(&mut self, addr: u32, data: &[u8]) -> Result<u16, SdkError> {
self.ensure_open()?;
self.ensure_command_mode()?;
let request = WriteRequest {
device_addr: self.config.device_addr,
start_addr: addr,

View File

@@ -18,6 +18,7 @@ pub enum SdkErrorCode {
FrameError = 15,
ProtocolError = 16,
DeviceError = 17,
StreamingBusy = 18,
}
#[derive(Debug, thiserror::Error)]
@@ -69,4 +70,7 @@ pub enum SdkError {
#[error("Device error: status 0x{0:04X}")]
DeviceError(u16),
#[error("Device is in streaming mode, command not allowed")]
StreamingBusy
}

View File

@@ -50,6 +50,7 @@ fn sdk_error_to_code(err: crate::error::SdkError) -> SdkErrorCode {
crate::error::SdkError::BufferOverflow(_) => SdkErrorCode::BufferOverflow,
crate::error::SdkError::InvalidParameter(_) => SdkErrorCode::InvalidParameter,
crate::error::SdkError::ProtocolError(_) => SdkErrorCode::ProtocolError,
crate::error::SdkError::StreamingBusy => SdkErrorCode::StreamingBusy,
}
}

View File

@@ -1,21 +1,28 @@
use eskin_finger_sdk::{config::DeviceConfig, device::{EskinDevice, EskinDeviceFunc, EskinDeviceInner}, transport::SerialPortTransport};
use std::io::{self, BufRead};
use eskin_finger_sdk::{
config::DeviceConfig,
device::{EskinDevice, EskinDeviceFunc, EskinDeviceInner},
error::SdkError,
transport::SerialPortTransport,
};
fn main() {
let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600);
let config = DeviceConfig::default();
let mut device = EskinDeviceInner::new(config, Box::new(transport));
device.open().unwrap();
// let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600);
// let config = DeviceConfig::default();
// let mut device = EskinDeviceInner::new(config, Box::new(transport));
// device.open().unwrap();
// let data = device.read_register(0x1C00, 168).unwrap();
// print_payload_data(&data);
// // let data = device.read_register(0x1C00, 168).unwrap();
// // print_payload_data(&data);
read_hdv(&mut device);
read_check_group(&mut device);
read_row(&mut device);
write_col(&mut device, &[0x08]);
read_col(&mut device);
read_config(&mut device);
// read_hdv(&mut device);
// read_check_group(&mut device);
// read_row(&mut device);
// write_col(&mut device, &[0x08]);
// read_col(&mut device);
// read_config(&mut device);
device.close().unwrap();
// device.close().unwrap();
stream_demo();
}
fn read_hdv(device: &mut EskinDeviceInner) {
@@ -48,6 +55,66 @@ fn read_config(device: &mut EskinDeviceInner) {
print_payload_data(&conf);
}
/// Stream 模式演示:后台线程持续采集,主线程消费 sample
/// 按 Enter 键停止流式采集
fn stream_demo() {
let transport = SerialPortTransport::new("/dev/ttyUSB0", 921600);
let config = DeviceConfig::default();
let mut device = EskinDeviceInner::new(config, Box::new(transport));
device.open().unwrap();
println!("Hardware version: {}", device.read_hdw_version().unwrap());
// 进入 Streaming 模式
device.start_stream().unwrap();
println!("Stream started, mode: {:?}", device.mode());
println!("Press Enter to stop...");
// 用 stdin 阻塞线程来检测用户输入,实现优雅退出
let (tx, rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
let stdin = io::stdin();
stdin.lock().lines().next();
let _ = tx.send(());
});
let mut count: u64 = 0;
loop {
// 检查用户是否按了 Enter
if rx.try_recv().is_ok() {
println!("User requested stop.");
break;
}
match device.read_sample(200) {
Ok(sample) => {
count += 1;
if count % 100 == 0 {
println!(
"[#{count} seq={}] combined_forces[0..3]={:?}",
sample.sequence,
&sample.combined_forces[..3.min(sample.combined_forces.len())]
);
}
}
Err(SdkError::Timeout) => continue,
Err(e) => {
eprintln!("read_sample error: {e}");
break;
}
}
}
// 回到 Command 模式
device.stop_stream().unwrap();
println!("Stream stopped, total samples: {count}, mode: {:?}", device.mode());
// Stream 停止后Command 操作恢复正常
println!("Row: {}", device.read_matrix_row().unwrap());
device.close().unwrap();
}
fn print_payload_data(data: &[u8]) {
for (i, chunk) in data.chunks(2).enumerate() {
if chunk.len() == 2 {

View File

@@ -12,7 +12,7 @@ pub const REG_L_LINE: u32 = 0x0012;
pub const REG_H_LINE: u32 = 0x0013;
pub const REG_PRODUCT_CONFIG_1: u32 = 0x0030;
pub const REG_PRODUCT_CONFIG_2: u32 = 0x0032;
pub const REG_COMBINED_FORCE: u32 = 0x0500;
pub const REG_COMBINED_FORCE: u32 = 0x1C00;
pub const REG_MODULE_ERROR: u32 = 0x0700;
pub const REG_DISTRIBUTION_FORCE_BASE: u32 = 0x1000;
pub const REG_PROCESSED_VALUE_BASE: u32 = 0x2000;

View File

@@ -3,7 +3,7 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
};
use crate::register::{REG_COMBINED_FORCE, REG_MODULE_ERROR};
use crate::{config, register::{REG_COMBINED_FORCE, REG_MODULE_ERROR}};
use chrono::Duration;
use crate::{
@@ -31,6 +31,7 @@ pub struct StreamConfig {
pub poll_interval_ms: u32,
pub device_addr: u8,
pub read_timeout_ms: u32,
pub finger_addr: u32,
}
impl Default for StreamConfig {
@@ -42,6 +43,7 @@ impl Default for StreamConfig {
poll_interval_ms: 10,
device_addr: 0x34,
read_timeout_ms: 100,
finger_addr: 0x1C00
}
}
}
@@ -277,7 +279,7 @@ impl PollingSampleCollector {
}
let data_len = u16::from_le_bytes([header[2], header[3]]) as usize;
let total_len = 4 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN;
let total_len = 4 + data_len + FRAME_CRC_LEN;
let mut frame = vec![0u8; total_len];
frame[..4].copy_from_slice(&header);
@@ -296,7 +298,7 @@ impl PollingSampleCollector {
};
let request_frame = self.codec.encode_read_request(&request)?;
println!("streaming send: {:02X?}", request_frame);
let response_frame = {
let mut transport = self
.transport
@@ -317,7 +319,7 @@ impl SampleCollector for PollingSampleCollector {
fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError> {
let sequence = self.next_sequence();
let combined_force_raw = self.read_register(REG_COMBINED_FORCE, 168)?;
let combined_force_raw = self.read_register(self.config.finger_addr, 168)?;
let module_error_raw = self.read_register(REG_MODULE_ERROR, 56)?;
let combined_forces = crate::register::parse_combined_forces(&combined_force_raw)?;