//! 信令客户端 use anyhow::Result; use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::{mpsc, RwLock}; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; /// 信令消息 #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum SignalMessage { /// 注册设备 #[serde(rename = "register")] Register { device_id: String, verification_code: String, }, /// 心跳 #[serde(rename = "heartbeat")] Heartbeat { device_id: String }, /// 心跳响应 #[serde(rename = "heartbeat_ack")] HeartbeatAck, /// 连接请求 #[serde(rename = "connect_request")] ConnectRequest { session_id: String, from_device: String, to_device: String, verification_code: String, }, /// 连接响应 #[serde(rename = "connect_response")] ConnectResponse { session_id: String, accepted: bool, reason: Option, }, /// SDP Offer #[serde(rename = "offer")] Offer { session_id: String, from_device: String, to_device: String, sdp: String, }, /// SDP Answer #[serde(rename = "answer")] Answer { session_id: String, from_device: String, to_device: String, sdp: String, }, /// ICE Candidate #[serde(rename = "candidate")] Candidate { session_id: String, from_device: String, to_device: String, candidate: String, sdp_mid: Option, sdp_mline_index: Option, }, /// 会话结束 #[serde(rename = "session_end")] SessionEnd { session_id: String }, /// 强制下线 #[serde(rename = "force_offline")] ForceOffline { device_id: String }, /// 错误 #[serde(rename = "error")] Error { code: u32, message: String }, /// 设置允许远程 #[serde(rename = "set_allow_remote")] SetAllowRemote { device_id: String, allow: bool }, /// 刷新验证码 #[serde(rename = "refresh_code")] RefreshCode { device_id: String }, /// 验证码已刷新 #[serde(rename = "code_refreshed")] CodeRefreshed { device_id: String, verification_code: String, }, /// 屏幕帧数据 #[serde(rename = "screen_frame")] ScreenFrame { session_id: String, from_device: String, to_device: String, width: u32, height: u32, data: String, // Base64 encoded JPEG }, /// 鼠标事件 #[serde(rename = "mouse_event")] MouseEvent { session_id: String, from_device: String, to_device: String, x: f64, y: f64, event_type: String, // "move", "click", "down", "up", "scroll" button: Option, delta: Option, }, /// 键盘事件 #[serde(rename = "keyboard_event")] KeyboardEvent { session_id: String, from_device: String, to_device: String, key: String, event_type: String, // "down", "up" }, } /// 信令客户端 pub struct SignalClient { device_id: String, server_url: String, tx: Option>, connected: Arc>, } impl SignalClient { pub fn new(device_id: String, server_url: String) -> Self { Self { device_id, server_url, tx: None, connected: Arc::new(RwLock::new(false)), } } /// 连接到信令服务器 pub async fn connect( &mut self, on_message: impl Fn(SignalMessage) + Send + 'static, ) -> Result<()> { // 将 http:// 转换为 ws://,https:// 转换为 wss:// let ws_url = self.server_url .replace("http://", "ws://") .replace("https://", "wss://"); let url = format!( "{}/ws/signal?device_id={}", ws_url, self.device_id ); tracing::info!("Connecting to WebSocket: {}", url); let (ws_stream, _) = connect_async(&url).await?; let (mut write, mut read) = ws_stream.split(); let (tx, mut rx) = mpsc::channel::(32); self.tx = Some(tx); *self.connected.write().await = true; let connected = self.connected.clone(); // 发送任务 tokio::spawn(async move { while let Some(msg) = rx.recv().await { let text = serde_json::to_string(&msg).unwrap(); if write.send(Message::Text(text)).await.is_err() { break; } } }); // 接收任务 tokio::spawn(async move { while let Some(Ok(msg)) = read.next().await { if let Message::Text(text) = msg { if let Ok(signal_msg) = serde_json::from_str::(&text) { on_message(signal_msg); } } } *connected.write().await = false; }); Ok(()) } /// 发送消息 pub async fn send(&self, msg: SignalMessage) -> Result<()> { if let Some(tx) = &self.tx { tx.send(msg).await?; } Ok(()) } /// 发送心跳 pub async fn send_heartbeat(&self) -> Result<()> { self.send(SignalMessage::Heartbeat { device_id: self.device_id.clone(), }) .await } /// 发送连接请求 pub async fn request_connect( &self, session_id: String, to_device: String, verification_code: String, ) -> Result<()> { self.send(SignalMessage::ConnectRequest { session_id, from_device: self.device_id.clone(), to_device, verification_code, }) .await } /// 发送连接响应 pub async fn respond_connect( &self, session_id: String, accepted: bool, reason: Option, ) -> Result<()> { self.send(SignalMessage::ConnectResponse { session_id, accepted, reason, }) .await } /// 发送 SDP Offer pub async fn send_offer( &self, session_id: String, to_device: String, sdp: String, ) -> Result<()> { self.send(SignalMessage::Offer { session_id, from_device: self.device_id.clone(), to_device, sdp, }) .await } /// 发送 SDP Answer pub async fn send_answer( &self, session_id: String, to_device: String, sdp: String, ) -> Result<()> { self.send(SignalMessage::Answer { session_id, from_device: self.device_id.clone(), to_device, sdp, }) .await } /// 发送 ICE Candidate pub async fn send_candidate( &self, session_id: String, to_device: String, candidate: String, sdp_mid: Option, sdp_mline_index: Option, ) -> Result<()> { self.send(SignalMessage::Candidate { session_id, from_device: self.device_id.clone(), to_device, candidate, sdp_mid, sdp_mline_index, }) .await } /// 设置允许远程 pub async fn set_allow_remote(&self, allow: bool) -> Result<()> { self.send(SignalMessage::SetAllowRemote { device_id: self.device_id.clone(), allow, }) .await } /// 刷新验证码 pub async fn refresh_code(&self) -> Result<()> { self.send(SignalMessage::RefreshCode { device_id: self.device_id.clone(), }) .await } /// 结束会话 pub async fn end_session(&self, session_id: String) -> Result<()> { self.send(SignalMessage::SessionEnd { session_id }).await } /// 发送屏幕帧 pub async fn send_frame( &self, session_id: String, to_device: String, width: u32, height: u32, jpeg_data: &[u8], ) -> Result<()> { let data = BASE64.encode(jpeg_data); self.send(SignalMessage::ScreenFrame { session_id, from_device: self.device_id.clone(), to_device, width, height, data, }) .await } /// 是否已连接 pub async fn is_connected(&self) -> bool { *self.connected.read().await } }