futu_mcp/state/push_filter.rs
1/// v1.4.105 T-C2: 从 daemon push 的 raw body 里解 `(acc_id, trd_market)`,
2/// 用于按 caller key `allowed_markets` 白名单过滤 trade push event。
3///
4/// 只处理 trade push(TRD_UPDATE_ORDER / TRD_UPDATE_ORDER_FILL),proto Header
5/// 同时含 `acc_id` (u64) 与 `trd_market` (i32 enum)。返 `Some((acc_id, trd_market))`
6/// 仅当 decode 成功且 s2c.header 存在.
7///
8/// 行情 push (QOT_UPDATE_*) 不含 trd_market 概念 → 返 None → 调用方按
9/// `event_acc_id=None` 路径不做 acc/market gate, 直接 broadcast.
10pub(crate) fn extract_acc_id_and_market_from_push(
11 proto_id: u32,
12 body: &[u8],
13) -> Option<(u64, i32)> {
14 use prost::Message;
15 match proto_id {
16 TRD_UPDATE_ORDER_PROTO_ID => {
17 let rsp = match futu_proto::trd_update_order::Response::decode(body) {
18 Ok(rsp) => rsp,
19 Err(e) => {
20 tracing::debug!(
21 proto_id,
22 body_len = body.len(),
23 error = %e,
24 "MCP trade push body decode failed while extracting acc_id"
25 );
26 return None;
27 }
28 };
29 let h = rsp.s2c?.header;
30 Some((h.acc_id, h.trd_market))
31 }
32 TRD_UPDATE_ORDER_FILL_PROTO_ID => {
33 let rsp = match futu_proto::trd_update_order_fill::Response::decode(body) {
34 Ok(rsp) => rsp,
35 Err(e) => {
36 tracing::debug!(
37 proto_id,
38 body_len = body.len(),
39 error = %e,
40 "MCP trade push body decode failed while extracting acc_id"
41 );
42 return None;
43 }
44 };
45 let h = rsp.s2c?.header;
46 Some((h.acc_id, h.trd_market))
47 }
48 _ => None,
49 }
50}
51
52/// v1.4.106 codex 0932 F6/F7: trade push proto_ids (set membership 测试).
53const TRD_UPDATE_ORDER_PROTO_ID: u32 = 2208;
54const TRD_UPDATE_ORDER_FILL_PROTO_ID: u32 = 2218;
55
56/// v1.4.106 codex 0932 F6 [P2]: 仅按 proto_id 判断是否 trade push 类.
57///
58/// 修 F6 silent-misclassify bug: 旧实装靠 `extract_acc_id_and_market_from_push`
59/// 返 None 推断 "非 trade" — 但 trade push (proto_id 2208/2218) 若 body decode
60/// 失败也返 None → 误归 quote → restricted key (有 allowed_acc_ids 限额) 看到
61/// 该 push 时按 quote 路径直接 broadcast (绕过 trade-market filter 的 ACL).
62///
63/// **正确语义**: event_type 由 proto_id 决定 (set membership), 与 body 完整性
64/// 无关. body 解码状态另用 `decode_status` 字段表达 (F7).
65pub(crate) fn is_trade_push_proto_id(proto_id: u32) -> bool {
66 matches!(
67 proto_id,
68 TRD_UPDATE_ORDER_PROTO_ID | TRD_UPDATE_ORDER_FILL_PROTO_ID
69 )
70}
71
72/// v1.4.106 codex 0932 F6 [P2]: trade push 的解码结果 + 分发决策.
73#[derive(Debug, Clone)]
74pub(crate) enum TradePushDecode {
75 /// proto_id 不在 trade set — 非 trade push (quote / notify / 其他).
76 NotTrade,
77 /// trade push body decode 成功 — 含 (acc_id, trd_market).
78 Decoded { acc_id: u64, trd_market: i32 },
79 /// trade push body decode 失败 — caller 必须按 trade 语义处理 (event_type="trade")
80 /// 但无 acc_id / market gate 信息. restricted key 应 drop, unrestricted
81 /// 应透传带 `decode_status="failed"`.
82 DecodeFailed,
83}
84
85pub(crate) fn classify_trade_push(proto_id: u32, body: &[u8]) -> TradePushDecode {
86 if !is_trade_push_proto_id(proto_id) {
87 return TradePushDecode::NotTrade;
88 }
89 match extract_acc_id_and_market_from_push(proto_id, body) {
90 Some((acc_id, trd_market)) => TradePushDecode::Decoded { acc_id, trd_market },
91 None => TradePushDecode::DecodeFailed,
92 }
93}
94
95/// v1.4.105 T-C2: `Trd_Common.TrdMarket` enum int → 字符串 (与 `keys.json`
96/// 配 `allowed_markets` 中字符串一致). 实际映射由 `futu_trd::market`
97/// 统一维护,避免 MCP push 过滤、CLI 展示和 trade read projection 漂移.
98///
99/// 来源: `Trd_Common.proto::TrdMarket` enum.
100pub(crate) fn trd_market_int_to_str(i: i32) -> &'static str {
101 futu_trd::market::trd_market_label(i).unwrap_or("")
102}
103
104/// v1.4.39 Phase 5 filter 核心决策(pure function,便于单测):
105///
106/// 判断 push 是否应推给某个订阅者。2 层 gate:
107/// 1. **subscriber acc_ids**:`futu_sub_acc_push` 参数里用户指定的 acc_id 列表
108/// (空集 = subscribe-all,即不做 subscriber 级过滤)
109/// 2. **key allowed_acc_ids** (v1.4.39):注册时快照的 per-key 白名单。defense-in-depth
110/// 层,防止 agent 订阅了 key 无权限的 acc_id(主 auth 在 guard.rs tool 调用
111/// 时 enforce,但 push 是服务端主动发起绕过 tool 调用)
112///
113/// 两层都 pass 才推。行情类 push (`push_acc_id=None`) 跳过所有 acc_id gate。
114/// v1.4.58 MED-NEW-3(2nd code review): summary 过滤决策 pure function。
115///
116/// 用于 `push_subscribers_summary` 的 cross-tenant filter —— scope mode 下 caller
117/// 只能看到**自己 allowed_acc_ids 有交集的订阅**(防止跨租户泄漏其他 agent 的
118/// acc_ids)。
119///
120/// 规则:
121/// - `caller_allowed = None` 或空集(legacy / unrestricted)→ 全可见
122/// - 被查订阅的 `sub_acc_ids` 空集(subscribe-all)→ 全可见(没有 specific 账户可泄漏)
123/// - 否则 → 只要 `sub_acc_ids` 与 `caller_allowed` 有**任一交集** → 可见
124pub(crate) fn subscriber_visible_to_caller(
125 sub_acc_ids: &std::collections::HashSet<u64>,
126 caller_allowed: Option<&std::collections::HashSet<u64>>,
127) -> bool {
128 match caller_allowed {
129 Some(allowed) if !allowed.is_empty() => {
130 sub_acc_ids.is_empty() || sub_acc_ids.iter().any(|a| allowed.contains(a))
131 }
132 _ => true,
133 }
134}
135
136/// v1.4.105 T-C2: 生产路径已 migrate 到 `subscriber_should_receive_with_market`
137/// (含 market gate Layer 3). 本 helper 保留作向后兼容入口 (老 6 个 filter_*
138/// pure-fn 测仍引用), 不入生产路径.
139///
140/// **v1.4.105 F5 fix**: production migrated to `FilterRegistry::should_drop_event`,
141/// 本 fn 跟 `subscriber_should_receive_with_market` 同 `#[cfg(test)]` gate 防
142/// dead_code 警告.
143#[cfg(test)]
144pub(crate) fn subscriber_should_receive(
145 push_acc_id: Option<u64>,
146 sub_acc_ids: &std::collections::HashSet<u64>,
147 key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
148) -> bool {
149 // v1.4.105 T-C2: 旧 API 委托新 API (无 market 信息 → 不做 market gate).
150 // 保留为向后兼容入口 (单测仍以 sub_acc_ids + allowed_acc_ids 双 gate 模式验证).
151 subscriber_should_receive_with_market(
152 push_acc_id,
153 None, // 无 market context → 不做 market gate
154 sub_acc_ids,
155 key_allowed_acc_ids,
156 None, // 无 allowed_markets snapshot → 不做 market gate
157 )
158}
159
160/// v1.4.105 T-C2: subscriber_should_receive 的扩展版 — 加 market gate (Layer 3).
161///
162/// 完整 3-layer trade push filter:
163/// 1. **subscriber acc_ids** (`futu_sub_acc_push` 参数): 空 = subscribe-all
164/// 2. **key allowed_acc_ids** (caller key 注册时快照): 硬白名单
165/// 3. **key allowed_markets** (caller key 注册时快照, v1.4.105 加): 硬白名单
166///
167/// 三 gate 都 pass 才推. `event_trd_market=None` (decode 失败 / 行情 push) 跳过
168/// market gate. `key_allowed_markets=None` / 空 = 不做 market gate (stdio /
169/// legacy / 未配 allowed_markets 的 key).
170///
171/// 行情类 push (`push_acc_id=None`) 仍按 v1.4.39 行为全推 — 行情无 acc / market
172/// 概念, 不参与 trade event filter.
173///
174/// **v1.4.105 F5 fix (codex review C4)**: production 路径已 migrate 到
175/// `FilterRegistry::should_drop_event` (跟 4 surface 一致). 本 fn 现仅作
176/// `#[cfg(test)]` unit test target — 直接验证 logic, 不再 production 调用.
177/// 保留 + cfg-gate 让 12 既有 test 继续 lock 行为契约 (logic 等价两套断言:
178/// 这里 pure-fn assert + cross_surface_invariants FilterRegistry 路径 assert).
179#[cfg(test)]
180pub(crate) fn subscriber_should_receive_with_market(
181 push_acc_id: Option<u64>,
182 event_trd_market: Option<i32>,
183 sub_acc_ids: &std::collections::HashSet<u64>,
184 key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
185 key_allowed_markets: Option<&std::collections::HashSet<String>>,
186) -> bool {
187 let Some(aid) = push_acc_id else {
188 return true; // 非 trade push,全推(行情类无 key 级 gate 概念)
189 };
190 // Layer 1: 订阅者 acc_ids 过滤
191 let sub_gate = sub_acc_ids.is_empty() || sub_acc_ids.contains(&aid);
192 // Layer 2: caller key allowed_acc_ids 硬白名单
193 let key_acc_gate = match key_allowed_acc_ids {
194 Some(allowed) if !allowed.is_empty() => allowed.contains(&aid),
195 _ => true, // 无 key 级约束(allowed_acc_ids=None 或空)/ stdio 模式
196 };
197 // Layer 3 (v1.4.105 T-C2): caller key allowed_markets 硬白名单
198 //
199 // 行为契约:
200 // - `key_allowed_markets=None` 或空集 → 不做 market gate (向后兼容 + stdio)
201 // - `event_trd_market=None` → 不做 market gate (decode 失败保守 — 不 silent
202 // drop, 让 caller 看到原始 push, downstream 单测覆盖 decode 失败路径)
203 // - `event_trd_market=Some(int)` → 转字符串与 allowed 集合比对
204 // (注: int=0/未知 → trd_market_int_to_str 返 "" → 不在任何非空 allowed
205 // 集合内 → drop. 这是 fail-closed 语义: 未知 market 配 restricted key
206 // 应 drop 而非 silently leak)
207 let key_market_gate = match key_allowed_markets {
208 Some(allowed) if !allowed.is_empty() => match event_trd_market {
209 Some(int) => allowed.contains(trd_market_int_to_str(int)),
210 None => true, // decode 失败 → 不 drop (向后兼容 — 单独 metric 提示)
211 },
212 _ => true, // 无 market 限制
213 };
214 sub_gate && key_acc_gate && key_market_gate
215}