1 Commits
0.2.0 ... main

Author SHA1 Message Date
Lenn Louis
499049dd27 修复粘包问题 2026-05-27 11:53:57 +08:00
2 changed files with 193 additions and 43 deletions

View File

@@ -346,7 +346,7 @@ impl ProtocolCodec for EskinProtocolCodec {
} }
let data_len = Self::read_u16_le(frame, 2)? as usize; let data_len = Self::read_u16_le(frame, 2)? as usize;
let expected_len = 2 + 2 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN; let expected_len = 2 + 2 + data_len + FRAME_CRC_LEN;
if frame.len() != expected_len { if frame.len() != expected_len {
return Err(SdkError::FrameError(format!( return Err(SdkError::FrameError(format!(
@@ -358,29 +358,37 @@ impl ProtocolCodec for EskinProtocolCodec {
self.validate_crc(frame)?; self.validate_crc(frame)?;
let device_addr = frame[4]; let device_addr = frame[4];
let reserved = frame[5];
let function = frame[6]; let function = frame[6];
if reserved != 0x00 {
return Err(SdkError::FrameError(format!(
"invalid stream reserved byte: 0x{reserved:02X}"
)));
}
let start_addr = Self::read_u32_le(frame, 7)?; let start_addr = Self::read_u32_le(frame, 7)?;
let payload_len = Self::read_u16_le(frame, 11)? as usize; let payload_len = Self::read_u16_le(frame, 11)? as usize;
if data_len != 9 + payload_len { if data_len != 10 + payload_len {
return Err(SdkError::FrameError(format!( return Err(SdkError::FrameError(format!(
"stream frame data length mismatch: header data_len {data_len}, payload len {payload_len}" "stream frame data length mismatch: header data_len {data_len}, payload len {payload_len}"
))); )));
} }
let payload_start = 13;
let payload_end = payload_start + payload_len;
let payload = frame
.get(payload_start..payload_end)
.ok_or_else(|| SdkError::FrameError("stream payload missing".into()))?
.to_vec();
let status_offset = 13; let status_offset = 13;
let status_raw = *frame let status_raw = *frame
.get(status_offset) .get(status_offset)
.ok_or_else(|| SdkError::FrameError("stream status missing".into()))?; .ok_or_else(|| SdkError::FrameError("stream status missing".into()))?;
let status = Self::status_from_u8(status_raw)?; let status = Self::status_from_u8(status_raw)?;
let payload_start = 14;
let payload_end = payload_start + payload_len;
let payload = frame
.get(payload_start..payload_end)
.ok_or_else(|| SdkError::FrameError("stream payload missing".into()))?
.to_vec();
Ok(ProtocolFrame { Ok(ProtocolFrame {
start, start,
device_addr, device_addr,
@@ -404,6 +412,26 @@ mod tests {
EskinProtocolCodec EskinProtocolCodec
} }
fn read_response_frame(payload: &[u8]) -> Vec<u8> {
let codec = codec();
let data_len = 10 + payload.len();
let mut frame = Vec::with_capacity(15 + payload.len());
frame.extend_from_slice(&FRAME_START_RESPONSE);
frame.extend_from_slice(&(data_len as u16).to_le_bytes());
frame.push(0x34);
frame.push(0x00);
frame.push(FUNC_RESPONSE_READ);
frame.extend_from_slice(&0x1C00u32.to_le_bytes());
frame.extend_from_slice(&(payload.len() as u16).to_le_bytes());
frame.push(0x00);
frame.extend_from_slice(payload);
let crc = codec.crc8(&frame);
frame.push(crc);
frame
}
#[test] #[test]
fn encode_read_request_has_correct_structure() { fn encode_read_request_has_correct_structure() {
let req = ReadRequest { let req = ReadRequest {
@@ -438,4 +466,18 @@ mod tests {
assert_eq!(frame.len(), 14); assert_eq!(frame.len(), 14);
} }
#[test]
fn decode_stream_frame_reads_payload_after_status_byte() {
let payload = [0x10, 0x00, 0x20, 0x00];
let frame = read_response_frame(&payload);
let decoded = codec().decode_stream_frame(&frame).unwrap();
assert_eq!(decoded.device_addr, 0x34);
assert_eq!(decoded.function, FUNC_RESPONSE_READ);
assert_eq!(decoded.start_addr, 0x1C00);
assert_eq!(decoded.status, Some(DeviceStatus::Success));
assert_eq!(decoded.payload, payload);
}
} }

View File

@@ -16,6 +16,11 @@ use crate::{
use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, ReadRequest}; use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, ReadRequest};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::{Duration as StdDuration, Instant}; use std::time::{Duration as StdDuration, Instant};
const STREAM_READ_BYTE_COUNT: u16 = 168;
const STREAM_RESPONSE_DATA_OVERHEAD: usize = 10;
const STREAM_FRAME_PREFIX_LEN: usize = 14;
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamMode { pub enum StreamMode {
Polling, Polling,
@@ -213,7 +218,7 @@ impl StreamWorker {
let request = ReadRequest { let request = ReadRequest {
device_addr: self.config.device_addr, device_addr: self.config.device_addr,
start_addr: self.config.finger_addr, start_addr: self.config.finger_addr,
read_byte_count: 168, read_byte_count: STREAM_READ_BYTE_COUNT,
}; };
let request_frame = self.codec.encode_read_request(&request)?; let request_frame = self.codec.encode_read_request(&request)?;
@@ -268,38 +273,7 @@ impl StreamWorker {
} }
fn pop_next_frame(&mut self) -> Result<Option<Vec<u8>>, SdkError> { fn pop_next_frame(&mut self) -> Result<Option<Vec<u8>>, SdkError> {
loop { pop_next_stream_frame(&mut self.rx_buffer, self.codec.as_ref())
let Some(start) = self
.rx_buffer
.windows(FRAME_START_RESPONSE.len())
.position(|window| window == FRAME_START_RESPONSE)
else {
self.rx_buffer.clear();
return Ok(None);
};
if start > 0 {
self.rx_buffer.drain(..start);
}
if self.rx_buffer.len() < 4 {
return Ok(None);
}
let data_len = u16::from_le_bytes([self.rx_buffer[2], self.rx_buffer[3]]) as usize;
let frame_len = 4 + data_len + FRAME_CRC_LEN;
if frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN {
self.rx_buffer.drain(..FRAME_START_RESPONSE.len());
continue;
}
if self.rx_buffer.len() < frame_len {
return Ok(None);
}
return Ok(Some(self.rx_buffer.drain(..frame_len).collect()));
}
} }
fn next_sequence(&mut self) -> u32 { fn next_sequence(&mut self) -> u32 {
@@ -309,6 +283,64 @@ impl StreamWorker {
} }
} }
fn pop_next_stream_frame(
rx_buffer: &mut Vec<u8>,
codec: &dyn ProtocolCodec,
) -> Result<Option<Vec<u8>>, SdkError> {
loop {
let Some(start) = rx_buffer
.windows(FRAME_START_RESPONSE.len())
.position(|window| window == FRAME_START_RESPONSE)
else {
rx_buffer.clear();
return Ok(None);
};
if start > 0 {
rx_buffer.drain(..start);
}
if rx_buffer.len() < 4 {
return Ok(None);
}
let data_len = u16::from_le_bytes([rx_buffer[2], rx_buffer[3]]) as usize;
let frame_len = 4 + data_len + FRAME_CRC_LEN;
if data_len < STREAM_RESPONSE_DATA_OVERHEAD
|| frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN
{
rx_buffer.drain(..1);
continue;
}
if rx_buffer.len() < STREAM_FRAME_PREFIX_LEN {
return Ok(None);
}
let expected_data_len = u16::from_le_bytes([rx_buffer[11], rx_buffer[12]]) as usize;
if expected_data_len != STREAM_READ_BYTE_COUNT as usize
|| data_len != STREAM_RESPONSE_DATA_OVERHEAD + expected_data_len
{
rx_buffer.drain(..1);
continue;
}
if rx_buffer.len() < frame_len {
return Ok(None);
}
let expected_crc = rx_buffer[frame_len - 1];
let actual_crc = codec.crc8(&rx_buffer[..frame_len - 1]);
if expected_crc != actual_crc {
rx_buffer.drain(..1);
continue;
}
return Ok(Some(rx_buffer.drain(..frame_len).collect()));
}
}
pub trait SampleCollector: Send { pub trait SampleCollector: Send {
fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError>; fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError>;
} }
@@ -454,3 +486,79 @@ fn make_collector(
StreamMode::AutoDistribution => Box::new(NoopSampleCollector), StreamMode::AutoDistribution => Box::new(NoopSampleCollector),
} }
} }
#[cfg(test)]
mod tests {
use super::*;
fn stream_frame(payload_seed: u8) -> Vec<u8> {
let codec = EskinProtocolCodec;
let payload = vec![payload_seed; STREAM_READ_BYTE_COUNT as usize];
let data_len = STREAM_RESPONSE_DATA_OVERHEAD + payload.len();
let mut frame = Vec::with_capacity(STREAM_FRAME_PREFIX_LEN + payload.len() + 1);
frame.extend_from_slice(&FRAME_START_RESPONSE);
frame.extend_from_slice(&(data_len as u16).to_le_bytes());
frame.push(0x34);
frame.push(0x00);
frame.push(crate::protocol::FUNC_RESPONSE_READ);
frame.extend_from_slice(&0x1C00u32.to_le_bytes());
frame.extend_from_slice(&(payload.len() as u16).to_le_bytes());
frame.push(0x00);
frame.extend_from_slice(&payload);
let crc = codec.crc8(&frame);
frame.push(crc);
frame
}
#[test]
fn pop_next_stream_frame_keeps_partial_frame_buffered() {
let codec = EskinProtocolCodec;
let frame = stream_frame(0x11);
let mut buffer = frame[..20].to_vec();
let popped = pop_next_stream_frame(&mut buffer, &codec).unwrap();
assert!(popped.is_none());
assert_eq!(buffer, frame[..20]);
}
#[test]
fn pop_next_stream_frame_splits_sticky_frames() {
let codec = EskinProtocolCodec;
let first = stream_frame(0x11);
let second = stream_frame(0x22);
let mut buffer = Vec::new();
buffer.extend_from_slice(&first);
buffer.extend_from_slice(&second);
assert_eq!(
pop_next_stream_frame(&mut buffer, &codec).unwrap(),
Some(first)
);
assert_eq!(
pop_next_stream_frame(&mut buffer, &codec).unwrap(),
Some(second)
);
assert!(buffer.is_empty());
}
#[test]
fn pop_next_stream_frame_resyncs_after_bad_crc() {
let codec = EskinProtocolCodec;
let mut bad = stream_frame(0x11);
let good = stream_frame(0x22);
let last = bad.len() - 1;
bad[last] ^= 0xFF;
let mut buffer = bad;
buffer.extend_from_slice(&good);
assert_eq!(
pop_next_stream_frame(&mut buffer, &codec).unwrap(),
Some(good)
);
assert!(buffer.is_empty());
}
}