futu_mcp/
main.rs

1//! FutuOpenD-rs MCP 服务器
2//!
3//! 通过 Model Context Protocol 把 Futu 行情/账户能力暴露给 Claude / LLM 客户端。
4//!
5//! 授权有两种模式:
6//!
7//! - **Scope 模式**:`--keys-file <path>` 启用,客户端必须通过 `FUTU_MCP_API_KEY`
8//!   环境变量传入明文 key。服务器用 SHA-256 hash 比对 keys.json 中的记录,
9//!   按 scope + 限额放行。
10//! - **Legacy 模式**:未提供 keys-file 时回退到旧的
11//!   `--enable-trading` / `--allow-real-trading` 两级开关。
12
13mod guard;
14mod handlers;
15mod state;
16mod tools;
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use anyhow::{Context, Result};
22use clap::Parser;
23use futu_auth::KeyStore;
24use rmcp::{transport::stdio, ServiceExt};
25use tracing_subscriber::{
26    filter::filter_fn, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer,
27};
28
29use crate::state::ServerState;
30use crate::tools::FutuServer;
31
32/// 初始化 stderr 日志 + 可选 audit JSONL 层
33///
34/// - 常规事件走 stderr(no-ansi,因为 MCP client 的 stderr 往往不是 tty)
35/// - 如果 `audit_path` 传了,加一个 target=futu_audit 的 JSON 层写到文件/目录
36fn setup_logging(
37    default_level: &str,
38    audit_path: Option<&std::path::Path>,
39) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
40    let filter =
41        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
42
43    let fmt_layer = fmt::layer().with_writer(std::io::stderr).with_ansi(false);
44
45    let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
46
47    if let Some(path) = audit_path {
48        let (writer, guard) = futu_auth::audit::open_writer(path)
49            .with_context(|| format!("open audit log {}", path.display()))?;
50        let audit_layer = fmt::layer()
51            .json()
52            .flatten_event(true)
53            .with_current_span(false)
54            .with_span_list(false)
55            .with_target(true)
56            .with_writer(writer)
57            .with_filter(filter_fn(|meta| meta.target() == futu_auth::audit::TARGET));
58        registry.with(audit_layer).init();
59        tracing::info!(
60            path = %path.display(),
61            "audit JSONL logger enabled (target=futu_audit)"
62        );
63        Ok(Some(guard))
64    } else {
65        registry.init();
66        Ok(None)
67    }
68}
69
70/// FutuOpenD-rs MCP server
71#[derive(Parser)]
72#[command(
73    name = "futu-mcp",
74    version,
75    about = "FutuOpenD-rs MCP server",
76    long_about = "通过 Model Context Protocol 暴露 Futu 行情/账户工具。默认 stdio transport。"
77)]
78struct Cli {
79    /// 网关地址(可用 FUTU_GATEWAY 环境变量覆盖)
80    #[arg(short, long, env = "FUTU_GATEWAY", default_value = "127.0.0.1:11111")]
81    gateway: String,
82
83    /// 启用 debug 日志
84    #[arg(short, long)]
85    verbose: bool,
86
87    /// Scope 模式:加载 keys.json 文件(API Key 授权)。
88    ///
89    /// 启用后所有工具调用必须带 FUTU_MCP_API_KEY 环境变量,
90    /// scope / 限额由 keys.json 配置决定;此时 --enable-trading / --allow-real-trading 被忽略。
91    #[arg(long)]
92    keys_file: Option<PathBuf>,
93
94    /// 调用方 API Key 明文(等价于 FUTU_MCP_API_KEY 环境变量)
95    ///
96    /// 生产环境强烈建议用环境变量而非命令行参数(后者会进 `ps` 输出)。
97    #[arg(long, env = "FUTU_MCP_API_KEY", hide_env_values = true)]
98    api_key: Option<String>,
99
100    /// [Legacy] 启用交易写工具(place / modify / cancel)。默认关闭。
101    ///
102    /// 开启后默认仅允许 simulate 环境;要操作真实账户需额外 --allow-real-trading。
103    /// 注意:下单前网关必须已 unlock_trade(密码不经过 MCP / LLM)。
104    /// 若提供了 --keys-file,此开关被忽略,改由 key 的 scope 决定。
105    #[arg(long)]
106    enable_trading: bool,
107
108    /// [Legacy] 允许交易写工具对 real 环境执行。必须与 --enable-trading 搭配。
109    #[arg(long, requires = "enable_trading")]
110    allow_real_trading: bool,
111
112    /// 审计日志输出:JSONL 文件路径或目录
113    ///
114    /// - 带扩展名(`/var/log/futu-mcp-audit.jsonl`)→ 单文件 append
115    /// - 不带扩展名 / 以 `/` 结尾 → 每日滚动 `futu-audit.log` + 日期
116    ///
117    /// 只记录 auth / 交易 事件(target = `futu_audit`)。
118    #[arg(long)]
119    audit_log: Option<PathBuf>,
120
121    /// 以 HTTP transport 启动(streamable HTTP),监听该端口(格式 `host:port` 或 `:port`)
122    ///
123    /// 默认 stdio:LLM 客户端启子进程走 stdin/stdout。开 HTTP 后可以让多个
124    /// 客户端连同一个 MCP 进程,并同时暴露 `/metrics`。per-call key 覆盖依然
125    /// 走 tool args 的 `api_key` 字段;HTTP-layer 的 Authorization header 未来
126    /// 版本再接(v1.0 先做传输层切换)。
127    ///
128    /// 例:`--http-listen 127.0.0.1:3000` / `--http-listen :3000`
129    #[arg(long)]
130    http_listen: Option<String>,
131}
132
133#[tokio::main]
134async fn main() -> Result<()> {
135    let cli = Cli::parse();
136
137    // MCP 用 stdout 传协议帧,所有日志必须写 stderr
138    let default_level = if cli.verbose { "debug" } else { "info" };
139
140    // audit 日志 guard 必须活到 main 返回;否则后台 flush 可能丢事件
141    let _audit_guard = setup_logging(default_level, cli.audit_log.as_deref())?;
142
143    // ---------- 加载 KeyStore ----------
144    let key_store = match &cli.keys_file {
145        Some(path) => {
146            let store = KeyStore::load(path)
147                .with_context(|| format!("load keys file {}", path.display()))?;
148            tracing::info!(
149                path = %path.display(),
150                keys_loaded = store.len(),
151                "scope mode: keys file loaded"
152            );
153            if cli.enable_trading || cli.allow_real_trading {
154                tracing::warn!(
155                    "--enable-trading / --allow-real-trading are IGNORED in scope mode; \
156                     trading permissions are controlled by API key scopes"
157                );
158            }
159            Arc::new(store)
160        }
161        None => {
162            tracing::info!("legacy mode: no keys file; using --enable-trading switches");
163            Arc::new(KeyStore::empty())
164        }
165    };
166
167    // ---------- 校验调用方 API key ----------
168    let authed_key = if key_store.is_configured() {
169        match cli.api_key.as_deref() {
170            Some(plaintext) if !plaintext.is_empty() => match key_store.verify(plaintext) {
171                Some(rec) => {
172                    tracing::info!(
173                        key_id = %rec.id,
174                        scopes = ?rec.scopes.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
175                        "API key verified"
176                    );
177                    Some(rec)
178                }
179                None => {
180                    tracing::error!("FUTU_MCP_API_KEY does not match any key in keys.json");
181                    None
182                }
183            },
184            _ => {
185                tracing::warn!(
186                    "scope mode active but FUTU_MCP_API_KEY not set; \
187                     all tool calls will be rejected"
188                );
189                None
190            }
191        }
192    } else {
193        None
194    };
195
196    tracing::info!(
197        gateway = %cli.gateway,
198        scope_mode = key_store.is_configured(),
199        enable_trading = cli.enable_trading,
200        allow_real_trading = cli.allow_real_trading,
201        "futu-mcp starting"
202    );
203    if !key_store.is_configured() && cli.enable_trading {
204        tracing::warn!(
205            allow_real_trading = cli.allow_real_trading,
206            "trading write tools ENABLED (legacy mode)"
207        );
208    }
209
210    let state = ServerState::new(cli.gateway)
211        .with_trading(cli.enable_trading, cli.allow_real_trading)
212        .with_key_store(key_store.clone())
213        .with_authed_key(authed_key);
214    let server = FutuServer::new(state);
215
216    // SIGHUP 热重载 keys.json(unix only)
217    #[cfg(unix)]
218    spawn_sighup_reload(key_store);
219
220    // v1.0:install 全局 metrics registry,让 audit::* 的 counter hook 起作用
221    // (HTTP 模式下 /metrics 端点消费这套;stdio 模式虽然没 HTTP,但写进内存
222    // 方便 debug 和后续加 transport)
223    futu_auth::metrics::install(std::sync::Arc::new(futu_auth::MetricsRegistry::default()));
224
225    if let Some(listen) = cli.http_listen {
226        serve_http(server, &listen).await?;
227    } else {
228        serve_stdio(server).await?;
229    }
230
231    Ok(())
232}
233
234/// stdio 模式:MCP 客户端启动子进程,stdin/stdout 传协议帧
235async fn serve_stdio(server: tools::FutuServer) -> Result<()> {
236    let service = server
237        .serve(stdio())
238        .await
239        .map_err(|e| anyhow::anyhow!("MCP service init failed: {e}"))?;
240
241    service
242        .waiting()
243        .await
244        .map_err(|e| anyhow::anyhow!("MCP service error: {e}"))?;
245    Ok(())
246}
247
248/// HTTP 模式:axum + rmcp StreamableHttpService,`/mcp` 路径跑 MCP,
249/// `/metrics` 暴露 Prometheus counters(无需 token)。
250async fn serve_http(server: tools::FutuServer, listen: &str) -> Result<()> {
251    use rmcp::transport::streamable_http_server::{
252        session::local::LocalSessionManager, StreamableHttpService,
253    };
254
255    // 补齐 `:port` 写法:bind 到 0.0.0.0:port
256    let bind_addr = if listen.starts_with(':') {
257        format!("0.0.0.0{listen}")
258    } else {
259        listen.to_string()
260    };
261
262    // rmcp StreamableHttpService 要求一个 service factory —— 每个 HTTP 会话要
263    // 一份独立的 MCP Service 实例。FutuServer 目前是 Clone 的(ServerState 内
264    // Arc 共享),所以 factory 里 clone 给出去就行
265    let session_manager = std::sync::Arc::new(LocalSessionManager::default());
266    let mcp_svc = StreamableHttpService::new(
267        {
268            let server = server.clone();
269            move || Ok::<_, std::io::Error>(server.clone())
270        },
271        session_manager,
272        Default::default(),
273    );
274
275    // axum 0.8 router:/mcp 挂 MCP tower service,/metrics 独立路由
276    use axum::routing::get;
277    let app = axum::Router::new()
278        .route(
279            "/metrics",
280            get(|| async {
281                let body = futu_auth::metrics::global()
282                    .map(|r| r.render_prometheus())
283                    .unwrap_or_default();
284                (
285                    axum::http::StatusCode::OK,
286                    [(
287                        axum::http::header::CONTENT_TYPE,
288                        "text/plain; version=0.0.4",
289                    )],
290                    body,
291                )
292            }),
293        )
294        .nest_service("/mcp", mcp_svc);
295
296    let listener = tokio::net::TcpListener::bind(&bind_addr)
297        .await
298        .map_err(|e| anyhow::anyhow!("bind {bind_addr}: {e}"))?;
299    tracing::info!(
300        addr = %bind_addr,
301        "futu-mcp HTTP transport started (MCP: /mcp, metrics: /metrics)"
302    );
303    axum::serve(listener, app)
304        .await
305        .map_err(|e| anyhow::anyhow!("axum serve error: {e}"))?;
306    Ok(())
307}
308
309#[cfg(unix)]
310fn spawn_sighup_reload(store: Arc<KeyStore>) {
311    if !store.is_configured() {
312        return;
313    }
314    use tokio::signal::unix::{signal, SignalKind};
315    tokio::spawn(async move {
316        let mut sig = match signal(SignalKind::hangup()) {
317            Ok(s) => s,
318            Err(e) => {
319                tracing::error!(error = %e, "failed to install SIGHUP handler");
320                return;
321            }
322        };
323        tracing::info!("SIGHUP handler installed; send `kill -HUP <pid>` to reload keys");
324        while sig.recv().await.is_some() {
325            match store.reload() {
326                Ok(()) => tracing::warn!(keys_loaded = store.len(), "keys file reloaded on SIGHUP"),
327                Err(e) => tracing::error!(error = %e, "SIGHUP reload failed; keeping old keys"),
328            }
329        }
330    });
331}