Skip to main content

futu_mcp/state/
push_filter.rs

1/// v1.4.105 T-C2: 从 daemon push 的 raw body 里解 `(acc_id, trd_market)`,
2/// 用于按 caller key `allowed_markets` 白名单过滤 trade push event。
3///
4/// 只处理 trade push(TRD_UPDATE_ORDER / TRD_UPDATE_ORDER_FILL),proto Header
5/// 同时含 `acc_id` (u64) 与 `trd_market` (i32 enum)。返 `Some((acc_id, trd_market))`
6/// 仅当 decode 成功且 s2c.header 存在.
7///
8/// 行情 push (QOT_UPDATE_*) 不含 trd_market 概念 → 返 None → 调用方按
9/// `event_acc_id=None` 路径不做 acc/market gate, 直接 broadcast.
10pub(crate) fn extract_acc_id_and_market_from_push(
11    proto_id: u32,
12    body: &[u8],
13) -> Option<(u64, i32)> {
14    use prost::Message;
15    match proto_id {
16        TRD_UPDATE_ORDER_PROTO_ID => {
17            let rsp = match futu_proto::trd_update_order::Response::decode(body) {
18                Ok(rsp) => rsp,
19                Err(e) => {
20                    tracing::debug!(
21                        proto_id,
22                        body_len = body.len(),
23                        error = %e,
24                        "MCP trade push body decode failed while extracting acc_id"
25                    );
26                    return None;
27                }
28            };
29            let h = rsp.s2c?.header;
30            Some((h.acc_id, h.trd_market))
31        }
32        TRD_UPDATE_ORDER_FILL_PROTO_ID => {
33            let rsp = match futu_proto::trd_update_order_fill::Response::decode(body) {
34                Ok(rsp) => rsp,
35                Err(e) => {
36                    tracing::debug!(
37                        proto_id,
38                        body_len = body.len(),
39                        error = %e,
40                        "MCP trade push body decode failed while extracting acc_id"
41                    );
42                    return None;
43                }
44            };
45            let h = rsp.s2c?.header;
46            Some((h.acc_id, h.trd_market))
47        }
48        _ => None,
49    }
50}
51
52/// v1.4.106 codex 0932 F6/F7: trade push proto_ids (set membership 测试).
53const TRD_UPDATE_ORDER_PROTO_ID: u32 = 2208;
54const TRD_UPDATE_ORDER_FILL_PROTO_ID: u32 = 2218;
55
56/// v1.4.106 codex 0932 F6 [P2]: 仅按 proto_id 判断是否 trade push 类.
57///
58/// 修 F6 silent-misclassify bug: 旧实装靠 `extract_acc_id_and_market_from_push`
59/// 返 None 推断 "非 trade" — 但 trade push (proto_id 2208/2218) 若 body decode
60/// 失败也返 None → 误归 quote → restricted key (有 allowed_acc_ids 限额) 看到
61/// 该 push 时按 quote 路径直接 broadcast (绕过 trade-market filter 的 ACL).
62///
63/// **正确语义**: event_type 由 proto_id 决定 (set membership), 与 body 完整性
64/// 无关. body 解码状态另用 `decode_status` 字段表达 (F7).
65pub(crate) fn is_trade_push_proto_id(proto_id: u32) -> bool {
66    matches!(
67        proto_id,
68        TRD_UPDATE_ORDER_PROTO_ID | TRD_UPDATE_ORDER_FILL_PROTO_ID
69    )
70}
71
72/// v1.4.106 codex 0932 F6 [P2]: trade push 的解码结果 + 分发决策.
73#[derive(Debug, Clone)]
74pub(crate) enum TradePushDecode {
75    /// proto_id 不在 trade set — 非 trade push (quote / notify / 其他).
76    NotTrade,
77    /// trade push body decode 成功 — 含 (acc_id, trd_market).
78    Decoded { acc_id: u64, trd_market: i32 },
79    /// trade push body decode 失败 — caller 必须按 trade 语义处理 (event_type="trade")
80    /// 但无 acc_id / market gate 信息. restricted key 应 drop, unrestricted
81    /// 应透传带 `decode_status="failed"`.
82    DecodeFailed,
83}
84
85pub(crate) fn classify_trade_push(proto_id: u32, body: &[u8]) -> TradePushDecode {
86    if !is_trade_push_proto_id(proto_id) {
87        return TradePushDecode::NotTrade;
88    }
89    match extract_acc_id_and_market_from_push(proto_id, body) {
90        Some((acc_id, trd_market)) => TradePushDecode::Decoded { acc_id, trd_market },
91        None => TradePushDecode::DecodeFailed,
92    }
93}
94
95/// v1.4.105 T-C2: `Trd_Common.TrdMarket` enum int → 字符串 (与 `keys.json`
96/// 配 `allowed_markets` 中字符串一致). 实际映射由 `futu_trd::market`
97/// 统一维护,避免 MCP push 过滤、CLI 展示和 trade read projection 漂移.
98///
99/// 来源: `Trd_Common.proto::TrdMarket` enum.
100pub(crate) fn trd_market_int_to_str(i: i32) -> &'static str {
101    futu_trd::market::trd_market_label(i).unwrap_or("")
102}
103
104/// v1.4.39 Phase 5 filter 核心决策(pure function,便于单测):
105///
106/// 判断 push 是否应推给某个订阅者。2 层 gate:
107/// 1. **subscriber acc_ids**:`futu_sub_acc_push` 参数里用户指定的 acc_id 列表
108///    (空集 = subscribe-all,即不做 subscriber 级过滤)
109/// 2. **key allowed_acc_ids** (v1.4.39):注册时快照的 per-key 白名单。defense-in-depth
110///    层,防止 agent 订阅了 key 无权限的 acc_id(主 auth 在 guard.rs tool 调用
111///    时 enforce,但 push 是服务端主动发起绕过 tool 调用)
112///
113/// 两层都 pass 才推。行情类 push (`push_acc_id=None`) 跳过所有 acc_id gate。
114/// v1.4.58 MED-NEW-3(2nd code review): summary 过滤决策 pure function。
115///
116/// 用于 `push_subscribers_summary` 的 cross-tenant filter —— scope mode 下 caller
117/// 只能看到**自己 allowed_acc_ids 有交集的订阅**(防止跨租户泄漏其他 agent 的
118/// acc_ids)。
119///
120/// 规则:
121/// - `caller_allowed = None` 或空集(legacy / unrestricted)→ 全可见
122/// - 被查订阅的 `sub_acc_ids` 空集(subscribe-all)→ 全可见(没有 specific 账户可泄漏)
123/// - 否则 → 只要 `sub_acc_ids` 与 `caller_allowed` 有**任一交集** → 可见
124pub(crate) fn subscriber_visible_to_caller(
125    sub_acc_ids: &std::collections::HashSet<u64>,
126    caller_allowed: Option<&std::collections::HashSet<u64>>,
127) -> bool {
128    match caller_allowed {
129        Some(allowed) if !allowed.is_empty() => {
130            sub_acc_ids.is_empty() || sub_acc_ids.iter().any(|a| allowed.contains(a))
131        }
132        _ => true,
133    }
134}
135
136/// v1.4.105 T-C2: 生产路径已 migrate 到 `subscriber_should_receive_with_market`
137/// (含 market gate Layer 3). 本 helper 保留作向后兼容入口 (老 6 个 filter_*
138/// pure-fn 测仍引用), 不入生产路径.
139///
140/// **v1.4.105 F5 fix**: production migrated to `FilterRegistry::should_drop_event`,
141/// 本 fn 跟 `subscriber_should_receive_with_market` 同 `#[cfg(test)]` gate 防
142/// dead_code 警告.
143#[cfg(test)]
144pub(crate) fn subscriber_should_receive(
145    push_acc_id: Option<u64>,
146    sub_acc_ids: &std::collections::HashSet<u64>,
147    key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
148) -> bool {
149    // v1.4.105 T-C2: 旧 API 委托新 API (无 market 信息 → 不做 market gate).
150    // 保留为向后兼容入口 (单测仍以 sub_acc_ids + allowed_acc_ids 双 gate 模式验证).
151    subscriber_should_receive_with_market(
152        push_acc_id,
153        None, // 无 market context → 不做 market gate
154        sub_acc_ids,
155        key_allowed_acc_ids,
156        None, // 无 allowed_markets snapshot → 不做 market gate
157    )
158}
159
160/// v1.4.105 T-C2: subscriber_should_receive 的扩展版 — 加 market gate (Layer 3).
161///
162/// 完整 3-layer trade push filter:
163/// 1. **subscriber acc_ids** (`futu_sub_acc_push` 参数): 空 = subscribe-all
164/// 2. **key allowed_acc_ids** (caller key 注册时快照): 硬白名单
165/// 3. **key allowed_markets** (caller key 注册时快照, v1.4.105 加): 硬白名单
166///
167/// 三 gate 都 pass 才推. `event_trd_market=None` (decode 失败 / 行情 push) 跳过
168/// market gate. `key_allowed_markets=None` / 空 = 不做 market gate (stdio /
169/// legacy / 未配 allowed_markets 的 key).
170///
171/// 行情类 push (`push_acc_id=None`) 仍按 v1.4.39 行为全推 — 行情无 acc / market
172/// 概念, 不参与 trade event filter.
173///
174/// **v1.4.105 F5 fix (codex review C4)**: production 路径已 migrate 到
175/// `FilterRegistry::should_drop_event` (跟 4 surface 一致). 本 fn 现仅作
176/// `#[cfg(test)]` unit test target — 直接验证 logic, 不再 production 调用.
177/// 保留 + cfg-gate 让 12 既有 test 继续 lock 行为契约 (logic 等价两套断言:
178/// 这里 pure-fn assert + cross_surface_invariants FilterRegistry 路径 assert).
179#[cfg(test)]
180pub(crate) fn subscriber_should_receive_with_market(
181    push_acc_id: Option<u64>,
182    event_trd_market: Option<i32>,
183    sub_acc_ids: &std::collections::HashSet<u64>,
184    key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
185    key_allowed_markets: Option<&std::collections::HashSet<String>>,
186) -> bool {
187    let Some(aid) = push_acc_id else {
188        return true; // 非 trade push,全推(行情类无 key 级 gate 概念)
189    };
190    // Layer 1: 订阅者 acc_ids 过滤
191    let sub_gate = sub_acc_ids.is_empty() || sub_acc_ids.contains(&aid);
192    // Layer 2: caller key allowed_acc_ids 硬白名单
193    let key_acc_gate = match key_allowed_acc_ids {
194        Some(allowed) if !allowed.is_empty() => allowed.contains(&aid),
195        _ => true, // 无 key 级约束(allowed_acc_ids=None 或空)/ stdio 模式
196    };
197    // Layer 3 (v1.4.105 T-C2): caller key allowed_markets 硬白名单
198    //
199    // 行为契约:
200    // - `key_allowed_markets=None` 或空集 → 不做 market gate (向后兼容 + stdio)
201    // - `event_trd_market=None` → 不做 market gate (decode 失败保守 — 不 silent
202    //   drop, 让 caller 看到原始 push, downstream 单测覆盖 decode 失败路径)
203    // - `event_trd_market=Some(int)` → 转字符串与 allowed 集合比对
204    //   (注: int=0/未知 → trd_market_int_to_str 返 "" → 不在任何非空 allowed
205    //    集合内 → drop. 这是 fail-closed 语义: 未知 market 配 restricted key
206    //    应 drop 而非 silently leak)
207    let key_market_gate = match key_allowed_markets {
208        Some(allowed) if !allowed.is_empty() => match event_trd_market {
209            Some(int) => allowed.contains(trd_market_int_to_str(int)),
210            None => true, // decode 失败 → 不 drop (向后兼容 — 单独 metric 提示)
211        },
212        _ => true, // 无 market 限制
213    };
214    sub_gate && key_acc_gate && key_market_gate
215}