Skip to main content

futu_server/
telnet.rs

1// Telnet 管理接口 — 运维调试用
2//
3// 支持的命令:
4//   help          — 显示帮助
5//   ping          — 连通测试
6//   version       — 版本信息
7//   show_subinfo  — 显示订阅状态
8//   set_loglevel  — 设置日志级别
9//   exit          — 关闭网关
10//
11// C++ 对齐: APIServer_Telnet.cpp (最大 5 个并发连接)
12
13use std::sync::Arc;
14
15use dashmap::DashMap;
16use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
17use tokio::net::TcpListener;
18use tokio::sync::watch;
19
20use crate::conn::ClientConn;
21use crate::metrics::GatewayMetrics;
22use crate::subscription::SubscriptionManager;
23
24/// v1.4.97 P1-D-F: telnet `relogin` 命令的回调.
25///
26/// 对齐 C++ `GTWCmd_ReLogin` (FTGateway/FTGTW_Define_Key.h:5 +
27/// GTWCmdAndPushReply.cpp:1780-1799) 命令名 — 不创新 `force_reconnect_backend`
28/// (per CLAUDE.md 坑 #51 "对齐 C++ = 减法").
29///
30/// `main.rs` wires 一个 closure 清 `bridge.caches.login_cache` (LoginCache::clear),
31/// 让下个 30s P1-D health tick 看到 `qot_logined=false` → 自动 trigger
32/// AuthRefresher relogin (走现有 v1.4.92 P1-D ladder 路径).
33///
34/// `Arc<dyn Fn>` 而非 trait object: 不需要 mutable state, 多 telnet client
35/// 并发调时是 idempotent (clear 已 cleared 的 cache 仍 OK).
36pub type ReloginFn = Arc<dyn Fn() + Send + Sync>;
37
38/// Telnet 管理服务
39pub struct TelnetServer {
40    listen_addr: String,
41    connections: Arc<DashMap<u64, ClientConn>>,
42    subscriptions: Option<Arc<SubscriptionManager>>,
43    metrics: Option<Arc<GatewayMetrics>>,
44    shutdown_tx: watch::Sender<bool>,
45    /// v1.4.97 P1-D-F: optional relogin callback (None = command unavailable
46    /// with helpful error msg).
47    relogin_fn: Option<ReloginFn>,
48}
49
50const MAX_TELNET_CONNECTIONS: usize = 5;
51const VERSION: &str = env!("CARGO_PKG_VERSION");
52
53async fn write_telnet<W>(
54    writer: &mut W,
55    peer_addr: std::net::SocketAddr,
56    context: &'static str,
57    bytes: &[u8],
58) -> bool
59where
60    W: AsyncWrite + Unpin,
61{
62    match writer.write_all(bytes).await {
63        Ok(()) => true,
64        Err(err) => {
65            tracing::debug!(
66                peer = %peer_addr,
67                context,
68                error = %err,
69                "telnet write failed; client likely disconnected"
70            );
71            false
72        }
73    }
74}
75
76impl TelnetServer {
77    pub fn new(
78        listen_addr: String,
79        connections: Arc<DashMap<u64, ClientConn>>,
80        subscriptions: Option<Arc<SubscriptionManager>>,
81        metrics: Option<Arc<GatewayMetrics>>,
82        shutdown_tx: watch::Sender<bool>,
83    ) -> Self {
84        Self {
85            listen_addr,
86            connections,
87            subscriptions,
88            metrics,
89            shutdown_tx,
90            relogin_fn: None,
91        }
92    }
93
94    /// v1.4.97 P1-D-F: builder-style setter for relogin callback.
95    pub fn with_relogin_fn(mut self, relogin_fn: ReloginFn) -> Self {
96        self.relogin_fn = Some(relogin_fn);
97        self
98    }
99
100    pub async fn run(&self) -> anyhow::Result<()> {
101        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
102        self.run_until_shutdown(shutdown_rx).await
103    }
104
105    pub async fn run_until_shutdown(
106        &self,
107        mut shutdown_rx: watch::Receiver<bool>,
108    ) -> anyhow::Result<()> {
109        let listener = TcpListener::bind(&self.listen_addr).await?;
110        tracing::info!(addr = %self.listen_addr, "Telnet server listening");
111
112        let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
113
114        loop {
115            let (stream, peer_addr) = tokio::select! {
116                _ = crate::listener::shutdown_requested(&mut shutdown_rx) => {
117                    tracing::info!("Telnet server accept loop stopped by shutdown signal");
118                    break;
119                }
120                accepted = listener.accept() => accepted?,
121            };
122            let count = active.load(std::sync::atomic::Ordering::Relaxed);
123            if count >= MAX_TELNET_CONNECTIONS {
124                tracing::warn!(peer = %peer_addr, "telnet max connections reached");
125                drop(stream);
126                continue;
127            }
128
129            active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
130            let active_clone = Arc::clone(&active);
131            let connections = Arc::clone(&self.connections);
132            let subscriptions = self.subscriptions.clone();
133            let metrics = self.metrics.clone();
134            let shutdown_tx = self.shutdown_tx.clone();
135            // v1.4.97 P1-D-F: clone relogin callback into spawned task.
136            let relogin_fn = self.relogin_fn.clone();
137
138            tokio::spawn(async move {
139                tracing::info!(peer = %peer_addr, "telnet client connected");
140                let (reader, mut writer) = stream.into_split();
141                let mut reader = BufReader::new(reader);
142
143                if !write_telnet(
144                    &mut writer,
145                    peer_addr,
146                    "greeting",
147                    b"FutuOpenD Rust Gateway Telnet Console\r\nType 'help' for commands.\r\n> ",
148                )
149                .await
150                {
151                    active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
152                    tracing::info!(peer = %peer_addr, "telnet client disconnected");
153                    return;
154                }
155
156                let mut line = String::new();
157                loop {
158                    line.clear();
159                    match reader.read_line(&mut line).await {
160                        Ok(0) => break, // EOF
161                        Ok(_) => {}
162                        Err(_) => break,
163                    }
164
165                    let cmd = line.trim().to_lowercase();
166                    let response = match cmd.as_str() {
167                        "help" => "Available commands:\r\n\
168                             help           — show this help\r\n\
169                             ping           — connectivity test\r\n\
170                             version        — show version\r\n\
171                             show_subinfo   — show subscription info\r\n\
172                             show_conn      — show active connections\r\n\
173                             show_metrics   — show gateway metrics\r\n\
174                             set_loglevel   — change log level (debug/info/warn/error)\r\n\
175                             relogin        — clear login_cache; next P1-D tick triggers relogin (v1.4.97)\r\n\
176                             exit           — shutdown gateway\r\n"
177                            .to_string(),
178                        "ping" => "pong\r\n".to_string(),
179                        "version" => format!("futu-opend-rs v{VERSION}\r\n"),
180                        "show_subinfo" => {
181                            if let Some(ref subs) = subscriptions {
182                                let total_quota = subs.get_total_used_quota();
183                                format!("Total subscription quota used: {total_quota}/4000\r\n")
184                            } else {
185                                "Subscription manager not available\r\n".to_string()
186                            }
187                        }
188                        "show_conn" => {
189                            let count = connections.len();
190                            let mut info = format!("Active connections: {count}\r\n");
191                            for entry in connections.iter() {
192                                let c = entry.value();
193                                info += &format!("  conn_id={} state={:?}\r\n", c.conn_id, c.state);
194                            }
195                            info
196                        }
197                        "show_metrics" => {
198                            if let Some(ref m) = metrics {
199                                m.report()
200                            } else {
201                                "Metrics not available\r\n".to_string()
202                            }
203                        }
204                        s if s.starts_with("set_loglevel") => {
205                            let parts: Vec<&str> = s.split_whitespace().collect();
206                            if parts.len() < 2 {
207                                "Usage: set_loglevel <debug|info|warn|error>\r\n".to_string()
208                            } else {
209                                // tracing 动态级别调整需要 reload handle,
210                                // 简化实现:只记录意图
211                                let level = parts[1];
212                                tracing::info!(level, "log level change requested via telnet");
213                                format!(
214                                    "Log level change to '{level}' noted (restart for full effect)\r\n"
215                                )
216                            }
217                        }
218                        "relogin" => {
219                            // v1.4.97 P1-D-F: align with C++ GTWCmd_ReLogin
220                            // (FTGateway/FTGTW_Define_Key.h:5).
221                            //
222                            // Clear login_cache → next 30s health-loop tick
223                            // sees qot_logined=false → triggers AuthRefresher
224                            // relogin via existing v1.4.92 P1-D ladder path.
225                            // Tester: wait up to ~30s, see daemon log
226                            // "P1-D loop alive" + "relogin succeeded".
227                            if let Some(ref f) = relogin_fn {
228                                tracing::info!(peer = %peer_addr,
229                                    "v1.4.97 P1-D-F: relogin requested via telnet");
230                                f();
231                                "Relogin triggered. Watch daemon log for \
232                                 \"P1-D ladder fired\" + \"relogin succeeded\" \
233                                 in next 30s tick.\r\n"
234                                    .to_string()
235                            } else {
236                                "Relogin callback not wired (daemon was started \
237                                 without bridge handle). Relogin requires daemon \
238                                 restart in this configuration.\r\n"
239                                    .to_string()
240                            }
241                        }
242                        "exit" => {
243                            write_telnet(
244                                &mut writer,
245                                peer_addr,
246                                "shutdown_notice",
247                                b"Shutting down...\r\n",
248                            )
249                            .await;
250                            if shutdown_tx.send(true).is_err() {
251                                tracing::warn!(
252                                    peer = %peer_addr,
253                                    "telnet exit requested but shutdown receiver dropped"
254                                );
255                            }
256                            break;
257                        }
258                        "" => String::new(),
259                        _ => format!(
260                            "Unknown command: '{cmd}'. Type 'help' for available commands.\r\n"
261                        ),
262                    };
263
264                    if !response.is_empty()
265                        && !write_telnet(&mut writer, peer_addr, "response", response.as_bytes())
266                            .await
267                    {
268                        break;
269                    }
270                    if !write_telnet(&mut writer, peer_addr, "prompt", b"> ").await {
271                        break;
272                    }
273                }
274
275                active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
276                tracing::info!(peer = %peer_addr, "telnet client disconnected");
277            });
278        }
279
280        Ok(())
281    }
282}
283
284#[cfg(test)]
285mod tests;