futu_server/
telnet.rs

1// Telnet 管理接口 — 运维调试用
2//
3// 支持的命令:
4//   help          — 显示帮助
5//   ping          — 连通测试
6//   version       — 版本信息
7//   show_subinfo  — 显示订阅状态
8//   set_loglevel  — 设置日志级别
9//   exit          — 关闭网关
10//
11// C++ 对齐: APIServer_Telnet.cpp (最大 5 个并发连接)
12
13use std::sync::Arc;
14
15use dashmap::DashMap;
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::net::TcpListener;
18use tokio::sync::watch;
19
20use crate::conn::ClientConn;
21use crate::metrics::GatewayMetrics;
22use crate::subscription::SubscriptionManager;
23
24/// Telnet 管理服务
25pub struct TelnetServer {
26    listen_addr: String,
27    connections: Arc<DashMap<u64, ClientConn>>,
28    subscriptions: Option<Arc<SubscriptionManager>>,
29    metrics: Option<Arc<GatewayMetrics>>,
30    shutdown_tx: watch::Sender<bool>,
31}
32
33const MAX_TELNET_CONNECTIONS: usize = 5;
34const VERSION: &str = env!("CARGO_PKG_VERSION");
35
36impl TelnetServer {
37    pub fn new(
38        listen_addr: String,
39        connections: Arc<DashMap<u64, ClientConn>>,
40        subscriptions: Option<Arc<SubscriptionManager>>,
41        metrics: Option<Arc<GatewayMetrics>>,
42        shutdown_tx: watch::Sender<bool>,
43    ) -> Self {
44        Self {
45            listen_addr,
46            connections,
47            subscriptions,
48            metrics,
49            shutdown_tx,
50        }
51    }
52
53    pub async fn run(&self) -> anyhow::Result<()> {
54        let listener = TcpListener::bind(&self.listen_addr).await?;
55        tracing::info!(addr = %self.listen_addr, "Telnet server listening");
56
57        let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
58
59        loop {
60            let (stream, peer_addr) = listener.accept().await?;
61            let count = active.load(std::sync::atomic::Ordering::Relaxed);
62            if count >= MAX_TELNET_CONNECTIONS {
63                tracing::warn!(peer = %peer_addr, "telnet max connections reached");
64                drop(stream);
65                continue;
66            }
67
68            active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
69            let active_clone = Arc::clone(&active);
70            let connections = Arc::clone(&self.connections);
71            let subscriptions = self.subscriptions.clone();
72            let metrics = self.metrics.clone();
73            let shutdown_tx = self.shutdown_tx.clone();
74
75            tokio::spawn(async move {
76                tracing::info!(peer = %peer_addr, "telnet client connected");
77                let (reader, mut writer) = stream.into_split();
78                let mut reader = BufReader::new(reader);
79
80                let _ = writer
81                    .write_all(
82                        b"FutuOpenD Rust Gateway Telnet Console\r\nType 'help' for commands.\r\n> ",
83                    )
84                    .await;
85
86                let mut line = String::new();
87                loop {
88                    line.clear();
89                    match reader.read_line(&mut line).await {
90                        Ok(0) => break, // EOF
91                        Ok(_) => {}
92                        Err(_) => break,
93                    }
94
95                    let cmd = line.trim().to_lowercase();
96                    let response = match cmd.as_str() {
97                        "help" => "Available commands:\r\n\
98                             help           — show this help\r\n\
99                             ping           — connectivity test\r\n\
100                             version        — show version\r\n\
101                             show_subinfo   — show subscription info\r\n\
102                             show_conn      — show active connections\r\n\
103                             show_metrics   — show gateway metrics\r\n\
104                             set_loglevel   — change log level (debug/info/warn/error)\r\n\
105                             exit           — shutdown gateway\r\n"
106                            .to_string(),
107                        "ping" => "pong\r\n".to_string(),
108                        "version" => format!("futu-opend-rs v{VERSION}\r\n"),
109                        "show_subinfo" => {
110                            if let Some(ref subs) = subscriptions {
111                                let total_quota = subs.get_total_used_quota();
112                                format!("Total subscription quota used: {total_quota}/4000\r\n")
113                            } else {
114                                "Subscription manager not available\r\n".to_string()
115                            }
116                        }
117                        "show_conn" => {
118                            let count = connections.len();
119                            let mut info = format!("Active connections: {count}\r\n");
120                            for entry in connections.iter() {
121                                let c = entry.value();
122                                info += &format!("  conn_id={} state={:?}\r\n", c.conn_id, c.state);
123                            }
124                            info
125                        }
126                        "show_metrics" => {
127                            if let Some(ref m) = metrics {
128                                m.report()
129                            } else {
130                                "Metrics not available\r\n".to_string()
131                            }
132                        }
133                        s if s.starts_with("set_loglevel") => {
134                            let parts: Vec<&str> = s.split_whitespace().collect();
135                            if parts.len() < 2 {
136                                "Usage: set_loglevel <debug|info|warn|error>\r\n".to_string()
137                            } else {
138                                // tracing 动态级别调整需要 reload handle,
139                                // 简化实现:只记录意图
140                                let level = parts[1];
141                                tracing::info!(level, "log level change requested via telnet");
142                                format!("Log level change to '{level}' noted (restart for full effect)\r\n")
143                            }
144                        }
145                        "exit" => {
146                            let _ = writer.write_all(b"Shutting down...\r\n").await;
147                            let _ = shutdown_tx.send(true);
148                            break;
149                        }
150                        "" => String::new(),
151                        _ => format!(
152                            "Unknown command: '{cmd}'. Type 'help' for available commands.\r\n"
153                        ),
154                    };
155
156                    if !response.is_empty() {
157                        let _ = writer.write_all(response.as_bytes()).await;
158                    }
159                    let _ = writer.write_all(b"> ").await;
160                }
161
162                active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
163                tracing::info!(peer = %peer_addr, "telnet client disconnected");
164            });
165        }
166    }
167}