1use std::sync::Arc;
14
15use dashmap::DashMap;
16use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
17use tokio::net::TcpListener;
18use tokio::sync::watch;
19
20use crate::conn::ClientConn;
21use crate::metrics::GatewayMetrics;
22use crate::subscription::SubscriptionManager;
23
24pub type ReloginFn = Arc<dyn Fn() + Send + Sync>;
37
38pub struct TelnetServer {
40 listen_addr: String,
41 connections: Arc<DashMap<u64, ClientConn>>,
42 subscriptions: Option<Arc<SubscriptionManager>>,
43 metrics: Option<Arc<GatewayMetrics>>,
44 shutdown_tx: watch::Sender<bool>,
45 relogin_fn: Option<ReloginFn>,
48}
49
50const MAX_TELNET_CONNECTIONS: usize = 5;
51const VERSION: &str = env!("CARGO_PKG_VERSION");
52
53async fn write_telnet<W>(
54 writer: &mut W,
55 peer_addr: std::net::SocketAddr,
56 context: &'static str,
57 bytes: &[u8],
58) -> bool
59where
60 W: AsyncWrite + Unpin,
61{
62 match writer.write_all(bytes).await {
63 Ok(()) => true,
64 Err(err) => {
65 tracing::debug!(
66 peer = %peer_addr,
67 context,
68 error = %err,
69 "telnet write failed; client likely disconnected"
70 );
71 false
72 }
73 }
74}
75
76impl TelnetServer {
77 pub fn new(
78 listen_addr: String,
79 connections: Arc<DashMap<u64, ClientConn>>,
80 subscriptions: Option<Arc<SubscriptionManager>>,
81 metrics: Option<Arc<GatewayMetrics>>,
82 shutdown_tx: watch::Sender<bool>,
83 ) -> Self {
84 Self {
85 listen_addr,
86 connections,
87 subscriptions,
88 metrics,
89 shutdown_tx,
90 relogin_fn: None,
91 }
92 }
93
94 pub fn with_relogin_fn(mut self, relogin_fn: ReloginFn) -> Self {
96 self.relogin_fn = Some(relogin_fn);
97 self
98 }
99
100 pub async fn run(&self) -> anyhow::Result<()> {
101 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
102 self.run_until_shutdown(shutdown_rx).await
103 }
104
105 pub async fn run_until_shutdown(
106 &self,
107 mut shutdown_rx: watch::Receiver<bool>,
108 ) -> anyhow::Result<()> {
109 let listener = TcpListener::bind(&self.listen_addr).await?;
110 tracing::info!(addr = %self.listen_addr, "Telnet server listening");
111
112 let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
113
114 loop {
115 let (stream, peer_addr) = tokio::select! {
116 _ = crate::listener::shutdown_requested(&mut shutdown_rx) => {
117 tracing::info!("Telnet server accept loop stopped by shutdown signal");
118 break;
119 }
120 accepted = listener.accept() => accepted?,
121 };
122 let count = active.load(std::sync::atomic::Ordering::Relaxed);
123 if count >= MAX_TELNET_CONNECTIONS {
124 tracing::warn!(peer = %peer_addr, "telnet max connections reached");
125 drop(stream);
126 continue;
127 }
128
129 active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
130 let active_clone = Arc::clone(&active);
131 let connections = Arc::clone(&self.connections);
132 let subscriptions = self.subscriptions.clone();
133 let metrics = self.metrics.clone();
134 let shutdown_tx = self.shutdown_tx.clone();
135 let relogin_fn = self.relogin_fn.clone();
137
138 tokio::spawn(async move {
139 tracing::info!(peer = %peer_addr, "telnet client connected");
140 let (reader, mut writer) = stream.into_split();
141 let mut reader = BufReader::new(reader);
142
143 if !write_telnet(
144 &mut writer,
145 peer_addr,
146 "greeting",
147 b"FutuOpenD Rust Gateway Telnet Console\r\nType 'help' for commands.\r\n> ",
148 )
149 .await
150 {
151 active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
152 tracing::info!(peer = %peer_addr, "telnet client disconnected");
153 return;
154 }
155
156 let mut line = String::new();
157 loop {
158 line.clear();
159 match reader.read_line(&mut line).await {
160 Ok(0) => break, Ok(_) => {}
162 Err(_) => break,
163 }
164
165 let cmd = line.trim().to_lowercase();
166 let response = match cmd.as_str() {
167 "help" => "Available commands:\r\n\
168 help — show this help\r\n\
169 ping — connectivity test\r\n\
170 version — show version\r\n\
171 show_subinfo — show subscription info\r\n\
172 show_conn — show active connections\r\n\
173 show_metrics — show gateway metrics\r\n\
174 set_loglevel — change log level (debug/info/warn/error)\r\n\
175 relogin — clear login_cache; next P1-D tick triggers relogin (v1.4.97)\r\n\
176 exit — shutdown gateway\r\n"
177 .to_string(),
178 "ping" => "pong\r\n".to_string(),
179 "version" => format!("futu-opend-rs v{VERSION}\r\n"),
180 "show_subinfo" => {
181 if let Some(ref subs) = subscriptions {
182 let total_quota = subs.get_total_used_quota();
183 format!("Total subscription quota used: {total_quota}/4000\r\n")
184 } else {
185 "Subscription manager not available\r\n".to_string()
186 }
187 }
188 "show_conn" => {
189 let count = connections.len();
190 let mut info = format!("Active connections: {count}\r\n");
191 for entry in connections.iter() {
192 let c = entry.value();
193 info += &format!(" conn_id={} state={:?}\r\n", c.conn_id, c.state);
194 }
195 info
196 }
197 "show_metrics" => {
198 if let Some(ref m) = metrics {
199 m.report()
200 } else {
201 "Metrics not available\r\n".to_string()
202 }
203 }
204 s if s.starts_with("set_loglevel") => {
205 let parts: Vec<&str> = s.split_whitespace().collect();
206 if parts.len() < 2 {
207 "Usage: set_loglevel <debug|info|warn|error>\r\n".to_string()
208 } else {
209 let level = parts[1];
212 tracing::info!(level, "log level change requested via telnet");
213 format!(
214 "Log level change to '{level}' noted (restart for full effect)\r\n"
215 )
216 }
217 }
218 "relogin" => {
219 if let Some(ref f) = relogin_fn {
228 tracing::info!(peer = %peer_addr,
229 "v1.4.97 P1-D-F: relogin requested via telnet");
230 f();
231 "Relogin triggered. Watch daemon log for \
232 \"P1-D ladder fired\" + \"relogin succeeded\" \
233 in next 30s tick.\r\n"
234 .to_string()
235 } else {
236 "Relogin callback not wired (daemon was started \
237 without bridge handle). Relogin requires daemon \
238 restart in this configuration.\r\n"
239 .to_string()
240 }
241 }
242 "exit" => {
243 write_telnet(
244 &mut writer,
245 peer_addr,
246 "shutdown_notice",
247 b"Shutting down...\r\n",
248 )
249 .await;
250 if shutdown_tx.send(true).is_err() {
251 tracing::warn!(
252 peer = %peer_addr,
253 "telnet exit requested but shutdown receiver dropped"
254 );
255 }
256 break;
257 }
258 "" => String::new(),
259 _ => format!(
260 "Unknown command: '{cmd}'. Type 'help' for available commands.\r\n"
261 ),
262 };
263
264 if !response.is_empty()
265 && !write_telnet(&mut writer, peer_addr, "response", response.as_bytes())
266 .await
267 {
268 break;
269 }
270 if !write_telnet(&mut writer, peer_addr, "prompt", b"> ").await {
271 break;
272 }
273 }
274
275 active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
276 tracing::info!(peer = %peer_addr, "telnet client disconnected");
277 });
278 }
279
280 Ok(())
281 }
282}
283
284#[cfg(test)]
285mod tests;