futu_mcp/state.rs
1//! 共享状态:网关连接 + 订阅状态 + 授权
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::{Context, Result, anyhow};
7use futu_auth::{KeyRecord, KeyStore, RuntimeCounters};
8use futu_net::client::{ClientConfig, FutuClient, ReconnectingClient};
9use futu_net::reconnect::ReconnectPolicy;
10use futu_qot::types::Security;
11use rmcp::{RoleServer, service::Peer};
12use tokio::sync::Mutex;
13
14mod push_filter;
15#[cfg(test)]
16mod tests;
17
18use push_filter::{
19 TradePushDecode, classify_trade_push, subscriber_visible_to_caller, trd_market_int_to_str,
20};
21#[cfg(test)]
22use push_filter::{
23 extract_acc_id_and_market_from_push, is_trade_push_proto_id, subscriber_should_receive,
24 subscriber_should_receive_with_market,
25};
26
27const MCP_CONNECT_TOTAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
28const MCP_CONNECT_RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(200);
29
30/// v1.4.38 Phase 5 helper: bytes → base64 (用于 push body 安全包进 JSON)
31fn base64_encode_bytes(bytes: &[u8]) -> String {
32 use base64::Engine as _;
33 base64::engine::general_purpose::STANDARD.encode(bytes)
34}
35
36/// v1.4.38 Phase 5: 订阅了 push 通知的 MCP 客户端 session 记录。
37///
38/// 每个 session 用 `futu_sub_acc_push` 注册时登记一条。daemon push 事件来到
39/// MCP server 时,按 `acc_ids` 过滤后用 `peer.notify_logging_message()` 推回。
40///
41/// v1.4.38 100%:`acc_ids` 过滤已生效(state.rs drain loop 实装)。
42/// caller key ownership / scope 快照在注册时解析并保存,后续 push 分发不再
43/// 重新读取 bearer 明文。
44#[derive(Clone)]
45pub struct PushSubscriber {
46 /// rmcp 对 MCP session 的抽象。clone 便宜(内部 Arc)。
47 pub peer: Peer<RoleServer>,
48 /// 该 session 关心的 acc_id 列表。空集合表示"不过滤"(接收所有 acc 的 push)。
49 pub acc_ids: std::collections::HashSet<u64>,
50 /// v1.4.39 per-key acc_id 白名单**注册时快照**(非 live-reload)。
51 ///
52 /// Some(set) + non-empty → push 的 acc_id 必须在 set 里才推。
53 /// Some(empty) / None → 不做 key 级过滤(兼容无 allowed_acc_ids 约束的 key
54 /// 或 stdio / legacy 模式)。
55 ///
56 /// 快照语义:注册后 SIGHUP 重载 keys.json 修改 allowed_acc_ids 不会立即
57 /// 反映到已注册订阅者。用户需重新 `futu_sub_acc_push` 才应用新 scope。
58 /// 这是 defense-in-depth 层(主 auth 在 tool 调用时 guard.rs),可接受。
59 pub allowed_acc_ids_snapshot: Option<std::collections::HashSet<u64>>,
60 /// v1.4.105 T-C2: per-key `allowed_markets` 注册时快照, Layer 3 trade push gate.
61 ///
62 /// `Some(set)` + non-empty → push event 的 `trd_market` 必须 ∈ set 才推 (按
63 /// `Trd_Common.TrdMarket` int → 字符串映射, 与 `keys.json::allowed_markets`
64 /// 配置字符串一致, e.g. "HK"/"US"/"FUTURES").
65 /// `None` / `Some(empty)` → 不做 market gate (兼容 stdio / legacy / 未配
66 /// allowed_markets 的 key).
67 ///
68 /// 与 `allowed_acc_ids_snapshot` 同样**注册时快照**, SIGHUP 重载不影响
69 /// 已注册订阅者. 用户需重新 `futu_sub_acc_push` 才应用新 scope.
70 /// 配套 main auth (guard.rs / require_acc_read_with_acc_id) 仍在 tool 调用
71 /// 时 enforce, 这是 defense-in-depth 层 (push 走 server-initiated channel
72 /// 绕过 tool 调用 → 必须独立 enforce).
73 pub allowed_markets_snapshot: Option<std::collections::HashSet<String>>,
74 /// 注册时间(用于 session 硬上限清理,4h 默认 TTL)
75 pub registered_at: std::time::Instant,
76 /// v1.4.103 (codex 50 F6 / 53 F4 — B8): owner key id (KeyRecord.id).
77 /// 注册时填的 caller key id (HTTP Bearer 或 startup key); 用于 unsub
78 /// ownership check — 任何 caller 拿到 session_id 后想 unsub 必须 key id
79 /// 匹配 owner_key_id (admin scope 例外).
80 ///
81 /// None = legacy / stdio 模式无 key (ownership 退化为 "anyone can unsub",
82 /// 与本来 v1.4.102 行为一致).
83 pub owner_key_id: Option<String>,
84}
85
86struct PushDelivery {
87 peer: Peer<RoleServer>,
88 data: serde_json::Value,
89 session_id: String,
90 owner_key_id: Option<String>,
91 proto_id: u32,
92}
93
94/// MCP server 运行时状态
95#[derive(Clone)]
96pub struct ServerState {
97 /// [`Inner`] 共享可变状态(gateway 地址 + 懒加载的 [`FutuClient`])
98 inner: Arc<Mutex<Inner>>,
99 /// 是否启用交易写工具(place/modify/cancel)。默认 false。旧开关,仅当
100 /// `key_store.is_configured() == false` 时生效。
101 enable_trading: bool,
102 /// 是否允许对 real 环境下单。默认 false。旧开关,同上。
103 allow_real_trading: bool,
104 /// keys.json 加载的 KeyStore。`is_configured()` 为 true 时走 scope 授权模式。
105 key_store: Arc<KeyStore>,
106 /// 调用方传入的 API Key 对应的记录;None 表示未提供 key。
107 authed_key: Option<Arc<KeyRecord>>,
108 /// 交易密码所属登录账号。用于 `futu_unlock_trade` 从账号级 keychain
109 /// `trade-password.<login-account>` 读取密码;None 时走 legacy/global/env 兼容路径。
110 trade_pwd_account: Option<String>,
111 /// 限额运行时(日累计计数器)
112 counters: Arc<RuntimeCounters>,
113 /// v1.4.38 Phase 5: MCP push 订阅者注册表(session_uuid → subscriber)。
114 /// `futu_sub_acc_push` 工具在 HTTP 模式下调用时注册当前 session,daemon
115 /// push 到 MCP 后按 acc_id filter 向注册的 peer 发
116 /// `notify_logging_message`(server-initiated notification)。
117 push_subscribers: Arc<Mutex<HashMap<String, PushSubscriber>>>,
118}
119
120/// ServerState 内部可变部分,加锁存放 gateway 地址 + 懒加载的 [`FutuClient`]。
121struct Inner {
122 /// 网关 TCP 地址(如 `127.0.0.1:11111`)
123 gateway: String,
124 /// 懒加载的底层连接;首次调用 [`ServerState::client`] 时建立,后续复用
125 client: Option<Arc<FutuClient>>,
126}
127
128impl ServerState {
129 /// 创建默认 state:`enable_trading=false` / `allow_real_trading=false` /
130 /// 空 [`KeyStore`] / 无 authed_key。使用 `with_*` 链式方法注入额外能力。
131 pub fn new(gateway: String) -> Self {
132 Self {
133 inner: Arc::new(Mutex::new(Inner {
134 gateway,
135 client: None,
136 })),
137 enable_trading: false,
138 allow_real_trading: false,
139 key_store: Arc::new(KeyStore::empty()),
140 authed_key: None,
141 trade_pwd_account: None,
142 counters: Arc::new(RuntimeCounters::new()),
143 push_subscribers: Arc::new(Mutex::new(HashMap::new())),
144 }
145 }
146
147 /// v1.4.38 Phase 5: 注册当前 session 接收指定 acc_id 的 push。返回 session
148 /// UUID(调用方存着,后续可 unregister)。
149 ///
150 /// v1.4.38: 已 wire 到 `futu_sub_acc_push` tool。tool 被调用时拿到
151 /// `RequestContext.peer`,`acc_ids` 从工具 args 解析,注册完成后
152 /// state.rs 的 push drain loop 会按 acc_ids filter 转 notify 给该 peer。
153 /// v1.4.103 (codex 50 F5 / 53 F2 / 58 F3 — B7) + (codex 50 F6 / 53 F4 — B8):
154 /// 注册当前 session 接收指定 acc_id 的 push。
155 ///
156 /// `owner_key_id_override` 由 caller 传入 (例如从 HTTP Bearer 解析得到 key id);
157 /// 若 None → fall back 到 bearer_token 解析 → fall back 到 startup `authed_key.id`.
158 /// `allowed_acc_ids_snapshot` 同样 fall back 链: bearer → startup.
159 pub async fn register_push_subscriber_with_owner(
160 &self,
161 peer: Peer<RoleServer>,
162 acc_ids: std::collections::HashSet<u64>,
163 bearer_token: Option<String>,
164 owner_key_id_override: Option<String>,
165 ) -> String {
166 use std::time::Instant;
167 let session_id = format!("sub-{}", rand::random::<u64>());
168
169 // 解析 caller-specific KeyRecord (HTTP Bearer 优先, fallback startup key).
170 // 用于 (a) allowed_acc_ids_snapshot (B7) (b) owner_key_id (B8).
171 //
172 // v1.4.106 codex 0608 F2 (P1): startup fallback 路径用
173 // `get_by_id_for_current_machine` 替代裸 `get_by_id`, 与 verify (Bearer 路径
174 // 自带 machine 校验) 行为对称, 让 SIGHUP 收紧 allowed_machines 后能立即拒绝.
175 let bearer_key_rec = bearer_token
176 .as_deref()
177 .filter(|pt| !pt.is_empty())
178 .and_then(|pt| self.key_store.verify(pt));
179 let startup_key_rec = self
180 .authed_key
181 .as_ref()
182 .and_then(|k| self.key_store.get_by_id_for_current_machine(&k.id));
183 let effective_key_rec = bearer_key_rec.as_ref().or(startup_key_rec.as_ref());
184
185 // v1.4.103 (B7): allowed_acc_ids_snapshot 优先 HTTP Bearer 解析,
186 // 否则 fall back startup key.allowed_acc_ids; 都无 → None (无限制).
187 let allowed_acc_ids_snapshot =
188 effective_key_rec.and_then(|rec| rec.allowed_acc_ids.clone());
189
190 // v1.4.105 T-C2: allowed_markets_snapshot 同链 — 与 acc_ids_snapshot
191 // 同样从 effective_key_rec (Bearer / startup) 取 allowed_markets.
192 // 用于 push event Layer 3 market gate (state.rs::subscriber_should_receive_with_market).
193 let allowed_markets_snapshot =
194 effective_key_rec.and_then(|rec| rec.allowed_markets.clone());
195
196 // v1.4.103 (B8): owner_key_id 优先 caller 显式传入, 否则 effective_key_rec.id.
197 let owner_key_id =
198 owner_key_id_override.or_else(|| effective_key_rec.map(|rec| rec.id.clone()));
199
200 let subscriber = PushSubscriber {
201 peer,
202 acc_ids,
203 allowed_acc_ids_snapshot,
204 allowed_markets_snapshot,
205 registered_at: Instant::now(),
206 owner_key_id,
207 };
208 self.push_subscribers
209 .lock()
210 .await
211 .insert(session_id.clone(), subscriber);
212 session_id
213 }
214
215 /// v1.4.103 (codex 50 F6 / 53 F4 — B8): unsub session ownership check.
216 ///
217 /// 行为:
218 /// - 无 caller_key_id (legacy / stdio): 退化为旧行为 (任何 caller 可 unsub).
219 /// - 有 caller_key_id + subscriber.owner_key_id 匹配: 删除, 返 Ok(true).
220 /// - 有 caller_key_id + subscriber.owner_key_id 不匹配: **拒绝**, 返
221 /// Err(reason) — 防其他 caller 拿可见 session_id 强制 unsub.
222 /// - session_id 不存在: 返 Ok(false) (idempotent, 不报错).
223 /// - subscriber.owner_key_id = None (legacy 注册): 退化为旧行为 — 任何 caller
224 /// 可 unsub (向后兼容).
225 pub async fn unregister_push_subscriber_with_owner_check(
226 &self,
227 session_id: &str,
228 caller_key_id: Option<&str>,
229 ) -> Result<bool, String> {
230 let mut subs = self.push_subscribers.lock().await;
231 // 不存在 → idempotent Ok(false), 不报错
232 let Some(sub) = subs.get(session_id) else {
233 return Ok(false);
234 };
235 // ownership check
236 match (caller_key_id, sub.owner_key_id.as_deref()) {
237 (None, _) => {
238 // 无 caller key: legacy / stdio 模式, 退化旧行为
239 subs.remove(session_id);
240 Ok(true)
241 }
242 (Some(caller), None) => {
243 // session 注册时无 owner_key_id (legacy): 任何 caller 可 unsub
244 tracing::warn!(
245 session_id,
246 caller,
247 "v1.4.103 B8: unsub legacy session (no owner_key_id) — \
248 allowed for backward-compat"
249 );
250 subs.remove(session_id);
251 Ok(true)
252 }
253 (Some(caller), Some(owner)) if caller == owner => {
254 subs.remove(session_id);
255 Ok(true)
256 }
257 (Some(caller), Some(owner)) => {
258 // ownership 不匹配: reject. 当前 MCP subscription ownership contract
259 // 没有 admin override surface;若要扩展,必须先在 spec / auth
260 // pipeline / integration tests 中定义清楚。
261 Err(format!(
262 "session_id {session_id:?} owned by key_id {owner:?}, \
263 caller key_id {caller:?} not allowed to unsub"
264 ))
265 }
266 }
267 }
268
269 /// 当前活跃订阅数。生产诊断使用 `push_subscribers_summary`,这里只作为
270 /// state 单测的轻量断言入口保留。
271 #[cfg(test)]
272 pub async fn push_subscriber_count(&self) -> usize {
273 self.push_subscribers.lock().await.len()
274 }
275
276 /// v1.4.58 Phase A: 列出所有 push 订阅 summary(tool diagnostic 用)。
277 ///
278 /// 返 Vec<(session_id, acc_ids, age_secs)>。
279 ///
280 /// **MED-NEW-3 修(2nd review)**:加 `caller_allowed_acc_ids` 参数做
281 /// scope-mode 多租过滤。当 caller 的 key 有 `allowed_acc_ids` 白名单时,
282 /// **只返** subscription 的 `acc_ids` 与 caller 白名单有交集的条目。
283 /// 避免 agent A(acc_ids=[100, 200])通过本 tool 看到 agent B 订阅的
284 /// acc_id=[300, 400]。
285 ///
286 /// `caller_allowed_acc_ids=None` / empty → 不过滤(legacy mode / no-scope key)。
287 ///
288 /// **rmcp 版本兼容**:rmcp 1.4.0 `Peer<RoleServer>` 不实装 `PartialEq`,
289 /// 无法按 peer 身份直接过滤。若未来 rmcp 加 PartialEq,可切到更精确的
290 /// per-session-owner 过滤(当前只能靠 acc_id 权限交集近似)。
291 pub async fn push_subscribers_summary(
292 &self,
293 caller_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
294 ) -> Vec<(String, std::collections::HashSet<u64>, u64)> {
295 let subs = self.push_subscribers.lock().await;
296 let now = std::time::Instant::now();
297 subs.iter()
298 .filter(|(_, sub)| subscriber_visible_to_caller(&sub.acc_ids, caller_allowed_acc_ids))
299 .map(|(id, sub)| {
300 let age = now
301 .checked_duration_since(sub.registered_at)
302 .map(|d| d.as_secs())
303 .unwrap_or(0);
304 // LOW-3RD-1(3rd code review):scope mode 下返回的 acc_ids 要与
305 // caller allowed 求交集 — 防止 caller=[100] 看到 sub=[100, 999]
306 // 时知道 999 这个 acc 存在。sub.acc_ids 空集(subscribe-all)不做
307 // 交集(概念上 caller 看到的是"有个 catch-all subscriber",不泄漏
308 // 具体账户信息)。
309 let visible_accs = match caller_allowed_acc_ids {
310 Some(allowed) if !allowed.is_empty() && !sub.acc_ids.is_empty() => {
311 sub.acc_ids.intersection(allowed).copied().collect()
312 }
313 _ => sub.acc_ids.clone(),
314 };
315 (id.clone(), visible_accs, age)
316 })
317 .collect()
318 }
319
320 /// 启用交易写工具(构造器式链式设置)
321 pub fn with_trading(mut self, enable_trading: bool, allow_real_trading: bool) -> Self {
322 self.enable_trading = enable_trading;
323 self.allow_real_trading = allow_real_trading;
324 self
325 }
326
327 /// 设置 KeyStore(新授权模式)
328 pub fn with_key_store(mut self, store: Arc<KeyStore>) -> Self {
329 self.key_store = store;
330 self
331 }
332
333 /// 设置已通过验证的 API Key 记录
334 pub fn with_authed_key(mut self, key: Option<Arc<KeyRecord>>) -> Self {
335 self.authed_key = key;
336 self
337 }
338
339 /// 设置交易密码所属登录账号(MCP 只连 gateway,本身无法可靠推断 daemon
340 /// 的 login account;由 CLI/env/config 显式注入)。
341 pub fn with_trade_pwd_account(mut self, account: Option<String>) -> Self {
342 self.trade_pwd_account = account;
343 self
344 }
345
346 /// 是否启用了 scope 授权模式
347 pub fn is_scope_mode(&self) -> bool {
348 self.key_store.is_configured()
349 }
350
351 /// 交易写工具开关(legacy mode)。
352 pub fn enable_trading(&self) -> bool {
353 self.enable_trading
354 }
355
356 /// real 环境交易写工具开关(legacy mode)。
357 pub fn allow_real_trading(&self) -> bool {
358 self.allow_real_trading
359 }
360
361 /// 当前 MCP API key store。返回共享引用,避免调用方替换 runtime storage。
362 pub fn key_store(&self) -> &Arc<KeyStore> {
363 &self.key_store
364 }
365
366 /// startup 阶段验证过的 key 快照;调用方需要 fresh record 时仍应按 id 回查 key store。
367 pub fn authed_key(&self) -> Option<Arc<KeyRecord>> {
368 self.authed_key.clone()
369 }
370
371 /// 交易密码所属登录账号。
372 pub fn trade_pwd_account(&self) -> Option<&str> {
373 self.trade_pwd_account.as_deref()
374 }
375
376 /// 限额运行时计数器。返回共享引用,避免调用方替换 runtime storage。
377 pub fn counters(&self) -> &Arc<RuntimeCounters> {
378 &self.counters
379 }
380
381 /// 当前配置的 gateway 地址。
382 pub async fn gateway(&self) -> String {
383 self.inner.lock().await.gateway.clone()
384 }
385
386 /// 获取(或懒加载)网关客户端
387 pub async fn client(&self) -> Result<Arc<FutuClient>> {
388 let gateway = {
389 let guard = self.inner.lock().await;
390 if let Some(c) = &guard.client {
391 return Ok(c.clone());
392 }
393 guard.gateway.clone()
394 };
395
396 let config = ClientConfig {
397 addr: gateway.clone(),
398 client_ver: env!("CARGO_PKG_VERSION").to_string(),
399 client_id: "futu-mcp".to_string(),
400 recv_notify: false,
401 rsa_key: None,
402 };
403 let policy =
404 ReconnectPolicy::new(MCP_CONNECT_RETRY_DELAY, MCP_CONNECT_RETRY_DELAY, Some(1));
405 let mut reconnector = ReconnectingClient::new(config).with_policy(policy);
406 let connect_result =
407 tokio::time::timeout(MCP_CONNECT_TOTAL_TIMEOUT, reconnector.connect()).await;
408 let (client, mut push_rx, _info) = match connect_result {
409 Ok(result) => {
410 result.with_context(|| format!("connect to futu gateway at {gateway}"))?
411 }
412 Err(_) => {
413 return Err(anyhow!(
414 "connect to futu gateway at {gateway} timed out after {}s",
415 MCP_CONNECT_TOTAL_TIMEOUT.as_secs()
416 ));
417 }
418 };
419
420 let arc = Arc::new(client);
421 {
422 let mut guard = self.inner.lock().await;
423 if let Some(c) = &guard.client {
424 return Ok(c.clone());
425 }
426 guard.client = Some(arc.clone());
427 }
428
429 // v1.4.38 Phase 5 (100%): 按 acc_ids 过滤的 push broadcast
430 //
431 // 流程:
432 // 1. push_rx 收 daemon 转发的 push
433 // 2. 对 TRD_UPDATE_ORDER (2208) / TRD_UPDATE_ORDER_FILL (2218) 解包
434 // 提取 acc_id
435 // 3. 遍历订阅者,**只推给 acc_ids 匹配的**(或订阅者 acc_ids 空 = 不
436 // 过滤,所有 acc 都收)
437 // 4. 行情 push(QOT_UPDATE_*)无 acc_id 语义,广播给所有订阅者
438 //
439 // Per-session 独立 spawn notify,避免一个慢 session 阻塞其他
440 let subs_for_push = Arc::downgrade(&self.push_subscribers);
441 // v1.4.105 F5 fix (codex review C4 USER_ACK B): MCP push filter 改用
442 // FilterRegistry::should_drop_event 单一注册中心 (跟 4 surface 一致),
443 // 替代之前 inline subscriber_should_receive_with_market. 防 sibling-route
444 // bypass — 加新 push event filter 维度只在 install_defaults 注册一次,
445 // MCP 自动覆盖.
446 let filter_registry =
447 std::sync::Arc::new(futu_auth_pipeline::FilterRegistry::with_defaults());
448 tokio::spawn(async move {
449 while let Some(push) = push_rx.recv().await {
450 let Some(subs_for_push) = subs_for_push.upgrade() else {
451 break;
452 };
453 let subscribers = {
454 let subs = subs_for_push.lock().await;
455 if subs.is_empty() {
456 Vec::new()
457 } else {
458 subs.iter()
459 .map(|(session_id, sub)| (session_id.clone(), sub.clone()))
460 .collect::<Vec<_>>()
461 }
462 };
463 if subscribers.is_empty() {
464 continue; // fast path: no listeners, drop
465 }
466 // v1.4.105 T-C2 + v1.4.106 codex 0932 F6/F7: classify push by proto_id
467 // (set membership), 不再靠 body decode 成功推断. trade body decode
468 // 失败现在归 TradePushDecode::DecodeFailed (event_type="trade",
469 // 无 acc/market gate 信息) — restricted key 应 drop, unrestricted
470 // 透传带 decode_status="failed".
471 let decode_result = classify_trade_push(push.proto_id, &push.body);
472 let (push_acc_id, push_trd_market, decode_status, event_type) = match &decode_result
473 {
474 TradePushDecode::NotTrade => (None, None, "ok", "quote"),
475 TradePushDecode::Decoded { acc_id, trd_market } => {
476 (Some(*acc_id), Some(*trd_market), "ok", "trade")
477 }
478 TradePushDecode::DecodeFailed => (None, None, "failed", "trade"),
479 };
480 let push_trd_market_str = push_trd_market.map(trd_market_int_to_str);
481 // v1.4.106 codex 0932 F7 [P3]: payload 加 event_type / trd_market /
482 // decode_status — 让客户端不需要按 proto_id 自己 derive (4 surface 一致).
483 // body_base64 后向兼容保留.
484 let payload = serde_json::json!({
485 "kind": "futu_push",
486 "proto_id": push.proto_id,
487 "acc_id": push_acc_id,
488 "event_type": event_type,
489 "trd_market": push_trd_market_str,
490 "decode_status": decode_status,
491 "body_base64": base64_encode_bytes(&push.body),
492 });
493 let deliveries = {
494 let mut deliveries = Vec::with_capacity(subscribers.len());
495 for (session_id, sub) in subscribers.iter() {
496 // v1.4.106 codex 0932 F6 [P2]: trade decode-failed + restricted
497 // key (allowed_acc_ids 非 None) → DROP. 不能让 restricted key
498 // 看到无 acc gate 信息的 trade body 透传 (绕过 ACL).
499 // unrestricted key (allowed_acc_ids None / 空) 仍透传带 decode_status="failed".
500 if matches!(decode_result, TradePushDecode::DecodeFailed) {
501 let restricted = sub
502 .allowed_acc_ids_snapshot
503 .as_ref()
504 .map(|s| !s.is_empty())
505 .unwrap_or(false);
506 if restricted {
507 let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
508 // 复用 cross-surface metric — reason="trade_decode_failed"
509 futu_auth::metrics::bump_ws_filtered("trade_decode_failed", key_id);
510 tracing::warn!(
511 proto_id = push.proto_id,
512 key_id,
513 "v1.4.106 audit 0932 F6: trade push body decode failed; \
514 dropped for restricted key (allowed_acc_ids set, \
515 cannot ACL-gate body without acc_id)"
516 );
517 continue;
518 }
519 // unrestricted: fall through, broadcast 带 decode_status="failed"
520 }
521 // v1.4.105 F5 fix: 改用 FilterRegistry::should_drop_event.
522 // 行为对齐 4 surface — sub.acc_ids (MCP 显式订阅 list) 喂给
523 // ctx.sub_state (REST sub-acc-push state 同语义); sub.allowed_*
524 // _snapshot 喂给 ctx.allowed_* (caller key 限额).
525 //
526 // **行为微差** (与老 inline fn 一致, 不破老行为):
527 // - sub.acc_ids 空 = MCP 老语义"无限制订阅" → 传 None
528 // (避免 REST sub_state 空集 tombstone 语义触发 drop-all)
529 // - sub.acc_ids 非空 → 传 Some(&sub.acc_ids), 跟 REST 一致
530 let sub_state_for_ctx = if sub.acc_ids.is_empty() {
531 None
532 } else {
533 Some(&sub.acc_ids)
534 };
535 let ctx = futu_auth_pipeline::PushEventCtx {
536 event_type,
537 event_acc: push_acc_id,
538 allowed_acc_ids: sub.allowed_acc_ids_snapshot.as_ref(),
539 sub_state: sub_state_for_ctx,
540 event_trd_market: push_trd_market_str,
541 allowed_markets: sub.allowed_markets_snapshot.as_ref(),
542 };
543 if filter_registry.should_drop_event(&ctx) {
544 // v1.4.105 T-C2 + F3 (codex review C4): bump filtered metric.
545 // **统一 label 命名** 跟 4 surface 一致 — gRPC subscribe_push
546 // / push_trd_acc / 都用 "trade_market" 标 Layer 3 (allowed_markets)
547 // 拒. 老 "push.trade" 命名是 surface-specific (T-C2 sole), 改成
548 // canonical "trade_market" 让跨 surface metrics jq aggregate 一致.
549 let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
550 futu_auth::metrics::bump_ws_filtered("trade_market", key_id);
551 continue;
552 }
553 deliveries.push(PushDelivery {
554 peer: sub.peer.clone(),
555 data: payload.clone(),
556 session_id: session_id.clone(),
557 owner_key_id: sub.owner_key_id.clone(),
558 proto_id: push.proto_id,
559 });
560 }
561 deliveries
562 };
563 for delivery in deliveries {
564 tokio::spawn(async move {
565 let params = rmcp::model::LoggingMessageNotificationParam {
566 level: rmcp::model::LoggingLevel::Info,
567 logger: Some("futu_push".to_string()),
568 data: delivery.data,
569 };
570 if let Err(err) = delivery.peer.notify_logging_message(params).await {
571 tracing::warn!(
572 proto_id = delivery.proto_id,
573 session_id = delivery.session_id,
574 owner_key_id = delivery.owner_key_id.as_deref().unwrap_or("<none>"),
575 error = %err,
576 "mcp push notification send failed"
577 );
578 }
579 });
580 }
581 }
582 });
583
584 // v1.4.39 Phase 5 stale cleanup: 5 分钟跑一次,移除 registered_at > 4h
585 // 的订阅者。避免长跑 daemon 累积陈旧 subscriber(客户端断开 / rmcp
586 // session gone 但没显式 unregister 的情况)。
587 let subs_for_purge = Arc::downgrade(&self.push_subscribers);
588 tokio::spawn(async move {
589 use std::time::Duration;
590 const PURGE_INTERVAL: Duration = Duration::from_secs(5 * 60);
591 const MAX_AGE: Duration = Duration::from_secs(4 * 3600);
592 let mut ticker = tokio::time::interval(PURGE_INTERVAL);
593 ticker.tick().await; // skip the immediate first tick
594 loop {
595 ticker.tick().await;
596 let Some(subs_for_purge) = subs_for_purge.upgrade() else {
597 break;
598 };
599 let now = std::time::Instant::now();
600 let mut subs = subs_for_purge.lock().await;
601 let before = subs.len();
602 subs.retain(|_, sub| {
603 now.checked_duration_since(sub.registered_at)
604 .map(|age| age < MAX_AGE)
605 .unwrap_or(true)
606 });
607 let purged = before - subs.len();
608 if purged > 0 {
609 tracing::info!(
610 purged,
611 remaining = subs.len(),
612 max_age_secs = MAX_AGE.as_secs(),
613 "v1.4.39 Phase 5: purged stale push subscribers (> 4h registered)"
614 );
615 }
616 }
617 });
618
619 Ok(arc)
620 }
621}
622
623// ========== symbol 解析 ==========
624
625pub fn parse_symbol(s: &str) -> Result<Security> {
626 futu_qot::symbol::parse_symbol(s)
627}
628
629/// 格式化 Security 为 "MARKET.CODE"
630pub fn format_symbol(sec: &Security) -> String {
631 futu_qot::symbol::format_symbol(sec)
632}
633
634/// v1.4.90 P2-C: audit log Option<T> 序列化助手。
635///
636/// **背景**:之前 audit log 把 `Option<f64>` 用 `?req.price`(tracing 的 Debug
637/// shorthand)记录,渲染成 JSON 字符串 `"Some(400.0)"` / `"None"`,下游 jq /
638/// DuckDB 数值聚合炸(aggregator 期望 `400.0` number 或 `null`)。
639///
640/// **修法**:用 NaN sentinel 把 `Option<f64>` flatten 成 `f64`,tracing-subscriber
641/// 的 JSON formatter 内部走 `serde_json::Value::from(f64::NAN)` →
642/// `Number::from_f64(NaN) = None` → `Value::Null`。
643/// 整数 / 字符串同理(i32 → f64 NaN sentinel;&str → "" 哨兵)。
644///
645/// 验证依据:
646/// - `tracing_subscriber::fmt::format::json` line 501 `record_f64` 直接调
647/// `serde_json::Value::from(value)`
648/// - `serde_json::Value::from(f64)` impl: `Number::from_f64(f).map_or(Value::Null, Value::Number)`
649pub mod audit_fmt {
650 /// `Option<f64>` → `f64`(None → NaN)。tracing JSON 渲染 NaN 为 `null`。
651 #[inline]
652 pub fn opt_f64(v: Option<f64>) -> f64 {
653 v.unwrap_or(f64::NAN)
654 }
655
656 /// `Option<i32>` → `f64`(None → NaN,Some(n) → n as f64)。
657 /// i32 ≤ 2^31 < 2^52 mantissa,无精度损失。
658 #[inline]
659 pub fn opt_i32(v: Option<i32>) -> f64 {
660 v.map(f64::from).unwrap_or(f64::NAN)
661 }
662
663 /// `Option<&str>` → `&str`(None → "")。"" 哨兵在 audit 上下文里足以区分
664 /// 不传 vs 传空(因为 Symbol / owner 等业务字段不会是空字符串)。
665 #[inline]
666 pub fn opt_str(v: Option<&str>) -> &str {
667 v.unwrap_or("")
668 }
669}