1mod guard;
14mod handlers;
15mod state;
16mod tools;
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use anyhow::{Context, Result};
22use clap::Parser;
23use futu_auth::KeyStore;
24use rmcp::{transport::stdio, ServiceExt};
25use tracing_subscriber::{
26 filter::filter_fn, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer,
27};
28
29use crate::state::ServerState;
30use crate::tools::FutuServer;
31
32fn setup_logging(
37 default_level: &str,
38 audit_path: Option<&std::path::Path>,
39) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
40 let filter =
41 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
42
43 let fmt_layer = fmt::layer().with_writer(std::io::stderr).with_ansi(false);
44
45 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
46
47 if let Some(path) = audit_path {
48 let (writer, guard) = futu_auth::audit::open_writer(path)
49 .with_context(|| format!("open audit log {}", path.display()))?;
50 let audit_layer = fmt::layer()
51 .json()
52 .flatten_event(true)
53 .with_current_span(false)
54 .with_span_list(false)
55 .with_target(true)
56 .with_writer(writer)
57 .with_filter(filter_fn(|meta| meta.target() == futu_auth::audit::TARGET));
58 registry.with(audit_layer).init();
59 tracing::info!(
60 path = %path.display(),
61 "audit JSONL logger enabled (target=futu_audit)"
62 );
63 Ok(Some(guard))
64 } else {
65 registry.init();
66 Ok(None)
67 }
68}
69
70#[derive(Parser)]
72#[command(
73 name = "futu-mcp",
74 version,
75 about = "FutuOpenD-rs MCP server",
76 long_about = "通过 Model Context Protocol 暴露 Futu 行情/账户工具。默认 stdio transport。"
77)]
78struct Cli {
79 #[arg(short, long, env = "FUTU_GATEWAY", default_value = "127.0.0.1:11111")]
81 gateway: String,
82
83 #[arg(short, long)]
85 verbose: bool,
86
87 #[arg(long)]
92 keys_file: Option<PathBuf>,
93
94 #[arg(long, env = "FUTU_MCP_API_KEY", hide_env_values = true)]
98 api_key: Option<String>,
99
100 #[arg(long)]
106 enable_trading: bool,
107
108 #[arg(long, requires = "enable_trading")]
110 allow_real_trading: bool,
111
112 #[arg(long)]
119 audit_log: Option<PathBuf>,
120
121 #[arg(long)]
130 http_listen: Option<String>,
131}
132
133#[tokio::main]
134async fn main() -> Result<()> {
135 let cli = Cli::parse();
136
137 let default_level = if cli.verbose { "debug" } else { "info" };
139
140 let _audit_guard = setup_logging(default_level, cli.audit_log.as_deref())?;
142
143 let key_store = match &cli.keys_file {
145 Some(path) => {
146 let store = KeyStore::load(path)
147 .with_context(|| format!("load keys file {}", path.display()))?;
148 tracing::info!(
149 path = %path.display(),
150 keys_loaded = store.len(),
151 "scope mode: keys file loaded"
152 );
153 if cli.enable_trading || cli.allow_real_trading {
154 tracing::warn!(
155 "--enable-trading / --allow-real-trading are IGNORED in scope mode; \
156 trading permissions are controlled by API key scopes"
157 );
158 }
159 Arc::new(store)
160 }
161 None => {
162 tracing::info!("legacy mode: no keys file; using --enable-trading switches");
163 Arc::new(KeyStore::empty())
164 }
165 };
166
167 let authed_key = if key_store.is_configured() {
169 match cli.api_key.as_deref() {
170 Some(plaintext) if !plaintext.is_empty() => match key_store.verify(plaintext) {
171 Some(rec) => {
172 tracing::info!(
173 key_id = %rec.id,
174 scopes = ?rec.scopes.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
175 "API key verified"
176 );
177 Some(rec)
178 }
179 None => {
180 tracing::error!("FUTU_MCP_API_KEY does not match any key in keys.json");
181 None
182 }
183 },
184 _ => {
185 tracing::warn!(
186 "scope mode active but FUTU_MCP_API_KEY not set; \
187 all tool calls will be rejected"
188 );
189 None
190 }
191 }
192 } else {
193 None
194 };
195
196 tracing::info!(
197 gateway = %cli.gateway,
198 scope_mode = key_store.is_configured(),
199 enable_trading = cli.enable_trading,
200 allow_real_trading = cli.allow_real_trading,
201 "futu-mcp starting"
202 );
203 if !key_store.is_configured() && cli.enable_trading {
204 tracing::warn!(
205 allow_real_trading = cli.allow_real_trading,
206 "trading write tools ENABLED (legacy mode)"
207 );
208 }
209
210 let state = ServerState::new(cli.gateway)
211 .with_trading(cli.enable_trading, cli.allow_real_trading)
212 .with_key_store(key_store.clone())
213 .with_authed_key(authed_key);
214 let server = FutuServer::new(state);
215
216 #[cfg(unix)]
218 spawn_sighup_reload(key_store);
219
220 futu_auth::metrics::install(std::sync::Arc::new(futu_auth::MetricsRegistry::default()));
224
225 if let Some(listen) = cli.http_listen {
226 serve_http(server, &listen).await?;
227 } else {
228 serve_stdio(server).await?;
229 }
230
231 Ok(())
232}
233
234async fn serve_stdio(server: tools::FutuServer) -> Result<()> {
236 let service = server
237 .serve(stdio())
238 .await
239 .map_err(|e| anyhow::anyhow!("MCP service init failed: {e}"))?;
240
241 service
242 .waiting()
243 .await
244 .map_err(|e| anyhow::anyhow!("MCP service error: {e}"))?;
245 Ok(())
246}
247
248async fn serve_http(server: tools::FutuServer, listen: &str) -> Result<()> {
251 use rmcp::transport::streamable_http_server::{
252 session::local::LocalSessionManager, StreamableHttpService,
253 };
254
255 let bind_addr = if listen.starts_with(':') {
257 format!("0.0.0.0{listen}")
258 } else {
259 listen.to_string()
260 };
261
262 let session_manager = std::sync::Arc::new(LocalSessionManager::default());
266 let mcp_svc = StreamableHttpService::new(
267 {
268 let server = server.clone();
269 move || Ok::<_, std::io::Error>(server.clone())
270 },
271 session_manager,
272 Default::default(),
273 );
274
275 use axum::routing::get;
277 let app = axum::Router::new()
278 .route(
279 "/metrics",
280 get(|| async {
281 let body = futu_auth::metrics::global()
282 .map(|r| r.render_prometheus())
283 .unwrap_or_default();
284 (
285 axum::http::StatusCode::OK,
286 [(
287 axum::http::header::CONTENT_TYPE,
288 "text/plain; version=0.0.4",
289 )],
290 body,
291 )
292 }),
293 )
294 .nest_service("/mcp", mcp_svc);
295
296 let listener = tokio::net::TcpListener::bind(&bind_addr)
297 .await
298 .map_err(|e| anyhow::anyhow!("bind {bind_addr}: {e}"))?;
299 tracing::info!(
300 addr = %bind_addr,
301 "futu-mcp HTTP transport started (MCP: /mcp, metrics: /metrics)"
302 );
303 axum::serve(listener, app)
304 .await
305 .map_err(|e| anyhow::anyhow!("axum serve error: {e}"))?;
306 Ok(())
307}
308
309#[cfg(unix)]
310fn spawn_sighup_reload(store: Arc<KeyStore>) {
311 if !store.is_configured() {
312 return;
313 }
314 use tokio::signal::unix::{signal, SignalKind};
315 tokio::spawn(async move {
316 let mut sig = match signal(SignalKind::hangup()) {
317 Ok(s) => s,
318 Err(e) => {
319 tracing::error!(error = %e, "failed to install SIGHUP handler");
320 return;
321 }
322 };
323 tracing::info!("SIGHUP handler installed; send `kill -HUP <pid>` to reload keys");
324 while sig.recv().await.is_some() {
325 match store.reload() {
326 Ok(()) => tracing::warn!(keys_loaded = store.len(), "keys file reloaded on SIGHUP"),
327 Err(e) => tracing::error!(error = %e, "SIGHUP reload failed; keeping old keys"),
328 }
329 }
330 });
331}