1use std::sync::Arc;
14
15use dashmap::DashMap;
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::net::TcpListener;
18use tokio::sync::watch;
19
20use crate::conn::ClientConn;
21use crate::metrics::GatewayMetrics;
22use crate::subscription::SubscriptionManager;
23
24pub struct TelnetServer {
26 listen_addr: String,
27 connections: Arc<DashMap<u64, ClientConn>>,
28 subscriptions: Option<Arc<SubscriptionManager>>,
29 metrics: Option<Arc<GatewayMetrics>>,
30 shutdown_tx: watch::Sender<bool>,
31}
32
33const MAX_TELNET_CONNECTIONS: usize = 5;
34const VERSION: &str = env!("CARGO_PKG_VERSION");
35
36impl TelnetServer {
37 pub fn new(
38 listen_addr: String,
39 connections: Arc<DashMap<u64, ClientConn>>,
40 subscriptions: Option<Arc<SubscriptionManager>>,
41 metrics: Option<Arc<GatewayMetrics>>,
42 shutdown_tx: watch::Sender<bool>,
43 ) -> Self {
44 Self {
45 listen_addr,
46 connections,
47 subscriptions,
48 metrics,
49 shutdown_tx,
50 }
51 }
52
53 pub async fn run(&self) -> anyhow::Result<()> {
54 let listener = TcpListener::bind(&self.listen_addr).await?;
55 tracing::info!(addr = %self.listen_addr, "Telnet server listening");
56
57 let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
58
59 loop {
60 let (stream, peer_addr) = listener.accept().await?;
61 let count = active.load(std::sync::atomic::Ordering::Relaxed);
62 if count >= MAX_TELNET_CONNECTIONS {
63 tracing::warn!(peer = %peer_addr, "telnet max connections reached");
64 drop(stream);
65 continue;
66 }
67
68 active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
69 let active_clone = Arc::clone(&active);
70 let connections = Arc::clone(&self.connections);
71 let subscriptions = self.subscriptions.clone();
72 let metrics = self.metrics.clone();
73 let shutdown_tx = self.shutdown_tx.clone();
74
75 tokio::spawn(async move {
76 tracing::info!(peer = %peer_addr, "telnet client connected");
77 let (reader, mut writer) = stream.into_split();
78 let mut reader = BufReader::new(reader);
79
80 let _ = writer
81 .write_all(
82 b"FutuOpenD Rust Gateway Telnet Console\r\nType 'help' for commands.\r\n> ",
83 )
84 .await;
85
86 let mut line = String::new();
87 loop {
88 line.clear();
89 match reader.read_line(&mut line).await {
90 Ok(0) => break, Ok(_) => {}
92 Err(_) => break,
93 }
94
95 let cmd = line.trim().to_lowercase();
96 let response = match cmd.as_str() {
97 "help" => "Available commands:\r\n\
98 help — show this help\r\n\
99 ping — connectivity test\r\n\
100 version — show version\r\n\
101 show_subinfo — show subscription info\r\n\
102 show_conn — show active connections\r\n\
103 show_metrics — show gateway metrics\r\n\
104 set_loglevel — change log level (debug/info/warn/error)\r\n\
105 exit — shutdown gateway\r\n"
106 .to_string(),
107 "ping" => "pong\r\n".to_string(),
108 "version" => format!("futu-opend-rs v{VERSION}\r\n"),
109 "show_subinfo" => {
110 if let Some(ref subs) = subscriptions {
111 let total_quota = subs.get_total_used_quota();
112 format!("Total subscription quota used: {total_quota}/4000\r\n")
113 } else {
114 "Subscription manager not available\r\n".to_string()
115 }
116 }
117 "show_conn" => {
118 let count = connections.len();
119 let mut info = format!("Active connections: {count}\r\n");
120 for entry in connections.iter() {
121 let c = entry.value();
122 info += &format!(" conn_id={} state={:?}\r\n", c.conn_id, c.state);
123 }
124 info
125 }
126 "show_metrics" => {
127 if let Some(ref m) = metrics {
128 m.report()
129 } else {
130 "Metrics not available\r\n".to_string()
131 }
132 }
133 s if s.starts_with("set_loglevel") => {
134 let parts: Vec<&str> = s.split_whitespace().collect();
135 if parts.len() < 2 {
136 "Usage: set_loglevel <debug|info|warn|error>\r\n".to_string()
137 } else {
138 let level = parts[1];
141 tracing::info!(level, "log level change requested via telnet");
142 format!("Log level change to '{level}' noted (restart for full effect)\r\n")
143 }
144 }
145 "exit" => {
146 let _ = writer.write_all(b"Shutting down...\r\n").await;
147 let _ = shutdown_tx.send(true);
148 break;
149 }
150 "" => String::new(),
151 _ => format!(
152 "Unknown command: '{cmd}'. Type 'help' for available commands.\r\n"
153 ),
154 };
155
156 if !response.is_empty() {
157 let _ = writer.write_all(response.as_bytes()).await;
158 }
159 let _ = writer.write_all(b"> ").await;
160 }
161
162 active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
163 tracing::info!(peer = %peer_addr, "telnet client disconnected");
164 });
165 }
166 }
167}