Files
JE-Skin/src-tauri/src/devkit/client.rs

296 lines
9.5 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! DevKit gRPC Client
//!
//! Rust 端作为 gRPC client
//! 1. 以 client-streaming 方式推送实时帧SensorPush.Upload
//! 2. 以 unary 方式发送导出文件路径做后处理ExportProcessor.ProcessFile
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tauri::{AppHandle, Emitter};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use super::proto::sensor_push_client::SensorPushClient;
use super::proto::export_processor_client::ExportProcessorClient;
use super::proto::{ProcessRequest, SensorFrame};
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct DevKitPztAngleEvent {
seq: u64,
timestamp_ms: u64,
dts_ms: u32,
angle: f32,
}
// ── DevKit 配置 ────────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DevKitConfig {
/// 导出过滤抬起:导出 CSV 后自动调用 Python 做梯度过滤
pub filter_lift_enabled: bool,
/// 以 xlsx 保存Python 处理后输出 xlsx 并删除源 CSV
pub save_as_xlsx: bool,
}
impl Default for DevKitConfig {
fn default() -> Self {
Self {
filter_lift_enabled: true,
save_as_xlsx: false,
}
}
}
impl DevKitConfig {
fn config_path() -> PathBuf {
let base = dirs::config_dir()
.or_else(|| dirs::data_dir())
.unwrap_or_else(|| PathBuf::from("."));
base.join("JE-Skin").join("devkit_config.json")
}
/// 从文件加载配置,失败则返回默认值
pub fn load() -> Self {
let path = Self::config_path();
match std::fs::read_to_string(&path) {
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => Self::default(),
}
}
/// 保存配置到文件
pub fn save(&self) -> Result<(), String> {
let path = Self::config_path();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create config dir: {e}"))?;
}
let json = serde_json::to_string_pretty(self)
.map_err(|e| format!("Failed to serialize config: {e}"))?;
std::fs::write(&path, json)
.map_err(|e| format!("Failed to write config: {e}"))?;
Ok(())
}
}
// ── 导出处理结果 ───────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExportProcessResult {
pub ok: bool,
pub output_path: String,
pub groups_used: u32,
pub mean_value: f64,
pub threshold: f64,
pub rows_total: u32,
pub rows_kept: u32,
pub message: String,
}
// ── Tauri 状态 ─────────────────────────────────────────────────────
/// DevKit 全局状态,由 Tauri manage
#[derive(Clone)]
pub struct DevKitState {
pub running: Arc<AtomicBool>,
pub port: Arc<std::sync::Mutex<u16>>,
pub frame_count: Arc<AtomicU32>,
pub config: Arc<std::sync::Mutex<DevKitConfig>>,
frame_tx: Arc<std::sync::Mutex<Option<mpsc::Sender<SensorFrame>>>>,
client_handle: Arc<std::sync::Mutex<Option<JoinHandle<()>>>>,
}
impl Default for DevKitState {
fn default() -> Self {
Self {
running: Arc::new(AtomicBool::new(false)),
port: Arc::new(std::sync::Mutex::new(50051)),
frame_count: Arc::new(AtomicU32::new(0)),
config: Arc::new(std::sync::Mutex::new(DevKitConfig::load())),
frame_tx: Arc::new(std::sync::Mutex::new(None)),
client_handle: Arc::new(std::sync::Mutex::new(None)),
}
}
}
/// 前端查询到的状态快照
#[derive(Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DevKitStatusSnapshot {
pub enabled: bool,
pub running: bool,
pub port: u16,
pub frames_sent: u32,
pub config: DevKitConfig,
}
impl DevKitState {
pub fn status(&self) -> DevKitStatusSnapshot {
let cfg = self.config.lock().unwrap().clone();
DevKitStatusSnapshot {
enabled: true,
running: self.running.load(Ordering::SeqCst),
port: *self.port.lock().unwrap(),
frames_sent: self.frame_count.load(Ordering::SeqCst),
config: cfg,
}
}
/// 获取当前配置
pub fn get_config(&self) -> DevKitConfig {
self.config.lock().unwrap().clone()
}
/// 更新配置并持久化
pub fn set_config(&self, new_config: DevKitConfig) -> Result<(), String> {
new_config.save()?;
*self.config.lock().unwrap() = new_config;
Ok(())
}
/// 启动 gRPC client连接到 Python server 并开始推送数据
pub async fn start(&self, app: AppHandle, port: u16) -> Result<(), String> {
if self.running.load(Ordering::SeqCst) {
return Err("AlreadyRunning".into());
}
let addr = format!("http://127.0.0.1:{port}");
*self.port.lock().unwrap() = port;
self.running.store(true, Ordering::SeqCst);
self.frame_count.store(0, Ordering::SeqCst);
// mpsc channel: 主线程 send 帧 → gRPC task 推送给 Python
let (tx, rx) = mpsc::channel::<SensorFrame>(512);
*self.frame_tx.lock().unwrap() = Some(tx);
let running = Arc::clone(&self.running);
let frame_count = Arc::clone(&self.frame_count);
let app_handle = app.clone();
let handle = tokio::spawn(async move {
if let Err(e) = run_grpc_upload(app_handle, addr, rx, frame_count).await {
::log::error!("DevKit gRPC upload error: {e:?}");
}
running.store(false, Ordering::SeqCst);
});
*self.client_handle.lock().unwrap() = Some(handle);
::log::info!("DevKit gRPC client started, connecting to 127.0.0.1:{port}");
Ok(())
}
/// 停止 gRPC client
pub async fn stop(&self) -> Result<(), String> {
if !self.running.load(Ordering::SeqCst) {
return Err("NotRunning".into());
}
*self.frame_tx.lock().unwrap() = None;
if let Some(handle) = self.client_handle.lock().unwrap().take() {
handle.abort();
}
self.running.store(false, Ordering::SeqCst);
::log::info!("DevKit gRPC client stopped");
Ok(())
}
/// 推送一帧数据到 gRPC stream由主线程调用
pub fn push_frame(&self, frame: SensorFrame) {
if !self.running.load(Ordering::SeqCst) {
return;
}
if let Some(tx) = self.frame_tx.lock().unwrap().as_ref() {
let _ = tx.try_send(frame);
}
}
/// 调用 Python ExportProcessor.ProcessFile 做导出后处理unary
pub async fn process_export(
&self,
csv_path: &str,
save_as_xlsx: bool,
) -> Result<ExportProcessResult, String> {
let port = *self.port.lock().unwrap();
let addr = format!("http://127.0.0.1:{port}");
let mut client = ExportProcessorClient::connect(addr)
.await
.map_err(|e| format!("Failed to connect to DevKit server: {e}"))?;
let request = ProcessRequest {
csv_path: csv_path.to_string(),
save_as_xlsx,
};
let response = client
.process_file(request)
.await
.map_err(|e| format!("ProcessFile RPC failed: {e}"))?;
let resp = response.into_inner();
Ok(ExportProcessResult {
ok: resp.ok,
output_path: resp.output_path,
groups_used: resp.groups_used,
mean_value: resp.mean_value,
threshold: resp.threshold,
rows_total: resp.rows_total,
rows_kept: resp.rows_kept,
message: resp.message,
})
}
}
// ── gRPC Upload Client ─────────────────────────────────────────────
async fn run_grpc_upload(
app: AppHandle,
addr: String,
mut rx: mpsc::Receiver<SensorFrame>,
frame_count: Arc<AtomicU32>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut client = SensorPushClient::connect(addr.clone()).await?;
let stream = async_stream::stream! {
while let Some(frame) = rx.recv().await {
frame_count.fetch_add(1, Ordering::SeqCst);
yield frame;
}
};
let response = client.upload(stream).await?;
let mut inbound = response.into_inner();
while let Some(message) = inbound.message().await? {
if message.ok {
let payload = DevKitPztAngleEvent {
seq: message.seq,
timestamp_ms: message.timestamp_ms,
dts_ms: message.dts_ms,
angle: message.angle,
};
::log::debug!(
"python pzt angle: seq={} dts_ms={} angle={:.2}",
message.seq,
message.dts_ms,
message.angle
);
app.emit("devkit_pzt_angle", payload)?;
} else {
::log::warn!("DevKit PZT response error: {}", message.message);
}
}
::log::info!("DevKit upload stream closed");
Ok(())
}