Skip to main content

futu_rest/
ws.rs

1//! WebSocket 推送模块
2//!
3//! 在 REST API 端口上提供 /ws 路由,客户端通过 WebSocket 接收实时推送。
4//!
5//! 推送事件通过 broadcast channel 从 OpenD 核心分发到所有 WebSocket 客户端。
6
7use std::collections::{HashMap, HashSet};
8use std::net::SocketAddr;
9use std::sync::{Arc, RwLock};
10
11use axum::extract::connect_info::ConnectInfo;
12use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
13use axum::extract::{Query, State};
14use axum::http::{HeaderMap, StatusCode};
15use axum::response::IntoResponse;
16use chrono::Utc;
17use futures::{SinkExt, StreamExt};
18use tokio::sync::broadcast;
19
20use futu_auth::{KeyRecord, KeyStore, Scope};
21use futu_server::push::ExternalPushSink;
22
23use crate::adapter::RestState;
24
25/// REST `/ws` only accepts tiny JSON control messages from clients
26/// (`subscribe-notify` / `unsubscribe-notify`). Push payload size is governed by
27/// outbound serialization; this is an inbound resource boundary.
28pub const REST_WS_MAX_CONTROL_MESSAGE_SIZE_BYTES: usize = 64 * 1024;
29
30/// WebSocket 推送事件
31#[derive(Clone, Debug, serde::Serialize)]
32pub struct WsPushEvent {
33    /// 推送类型: "quote", "trade", "notify"
34    #[serde(rename = "type")]
35    pub event_type: String,
36    /// 该事件需要哪个 scope 才能被某个 client 接收(filter 用,不发到客户端)
37    #[serde(skip)]
38    pub required_scope: WsPushScope,
39    /// 协议 ID
40    pub proto_id: u32,
41    /// 证券标识 (行情推送)
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub sec_key: Option<String>,
44    /// 订阅类型 (行情推送)
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub sub_type: Option<i32>,
47    /// **v1.4.106 codex 1131 F4 [P1]**: rehab 类型 (KL push 非 0, 其它 sub_type
48    /// 为 0). 客户端用于 (sec_key, sub_type, rehab_type) 三元 key 自行 filter
49    /// 不感兴趣的 KL rehab 推送.
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub rehab_type: Option<i32>,
52    /// 交易账户 ID (交易推送)
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub acc_id: Option<u64>,
55    /// protobuf body 的 base64 编码
56    pub body_b64: String,
57    /// v1.4.105 D3 (Phase 4) T-B1: 交易推送的 trd_market 大写字符串 ("HK" /
58    /// "US" / "CN" / "HKCC" / "FUTURES" / "SG" / "AU" / "JP" / "MY" / "CA").
59    /// PushDispatcher 一次 decode body 后透传过来, 让 WS push filter Layer 3
60    /// (allowed_markets) 直接读. `None` = 非 trade event / decode 失败 /
61    /// market 未知 (Layer 3 向后兼容不 trigger drop).
62    ///
63    /// 客户端可见: trade event 出现 `trd_market` 字段, qot/notify 不出现
64    /// (`skip_serializing_if = "Option::is_none"`).
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub trd_market: Option<String>,
67}
68
69/// WS 推送事件需要的最低 scope(client 没这个 scope 就收不到)
70///
71/// - `Quote` → `qot:read`:行情类
72/// - `Notify` → `qot:read`:通用通知(如订阅状态、网关心跳)
73/// - `Trade` → `acc:read`:交易回报涉及账户隐私,必须有账户读权限
74#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
75#[non_exhaustive]
76pub enum WsPushScope {
77    /// 行情推送(订阅 symbol 后 push 的 basic_qot / order_book / ticker 等)。
78    /// 需要 [`Scope::QotRead`]。
79    #[default]
80    Quote,
81    /// 广播通知(系统事件 / 全局消息)。需要 [`Scope::QotRead`]。
82    Notify,
83    /// 交易推送(订单状态变化 / 成交回报)。需要 [`Scope::AccRead`]。
84    Trade,
85}
86
87impl WsPushScope {
88    /// 该事件类型需要的 Scope;client 必须持有这个 scope 才能收到
89    pub fn required_scope(&self) -> Scope {
90        match self {
91            WsPushScope::Quote => Scope::QotRead,
92            WsPushScope::Notify => Scope::QotRead,
93            WsPushScope::Trade => Scope::AccRead,
94        }
95    }
96}
97
98/// WebSocket 推送广播器
99///
100/// OpenD 核心推送事件 → broadcast channel → 所有 WebSocket 客户端
101///
102/// 实现 `ExternalPushSink` trait,可直接嵌入 PushDispatcher。
103#[derive(Clone)]
104pub struct WsBroadcaster {
105    tx: broadcast::Sender<WsPushEvent>,
106}
107
108impl WsBroadcaster {
109    pub fn new(capacity: usize) -> Self {
110        let (tx, _) = broadcast::channel(capacity);
111        Self { tx }
112    }
113
114    fn has_receivers(&self) -> bool {
115        self.tx.receiver_count() > 0
116    }
117
118    /// 发送推送事件到所有 WebSocket 客户端
119    pub fn send(&self, event: WsPushEvent) {
120        if !self.has_receivers() {
121            return;
122        }
123        let proto_id = event.proto_id;
124        let event_type = event.event_type.clone();
125        if self.tx.send(event).is_err() {
126            tracing::debug!(
127                proto_id,
128                event_type,
129                receiver_count = self.tx.receiver_count(),
130                "rest ws broadcast send skipped"
131            );
132        }
133    }
134
135    /// 创建接收端
136    pub fn subscribe(&self) -> broadcast::Receiver<WsPushEvent> {
137        self.tx.subscribe()
138    }
139
140    fn encode_body(body: &[u8]) -> String {
141        use base64::Engine;
142        base64::engine::general_purpose::STANDARD.encode(body)
143    }
144
145    /// 发送行情推送.
146    ///
147    /// **v1.4.106 codex 1131 F4 [P1]**: 加 `rehab_type` 参数. KL push 的
148    /// `rehab_type` ≠ 0, 其它 sub_type → 0. 当前 REST WS 仍 broadcast 所有
149    /// quote events 给 qot:read 订阅者 (per-conn 三元 key 过滤是 raw TCP 专属
150    /// 行为 — REST WS 用 broadcast 模型). 但 rehab_type 透传给客户端可见, 让
151    /// agent 自己识别 KL push 的 rehab 类型.
152    pub fn push_quote(
153        &self,
154        sec_key: &str,
155        sub_type: i32,
156        rehab_type: i32,
157        proto_id: u32,
158        body: &[u8],
159    ) {
160        if !self.has_receivers() {
161            return;
162        }
163        self.send(WsPushEvent {
164            event_type: "quote".to_string(),
165            required_scope: WsPushScope::Quote,
166            proto_id,
167            sec_key: Some(sec_key.to_string()),
168            sub_type: Some(sub_type),
169            rehab_type: Some(rehab_type),
170            acc_id: None,
171            body_b64: Self::encode_body(body),
172            trd_market: None,
173        });
174    }
175
176    /// 发送广播推送
177    pub fn push_broadcast(&self, proto_id: u32, body: &[u8]) {
178        if !self.has_receivers() {
179            return;
180        }
181        self.send(WsPushEvent {
182            event_type: "notify".to_string(),
183            required_scope: WsPushScope::Notify,
184            proto_id,
185            sec_key: None,
186            sub_type: None,
187            rehab_type: None,
188            acc_id: None,
189            body_b64: Self::encode_body(body),
190            trd_market: None,
191        });
192    }
193
194    /// 发送交易推送
195    ///
196    /// v1.4.105 D3 (Phase 4) T-B1: `trd_market` 由 [`PushDispatcher`] 一次
197    /// decode body 后透传, 直接塞 [`WsPushEvent.trd_market`] 给后续 Layer 3
198    /// filter 与客户端可见.
199    pub fn push_trade(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
200        if !self.has_receivers() {
201            return;
202        }
203        self.send(WsPushEvent {
204            event_type: "trade".to_string(),
205            required_scope: WsPushScope::Trade,
206            proto_id,
207            sec_key: None,
208            sub_type: None,
209            rehab_type: None,
210            acc_id: Some(acc_id),
211            body_b64: Self::encode_body(body),
212            trd_market: trd_market.map(|s| s.to_string()),
213        });
214    }
215}
216
217/// 实现 ExternalPushSink,使 WsBroadcaster 可嵌入 PushDispatcher
218impl ExternalPushSink for WsBroadcaster {
219    fn on_quote_push(
220        &self,
221        sec_key: &str,
222        sub_type: i32,
223        rehab_type: i32,
224        proto_id: u32,
225        body: &[u8],
226    ) {
227        self.push_quote(sec_key, sub_type, rehab_type, proto_id, body);
228    }
229
230    fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
231        self.push_broadcast(proto_id, body);
232    }
233
234    fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
235        self.push_trade(acc_id, proto_id, body, trd_market);
236    }
237}
238
239/// WebSocket 握手鉴权:从 `?token=xxx` 查询参数或 `Authorization: Bearer` header 提取 token
240///
241/// 浏览器 WebSocket API 不允许设置自定义 header,所以优先支持 `?token=`;
242/// 原生客户端(curl / websocat / tokio-tungstenite)可以用任一方式。
243fn extract_ws_token(headers: &HeaderMap, query: &HashMap<String, String>) -> Option<String> {
244    if let Some(t) = query.get("token") {
245        return Some(t.clone());
246    }
247    headers
248        .get("authorization")
249        .and_then(|v| v.to_str().ok())
250        .and_then(|v| futu_auth_pipeline::parse_bearer_scheme(v).map(|s| s.to_string()))
251}
252
253/// 校验 WebSocket 握手的 token;返回 `Ok(Some(rec))` 表示 scope 模式 + 通过;
254/// `Ok(None)` 表示 legacy 模式(未配 KeyStore),所有事件无条件放行。
255///
256/// - `key_store.is_configured() == false` → 无条件放行(legacy 模式)
257/// - 配置了 KeyStore:必须有 token,且 key 有 `qot:read` scope(最低门槛,
258///   实际收哪些事件由后续 push filter 按 scope 决定)
259fn authenticate_ws(
260    key_store: &KeyStore,
261    headers: &HeaderMap,
262    query: &HashMap<String, String>,
263) -> Result<Option<Arc<KeyRecord>>, (StatusCode, &'static str)> {
264    if !key_store.is_configured() {
265        return Ok(None);
266    }
267
268    let Some(token) = extract_ws_token(headers, query) else {
269        futu_auth::audit::reject(
270            "ws",
271            "/ws",
272            "<missing>",
273            "missing token (query or Authorization)",
274        );
275        return Err((StatusCode::UNAUTHORIZED, "missing api key"));
276    };
277
278    let Some(rec) = key_store.verify(&token) else {
279        futu_auth::audit::reject("ws", "/ws", "<invalid>", "invalid api key");
280        return Err((StatusCode::UNAUTHORIZED, "invalid api key"));
281    };
282
283    if rec.is_expired(Utc::now()) {
284        futu_auth::audit::reject("ws", "/ws", &rec.id, "key expired");
285        return Err((StatusCode::UNAUTHORIZED, "key expired"));
286    }
287
288    if !rec.scopes.contains(&Scope::QotRead) {
289        // v1.4.102 BUG-011 fix (P2): 不再泄露 scope 给请求方,
290        // 仅写本地 audit log. 与 REST / gRPC 同步.
291        futu_auth::audit::reject("ws", "/ws", &rec.id, "missing qot:read scope");
292        return Err((StatusCode::FORBIDDEN, "forbidden"));
293    }
294
295    futu_auth::audit::allow("ws", "/ws", &rec.id, Some("qot:read"));
296    Ok(Some(rec))
297}
298
299/// WebSocket 升级处理
300pub async fn ws_handler(
301    ws: WebSocketUpgrade,
302    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
303    headers: HeaderMap,
304    Query(query): Query<HashMap<String, String>>,
305    State(state): State<RestState>,
306) -> impl IntoResponse {
307    let peer_addr_string = peer_addr.to_string();
308    let session_id = headers
309        .get("x-request-id")
310        .or_else(|| headers.get("x-futu-session-id"))
311        .and_then(|v| v.to_str().ok())
312        .map(str::trim)
313        .filter(|v| !v.is_empty());
314    let audit_ctx =
315        futu_auth::audit::AuditContext::new(Some(peer_addr_string.as_str()), session_id);
316    let rec = match futu_auth::audit::with_context(audit_ctx.clone(), || {
317        authenticate_ws(&state.key_store, &headers, &query)
318    }) {
319        Ok(rec) => rec,
320        Err((code, msg)) => return (code, msg).into_response(),
321    };
322    // legacy(rec=None)时给个"全 scope"快照让 filter 全放行;scope 模式用 rec.scopes
323    let scopes: HashSet<Scope> = match &rec {
324        Some(r) => r.scopes.clone(),
325        None => all_scopes(),
326    };
327    let key_id = rec.as_ref().map(|r| r.id.clone());
328    // v1.4.102 codex 47 F1 / 48 F1 (P1): WS 必须独立 enforce key.allowed_acc_ids
329    // (per-key acc 白名单), 不能依赖 sub-acc-push 是否调过. 之前未调 sub-acc-push
330    // 时 fall-back 全 push, key 被限到 acc A 仍能收 acc B 的 trade push.
331    let allowed_acc_ids = rec.as_ref().and_then(|r| r.allowed_acc_ids.clone());
332    // v1.4.105 D3 (Phase 4) T-B1: per-key allowed_markets 硬限额 (大写字符
333    // 串 set, e.g. {"HK","US"}). `None` / 空 set = 无限制. WS Layer 3
334    // (TradePushFilter) 用此 set 过滤 trade event 的 trd_market.
335    let allowed_markets = rec.as_ref().and_then(|r| r.allowed_markets.clone());
336    let broadcaster = Arc::clone(&state.ws_broadcaster);
337    // v1.4.102 codex 46 F2 (P1): pass per-key acc subscription state into
338    // WS connection so trade push delivery can filter by sub-acc-push registrations.
339    let rest_acc_subs = Arc::clone(&state.rest_acc_subscriptions);
340    // v1.4.105 D4 (Phase 1): pass shared FilterRegistry into WS connection
341    // so push event filter (TradePushFilter) goes through unified registry.
342    let filter_registry = Arc::clone(&state.filter_registry);
343    let ctx = WsConnectionContext {
344        broadcaster,
345        scopes,
346        key_id,
347        allowed_acc_ids,
348        allowed_markets,
349        rest_acc_subscriptions: rest_acc_subs,
350        filter_registry,
351    };
352    ws.max_message_size(REST_WS_MAX_CONTROL_MESSAGE_SIZE_BYTES)
353        .max_frame_size(REST_WS_MAX_CONTROL_MESSAGE_SIZE_BYTES)
354        .on_upgrade(move |socket| handle_ws_connection(socket, ctx))
355        .into_response()
356}
357
358/// 全 scope 集合(legacy 模式用)
359fn all_scopes() -> HashSet<Scope> {
360    [
361        Scope::QotRead,
362        Scope::AccRead,
363        Scope::TradeSimulate,
364        Scope::TradeReal,
365    ]
366    .into_iter()
367    .collect()
368}
369
370/// 处理单个 WebSocket 连接
371///
372/// `scopes` 是该连接 key 持有的 scope 集合,用于按 `WsPushScope::required_scope()`
373/// 过滤推送事件。例如只有 `qot:read` 的 key 不会收到 `trade` 类推送。
374// v1.4.102 codex 47 F1 / 48 F1 (P1): per-key allowed_acc_ids 硬限额.
375// `None` = 该 key 无 acc 限制 (默认全开); `Some(set)` = 仅这些 acc 可见.
376struct WsConnectionContext {
377    broadcaster: Arc<WsBroadcaster>,
378    scopes: HashSet<Scope>,
379    key_id: Option<String>,
380    allowed_acc_ids: Option<HashSet<u64>>,
381    // v1.4.105 D3 (Phase 4) T-B1: caller key 的 allowed_markets 硬限额, 用于
382    // Layer 3 (TradePushFilter) 过滤. None / 空 set = 无限制.
383    allowed_markets: Option<HashSet<String>>,
384    rest_acc_subscriptions: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
385    // v1.4.105 D4 (Phase 1): 共享 FilterRegistry 实例 — push event 过滤走
386    // 同一 registry 与 4 surface (REST body filter / WS push) 一致.
387    filter_registry: Arc<futu_auth_pipeline::FilterRegistry>,
388}
389
390async fn handle_ws_connection(socket: WebSocket, ctx: WsConnectionContext) {
391    let WsConnectionContext {
392        broadcaster,
393        scopes,
394        key_id,
395        allowed_acc_ids,
396        allowed_markets,
397        rest_acc_subscriptions,
398        filter_registry,
399    } = ctx;
400
401    let (mut ws_tx, mut ws_rx) = socket.split();
402    let mut push_rx = broadcaster.subscribe();
403
404    tracing::info!(
405        key_id = ?key_id,
406        scopes = ?scopes,
407        "WebSocket push client connected"
408    );
409
410    // v1.4.106 codex 1125 F6 [P2]: REST WS notify subscription state.
411    //
412    // 对齐 C++ raw TCP `IsConnSubRecvNotify` (APIServer_Qot_PriceReminder.cpp:730-735):
413    // broadcast notify 类 push (e.g. price reminder) 必须 client 显式 sub 才下发.
414    //
415    // **Breaking change vs v1.4.105**: v1.4.105 之前 REST `/ws` 默认收所有
416    // broadcast notify; v1.4.106 起需 client 发 `{"action":"subscribe-notify"}`
417    // text message 才能继续收. 老 client 如果依赖 price reminder push 必须升级.
418    //
419    // Default false 对齐 raw TCP 默认 unsub 状态.
420    let notify_subscribed = Arc::new(std::sync::atomic::AtomicBool::new(false));
421    let notify_subscribed_for_send = Arc::clone(&notify_subscribed);
422    let notify_subscribed_for_recv = Arc::clone(&notify_subscribed);
423
424    // 推送任务:从 broadcast channel 读取事件 → 按 scope 过滤 → 发送给客户端
425    let send_scopes = scopes.clone();
426    let send_key_id_str = key_id.clone().unwrap_or_else(|| "<none>".to_string());
427    let send_key_id_for_filter = key_id.clone();
428    let rest_subs_for_filter = Arc::clone(&rest_acc_subscriptions);
429    let mut send_task = tokio::spawn(async move {
430        loop {
431            let event = match push_rx.recv().await {
432                Ok(event) => event,
433                Err(broadcast::error::RecvError::Lagged(n)) => {
434                    tracing::warn!(
435                        skipped = n,
436                        "REST WebSocket push client lagged, skipped events"
437                    );
438                    continue;
439                }
440                Err(broadcast::error::RecvError::Closed) => break,
441            };
442            // 按 client scope 过滤:key 没这个 scope 就不发
443            if !send_scopes.contains(&event.required_scope.required_scope()) {
444                // 记一次"被挡住的推送",供 Prometheus `/metrics` 观察
445                futu_auth::metrics::bump_ws_filtered(&event.event_type, &send_key_id_str);
446                continue;
447            }
448            // v1.4.106 codex 1125 F6 [P2]: notify subscribe gate.
449            // 对齐 C++ raw TCP `IsConnSubRecvNotify` (broadcast push 必须显式 sub).
450            if matches!(event.required_scope, WsPushScope::Notify)
451                && !notify_subscribed_for_send.load(std::sync::atomic::Ordering::Relaxed)
452            {
453                futu_auth::metrics::bump_ws_filtered("notify_unsub", &send_key_id_str);
454                continue;
455            }
456            // v1.4.102 codex 47 F1 / 48 F1 (P1): trade push 进 acc-id 过滤,
457            // 两层独立 enforce:
458            // 1. **key.allowed_acc_ids 硬限额** (Some(set)): event.acc_id 不在
459            //    set → drop. 与 sub-acc-push 是否调过无关 (老 key 没 sub 也强限).
460            // 2. **REST sub-acc-push state map** (sub_state): 仅当 key 已调过
461            //    sub-acc-push 才生效. entry 存在但不含 acc_id → drop. 未调过 →
462            //    pass (向后兼容老 client).
463            //
464            // codex 48 F2 P1 fix: REST sub state empty entry (Some(set) 但 set
465            // 空) 也算 "已 unsub all" tombstone, 不允许 fall back 到全 push.
466            //
467            // v1.4.103 codex F5.11 (P2) round 5: 抽 logic 到
468            // `should_drop_trade_event_for_caller` pure fn 让单测可验证.
469            //
470            // v1.4.105 D4 (Phase 1): 改走 `FilterRegistry::should_drop_event`
471            // 让 4 surface (REST `/ws` 现接 + 后续 gRPC subscribe_push 等)
472            // 共用同一 registry instance. 防 sibling-route bypass —
473            // 任何人加新 push event filter 只在 registry 注册一次, 不需
474            // 改各 surface inline. `should_drop_trade_event_for_caller`
475            // pure fn 仍保留作 unit test 直接验证 logic, 但 production 走 registry.
476            if matches!(event.required_scope, WsPushScope::Trade)
477                && let Some(event_acc) = event.acc_id
478            {
479                let sub_state_owned: Option<HashSet<u64>> =
480                    send_key_id_for_filter.as_ref().and_then(|kid| {
481                        crate::adapter::with_rest_acc_subscriptions_read(
482                            &rest_subs_for_filter,
483                            |subs| subs.get(kid).cloned(),
484                        )
485                    });
486                let ctx = futu_auth_pipeline::PushEventCtx {
487                    event_type: &event.event_type,
488                    event_acc: Some(event_acc),
489                    allowed_acc_ids: allowed_acc_ids.as_ref(),
490                    sub_state: sub_state_owned.as_ref(),
491                    // v1.4.105 D3 (Phase 4) T-B1: 真接 trd_market —
492                    // PushDispatcher 端一次 decode 后透传到 WsPushEvent.trd_market
493                    // (None = 老路径 / decode 失败 / market 未知, 不 trigger
494                    // Layer 3 drop).
495                    event_trd_market: event.trd_market.as_deref(),
496                    allowed_markets: allowed_markets.as_ref(),
497                };
498                if filter_registry.should_drop_event(&ctx) {
499                    // v1.4.105 F5.2 fix (codex review C4 4th): 4 surface 统一
500                    // metric label "trade_market" 跟 gRPC + raw TCP WS + MCP 一致,
501                    // 不再用 event.event_type (= "trade") 让跨 surface jq aggregate
502                    // 一致.
503                    futu_auth::metrics::bump_ws_filtered("trade_market", &send_key_id_str);
504                    continue;
505                }
506            }
507            let json = match serde_json::to_string(&event) {
508                Ok(j) => j,
509                Err(_) => continue,
510            };
511            if ws_tx.send(Message::Text(json.into())).await.is_err() {
512                break; // 客户端断开
513            }
514        }
515    });
516
517    // 接收任务:处理客户端消息(ping/pong/close + v1.4.106 codex 1125 F6 subscribe-notify)
518    let mut recv_task = tokio::spawn(async move {
519        while let Some(msg) = ws_rx.next().await {
520            match msg {
521                Ok(Message::Close(_)) | Err(_) => break,
522                Ok(Message::Ping(_data)) => {
523                    // axum 自动回复 pong,不需要手动处理
524                }
525                // v1.4.106 codex 1125 F6 [P2]: 处理 client 发的 JSON control message.
526                // 支持 `{"action":"subscribe-notify"}` / `{"action":"unsubscribe-notify"}`.
527                // 对齐 C++ raw TCP IsConnSubRecvNotify 的 sub/unsub 接口.
528                Ok(Message::Text(text)) => {
529                    if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text)
530                        && let Some(action) = val.get("action").and_then(|v| v.as_str())
531                    {
532                        match action {
533                            "subscribe-notify" => {
534                                notify_subscribed_for_recv
535                                    .store(true, std::sync::atomic::Ordering::Relaxed);
536                                tracing::info!("WS client subscribed notify push");
537                            }
538                            "unsubscribe-notify" => {
539                                notify_subscribed_for_recv
540                                    .store(false, std::sync::atomic::Ordering::Relaxed);
541                                tracing::info!("WS client unsubscribed notify push");
542                            }
543                            other => {
544                                tracing::debug!(action = %other, "WS client unknown action");
545                            }
546                        }
547                    }
548                }
549                _ => {} // 忽略其他消息
550            }
551        }
552    });
553
554    // 任一任务结束则关闭连接。JoinHandle drop 只会 detach,不会取消任务;
555    // 因此必须显式 abort sibling task,避免断开的 WS client 在下一次
556    // broadcast push 前留下悬挂 send/recv loop。
557    tokio::select! {
558        _ = &mut send_task => {
559            recv_task.abort();
560        }
561        _ = &mut recv_task => {
562            send_task.abort();
563        }
564    }
565
566    tracing::info!("WebSocket push client disconnected");
567}
568
569/// v1.4.103 codex F5.11 (P2) round 5: pure-fn 提取 trade event 过滤决策让单测
570/// 可以验证. 不直接 hit handle_ws_connection 异步通路 (需 WebSocket infra),
571/// 但 logic 与该函数 inline 顺序 1:1 对齐.
572///
573/// **行为**:
574/// - event_acc=None (非 trade event 没 acc_id): 不 drop
575/// - Layer 1: `allowed_acc_ids` 非空 + event_acc ∉ allowed → drop
576/// - Layer 2: `sub_state` 含 caller key 的 entry (即使空 tombstone) + event_acc ∉
577///   set → drop. **空 set = unsub-all tombstone, 全 drop 一切 trade**
578/// - Layer 2 missing entry (caller 从未调过 sub-acc-push) → 不 drop (向后兼容).
579///
580/// 返 `true` 表示 drop, `false` 表示 deliver.
581///
582/// v1.4.105 D4 (Phase 1): production 路径已切到 `FilterRegistry::should_drop_event`
583/// (走 `TradePushFilter` impl, logic 等价). 这个 pure fn 现仅作 unit test target
584/// 直接验证 logic, 不再 production 调用. 保留 + `#[cfg(test)]` 让 dead_code 不警告.
585#[cfg(test)]
586pub(crate) fn should_drop_trade_event_for_caller(
587    allowed_acc_ids: Option<&HashSet<u64>>,
588    sub_state: Option<&HashSet<u64>>,
589    event_acc: u64,
590) -> bool {
591    // Layer 1: hard allowed_acc_ids whitelist (caller key 限制).
592    if let Some(allowed) = allowed_acc_ids
593        && !allowed.is_empty()
594        && !allowed.contains(&event_acc)
595    {
596        return true;
597    }
598    // Layer 2: per-key REST sub state (opt-in via /api/sub-acc-push).
599    // - entry 存在 (sub-acc-push 调过): 必须 acc_id ∈ set (空 set =
600    //   unsub all tombstone → drop 一切)
601    // - entry 不存在 (从未调 sub-acc-push): 不 drop (向后兼容老 client)
602    if let Some(set) = sub_state
603        && !set.contains(&event_acc)
604    {
605        return true;
606    }
607    false
608}
609
610#[cfg(test)]
611mod tests;