Skip to main content

futu_mcp/
state.rs

1//! 共享状态:网关连接 + 订阅状态 + 授权
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::{Context, Result, anyhow};
7use futu_auth::{KeyRecord, KeyStore, RuntimeCounters};
8use futu_net::client::{ClientConfig, FutuClient, ReconnectingClient};
9use futu_net::reconnect::ReconnectPolicy;
10use futu_qot::types::Security;
11use rmcp::{RoleServer, service::Peer};
12use tokio::sync::Mutex;
13
14mod push_filter;
15#[cfg(test)]
16mod tests;
17
18use push_filter::{
19    TradePushDecode, classify_trade_push, subscriber_visible_to_caller, trd_market_int_to_str,
20};
21#[cfg(test)]
22use push_filter::{
23    extract_acc_id_and_market_from_push, is_trade_push_proto_id, subscriber_should_receive,
24    subscriber_should_receive_with_market,
25};
26
27const MCP_CONNECT_TOTAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
28const MCP_CONNECT_RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(200);
29
30/// v1.4.38 Phase 5 helper: bytes → base64 (用于 push body 安全包进 JSON)
31fn base64_encode_bytes(bytes: &[u8]) -> String {
32    use base64::Engine as _;
33    base64::engine::general_purpose::STANDARD.encode(bytes)
34}
35
36/// v1.4.38 Phase 5: 订阅了 push 通知的 MCP 客户端 session 记录。
37///
38/// 每个 session 用 `futu_sub_acc_push` 注册时登记一条。daemon push 事件来到
39/// MCP server 时,按 `acc_ids` 过滤后用 `peer.notify_logging_message()` 推回。
40///
41/// v1.4.38 100%:`acc_ids` 过滤已生效(state.rs drain loop 实装)。
42/// caller key ownership / scope 快照在注册时解析并保存,后续 push 分发不再
43/// 重新读取 bearer 明文。
44#[derive(Clone)]
45pub struct PushSubscriber {
46    /// rmcp 对 MCP session 的抽象。clone 便宜(内部 Arc)。
47    pub peer: Peer<RoleServer>,
48    /// 该 session 关心的 acc_id 列表。空集合表示"不过滤"(接收所有 acc 的 push)。
49    pub acc_ids: std::collections::HashSet<u64>,
50    /// v1.4.39 per-key acc_id 白名单**注册时快照**(非 live-reload)。
51    ///
52    /// Some(set) + non-empty → push 的 acc_id 必须在 set 里才推。
53    /// Some(empty) / None → 不做 key 级过滤(兼容无 allowed_acc_ids 约束的 key
54    /// 或 stdio / legacy 模式)。
55    ///
56    /// 快照语义:注册后 SIGHUP 重载 keys.json 修改 allowed_acc_ids 不会立即
57    /// 反映到已注册订阅者。用户需重新 `futu_sub_acc_push` 才应用新 scope。
58    /// 这是 defense-in-depth 层(主 auth 在 tool 调用时 guard.rs),可接受。
59    pub allowed_acc_ids_snapshot: Option<std::collections::HashSet<u64>>,
60    /// v1.4.105 T-C2: per-key `allowed_markets` 注册时快照, Layer 3 trade push gate.
61    ///
62    /// `Some(set)` + non-empty → push event 的 `trd_market` 必须 ∈ set 才推 (按
63    /// `Trd_Common.TrdMarket` int → 字符串映射, 与 `keys.json::allowed_markets`
64    /// 配置字符串一致, e.g. "HK"/"US"/"FUTURES").
65    /// `None` / `Some(empty)` → 不做 market gate (兼容 stdio / legacy / 未配
66    /// allowed_markets 的 key).
67    ///
68    /// 与 `allowed_acc_ids_snapshot` 同样**注册时快照**, SIGHUP 重载不影响
69    /// 已注册订阅者. 用户需重新 `futu_sub_acc_push` 才应用新 scope.
70    /// 配套 main auth (guard.rs / require_acc_read_with_acc_id) 仍在 tool 调用
71    /// 时 enforce, 这是 defense-in-depth 层 (push 走 server-initiated channel
72    /// 绕过 tool 调用 → 必须独立 enforce).
73    pub allowed_markets_snapshot: Option<std::collections::HashSet<String>>,
74    /// 注册时间(用于 session 硬上限清理,4h 默认 TTL)
75    pub registered_at: std::time::Instant,
76    /// v1.4.103 (codex 50 F6 / 53 F4 — B8): owner key id (KeyRecord.id).
77    /// 注册时填的 caller key id (HTTP Bearer 或 startup key); 用于 unsub
78    /// ownership check — 任何 caller 拿到 session_id 后想 unsub 必须 key id
79    /// 匹配 owner_key_id (admin scope 例外).
80    ///
81    /// None = legacy / stdio 模式无 key (ownership 退化为 "anyone can unsub",
82    /// 与本来 v1.4.102 行为一致).
83    pub owner_key_id: Option<String>,
84}
85
86struct PushDelivery {
87    peer: Peer<RoleServer>,
88    data: serde_json::Value,
89    session_id: String,
90    owner_key_id: Option<String>,
91    proto_id: u32,
92}
93
94/// MCP server 运行时状态
95#[derive(Clone)]
96pub struct ServerState {
97    /// [`Inner`] 共享可变状态(gateway 地址 + 懒加载的 [`FutuClient`])
98    inner: Arc<Mutex<Inner>>,
99    /// 是否启用交易写工具(place/modify/cancel)。默认 false。旧开关,仅当
100    /// `key_store.is_configured() == false` 时生效。
101    enable_trading: bool,
102    /// 是否允许对 real 环境下单。默认 false。旧开关,同上。
103    allow_real_trading: bool,
104    /// keys.json 加载的 KeyStore。`is_configured()` 为 true 时走 scope 授权模式。
105    key_store: Arc<KeyStore>,
106    /// 调用方传入的 API Key 对应的记录;None 表示未提供 key。
107    authed_key: Option<Arc<KeyRecord>>,
108    /// 交易密码所属登录账号。用于 `futu_unlock_trade` 从账号级 keychain
109    /// `trade-password.<login-account>` 读取密码;None 时走 legacy/global/env 兼容路径。
110    trade_pwd_account: Option<String>,
111    /// 限额运行时(日累计计数器)
112    counters: Arc<RuntimeCounters>,
113    /// v1.4.38 Phase 5: MCP push 订阅者注册表(session_uuid → subscriber)。
114    /// `futu_sub_acc_push` 工具在 HTTP 模式下调用时注册当前 session,daemon
115    /// push 到 MCP 后按 acc_id filter 向注册的 peer 发
116    /// `notify_logging_message`(server-initiated notification)。
117    push_subscribers: Arc<Mutex<HashMap<String, PushSubscriber>>>,
118}
119
120/// ServerState 内部可变部分,加锁存放 gateway 地址 + 懒加载的 [`FutuClient`]。
121struct Inner {
122    /// 网关 TCP 地址(如 `127.0.0.1:11111`)
123    gateway: String,
124    /// 懒加载的底层连接;首次调用 [`ServerState::client`] 时建立,后续复用
125    client: Option<Arc<FutuClient>>,
126}
127
128impl ServerState {
129    /// 创建默认 state:`enable_trading=false` / `allow_real_trading=false` /
130    /// 空 [`KeyStore`] / 无 authed_key。使用 `with_*` 链式方法注入额外能力。
131    pub fn new(gateway: String) -> Self {
132        Self {
133            inner: Arc::new(Mutex::new(Inner {
134                gateway,
135                client: None,
136            })),
137            enable_trading: false,
138            allow_real_trading: false,
139            key_store: Arc::new(KeyStore::empty()),
140            authed_key: None,
141            trade_pwd_account: None,
142            counters: Arc::new(RuntimeCounters::new()),
143            push_subscribers: Arc::new(Mutex::new(HashMap::new())),
144        }
145    }
146
147    /// v1.4.38 Phase 5: 注册当前 session 接收指定 acc_id 的 push。返回 session
148    /// UUID(调用方存着,后续可 unregister)。
149    ///
150    /// v1.4.38: 已 wire 到 `futu_sub_acc_push` tool。tool 被调用时拿到
151    /// `RequestContext.peer`,`acc_ids` 从工具 args 解析,注册完成后
152    /// state.rs 的 push drain loop 会按 acc_ids filter 转 notify 给该 peer。
153    /// v1.4.103 (codex 50 F5 / 53 F2 / 58 F3 — B7) + (codex 50 F6 / 53 F4 — B8):
154    /// 注册当前 session 接收指定 acc_id 的 push。
155    ///
156    /// `owner_key_id_override` 由 caller 传入 (例如从 HTTP Bearer 解析得到 key id);
157    /// 若 None → fall back 到 bearer_token 解析 → fall back 到 startup `authed_key.id`.
158    /// `allowed_acc_ids_snapshot` 同样 fall back 链: bearer → startup.
159    pub async fn register_push_subscriber_with_owner(
160        &self,
161        peer: Peer<RoleServer>,
162        acc_ids: std::collections::HashSet<u64>,
163        bearer_token: Option<String>,
164        owner_key_id_override: Option<String>,
165    ) -> String {
166        use std::time::Instant;
167        let session_id = format!("sub-{}", rand::random::<u64>());
168
169        // 解析 caller-specific KeyRecord (HTTP Bearer 优先, fallback startup key).
170        // 用于 (a) allowed_acc_ids_snapshot (B7) (b) owner_key_id (B8).
171        //
172        // v1.4.106 codex 0608 F2 (P1): startup fallback 路径用
173        // `get_by_id_for_current_machine` 替代裸 `get_by_id`, 与 verify (Bearer 路径
174        // 自带 machine 校验) 行为对称, 让 SIGHUP 收紧 allowed_machines 后能立即拒绝.
175        let bearer_key_rec = bearer_token
176            .as_deref()
177            .filter(|pt| !pt.is_empty())
178            .and_then(|pt| self.key_store.verify(pt));
179        let startup_key_rec = self
180            .authed_key
181            .as_ref()
182            .and_then(|k| self.key_store.get_by_id_for_current_machine(&k.id));
183        let effective_key_rec = bearer_key_rec.as_ref().or(startup_key_rec.as_ref());
184
185        // v1.4.103 (B7): allowed_acc_ids_snapshot 优先 HTTP Bearer 解析,
186        // 否则 fall back startup key.allowed_acc_ids; 都无 → None (无限制).
187        let allowed_acc_ids_snapshot =
188            effective_key_rec.and_then(|rec| rec.allowed_acc_ids.clone());
189
190        // v1.4.105 T-C2: allowed_markets_snapshot 同链 — 与 acc_ids_snapshot
191        // 同样从 effective_key_rec (Bearer / startup) 取 allowed_markets.
192        // 用于 push event Layer 3 market gate (state.rs::subscriber_should_receive_with_market).
193        let allowed_markets_snapshot =
194            effective_key_rec.and_then(|rec| rec.allowed_markets.clone());
195
196        // v1.4.103 (B8): owner_key_id 优先 caller 显式传入, 否则 effective_key_rec.id.
197        let owner_key_id =
198            owner_key_id_override.or_else(|| effective_key_rec.map(|rec| rec.id.clone()));
199
200        let subscriber = PushSubscriber {
201            peer,
202            acc_ids,
203            allowed_acc_ids_snapshot,
204            allowed_markets_snapshot,
205            registered_at: Instant::now(),
206            owner_key_id,
207        };
208        self.push_subscribers
209            .lock()
210            .await
211            .insert(session_id.clone(), subscriber);
212        session_id
213    }
214
215    /// v1.4.103 (codex 50 F6 / 53 F4 — B8): unsub session ownership check.
216    ///
217    /// 行为:
218    /// - 无 caller_key_id (legacy / stdio): 退化为旧行为 (任何 caller 可 unsub).
219    /// - 有 caller_key_id + subscriber.owner_key_id 匹配: 删除, 返 Ok(true).
220    /// - 有 caller_key_id + subscriber.owner_key_id 不匹配: **拒绝**, 返
221    ///   Err(reason) — 防其他 caller 拿可见 session_id 强制 unsub.
222    /// - session_id 不存在: 返 Ok(false) (idempotent, 不报错).
223    /// - subscriber.owner_key_id = None (legacy 注册): 退化为旧行为 — 任何 caller
224    ///   可 unsub (向后兼容).
225    pub async fn unregister_push_subscriber_with_owner_check(
226        &self,
227        session_id: &str,
228        caller_key_id: Option<&str>,
229    ) -> Result<bool, String> {
230        let mut subs = self.push_subscribers.lock().await;
231        // 不存在 → idempotent Ok(false), 不报错
232        let Some(sub) = subs.get(session_id) else {
233            return Ok(false);
234        };
235        // ownership check
236        match (caller_key_id, sub.owner_key_id.as_deref()) {
237            (None, _) => {
238                // 无 caller key: legacy / stdio 模式, 退化旧行为
239                subs.remove(session_id);
240                Ok(true)
241            }
242            (Some(caller), None) => {
243                // session 注册时无 owner_key_id (legacy): 任何 caller 可 unsub
244                tracing::warn!(
245                    session_id,
246                    caller,
247                    "v1.4.103 B8: unsub legacy session (no owner_key_id) — \
248                     allowed for backward-compat"
249                );
250                subs.remove(session_id);
251                Ok(true)
252            }
253            (Some(caller), Some(owner)) if caller == owner => {
254                subs.remove(session_id);
255                Ok(true)
256            }
257            (Some(caller), Some(owner)) => {
258                // ownership 不匹配: reject. 当前 MCP subscription ownership contract
259                // 没有 admin override surface;若要扩展,必须先在 spec / auth
260                // pipeline / integration tests 中定义清楚。
261                Err(format!(
262                    "session_id {session_id:?} owned by key_id {owner:?}, \
263                     caller key_id {caller:?} not allowed to unsub"
264                ))
265            }
266        }
267    }
268
269    /// 当前活跃订阅数。生产诊断使用 `push_subscribers_summary`,这里只作为
270    /// state 单测的轻量断言入口保留。
271    #[cfg(test)]
272    pub async fn push_subscriber_count(&self) -> usize {
273        self.push_subscribers.lock().await.len()
274    }
275
276    /// v1.4.58 Phase A: 列出所有 push 订阅 summary(tool diagnostic 用)。
277    ///
278    /// 返 Vec<(session_id, acc_ids, age_secs)>。
279    ///
280    /// **MED-NEW-3 修(2nd review)**:加 `caller_allowed_acc_ids` 参数做
281    /// scope-mode 多租过滤。当 caller 的 key 有 `allowed_acc_ids` 白名单时,
282    /// **只返** subscription 的 `acc_ids` 与 caller 白名单有交集的条目。
283    /// 避免 agent A(acc_ids=[100, 200])通过本 tool 看到 agent B 订阅的
284    /// acc_id=[300, 400]。
285    ///
286    /// `caller_allowed_acc_ids=None` / empty → 不过滤(legacy mode / no-scope key)。
287    ///
288    /// **rmcp 版本兼容**:rmcp 1.4.0 `Peer<RoleServer>` 不实装 `PartialEq`,
289    /// 无法按 peer 身份直接过滤。若未来 rmcp 加 PartialEq,可切到更精确的
290    /// per-session-owner 过滤(当前只能靠 acc_id 权限交集近似)。
291    pub async fn push_subscribers_summary(
292        &self,
293        caller_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
294    ) -> Vec<(String, std::collections::HashSet<u64>, u64)> {
295        let subs = self.push_subscribers.lock().await;
296        let now = std::time::Instant::now();
297        subs.iter()
298            .filter(|(_, sub)| subscriber_visible_to_caller(&sub.acc_ids, caller_allowed_acc_ids))
299            .map(|(id, sub)| {
300                let age = now
301                    .checked_duration_since(sub.registered_at)
302                    .map(|d| d.as_secs())
303                    .unwrap_or(0);
304                // LOW-3RD-1(3rd code review):scope mode 下返回的 acc_ids 要与
305                // caller allowed 求交集 — 防止 caller=[100] 看到 sub=[100, 999]
306                // 时知道 999 这个 acc 存在。sub.acc_ids 空集(subscribe-all)不做
307                // 交集(概念上 caller 看到的是"有个 catch-all subscriber",不泄漏
308                // 具体账户信息)。
309                let visible_accs = match caller_allowed_acc_ids {
310                    Some(allowed) if !allowed.is_empty() && !sub.acc_ids.is_empty() => {
311                        sub.acc_ids.intersection(allowed).copied().collect()
312                    }
313                    _ => sub.acc_ids.clone(),
314                };
315                (id.clone(), visible_accs, age)
316            })
317            .collect()
318    }
319
320    /// 启用交易写工具(构造器式链式设置)
321    pub fn with_trading(mut self, enable_trading: bool, allow_real_trading: bool) -> Self {
322        self.enable_trading = enable_trading;
323        self.allow_real_trading = allow_real_trading;
324        self
325    }
326
327    /// 设置 KeyStore(新授权模式)
328    pub fn with_key_store(mut self, store: Arc<KeyStore>) -> Self {
329        self.key_store = store;
330        self
331    }
332
333    /// 设置已通过验证的 API Key 记录
334    pub fn with_authed_key(mut self, key: Option<Arc<KeyRecord>>) -> Self {
335        self.authed_key = key;
336        self
337    }
338
339    /// 设置交易密码所属登录账号(MCP 只连 gateway,本身无法可靠推断 daemon
340    /// 的 login account;由 CLI/env/config 显式注入)。
341    pub fn with_trade_pwd_account(mut self, account: Option<String>) -> Self {
342        self.trade_pwd_account = account;
343        self
344    }
345
346    /// 是否启用了 scope 授权模式
347    pub fn is_scope_mode(&self) -> bool {
348        self.key_store.is_configured()
349    }
350
351    /// 交易写工具开关(legacy mode)。
352    pub fn enable_trading(&self) -> bool {
353        self.enable_trading
354    }
355
356    /// real 环境交易写工具开关(legacy mode)。
357    pub fn allow_real_trading(&self) -> bool {
358        self.allow_real_trading
359    }
360
361    /// 当前 MCP API key store。返回共享引用,避免调用方替换 runtime storage。
362    pub fn key_store(&self) -> &Arc<KeyStore> {
363        &self.key_store
364    }
365
366    /// startup 阶段验证过的 key 快照;调用方需要 fresh record 时仍应按 id 回查 key store。
367    pub fn authed_key(&self) -> Option<Arc<KeyRecord>> {
368        self.authed_key.clone()
369    }
370
371    /// 交易密码所属登录账号。
372    pub fn trade_pwd_account(&self) -> Option<&str> {
373        self.trade_pwd_account.as_deref()
374    }
375
376    /// 限额运行时计数器。返回共享引用,避免调用方替换 runtime storage。
377    pub fn counters(&self) -> &Arc<RuntimeCounters> {
378        &self.counters
379    }
380
381    /// 当前配置的 gateway 地址。
382    pub async fn gateway(&self) -> String {
383        self.inner.lock().await.gateway.clone()
384    }
385
386    /// 获取(或懒加载)网关客户端
387    pub async fn client(&self) -> Result<Arc<FutuClient>> {
388        let gateway = {
389            let guard = self.inner.lock().await;
390            if let Some(c) = &guard.client {
391                return Ok(c.clone());
392            }
393            guard.gateway.clone()
394        };
395
396        let config = ClientConfig {
397            addr: gateway.clone(),
398            client_ver: env!("CARGO_PKG_VERSION").to_string(),
399            client_id: "futu-mcp".to_string(),
400            recv_notify: false,
401            rsa_key: None,
402        };
403        let policy =
404            ReconnectPolicy::new(MCP_CONNECT_RETRY_DELAY, MCP_CONNECT_RETRY_DELAY, Some(1));
405        let mut reconnector = ReconnectingClient::new(config).with_policy(policy);
406        let connect_result =
407            tokio::time::timeout(MCP_CONNECT_TOTAL_TIMEOUT, reconnector.connect()).await;
408        let (client, mut push_rx, _info) = match connect_result {
409            Ok(result) => {
410                result.with_context(|| format!("connect to futu gateway at {gateway}"))?
411            }
412            Err(_) => {
413                return Err(anyhow!(
414                    "connect to futu gateway at {gateway} timed out after {}s",
415                    MCP_CONNECT_TOTAL_TIMEOUT.as_secs()
416                ));
417            }
418        };
419
420        let arc = Arc::new(client);
421        {
422            let mut guard = self.inner.lock().await;
423            if let Some(c) = &guard.client {
424                return Ok(c.clone());
425            }
426            guard.client = Some(arc.clone());
427        }
428
429        // v1.4.38 Phase 5 (100%): 按 acc_ids 过滤的 push broadcast
430        //
431        // 流程:
432        // 1. push_rx 收 daemon 转发的 push
433        // 2. 对 TRD_UPDATE_ORDER (2208) / TRD_UPDATE_ORDER_FILL (2218) 解包
434        //    提取 acc_id
435        // 3. 遍历订阅者,**只推给 acc_ids 匹配的**(或订阅者 acc_ids 空 = 不
436        //    过滤,所有 acc 都收)
437        // 4. 行情 push(QOT_UPDATE_*)无 acc_id 语义,广播给所有订阅者
438        //
439        // Per-session 独立 spawn notify,避免一个慢 session 阻塞其他
440        let subs_for_push = Arc::downgrade(&self.push_subscribers);
441        // v1.4.105 F5 fix (codex review C4 USER_ACK B): MCP push filter 改用
442        // FilterRegistry::should_drop_event 单一注册中心 (跟 4 surface 一致),
443        // 替代之前 inline subscriber_should_receive_with_market. 防 sibling-route
444        // bypass — 加新 push event filter 维度只在 install_defaults 注册一次,
445        // MCP 自动覆盖.
446        let filter_registry =
447            std::sync::Arc::new(futu_auth_pipeline::FilterRegistry::with_defaults());
448        tokio::spawn(async move {
449            while let Some(push) = push_rx.recv().await {
450                let Some(subs_for_push) = subs_for_push.upgrade() else {
451                    break;
452                };
453                let subscribers = {
454                    let subs = subs_for_push.lock().await;
455                    if subs.is_empty() {
456                        Vec::new()
457                    } else {
458                        subs.iter()
459                            .map(|(session_id, sub)| (session_id.clone(), sub.clone()))
460                            .collect::<Vec<_>>()
461                    }
462                };
463                if subscribers.is_empty() {
464                    continue; // fast path: no listeners, drop
465                }
466                // v1.4.105 T-C2 + v1.4.106 codex 0932 F6/F7: classify push by proto_id
467                // (set membership), 不再靠 body decode 成功推断. trade body decode
468                // 失败现在归 TradePushDecode::DecodeFailed (event_type="trade",
469                // 无 acc/market gate 信息) — restricted key 应 drop, unrestricted
470                // 透传带 decode_status="failed".
471                let decode_result = classify_trade_push(push.proto_id, &push.body);
472                let (push_acc_id, push_trd_market, decode_status, event_type) = match &decode_result
473                {
474                    TradePushDecode::NotTrade => (None, None, "ok", "quote"),
475                    TradePushDecode::Decoded { acc_id, trd_market } => {
476                        (Some(*acc_id), Some(*trd_market), "ok", "trade")
477                    }
478                    TradePushDecode::DecodeFailed => (None, None, "failed", "trade"),
479                };
480                let push_trd_market_str = push_trd_market.map(trd_market_int_to_str);
481                // v1.4.106 codex 0932 F7 [P3]: payload 加 event_type / trd_market /
482                // decode_status — 让客户端不需要按 proto_id 自己 derive (4 surface 一致).
483                // body_base64 后向兼容保留.
484                let payload = serde_json::json!({
485                    "kind": "futu_push",
486                    "proto_id": push.proto_id,
487                    "acc_id": push_acc_id,
488                    "event_type": event_type,
489                    "trd_market": push_trd_market_str,
490                    "decode_status": decode_status,
491                    "body_base64": base64_encode_bytes(&push.body),
492                });
493                let deliveries = {
494                    let mut deliveries = Vec::with_capacity(subscribers.len());
495                    for (session_id, sub) in subscribers.iter() {
496                        // v1.4.106 codex 0932 F6 [P2]: trade decode-failed + restricted
497                        // key (allowed_acc_ids 非 None) → DROP. 不能让 restricted key
498                        // 看到无 acc gate 信息的 trade body 透传 (绕过 ACL).
499                        // unrestricted key (allowed_acc_ids None / 空) 仍透传带 decode_status="failed".
500                        if matches!(decode_result, TradePushDecode::DecodeFailed) {
501                            let restricted = sub
502                                .allowed_acc_ids_snapshot
503                                .as_ref()
504                                .map(|s| !s.is_empty())
505                                .unwrap_or(false);
506                            if restricted {
507                                let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
508                                // 复用 cross-surface metric — reason="trade_decode_failed"
509                                futu_auth::metrics::bump_ws_filtered("trade_decode_failed", key_id);
510                                tracing::warn!(
511                                    proto_id = push.proto_id,
512                                    key_id,
513                                    "v1.4.106 audit 0932 F6: trade push body decode failed; \
514                                     dropped for restricted key (allowed_acc_ids set, \
515                                     cannot ACL-gate body without acc_id)"
516                                );
517                                continue;
518                            }
519                            // unrestricted: fall through, broadcast 带 decode_status="failed"
520                        }
521                        // v1.4.105 F5 fix: 改用 FilterRegistry::should_drop_event.
522                        // 行为对齐 4 surface — sub.acc_ids (MCP 显式订阅 list) 喂给
523                        // ctx.sub_state (REST sub-acc-push state 同语义); sub.allowed_*
524                        // _snapshot 喂给 ctx.allowed_* (caller key 限额).
525                        //
526                        // **行为微差** (与老 inline fn 一致, 不破老行为):
527                        // - sub.acc_ids 空 = MCP 老语义"无限制订阅" → 传 None
528                        //   (避免 REST sub_state 空集 tombstone 语义触发 drop-all)
529                        // - sub.acc_ids 非空 → 传 Some(&sub.acc_ids), 跟 REST 一致
530                        let sub_state_for_ctx = if sub.acc_ids.is_empty() {
531                            None
532                        } else {
533                            Some(&sub.acc_ids)
534                        };
535                        let ctx = futu_auth_pipeline::PushEventCtx {
536                            event_type,
537                            event_acc: push_acc_id,
538                            allowed_acc_ids: sub.allowed_acc_ids_snapshot.as_ref(),
539                            sub_state: sub_state_for_ctx,
540                            event_trd_market: push_trd_market_str,
541                            allowed_markets: sub.allowed_markets_snapshot.as_ref(),
542                        };
543                        if filter_registry.should_drop_event(&ctx) {
544                            // v1.4.105 T-C2 + F3 (codex review C4): bump filtered metric.
545                            // **统一 label 命名** 跟 4 surface 一致 — gRPC subscribe_push
546                            // / push_trd_acc / 都用 "trade_market" 标 Layer 3 (allowed_markets)
547                            // 拒. 老 "push.trade" 命名是 surface-specific (T-C2 sole), 改成
548                            // canonical "trade_market" 让跨 surface metrics jq aggregate 一致.
549                            let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
550                            futu_auth::metrics::bump_ws_filtered("trade_market", key_id);
551                            continue;
552                        }
553                        deliveries.push(PushDelivery {
554                            peer: sub.peer.clone(),
555                            data: payload.clone(),
556                            session_id: session_id.clone(),
557                            owner_key_id: sub.owner_key_id.clone(),
558                            proto_id: push.proto_id,
559                        });
560                    }
561                    deliveries
562                };
563                for delivery in deliveries {
564                    tokio::spawn(async move {
565                        let params = rmcp::model::LoggingMessageNotificationParam {
566                            level: rmcp::model::LoggingLevel::Info,
567                            logger: Some("futu_push".to_string()),
568                            data: delivery.data,
569                        };
570                        if let Err(err) = delivery.peer.notify_logging_message(params).await {
571                            tracing::warn!(
572                                proto_id = delivery.proto_id,
573                                session_id = delivery.session_id,
574                                owner_key_id = delivery.owner_key_id.as_deref().unwrap_or("<none>"),
575                                error = %err,
576                                "mcp push notification send failed"
577                            );
578                        }
579                    });
580                }
581            }
582        });
583
584        // v1.4.39 Phase 5 stale cleanup: 5 分钟跑一次,移除 registered_at > 4h
585        // 的订阅者。避免长跑 daemon 累积陈旧 subscriber(客户端断开 /  rmcp
586        // session gone 但没显式 unregister 的情况)。
587        let subs_for_purge = Arc::downgrade(&self.push_subscribers);
588        tokio::spawn(async move {
589            use std::time::Duration;
590            const PURGE_INTERVAL: Duration = Duration::from_secs(5 * 60);
591            const MAX_AGE: Duration = Duration::from_secs(4 * 3600);
592            let mut ticker = tokio::time::interval(PURGE_INTERVAL);
593            ticker.tick().await; // skip the immediate first tick
594            loop {
595                ticker.tick().await;
596                let Some(subs_for_purge) = subs_for_purge.upgrade() else {
597                    break;
598                };
599                let now = std::time::Instant::now();
600                let mut subs = subs_for_purge.lock().await;
601                let before = subs.len();
602                subs.retain(|_, sub| {
603                    now.checked_duration_since(sub.registered_at)
604                        .map(|age| age < MAX_AGE)
605                        .unwrap_or(true)
606                });
607                let purged = before - subs.len();
608                if purged > 0 {
609                    tracing::info!(
610                        purged,
611                        remaining = subs.len(),
612                        max_age_secs = MAX_AGE.as_secs(),
613                        "v1.4.39 Phase 5: purged stale push subscribers (> 4h registered)"
614                    );
615                }
616            }
617        });
618
619        Ok(arc)
620    }
621}
622
623// ========== symbol 解析 ==========
624
625pub fn parse_symbol(s: &str) -> Result<Security> {
626    futu_qot::symbol::parse_symbol(s)
627}
628
629/// 格式化 Security 为 "MARKET.CODE"
630pub fn format_symbol(sec: &Security) -> String {
631    futu_qot::symbol::format_symbol(sec)
632}
633
634/// v1.4.90 P2-C: audit log Option<T> 序列化助手。
635///
636/// **背景**:之前 audit log 把 `Option<f64>` 用 `?req.price`(tracing 的 Debug
637/// shorthand)记录,渲染成 JSON 字符串 `"Some(400.0)"` / `"None"`,下游 jq /
638/// DuckDB 数值聚合炸(aggregator 期望 `400.0` number 或 `null`)。
639///
640/// **修法**:用 NaN sentinel 把 `Option<f64>` flatten 成 `f64`,tracing-subscriber
641/// 的 JSON formatter 内部走 `serde_json::Value::from(f64::NAN)` →
642/// `Number::from_f64(NaN) = None` → `Value::Null`。
643/// 整数 / 字符串同理(i32 → f64 NaN sentinel;&str → "" 哨兵)。
644///
645/// 验证依据:
646/// - `tracing_subscriber::fmt::format::json` line 501 `record_f64` 直接调
647///   `serde_json::Value::from(value)`
648/// - `serde_json::Value::from(f64)` impl: `Number::from_f64(f).map_or(Value::Null, Value::Number)`
649pub mod audit_fmt {
650    /// `Option<f64>` → `f64`(None → NaN)。tracing JSON 渲染 NaN 为 `null`。
651    #[inline]
652    pub fn opt_f64(v: Option<f64>) -> f64 {
653        v.unwrap_or(f64::NAN)
654    }
655
656    /// `Option<i32>` → `f64`(None → NaN,Some(n) → n as f64)。
657    /// i32 ≤ 2^31 < 2^52 mantissa,无精度损失。
658    #[inline]
659    pub fn opt_i32(v: Option<i32>) -> f64 {
660        v.map(f64::from).unwrap_or(f64::NAN)
661    }
662
663    /// `Option<&str>` → `&str`(None → "")。"" 哨兵在 audit 上下文里足以区分
664    /// 不传 vs 传空(因为 Symbol / owner 等业务字段不会是空字符串)。
665    #[inline]
666    pub fn opt_str(v: Option<&str>) -> &str {
667        v.unwrap_or("")
668    }
669}