add multi port and export

This commit is contained in:
lenn
2026-04-24 14:33:14 +08:00
parent 6e639313e8
commit 0f23be2bb2
4 changed files with 183 additions and 10 deletions

View File

@@ -7,7 +7,10 @@ use std::{
fs::{self, File}, fs::{self, File},
io::BufWriter, io::BufWriter,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex}, sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@@ -44,10 +47,38 @@ impl SerialSession {
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CollectorMode {
Normal,
Poll,
}
impl CollectorMode {
pub fn parse(input: &str) -> Option<Self> {
match input.trim().to_ascii_lowercase().as_str() {
"normal" => Some(Self::Normal),
"poll" => Some(Self::Poll),
_ => None,
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::Normal => "normal",
Self::Poll => "poll",
}
}
fn is_poll(self) -> bool {
matches!(self, Self::Poll)
}
}
pub struct SerialConnectionState { pub struct SerialConnectionState {
sessions: Mutex<HashMap<String, Arc<SerialSession>>>, sessions: Mutex<HashMap<String, Arc<SerialSession>>>,
last_records: Mutex<HashMap<String, Arc<Mutex<TactileARecording>>>>, last_records: Mutex<HashMap<String, Arc<Mutex<TactileARecording>>>>,
export_dir: Mutex<PathBuf>, export_dir: Mutex<PathBuf>,
poll_enabled: Arc<AtomicBool>,
} }
impl SerialConnectionState { impl SerialConnectionState {
@@ -56,6 +87,7 @@ impl SerialConnectionState {
sessions: Mutex::new(HashMap::new()), sessions: Mutex::new(HashMap::new()),
last_records: Mutex::new(HashMap::new()), last_records: Mutex::new(HashMap::new()),
export_dir: Mutex::new(default_export_dir()), export_dir: Mutex::new(default_export_dir()),
poll_enabled: Arc::new(AtomicBool::new(false)),
} }
} }
@@ -112,6 +144,22 @@ impl SerialConnectionState {
Ok(export_dir) Ok(export_dir)
} }
pub fn current_mode(&self) -> CollectorMode {
if self.poll_enabled.load(Ordering::Relaxed) {
CollectorMode::Poll
} else {
CollectorMode::Normal
}
}
pub fn set_mode(&self, mode: CollectorMode) {
self.poll_enabled.store(mode.is_poll(), Ordering::Relaxed);
}
pub fn poll_enabled_handle(&self) -> Arc<AtomicBool> {
Arc::clone(&self.poll_enabled)
}
pub fn export_port_recording(&self, port: &str) -> anyhow::Result<PathBuf> { pub fn export_port_recording(&self, port: &str) -> anyhow::Result<PathBuf> {
let port_name = port.trim(); let port_name = port.trim();
if port_name.is_empty() { if port_name.is_empty() {
@@ -227,12 +275,14 @@ pub async fn serial_connect(
let task_session = Arc::clone(&session); let task_session = Arc::clone(&session);
let task_record = Arc::clone(&current_record); let task_record = Arc::clone(&current_record);
let task_port = port_name.clone(); let task_port = port_name.clone();
let poll_enabled = state.poll_enabled_handle();
let session_started_at = Instant::now(); let session_started_at = Instant::now();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let codec = TactileACodec::new(7, 12); let codec = TactileACodec::new(7, 12);
let handler = TactileAHandler; let handler = TactileAHandler;
let poll_mode = PollMode::Enabled(Box::new(TactileAPollRequester::new( let poll_mode = PollMode::Enabled(Box::new(TactileAPollRequester::new(
poll_enabled,
Duration::from_millis(10), Duration::from_millis(10),
7, 7,
12, 12,
@@ -401,3 +451,21 @@ fn sanitize_file_component(value: &str) -> String {
trimmed.to_string() trimmed.to_string()
} }
} }
#[cfg(test)]
mod tests {
use super::{CollectorMode, SerialConnectionState};
#[test]
fn default_mode_is_normal() {
let state = SerialConnectionState::new();
assert_eq!(state.current_mode(), CollectorMode::Normal);
}
#[test]
fn set_mode_updates_current_mode() {
let state = SerialConnectionState::new();
state.set_mode(CollectorMode::Poll);
assert_eq!(state.current_mode(), CollectorMode::Poll);
}
}

View File

@@ -3,7 +3,10 @@ use std::sync::Arc;
use anyhow::{Result, bail}; use anyhow::{Result, bail};
use crate::{ use crate::{
app::{SerialConnectionState, serial_connect, serial_disconnect_port, shutdown_all_sessions}, app::{
CollectorMode, SerialConnectionState, serial_connect, serial_disconnect_port,
shutdown_all_sessions,
},
serial_core::{error::SerialError, utils::serial_enum}, serial_core::{error::SerialError, utils::serial_enum},
}; };
@@ -15,6 +18,7 @@ pub enum Command {
Status, Status,
Echo(String), Echo(String),
Open(String), Open(String),
Mode(String),
Close(Option<String>), Close(Option<String>),
Export(String), Export(String),
SetExport(String), SetExport(String),
@@ -63,6 +67,12 @@ pub fn parse_command(input: &str) -> Result<Command> {
} }
Command::Open(rest.to_string()) Command::Open(rest.to_string())
} }
"mode" => {
if rest.is_empty() {
bail!("/mode requires a mode");
}
Command::Mode(rest.to_string())
}
"close" => Command::Close(if rest.is_empty() { "close" => Command::Close(if rest.is_empty() {
None None
} else { } else {
@@ -131,6 +141,7 @@ pub async fn execute_command(
}, },
Command::Status => match state.collector_lines() { Command::Status => match state.collector_lines() {
Ok(mut lines) => { Ok(mut lines) => {
lines.push(format!("Mode: {}", state.current_mode().as_str()));
match state.current_export_dir() { match state.current_export_dir() {
Ok(export_dir) => { Ok(export_dir) => {
lines.push(format!("Export directory: {}", export_dir.display())) lines.push(format!("Export directory: {}", export_dir.display()))
@@ -143,9 +154,45 @@ pub async fn execute_command(
}, },
Command::Echo(text) => CommandResponse::from_line(text), Command::Echo(text) => CommandResponse::from_line(text),
Command::Open(port) => match serial_connect(port.clone(), Arc::clone(&state)).await { Command::Open(port) => match serial_connect(port.clone(), Arc::clone(&state)).await {
Ok(()) => CommandResponse::from_line(format!("Serial {port} is collecting...")), Ok(()) => CommandResponse::new(
vec![
format!("Serial {port} is collecting..."),
format!("Current mode: {}", state.current_mode().as_str()),
],
false,
),
Err(err) => CommandResponse::from_line(open_error_message(&port, err)), Err(err) => CommandResponse::from_line(open_error_message(&port, err)),
}, },
Command::Mode(mode) => match CollectorMode::parse(&mode) {
Some(mode) => {
state.set_mode(mode);
let active_count = state.active_ports().map(|ports| ports.len()).unwrap_or(0);
let detail = match (mode, active_count) {
(CollectorMode::Normal, 0) => {
"New collectors will wait for manual data transmission.".to_string()
}
(CollectorMode::Normal, count) => {
format!(
"Polling stopped for {count} active collector(s); they will wait for manual data transmission."
)
}
(CollectorMode::Poll, 0) => {
"New collectors will continuously send req frames.".to_string()
}
(CollectorMode::Poll, count) => {
format!("Polling started for {count} active collector(s).")
}
};
CommandResponse::new(
vec![format!("Mode set to {}.", mode.as_str()), detail],
false,
)
}
None => CommandResponse::from_line(
"Invalid mode. Use /mode normal or /mode poll.".to_string(),
),
},
Command::Close(Some(port)) => match serial_disconnect_port(&port, state.as_ref()).await { Command::Close(Some(port)) => match serial_disconnect_port(&port, state.as_ref()).await {
Ok(()) => CommandResponse::from_line(format!("Serial {port} stopped collecting.")), Ok(()) => CommandResponse::from_line(format!("Serial {port} stopped collecting.")),
Err(SerialError::NotConnected) => { Err(SerialError::NotConnected) => {
@@ -212,8 +259,9 @@ fn help_lines() -> Vec<String> {
"Available commands:".to_string(), "Available commands:".to_string(),
" /help Show help".to_string(), " /help Show help".to_string(),
" /scan List serial ports".to_string(), " /scan List serial ports".to_string(),
" /status Show active collectors and export directory".to_string(), " /status Show active collectors, mode, and export directory".to_string(),
" /open <path> Start collecting on a serial port".to_string(), " /open <path> Start collecting on a serial port".to_string(),
" /mode <normal|poll> Set collection mode".to_string(),
" /close <path> Stop collecting on one serial port".to_string(), " /close <path> Stop collecting on one serial port".to_string(),
" /close Stop collecting on all serial ports".to_string(), " /close Stop collecting on all serial ports".to_string(),
" /export <port> Export one serial recording to CSV".to_string(), " /export <port> Export one serial recording to CSV".to_string(),
@@ -261,6 +309,14 @@ mod tests {
); );
} }
#[test]
fn parse_mode_command() {
assert_eq!(
parse_command("/mode poll").unwrap(),
Command::Mode("poll".to_string())
);
}
#[test] #[test]
fn reject_non_command_input() { fn reject_non_command_input() {
assert!(parse_command("scan").is_err()); assert!(parse_command("scan").is_err());

View File

@@ -5,7 +5,10 @@ use crate::serial_core::record::Recording;
use crate::serial_core::record::{FrameTiming, RecordedFrame}; use crate::serial_core::record::{FrameTiming, RecordedFrame};
use anyhow::Result; use anyhow::Result;
use std::future::pending; use std::future::pending;
use std::sync::{Arc, Mutex}; use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::Instant; use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::{self, Duration, MissedTickBehavior}; use tokio::time::{self, Duration, MissedTickBehavior};
@@ -43,6 +46,7 @@ pub struct NoopPollRequester;
impl<F> PollRequester<F> for NoopPollRequester {} impl<F> PollRequester<F> for NoopPollRequester {}
pub struct TactileAPollRequester { pub struct TactileAPollRequester {
enabled: Arc<AtomicBool>,
period: Duration, period: Duration,
cols: usize, cols: usize,
rows: usize, rows: usize,
@@ -52,8 +56,15 @@ pub struct TactileAPollRequester {
} }
impl TactileAPollRequester { impl TactileAPollRequester {
pub fn new(period: Duration, cols: usize, rows: usize, reply_timeout: Duration) -> Self { pub fn new(
enabled: Arc<AtomicBool>,
period: Duration,
cols: usize,
rows: usize,
reply_timeout: Duration,
) -> Self {
Self { Self {
enabled,
period, period,
cols, cols,
rows, rows,
@@ -70,6 +81,12 @@ impl PollRequester<TactileAFrame> for TactileAPollRequester {
} }
fn should_request(&mut self) -> bool { fn should_request(&mut self) -> bool {
if !self.enabled.load(Ordering::Relaxed) {
self.awaiting_reply = false;
self.last_request_at = None;
return false;
}
if !self.awaiting_reply { if !self.awaiting_reply {
return true; return true;
} }

View File

@@ -22,8 +22,10 @@ const MAX_COMMAND_LINES: usize = 512;
const COMMAND_INPUT_TITLE: &str = "Command Input [Enter=run Tab=complete]"; const COMMAND_INPUT_TITLE: &str = "Command Input [Enter=run Tab=complete]";
const COMPLETION_PREVIEW_LIMIT: usize = 4; const COMPLETION_PREVIEW_LIMIT: usize = 4;
const COMMAND_COMPLETIONS: &[&str] = &[ const COMMAND_COMPLETIONS: &[&str] = &[
"/help", "/scan", "/status", "/open", "/close", "/export", "/set", "/echo", "/exit", "/quit", "/help", "/scan", "/status", "/open", "/mode", "/close", "/export", "/set", "/echo", "/exit",
"/quit",
]; ];
const MODE_COMPLETIONS: &[&str] = &["normal", "poll"];
const SETTING_COMPLETIONS: &[&str] = &["export"]; const SETTING_COMPLETIONS: &[&str] = &["export"];
pub async fn run() -> Result<()> { pub async fn run() -> Result<()> {
@@ -64,7 +66,8 @@ impl TuiApp {
app.push_command_lines([ app.push_command_lines([
"JE-Skin CLI TUI".to_string(), "JE-Skin CLI TUI".to_string(),
"Streaming serial text has been disabled to keep the terminal responsive.".to_string(), "Streaming serial text has been disabled to keep the terminal responsive.".to_string(),
"Use /scan, /open <port>, /status, /export <port>, /set export <dir>, /close <port>, /close, /exit.".to_string(), "Default mode is normal. Use /mode poll to enable continuous req frames.".to_string(),
"Use /scan, /open <port>, /mode <normal|poll>, /status, /export <port>, /set export <dir>, /close <port>, /close, /exit.".to_string(),
"The right pane now shows active collectors only.".to_string(), "The right pane now shows active collectors only.".to_string(),
"Press Tab to autocomplete commands and paths.".to_string(), "Press Tab to autocomplete commands and paths.".to_string(),
]); ]);
@@ -204,6 +207,7 @@ impl TuiApp {
fn completion_candidates(&self, request: &CompletionRequest) -> Vec<String> { fn completion_candidates(&self, request: &CompletionRequest) -> Vec<String> {
let mut candidates = match &request.kind { let mut candidates = match &request.kind {
CompletionKind::Command => command_completion_candidates(&request.token), CompletionKind::Command => command_completion_candidates(&request.token),
CompletionKind::Mode => mode_completion_candidates(&request.token),
CompletionKind::Setting => setting_completion_candidates(&request.token), CompletionKind::Setting => setting_completion_candidates(&request.token),
CompletionKind::SerialPort { command_name } => { CompletionKind::SerialPort { command_name } => {
serial_port_completion_candidates(command_name, &request.token, &self.serial_state) serial_port_completion_candidates(command_name, &request.token, &self.serial_state)
@@ -338,7 +342,7 @@ fn new_command_input() -> TextArea<'static> {
input.set_cursor_line_style(Style::default()); input.set_cursor_line_style(Style::default());
input.set_style(Style::default().fg(Color::White)); input.set_style(Style::default().fg(Color::White));
input.set_placeholder_text( input.set_placeholder_text(
"/scan | /open /dev/ttyUSB0 | /export /dev/ttyUSB0 | /set export ./exports", "/scan | /open /dev/ttyUSB0 | /mode poll | /export /dev/ttyUSB0 | /set export ./exports",
); );
input input
} }
@@ -365,6 +369,7 @@ impl CompletionCycle {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
enum CompletionKind { enum CompletionKind {
Command, Command,
Mode,
Setting, Setting,
SerialPort { command_name: String }, SerialPort { command_name: String },
FileSystemPath, FileSystemPath,
@@ -424,6 +429,15 @@ fn build_completion_request(line: &str, cursor_col: usize) -> Option<CompletionR
} }
} }
if command_name == "/mode" && token_position == 1 {
return Some(CompletionRequest {
kind: CompletionKind::Mode,
start,
end,
token,
});
}
if matches!(command_name.as_str(), "/open" | "/close" | "/export") { if matches!(command_name.as_str(), "/open" | "/close" | "/export") {
return Some(CompletionRequest { return Some(CompletionRequest {
kind: CompletionKind::SerialPort { command_name }, kind: CompletionKind::SerialPort { command_name },
@@ -452,6 +466,14 @@ fn setting_completion_candidates(prefix: &str) -> Vec<String> {
.collect() .collect()
} }
fn mode_completion_candidates(prefix: &str) -> Vec<String> {
MODE_COMPLETIONS
.iter()
.filter(|mode| mode.starts_with(prefix))
.map(|mode| (*mode).to_string())
.collect()
}
fn serial_port_completion_candidates( fn serial_port_completion_candidates(
command_name: &str, command_name: &str,
prefix: &str, prefix: &str,
@@ -544,6 +566,13 @@ fn finalize_unique_completion(request: &CompletionRequest, candidate: &str, line
candidate.to_string() candidate.to_string()
} }
} }
CompletionKind::Mode => {
if request.end == line.chars().count() {
format!("{candidate} ")
} else {
candidate.to_string()
}
}
CompletionKind::Setting => { CompletionKind::Setting => {
if request.end == line.chars().count() { if request.end == line.chars().count() {
format!("{candidate} ") format!("{candidate} ")
@@ -563,7 +592,10 @@ fn finalize_unique_completion(request: &CompletionRequest, candidate: &str, line
} }
fn command_takes_argument(command: &str) -> bool { fn command_takes_argument(command: &str) -> bool {
matches!(command, "/open" | "/close" | "/export" | "/set" | "/echo") matches!(
command,
"/open" | "/mode" | "/close" | "/export" | "/set" | "/echo"
)
} }
fn completion_preview(candidates: &[String]) -> String { fn completion_preview(candidates: &[String]) -> String {