1mod card_num_expand;
14mod guard;
15mod handlers;
16mod state;
17mod tool_account;
18mod tool_args;
19mod tool_auth;
20mod tool_enums;
21mod tools;
22mod trade_pwd;
23mod transport;
27
28use std::path::PathBuf;
29use std::sync::Arc;
30
31use anyhow::{Context, Result};
32use clap::{ArgMatches, CommandFactory, FromArgMatches, Parser, parser::ValueSource};
33use futu_auth::KeyStore;
34use rmcp::ServiceExt;
35use crate::transport::resilient_stdio;
37use tracing_subscriber::{
38 EnvFilter, Layer, filter::filter_fn, fmt, layer::SubscriberExt, util::SubscriberInitExt,
39};
40
41#[cfg(test)]
42pub(crate) use crate::card_num_expand::build_card_num_resolver;
43use crate::card_num_expand::spawn_card_num_expand_retry;
44#[cfg(unix)]
45use crate::card_num_expand::spawn_sighup_reload;
46use crate::state::ServerState;
47use crate::tools::FutuServer;
48
49fn setup_logging(
54 default_level: &str,
55 audit_path: Option<&std::path::Path>,
56) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
57 let filter =
58 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
59
60 let fmt_layer = fmt::layer()
61 .with_timer(futu_core::log::LocalRfc3339Timer)
62 .with_writer(std::io::stderr)
63 .with_ansi(false);
64
65 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
66
67 if let Some(path) = audit_path {
68 let (writer, guard) = futu_auth::audit::open_writer(path)
69 .with_context(|| format!("open audit log {}", path.display()))?;
70 let audit_layer = fmt::layer()
71 .json()
72 .with_timer(futu_core::log::LocalRfc3339Timer)
73 .flatten_event(true)
74 .with_current_span(false)
75 .with_span_list(false)
76 .with_target(true)
77 .with_writer(writer)
78 .with_filter(filter_fn(|meta| meta.target() == futu_auth::audit::TARGET));
79 registry.with(audit_layer).init();
80 tracing::info!(
81 path = %path.display(),
82 "audit JSONL logger enabled (target=futu_audit)"
83 );
84 Ok(Some(guard))
85 } else {
86 registry.init();
87 Ok(None)
88 }
89}
90
91#[derive(Parser)]
93#[command(
94 name = "futu-mcp",
95 version,
96 about = "FutuOpenD-rs MCP server",
97 long_about = "通过 Model Context Protocol 暴露 Futu 行情/账户工具。默认 stdio transport。"
98)]
99struct Cli {
100 #[arg(short, long, env = "FUTU_GATEWAY", default_value = "127.0.0.1:11111")]
102 gateway: String,
103
104 #[arg(short, long)]
106 verbose: bool,
107
108 #[arg(long)]
113 keys_file: Option<PathBuf>,
114
115 #[arg(long, env = "FUTU_MCP_API_KEY", hide_env_values = true)]
119 api_key: Option<String>,
120
121 #[arg(long, env = "FUTU_TRADE_PWD_ACCOUNT")]
127 trade_pwd_account: Option<String>,
128
129 #[arg(long)]
135 enable_trading: bool,
136
137 #[arg(long, requires = "enable_trading")]
139 allow_real_trading: bool,
140
141 #[arg(long)]
148 audit_log: Option<PathBuf>,
149
150 #[arg(long)]
159 http_listen: Option<String>,
160
161 #[arg(long, requires = "tls_key")]
166 tls_cert: Option<PathBuf>,
167
168 #[arg(long, requires = "tls_cert")]
170 tls_key: Option<PathBuf>,
171
172 #[arg(long)]
184 config: Option<PathBuf>,
185}
186
187#[derive(Debug, Default, serde::Deserialize)]
202#[serde(default, deny_unknown_fields)]
203struct FileConfig {
204 gateway: Option<String>,
205 verbose: Option<bool>,
206 keys_file: Option<PathBuf>,
207 api_key: Option<String>,
208 trade_pwd_account: Option<String>,
209 enable_trading: Option<bool>,
210 allow_real_trading: Option<bool>,
211 audit_log: Option<PathBuf>,
212 http_listen: Option<String>,
213 tls_cert: Option<PathBuf>,
214 tls_key: Option<PathBuf>,
215}
216
217fn is_cli_explicit(matches: &ArgMatches, arg_id: &str) -> bool {
233 matches!(
234 matches.value_source(arg_id),
235 Some(ValueSource::CommandLine) | Some(ValueSource::EnvVariable)
236 )
237}
238
239impl Cli {
240 fn merge_config(mut self, matches: &ArgMatches) -> Result<Self> {
246 let Some(config_path) = &self.config else {
247 return Ok(self);
248 };
249 let content = std::fs::read_to_string(config_path)
250 .with_context(|| format!("read config file {}", config_path.display()))?;
251 let fc: FileConfig = toml::from_str(&content)
252 .with_context(|| format!("parse config file {}", config_path.display()))?;
253
254 if let Some(g) = fc.gateway
257 && !is_cli_explicit(matches, "gateway")
258 {
259 self.gateway = g;
260 }
261 if self.keys_file.is_none() {
264 self.keys_file = fc.keys_file;
265 }
266 if self.api_key.is_none()
267 && let Some(k) = fc.api_key
268 {
269 self.api_key = Some(k);
270 }
271 if self.trade_pwd_account.is_none() {
272 self.trade_pwd_account = fc.trade_pwd_account;
273 }
274 if fc.verbose.is_some() && !is_cli_explicit(matches, "verbose") {
279 self.verbose = fc.verbose.unwrap_or(false);
280 }
281 if fc.enable_trading.is_some() && !is_cli_explicit(matches, "enable_trading") {
282 self.enable_trading = fc.enable_trading.unwrap_or(false);
283 }
284 if fc.allow_real_trading.is_some() && !is_cli_explicit(matches, "allow_real_trading") {
285 self.allow_real_trading = fc.allow_real_trading.unwrap_or(false);
286 }
287 if self.audit_log.is_none() {
288 self.audit_log = fc.audit_log;
289 }
290 if self.http_listen.is_none() {
291 self.http_listen = fc.http_listen;
292 }
293 if self.tls_cert.is_none() {
294 self.tls_cert = fc.tls_cert;
295 }
296 if self.tls_key.is_none() {
297 self.tls_key = fc.tls_key;
298 }
299 eprintln!("[config] loaded {}", config_path.display());
301 Ok(self)
302 }
303}
304
305#[tokio::main]
306async fn main() -> Result<()> {
307 let matches = Cli::command().get_matches();
311 let cli = Cli::from_arg_matches(&matches)
312 .map_err(|e| anyhow::anyhow!("clap derive build failed: {e}"))?
313 .merge_config(&matches)?;
314
315 let default_level = if cli.verbose { "debug" } else { "info" };
317
318 let _audit_guard = setup_logging(default_level, cli.audit_log.as_deref())?;
320
321 let key_store = match &cli.keys_file {
323 Some(path) => {
324 let store = KeyStore::load(path)
325 .with_context(|| format!("load keys file {}", path.display()))?;
326 tracing::info!(
327 path = %path.display(),
328 keys_loaded = store.len(),
329 "scope mode: keys file loaded"
330 );
331 if cli.enable_trading || cli.allow_real_trading {
332 tracing::warn!(
333 "--enable-trading / --allow-real-trading are IGNORED in scope mode; \
334 trading permissions are controlled by API key scopes"
335 );
336 }
337 Arc::new(store)
338 }
339 None => {
340 tracing::info!("legacy mode: no keys file; using --enable-trading switches");
341 Arc::new(KeyStore::empty())
342 }
343 };
344
345 let authed_key = if key_store.is_configured() {
347 match cli.api_key.as_deref() {
348 Some(plaintext) if !plaintext.is_empty() => match key_store.verify(plaintext) {
349 Some(rec) => {
350 tracing::info!(
351 key_id = %rec.id,
352 scopes = ?rec.scopes.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
353 "API key verified"
354 );
355 Some(rec)
356 }
357 None => {
358 tracing::error!("FUTU_MCP_API_KEY does not match any key in keys.json");
359 None
360 }
361 },
362 _ => {
363 tracing::warn!(
364 "scope mode active but FUTU_MCP_API_KEY not set; \
365 all tool calls will be rejected"
366 );
367 None
368 }
369 }
370 } else {
371 None
372 };
373
374 tracing::info!(
375 gateway = %cli.gateway,
376 scope_mode = key_store.is_configured(),
377 enable_trading = cli.enable_trading,
378 allow_real_trading = cli.allow_real_trading,
379 trade_pwd_account = cli.trade_pwd_account.as_deref().unwrap_or("<legacy/env>"),
380 "futu-mcp starting"
381 );
382 if !key_store.is_configured() && cli.enable_trading {
383 tracing::warn!(
384 allow_real_trading = cli.allow_real_trading,
385 "trading write tools ENABLED (legacy mode)"
386 );
387 }
388
389 let state = ServerState::new(cli.gateway)
390 .with_trading(cli.enable_trading, cli.allow_real_trading)
391 .with_key_store(key_store.clone())
392 .with_authed_key(authed_key)
393 .with_trade_pwd_account(cli.trade_pwd_account);
394 let server = FutuServer::new(state.clone());
395
396 if key_store.is_configured() && key_store.has_any_card_num_restrictions() {
413 spawn_card_num_expand_retry(state.clone(), key_store.clone());
414 } else if key_store.is_configured() {
415 tracing::debug!(
416 "v1.4.105 external report #4: keystore 无 allowed_card_nums 限制, 跳过 daemon expand"
417 );
418 }
419
420 #[cfg(unix)]
423 spawn_sighup_reload(key_store, state.clone());
424
425 futu_auth::metrics::install(std::sync::Arc::new(futu_auth::MetricsRegistry::default()));
429
430 if let Some(listen) = cli.http_listen {
431 let tls = match (cli.tls_cert, cli.tls_key) {
432 (Some(cert), Some(key)) => Some((cert, key)),
433 _ => None,
434 };
435 serve_http(server, &listen, tls).await?;
436 } else {
437 serve_stdio(server).await?;
438 }
439
440 Ok(())
441}
442
443async fn serve_stdio(server: tools::FutuServer) -> Result<()> {
450 let service = server
451 .serve(resilient_stdio())
452 .await
453 .map_err(|e| anyhow::anyhow!("MCP service init failed: {e}"))?;
454
455 service
456 .waiting()
457 .await
458 .map_err(|e| anyhow::anyhow!("MCP service error: {e}"))?;
459 Ok(())
460}
461
462fn render_mcp_metrics_body() -> String {
470 let registry = futu_auth::metrics::global();
471 render_mcp_metrics_body_for(registry.as_deref())
472}
473
474fn render_mcp_metrics_body_for(registry: Option<&futu_auth::MetricsRegistry>) -> String {
475 registry.map(|r| r.render_prometheus()).unwrap_or_else(|| {
476 concat!(
477 "# HELP futu_metrics_registry_installed Whether futu_auth metrics registry is installed (1=yes, 0=no)\n",
478 "# TYPE futu_metrics_registry_installed gauge\n",
479 "futu_metrics_registry_installed{state=\"metrics registry not installed\"} 0\n"
480 )
481 .to_string()
482 })
483}
484
485async fn serve_http(
486 server: tools::FutuServer,
487 listen: &str,
488 tls: Option<(PathBuf, PathBuf)>,
489) -> Result<()> {
490 use rmcp::transport::streamable_http_server::{
491 StreamableHttpService, session::local::LocalSessionManager,
492 };
493
494 let bind_addr = if listen.starts_with(':') {
496 format!("0.0.0.0{listen}")
497 } else {
498 listen.to_string()
499 };
500
501 let session_manager = std::sync::Arc::new(LocalSessionManager::default());
505 let mcp_svc = StreamableHttpService::new(
506 {
507 let server = server.clone();
508 move || Ok::<_, std::io::Error>(server.clone())
509 },
510 session_manager,
511 Default::default(),
512 );
513
514 use axum::routing::get;
519 let mcp_with_auth_hint = axum::Router::new()
520 .nest_service("/mcp", mcp_svc)
521 .layer(axum::middleware::from_fn(inject_www_authenticate));
522
523 let app = axum::Router::new()
524 .route(
525 "/metrics",
526 get(|| async {
527 let body = render_mcp_metrics_body();
528 (
529 axum::http::StatusCode::OK,
530 [(
531 axum::http::header::CONTENT_TYPE,
532 "text/plain; version=0.0.4",
533 )],
534 body,
535 )
536 }),
537 )
538 .route(
539 "/.well-known/oauth-protected-resource",
540 get(oauth_protected_resource_metadata),
541 )
542 .merge(mcp_with_auth_hint);
543
544 let bind_addr_sock: std::net::SocketAddr = bind_addr
545 .parse()
546 .map_err(|e| anyhow::anyhow!("invalid bind address {bind_addr}: {e}"))?;
547
548 if let Some((cert_path, key_path)) = tls {
549 let tls_config =
551 axum_server::tls_rustls::RustlsConfig::from_pem_file(&cert_path, &key_path)
552 .await
553 .with_context(|| {
554 format!(
555 "load TLS cert={} key={}",
556 cert_path.display(),
557 key_path.display()
558 )
559 })?;
560 let handle = axum_server::Handle::new();
561 let shutdown_handle = handle.clone();
562 tokio::spawn(async move {
563 shutdown_signal().await;
564 tracing::info!("graceful shutdown: draining HTTPS connections...");
565 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
566 });
567 tracing::info!(
568 addr = %bind_addr,
569 cert = %cert_path.display(),
570 "futu-mcp HTTPS transport started \
571 (MCP: /mcp, metrics: /metrics, OAuth metadata: /.well-known/oauth-protected-resource)"
572 );
573 axum_server::bind_rustls(bind_addr_sock, tls_config)
574 .handle(handle)
575 .serve(app.into_make_service())
576 .await
577 .map_err(|e| anyhow::anyhow!("axum-server TLS serve error: {e}"))?;
578 } else {
579 let listener = tokio::net::TcpListener::bind(&bind_addr)
581 .await
582 .map_err(|e| anyhow::anyhow!("bind {bind_addr}: {e}"))?;
583 tracing::info!(
584 addr = %bind_addr,
585 "futu-mcp HTTP transport started \
586 (MCP: /mcp, metrics: /metrics, OAuth metadata: /.well-known/oauth-protected-resource)"
587 );
588 axum::serve(listener, app)
589 .with_graceful_shutdown(async {
590 shutdown_signal().await;
591 tracing::info!("graceful shutdown: draining HTTP connections...");
592 })
593 .await
594 .map_err(|e| anyhow::anyhow!("axum serve error: {e}"))?;
595 }
596 tracing::info!("server stopped");
597 Ok(())
598}
599
600async fn shutdown_signal() {
603 #[cfg(unix)]
604 {
605 use tokio::signal::unix::{SignalKind, signal};
606 let sigterm = match signal(SignalKind::terminate()) {
607 Ok(signal) => Some(signal),
608 Err(e) => {
609 tracing::error!(error = %e, "failed to install SIGTERM handler");
610 None
611 }
612 };
613 let sigint = match signal(SignalKind::interrupt()) {
614 Ok(signal) => Some(signal),
615 Err(e) => {
616 tracing::error!(error = %e, "failed to install SIGINT handler");
617 None
618 }
619 };
620
621 match (sigterm, sigint) {
622 (Some(mut sigterm), Some(mut sigint)) => {
623 tokio::select! {
624 _ = sigterm.recv() => tracing::info!("received SIGTERM"),
625 _ = sigint.recv() => tracing::info!("received SIGINT"),
626 }
627 }
628 (Some(mut sigterm), None) => {
629 sigterm.recv().await;
630 tracing::info!("received SIGTERM");
631 }
632 (None, Some(mut sigint)) => {
633 sigint.recv().await;
634 tracing::info!("received SIGINT");
635 }
636 (None, None) => wait_for_ctrl_c_or_pending().await,
637 }
638 }
639 #[cfg(not(unix))]
640 {
641 wait_for_ctrl_c_or_pending().await;
642 }
643}
644
645async fn wait_for_ctrl_c_or_pending() {
646 match tokio::signal::ctrl_c().await {
647 Ok(()) => tracing::info!("received Ctrl-C"),
648 Err(e) => {
649 tracing::error!(
650 error = %e,
651 "failed to install ctrl-c handler; graceful shutdown signal unavailable"
652 );
653 std::future::pending::<()>().await;
654 }
655 }
656}
657
658async fn oauth_protected_resource_metadata() -> axum::response::Json<serde_json::Value> {
670 axum::response::Json(serde_json::json!({
671 "resource": "/mcp",
672 "bearer_methods_supported": ["header"],
673 "scopes_supported": [
674 "qot:read",
675 "acc:read",
676 "trade:simulate",
677 "trade:real",
678 "trade:unlock"
679 ],
680 "resource_name": "FutuOpenD-rs MCP",
681 "resource_documentation": "https://futuapi.com/reference/mcp/",
682 }))
683}
684
685async fn inject_www_authenticate(
691 req: axum::extract::Request,
692 next: axum::middleware::Next,
693) -> axum::response::Response {
694 let mut resp = next.run(req).await;
695 let status = resp.status();
696 if (status == axum::http::StatusCode::UNAUTHORIZED
697 || status == axum::http::StatusCode::FORBIDDEN)
698 && !resp
699 .headers()
700 .contains_key(axum::http::header::WWW_AUTHENTICATE)
701 {
702 let value = axum::http::HeaderValue::from_static(
705 "Bearer resource_metadata=\"/.well-known/oauth-protected-resource\"",
706 );
707 resp.headers_mut()
708 .insert(axum::http::header::WWW_AUTHENTICATE, value);
709 }
710 resp
711}
712
713#[cfg(test)]
714mod tests;