修复粘包问题
This commit is contained in:
@@ -346,7 +346,7 @@ impl ProtocolCodec for EskinProtocolCodec {
|
||||
}
|
||||
|
||||
let data_len = Self::read_u16_le(frame, 2)? as usize;
|
||||
let expected_len = 2 + 2 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN;
|
||||
let expected_len = 2 + 2 + data_len + FRAME_CRC_LEN;
|
||||
|
||||
if frame.len() != expected_len {
|
||||
return Err(SdkError::FrameError(format!(
|
||||
@@ -358,29 +358,37 @@ impl ProtocolCodec for EskinProtocolCodec {
|
||||
self.validate_crc(frame)?;
|
||||
|
||||
let device_addr = frame[4];
|
||||
let reserved = frame[5];
|
||||
let function = frame[6];
|
||||
|
||||
if reserved != 0x00 {
|
||||
return Err(SdkError::FrameError(format!(
|
||||
"invalid stream reserved byte: 0x{reserved:02X}"
|
||||
)));
|
||||
}
|
||||
|
||||
let start_addr = Self::read_u32_le(frame, 7)?;
|
||||
let payload_len = Self::read_u16_le(frame, 11)? as usize;
|
||||
if data_len != 9 + payload_len {
|
||||
if data_len != 10 + payload_len {
|
||||
return Err(SdkError::FrameError(format!(
|
||||
"stream frame data length mismatch: header data_len {data_len}, payload len {payload_len}"
|
||||
)));
|
||||
}
|
||||
|
||||
let payload_start = 13;
|
||||
let payload_end = payload_start + payload_len;
|
||||
|
||||
let payload = frame
|
||||
.get(payload_start..payload_end)
|
||||
.ok_or_else(|| SdkError::FrameError("stream payload missing".into()))?
|
||||
.to_vec();
|
||||
|
||||
let status_offset = 13;
|
||||
let status_raw = *frame
|
||||
.get(status_offset)
|
||||
.ok_or_else(|| SdkError::FrameError("stream status missing".into()))?;
|
||||
let status = Self::status_from_u8(status_raw)?;
|
||||
|
||||
let payload_start = 14;
|
||||
let payload_end = payload_start + payload_len;
|
||||
|
||||
let payload = frame
|
||||
.get(payload_start..payload_end)
|
||||
.ok_or_else(|| SdkError::FrameError("stream payload missing".into()))?
|
||||
.to_vec();
|
||||
|
||||
Ok(ProtocolFrame {
|
||||
start,
|
||||
device_addr,
|
||||
@@ -404,6 +412,26 @@ mod tests {
|
||||
EskinProtocolCodec
|
||||
}
|
||||
|
||||
fn read_response_frame(payload: &[u8]) -> Vec<u8> {
|
||||
let codec = codec();
|
||||
let data_len = 10 + payload.len();
|
||||
let mut frame = Vec::with_capacity(15 + payload.len());
|
||||
|
||||
frame.extend_from_slice(&FRAME_START_RESPONSE);
|
||||
frame.extend_from_slice(&(data_len as u16).to_le_bytes());
|
||||
frame.push(0x34);
|
||||
frame.push(0x00);
|
||||
frame.push(FUNC_RESPONSE_READ);
|
||||
frame.extend_from_slice(&0x1C00u32.to_le_bytes());
|
||||
frame.extend_from_slice(&(payload.len() as u16).to_le_bytes());
|
||||
frame.push(0x00);
|
||||
frame.extend_from_slice(payload);
|
||||
|
||||
let crc = codec.crc8(&frame);
|
||||
frame.push(crc);
|
||||
frame
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_read_request_has_correct_structure() {
|
||||
let req = ReadRequest {
|
||||
@@ -438,4 +466,18 @@ mod tests {
|
||||
|
||||
assert_eq!(frame.len(), 14);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_stream_frame_reads_payload_after_status_byte() {
|
||||
let payload = [0x10, 0x00, 0x20, 0x00];
|
||||
let frame = read_response_frame(&payload);
|
||||
|
||||
let decoded = codec().decode_stream_frame(&frame).unwrap();
|
||||
|
||||
assert_eq!(decoded.device_addr, 0x34);
|
||||
assert_eq!(decoded.function, FUNC_RESPONSE_READ);
|
||||
assert_eq!(decoded.start_addr, 0x1C00);
|
||||
assert_eq!(decoded.status, Some(DeviceStatus::Success));
|
||||
assert_eq!(decoded.payload, payload);
|
||||
}
|
||||
}
|
||||
|
||||
174
src/stream.rs
174
src/stream.rs
@@ -16,6 +16,11 @@ use crate::{
|
||||
use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, ReadRequest};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::{Duration as StdDuration, Instant};
|
||||
|
||||
const STREAM_READ_BYTE_COUNT: u16 = 168;
|
||||
const STREAM_RESPONSE_DATA_OVERHEAD: usize = 10;
|
||||
const STREAM_FRAME_PREFIX_LEN: usize = 14;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum StreamMode {
|
||||
Polling,
|
||||
@@ -213,7 +218,7 @@ impl StreamWorker {
|
||||
let request = ReadRequest {
|
||||
device_addr: self.config.device_addr,
|
||||
start_addr: self.config.finger_addr,
|
||||
read_byte_count: 168,
|
||||
read_byte_count: STREAM_READ_BYTE_COUNT,
|
||||
};
|
||||
let request_frame = self.codec.encode_read_request(&request)?;
|
||||
|
||||
@@ -268,38 +273,7 @@ impl StreamWorker {
|
||||
}
|
||||
|
||||
fn pop_next_frame(&mut self) -> Result<Option<Vec<u8>>, SdkError> {
|
||||
loop {
|
||||
let Some(start) = self
|
||||
.rx_buffer
|
||||
.windows(FRAME_START_RESPONSE.len())
|
||||
.position(|window| window == FRAME_START_RESPONSE)
|
||||
else {
|
||||
self.rx_buffer.clear();
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if start > 0 {
|
||||
self.rx_buffer.drain(..start);
|
||||
}
|
||||
|
||||
if self.rx_buffer.len() < 4 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let data_len = u16::from_le_bytes([self.rx_buffer[2], self.rx_buffer[3]]) as usize;
|
||||
let frame_len = 4 + data_len + FRAME_CRC_LEN;
|
||||
|
||||
if frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN {
|
||||
self.rx_buffer.drain(..FRAME_START_RESPONSE.len());
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.rx_buffer.len() < frame_len {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
return Ok(Some(self.rx_buffer.drain(..frame_len).collect()));
|
||||
}
|
||||
pop_next_stream_frame(&mut self.rx_buffer, self.codec.as_ref())
|
||||
}
|
||||
|
||||
fn next_sequence(&mut self) -> u32 {
|
||||
@@ -309,6 +283,64 @@ impl StreamWorker {
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_next_stream_frame(
|
||||
rx_buffer: &mut Vec<u8>,
|
||||
codec: &dyn ProtocolCodec,
|
||||
) -> Result<Option<Vec<u8>>, SdkError> {
|
||||
loop {
|
||||
let Some(start) = rx_buffer
|
||||
.windows(FRAME_START_RESPONSE.len())
|
||||
.position(|window| window == FRAME_START_RESPONSE)
|
||||
else {
|
||||
rx_buffer.clear();
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if start > 0 {
|
||||
rx_buffer.drain(..start);
|
||||
}
|
||||
|
||||
if rx_buffer.len() < 4 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let data_len = u16::from_le_bytes([rx_buffer[2], rx_buffer[3]]) as usize;
|
||||
let frame_len = 4 + data_len + FRAME_CRC_LEN;
|
||||
|
||||
if data_len < STREAM_RESPONSE_DATA_OVERHEAD
|
||||
|| frame_len < crate::protocol::MIN_RESPONSE_FRAME_LEN
|
||||
{
|
||||
rx_buffer.drain(..1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if rx_buffer.len() < STREAM_FRAME_PREFIX_LEN {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let expected_data_len = u16::from_le_bytes([rx_buffer[11], rx_buffer[12]]) as usize;
|
||||
if expected_data_len != STREAM_READ_BYTE_COUNT as usize
|
||||
|| data_len != STREAM_RESPONSE_DATA_OVERHEAD + expected_data_len
|
||||
{
|
||||
rx_buffer.drain(..1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if rx_buffer.len() < frame_len {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let expected_crc = rx_buffer[frame_len - 1];
|
||||
let actual_crc = codec.crc8(&rx_buffer[..frame_len - 1]);
|
||||
if expected_crc != actual_crc {
|
||||
rx_buffer.drain(..1);
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(Some(rx_buffer.drain(..frame_len).collect()));
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SampleCollector: Send {
|
||||
fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError>;
|
||||
}
|
||||
@@ -454,3 +486,79 @@ fn make_collector(
|
||||
StreamMode::AutoDistribution => Box::new(NoopSampleCollector),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn stream_frame(payload_seed: u8) -> Vec<u8> {
|
||||
let codec = EskinProtocolCodec;
|
||||
let payload = vec![payload_seed; STREAM_READ_BYTE_COUNT as usize];
|
||||
let data_len = STREAM_RESPONSE_DATA_OVERHEAD + payload.len();
|
||||
|
||||
let mut frame = Vec::with_capacity(STREAM_FRAME_PREFIX_LEN + payload.len() + 1);
|
||||
frame.extend_from_slice(&FRAME_START_RESPONSE);
|
||||
frame.extend_from_slice(&(data_len as u16).to_le_bytes());
|
||||
frame.push(0x34);
|
||||
frame.push(0x00);
|
||||
frame.push(crate::protocol::FUNC_RESPONSE_READ);
|
||||
frame.extend_from_slice(&0x1C00u32.to_le_bytes());
|
||||
frame.extend_from_slice(&(payload.len() as u16).to_le_bytes());
|
||||
frame.push(0x00);
|
||||
frame.extend_from_slice(&payload);
|
||||
|
||||
let crc = codec.crc8(&frame);
|
||||
frame.push(crc);
|
||||
frame
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pop_next_stream_frame_keeps_partial_frame_buffered() {
|
||||
let codec = EskinProtocolCodec;
|
||||
let frame = stream_frame(0x11);
|
||||
let mut buffer = frame[..20].to_vec();
|
||||
|
||||
let popped = pop_next_stream_frame(&mut buffer, &codec).unwrap();
|
||||
|
||||
assert!(popped.is_none());
|
||||
assert_eq!(buffer, frame[..20]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pop_next_stream_frame_splits_sticky_frames() {
|
||||
let codec = EskinProtocolCodec;
|
||||
let first = stream_frame(0x11);
|
||||
let second = stream_frame(0x22);
|
||||
let mut buffer = Vec::new();
|
||||
buffer.extend_from_slice(&first);
|
||||
buffer.extend_from_slice(&second);
|
||||
|
||||
assert_eq!(
|
||||
pop_next_stream_frame(&mut buffer, &codec).unwrap(),
|
||||
Some(first)
|
||||
);
|
||||
assert_eq!(
|
||||
pop_next_stream_frame(&mut buffer, &codec).unwrap(),
|
||||
Some(second)
|
||||
);
|
||||
assert!(buffer.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pop_next_stream_frame_resyncs_after_bad_crc() {
|
||||
let codec = EskinProtocolCodec;
|
||||
let mut bad = stream_frame(0x11);
|
||||
let good = stream_frame(0x22);
|
||||
let last = bad.len() - 1;
|
||||
bad[last] ^= 0xFF;
|
||||
|
||||
let mut buffer = bad;
|
||||
buffer.extend_from_slice(&good);
|
||||
|
||||
assert_eq!(
|
||||
pop_next_stream_frame(&mut buffer, &codec).unwrap(),
|
||||
Some(good)
|
||||
);
|
||||
assert!(buffer.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user