Skip to main content

futu_server/
ws_listener.rs

1// WebSocket 监听器:接受 WebSocket 连接,复用 TCP 的请求路由和连接池
2//
3// 每个 WebSocket 二进制消息 = 一个完整的 FutuAPI 帧(44 字节帧头 + body)。
4// 与 TCP 共享同一个 connections DashMap、RequestRouter、SubscriptionManager。
5//
6// ## v1.0 鉴权
7//
8// 握手阶段(accept_hdr_async)校验 HTTP `Authorization: Bearer <token>` 或
9// `?token=<plaintext>` query —— 通过 `KeyStore::verify` 得到 `KeyRecord`,
10// 把 scope 集合和 key_id 存到 `ClientConn`。每条消息进 `ws_process_requests`
11// 时按 `futu_auth::scope_for_proto_id(proto_id)` 查所需 scope,不匹配 → 不 dispatch、
12// 记 audit reject。`trade:real` 额外跑 `check_and_commit` 过一道 rate + hours
13// 全局闸门。未注入 KeyStore(TCP listener 或 legacy 模式)→ scopes 空集被
14// 解释为"全放行",保持向后兼容。
15//
16// ## v1.4.104 阶段 3
17//
18// 把 inline scope gate / rate gate / body-aware acc_id check 替换为
19// `futu_auth_pipeline::authenticate_request` 单一调用. 流程:
20//   1. AES decrypt (handshake protocol INIT_CONNECT 跳过)
21//   2. (非 INIT_CONNECT + 非 1xxx 系统协议) → 调 pipeline
22//      Credential::PreVerified(rec_from_get_by_id) 拿 SIGHUP-aware 最新 rec.
23//      Reject → drop. Allow → 取 allowed_acc_ids 给 response filter.
24//   3. dispatch (router / handle_init_connect / handle_keepalive)
25//   4. response filter (proto 2001 TRD_GET_ACC_LIST 等)
26//
27// 删除: 旧 `ws_body_aware_check` (功能进 pipeline body_aware::build_check_ctxs).
28// 删除: 旧 inline scope gate + rate gate + audit allow/reject. 全 pipeline 一处.
29
30use std::collections::HashSet;
31use std::sync::Arc;
32use std::sync::atomic::AtomicI64;
33use std::time::Instant;
34
35use dashmap::DashMap;
36use tokio::net::TcpListener;
37use tokio::sync::{mpsc, watch};
38
39mod connection;
40mod handshake;
41
42use connection::run_ws_connection;
43use handshake::AuthResult;
44
45use futu_auth::{KeyStore, RuntimeCounters};
46use futu_auth_pipeline::{
47    AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, RejectKind, SurfaceId,
48    authenticate_request,
49};
50
51/// v1.4.106 D1 5c: WS surface adapter — `AuthDecision::Reject` 翻成 silent drop.
52///
53/// **历史**: WS 在 v1.4.103 / v1.4.104 阶段 3 都按 silent drop 处理 reject
54/// (与 v1.4.103 行为一致, 防 timing 探测 — 给客户端任何 wire response 都让
55/// 它知道帧被读了 vs 没读). v1.4.106 D1 把这层"翻译为 unit"也走 trait, 让 4
56/// surface SurfaceAdapter 一致.
57///
58/// **WireResponse = ()**: WS 不发任何东西回 client; 调用方拿 `Option<()>`
59/// 知道是 reject (Some) 还是 allow (None) 即可继续 dispatch / 丢弃.
60///
61/// **不变量**: `translate_reject` 不能写日志 (pipeline 已 audit::reject 一次,
62/// 不要重复). 只 drain reason / kind 让它进 `_`.
63pub struct WsAdapter;
64
65impl futu_auth_pipeline::SurfaceAdapter for WsAdapter {
66    type WireResponse = ();
67
68    fn surface_id() -> SurfaceId {
69        SurfaceId::Ws
70    }
71
72    fn translate_reject(_kind: RejectKind, _reason: String) -> Self::WireResponse {
73        // 与 v1.4.103/104 行为一致: silent drop. pipeline 已 audit reject,
74        // 不再写 log; reject fields intentionally unused to avoid leaking daemon state.
75    }
76}
77use futu_codec::header::ProtoFmtType;
78use futu_core::proto_id;
79
80use crate::conn::{ClientConn, ConnState, DisconnectNotify, IncomingRequest};
81use crate::listener::{
82    MAX_CONNECTIONS, REQUEST_QUEUE_CAPACITY, ServerConfig, default_server_time_offset_secs,
83    server_now_ts,
84};
85use crate::router::RequestRouter;
86
87/// WebSocket 服务端
88pub struct WsServer {
89    listen_addr: String,
90    config: ServerConfig,
91    connections: Arc<DashMap<u64, ClientConn>>,
92    router: Arc<RequestRouter>,
93    subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
94    /// v1.0:握手时做 Bearer token 鉴权。None 或 `!is_configured()` → legacy 模式放行
95    key_store: Option<Arc<KeyStore>>,
96    /// v1.0:跨 REST / gRPC / WS 共享的限额 counters
97    counters: Option<Arc<RuntimeCounters>>,
98    /// v1.4.104 阶段 3: 跨 surface 共享的 response filter registry (proto 2001
99    /// TRD_GET_ACC_LIST 默认装入). None 时 fallback 到内置 with_defaults.
100    filter_registry: Option<Arc<FilterRegistry>>,
101    server_time_offset_secs: Arc<AtomicI64>,
102}
103
104/// Shared dependencies for the WebSocket server.
105///
106/// These are the same runtime objects the raw TCP server owns. Keeping them in a
107/// bundle avoids every constructor carrying a growing positional list as auth /
108/// counters / filters evolve.
109pub struct WsServerDeps {
110    connections: Arc<DashMap<u64, ClientConn>>,
111    router: Arc<RequestRouter>,
112    subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
113}
114
115impl WsServerDeps {
116    pub fn new(
117        connections: Arc<DashMap<u64, ClientConn>>,
118        router: Arc<RequestRouter>,
119        subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
120    ) -> Self {
121        Self {
122            connections,
123            router,
124            subscriptions,
125        }
126    }
127}
128
129impl WsServer {
130    /// 创建 WsServer,共享 TCP 的连接池、路由器、订阅管理器(无鉴权,向后兼容)
131    pub fn new(
132        listen_addr: String,
133        config: ServerConfig,
134        connections: Arc<DashMap<u64, ClientConn>>,
135        router: Arc<RequestRouter>,
136        subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
137    ) -> Self {
138        Self::with_auth(
139            listen_addr,
140            config,
141            WsServerDeps::new(connections, router, subscriptions),
142            None,
143            None,
144        )
145    }
146
147    /// v1.0 入口:同时接入 KeyStore + 共享 RuntimeCounters 做握手鉴权和 per-message
148    /// scope / 限额检查。`key_store = None` 或未配置时保持 legacy(全放行)。
149    pub fn with_auth(
150        listen_addr: String,
151        config: ServerConfig,
152        deps: WsServerDeps,
153        key_store: Option<Arc<KeyStore>>,
154        counters: Option<Arc<RuntimeCounters>>,
155    ) -> Self {
156        Self {
157            listen_addr,
158            config,
159            connections: deps.connections,
160            router: deps.router,
161            subscriptions: deps.subscriptions,
162            key_store,
163            counters,
164            filter_registry: None,
165            server_time_offset_secs: default_server_time_offset_secs(),
166        }
167    }
168
169    /// Inject backend server-time offset for SDK-facing WS protocol fields.
170    pub fn with_server_time_offset_secs(mut self, offset: Arc<AtomicI64>) -> Self {
171        self.server_time_offset_secs = offset;
172        self
173    }
174
175    /// v1.4.104 阶段 3: 注入显式 FilterRegistry (跨 surface 共享同一份).
176    /// 不调用此 setter 则 run() 时 fallback 到 `FilterRegistry::with_defaults()`.
177    pub fn with_filter_registry(mut self, registry: Arc<FilterRegistry>) -> Self {
178        self.filter_registry = Some(registry);
179        self
180    }
181
182    /// 启动 WebSocket 服务端监听
183    pub async fn run(&self) -> anyhow::Result<()> {
184        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
185        self.run_until_shutdown(shutdown_rx).await
186    }
187
188    /// 启动 WebSocket 服务端监听,并在 shutdown 信号到来时停止接受新连接。
189    pub async fn run_until_shutdown(
190        &self,
191        mut shutdown_rx: watch::Receiver<bool>,
192    ) -> anyhow::Result<()> {
193        let listener = TcpListener::bind(&self.listen_addr).await?;
194        tracing::info!(addr = %self.listen_addr, "WebSocket server listening");
195
196        let (req_tx, req_rx) = mpsc::channel::<IncomingRequest>(REQUEST_QUEUE_CAPACITY);
197        // The disconnect cleanup signal is intentionally unbounded: each item
198        // is a tiny `conn_id`, and blocking this path behind request-queue
199        // backpressure would risk leaking connection/subscription state.
200        let (disconnect_tx, mut disconnect_rx) = mpsc::unbounded_channel::<DisconnectNotify>();
201
202        // 启动请求处理任务(与 TCP 共享同一逻辑)
203        let connections = Arc::clone(&self.connections);
204        let router = Arc::clone(&self.router);
205        let config = self.config.clone();
206        // v1.4.104 阶段 3: KeyStore + RuntimeCounters 总是材料化 (legacy mode 用
207        // empty / new()), 让 ws_process_requests 拿 non-Option Arc 直接调 pipeline.
208        // 行为与 v1.4.103 等价: KeyStore::empty().is_configured() = false, pipeline
209        // 走 legacy short-circuit (Allow{rec:None} 不 audit, body-aware 不 enforce).
210        let key_store_for_process = self
211            .key_store
212            .clone()
213            .unwrap_or_else(|| Arc::new(KeyStore::empty()));
214        let counters_for_process = self
215            .counters
216            .clone()
217            .unwrap_or_else(|| Arc::new(RuntimeCounters::new()));
218        let filter_registry_for_process = self
219            .filter_registry
220            .clone()
221            .unwrap_or_else(|| Arc::new(FilterRegistry::with_defaults()));
222        let server_time_offset_secs = Arc::clone(&self.server_time_offset_secs);
223        tokio::spawn(async move {
224            ws_process_requests(
225                req_rx,
226                connections,
227                router,
228                config,
229                counters_for_process,
230                key_store_for_process,
231                filter_registry_for_process,
232                server_time_offset_secs,
233            )
234            .await;
235        });
236
237        // 启动连接清理任务
238        let cleanup_connections = Arc::clone(&self.connections);
239        let cleanup_subs = self.subscriptions.clone();
240        tokio::spawn(async move {
241            while let Some(notify) = disconnect_rx.recv().await {
242                let removed = cleanup_connections.remove(&notify.conn_id);
243                if removed.is_some() {
244                    if let Some(ref subs) = cleanup_subs {
245                        subs.on_disconnect(notify.conn_id);
246                    }
247                    tracing::info!(
248                        conn_id = notify.conn_id,
249                        remaining = cleanup_connections.len(),
250                        "ws connection removed from pool"
251                    );
252                }
253            }
254        });
255
256        // 接受连接循环
257        let connections = Arc::clone(&self.connections);
258        let key_store_accept = self.key_store.clone();
259        // v1.4.104 阶段 3: scope_mode 局部计算 (KeyStore configured = 启用 auth).
260        let scope_mode = self.key_store.as_ref().is_some_and(|ks| ks.is_configured());
261        if !scope_mode {
262            // v1.4.93 P0-5 (NEW-C-02): 加强 legacy mode loud WARN —
263            // 对齐 REST mutating-blocked policy 的 startup signal。任何
264            // 未授权客户端都可 handshake + 接收 push(HTTP 101 OK)。本版
265            // **不 reject**(保持向后兼容),未来 v2 默认 reject。
266            tracing::warn!("{}", legacy_mode_warn_tracing_message());
267            eprintln!("{}", legacy_mode_warn_stderr_message());
268        }
269        loop {
270            let (stream, peer_addr) = tokio::select! {
271                _ = crate::listener::shutdown_requested(&mut shutdown_rx) => {
272                    tracing::info!("WebSocket server accept loop stopped by shutdown signal");
273                    break;
274                }
275                accepted = listener.accept() => accepted?,
276            };
277
278            if connections.len() >= MAX_CONNECTIONS {
279                tracing::warn!(
280                    peer = %peer_addr,
281                    "max connections reached ({}), rejecting ws client",
282                    MAX_CONNECTIONS,
283                );
284                drop(stream);
285                continue;
286            }
287
288            let conn_id = ClientConn::generate_conn_id();
289            let aes_key = ClientConn::generate_aes_key();
290            crate::listener::set_nodelay_with_log(&stream, peer_addr, "ws");
291
292            tracing::info!(
293                conn_id = conn_id,
294                peer = %peer_addr,
295                total = connections.len() + 1,
296                "ws client connected"
297            );
298
299            let (tx, authed) = run_ws_connection(
300                stream,
301                peer_addr,
302                conn_id,
303                aes_key,
304                req_tx.clone(),
305                disconnect_tx.clone(),
306                shutdown_rx.clone(),
307                key_store_accept.clone(),
308            )
309            .await;
310
311            // 握手鉴权失败 → run_ws_connection 已经 drop 连接;这里什么都不做
312            let Some(authed) = authed else {
313                continue;
314            };
315
316            let (key_id, scopes, allowed_markets, allowed_acc_ids) = match authed {
317                AuthResult::Authenticated(rec) => (
318                    Some(rec.id.clone()),
319                    rec.scopes.clone(),
320                    // v1.4.105 D3 (Phase 4) T-B2: 拷贝 caller key 的 allowed_markets
321                    // 到 ClientConn 让 PushDispatcher::push_trd_acc Layer 3 用.
322                    rec.allowed_markets
323                        .as_ref()
324                        .map(|s| std::sync::Arc::new(s.clone())),
325                    // codex round 1 F4 (P2) v1.4.105: 拷贝 caller key 的
326                    // allowed_acc_ids 到 ClientConn 让 PushDispatcher::push_trd_acc
327                    // Layer 1 push-time 硬过滤. 防 stale subscription /
328                    // KeyRecord reload 后 acc 范围窄化 时 push leak.
329                    rec.allowed_acc_ids
330                        .as_ref()
331                        .map(|s| std::sync::Arc::new(s.clone())),
332                ),
333                AuthResult::Legacy => (None, HashSet::new(), None, None),
334            };
335
336            let conn = ClientConn {
337                conn_id,
338                state: ConnState::Connected,
339                aes_key,
340                aes_encrypt_enabled: false,
341                proto_fmt_type: ProtoFmtType::Protobuf,
342                last_keepalive: Instant::now(),
343                recv_notify: false,
344                ai_type: 0,
345                keepalive_count: std::sync::atomic::AtomicU32::new(0),
346                tx,
347                key_id,
348                scopes,
349                allowed_markets,
350                allowed_acc_ids,
351            };
352
353            connections.insert(conn_id, conn);
354        }
355
356        Ok(())
357    }
358}
359
360/// v1.4.93 P0-5 (NEW-C-02): legacy mode 的 `tracing::warn!` 内容。
361///
362/// 抽出 const fn 以便单测验证 warn 消息携带 "v2"/"reject" 等关键提示词,
363/// 防止后续被误删(同模式 v1.4.86 SEC-003 Q4 已沉淀)。
364pub(crate) const fn legacy_mode_warn_tracing_message() -> &'static str {
365    "WS server running WITHOUT API key auth (legacy mode); \
366     all WS clients accept unauthenticated handshake (no-token / \
367     wrong-bearer / bogus-query all return success). \
368     Pass KeyStore via with_auth() to enable. \
369     v2 will default-reject; migrate to --rest-keys-file / --ws-keys-file for production."
370}
371
372/// v1.4.93 P0-5 (NEW-C-02): legacy mode 的 stderr 用户可见消息。
373///
374/// 比 tracing::warn 更短,方便 systemd / docker logs 一行抓住。
375pub(crate) const fn legacy_mode_warn_stderr_message() -> &'static str {
376    "⚠️  WS server (legacy mode, no --ws-keys-file): \
377     unauthenticated handshakes accepted. v2 will default-reject. \
378     Migrate to --ws-keys-file for production."
379}
380
381/// 处理 WebSocket 连接的请求(逻辑与 TCP 的 process_requests 相同,额外做 scope / 限额)
382///
383/// v1.4.104 阶段 3 重构: inline scope gate + trade:real rate gate +
384/// `ws_body_aware_check` 三段折叠为单一 `authenticate_request` pipeline 调用.
385///
386/// **流程**:
387/// 1. AES decrypt (handshake INIT_CONNECT 跳过 — body 是明文 RSA 加密 InitReq)
388/// 2. 非 INIT_CONNECT + scope_for_proto_id != None → 调 pipeline:
389///    - Credential::PreVerified(`KeyStore::get_by_id(key_id)`) 拿 SIGHUP-aware
390///      最新 rec, 复用 handshake 已 verify 的身份 (skip re-verify).
391///    - Endpoint::Proto(proto_id), commit_rate=true (per-msg rate 闸门).
392///    - Reject → drop request (audit 已 emit). Allow → 拿 allowed_acc_ids.
393/// 3. dispatch (router / handle_init_connect / handle_keepalive)
394/// 4. response filter (proto 2001 等, 通过 FilterRegistry::apply).
395///
396/// **legacy mode 行为**: KeyStore::empty().is_configured() = false, pipeline
397/// 走 legacy short-circuit (Allow{rec:None}, 不 audit, body-aware 不 enforce).
398/// 1xxx 系统协议 (INIT_CONNECT / KEEP_ALIVE / GET_GLOBAL_STATE) 与 v1.4.103
399/// 一致跳过 pipeline 直接 dispatch (handshake / heartbeat / 公开协议无 scope).
400async fn ws_process_requests(
401    mut req_rx: mpsc::Receiver<IncomingRequest>,
402    connections: Arc<DashMap<u64, ClientConn>>,
403    router: Arc<RequestRouter>,
404    config: ServerConfig,
405    counters: Arc<RuntimeCounters>,
406    key_store: Arc<KeyStore>,
407    filter_registry: Arc<FilterRegistry>,
408    server_time_offset_secs: Arc<AtomicI64>,
409) {
410    use crate::listener::ApiServer;
411
412    while let Some(mut req) = req_rx.recv().await {
413        let conn_id = req.conn_id;
414        let proto_id_val = req.proto_id;
415        let serial_no = req.serial_no;
416
417        // 更新 last_keepalive(任何包都算活跃)
418        if let Some(mut conn) = connections.get_mut(&conn_id) {
419            conn.last_keepalive = Instant::now();
420        }
421
422        // ── Step 0: v1.4.106 codex 0532 F3 (P2): daemon-internal proto_id
423        // (高位 0x8000_0000 bit) 绝不应从 raw WS 公开 surface 进入 — 仅 REST
424        // handler 内部合成给 router. 在 AES decrypt 前显式 reject + log,
425        // 防探测 daemon 内部 routing.
426        if futu_auth::is_internal_proto_id(proto_id_val) {
427            tracing::warn!(
428                conn_id,
429                proto_id = proto_id_val,
430                "rejecting daemon-internal proto_id at raw WS public surface (audit 0532 F3)"
431            );
432            continue;
433        }
434
435        // ── Step 1: AES decrypt (always for non-INIT_CONNECT) ─────────────────
436        // pipeline + body-aware 都需 plaintext body. INIT_CONNECT body 是 RSA
437        // 加密的 ConnInitReq, 由 handle_init_connect 自行 RSA decrypt.
438        if proto_id_val != proto_id::INIT_CONNECT
439            && let Some(conn) = connections.get(&conn_id)
440            && conn.aes_encrypt_enabled
441        {
442            match conn.decrypt_body(&req.body) {
443                Ok(decrypted) => {
444                    req.body = bytes::Bytes::from(decrypted);
445                }
446                Err(e) => {
447                    tracing::warn!(
448                        conn_id = conn_id,
449                        proto_id = proto_id_val,
450                        error = %e,
451                        "ws AES decrypt request failed, dropping"
452                    );
453                    continue;
454                }
455            }
456        }
457
458        // ── Step 2: Pipeline auth ─────────────────────────────────────────────
459        // INIT_CONNECT (handshake) + 1xxx 系统协议 (scope_for_proto_id == None)
460        // 跳 pipeline. 与 v1.4.103 行为一致: 系统协议不审 / 不限频 / 直 dispatch.
461        let needed_scope = futu_auth_pipeline::capability::scope_for_proto_id(proto_id_val);
462        // codex 0522 F1 v1.4.106: 提前抓 conn.key_id 快照让 dispatch IncomingRequest
463        // 也能填 caller_key_id (per-call snapshot, 与 caller_allowed_acc_ids 同源).
464        // 即使 INIT_CONNECT / 1xxx 系统协议 (跳 pipeline) 也带上 — handler 可基于
465        // key_id 做 per-key 订阅配额 / cleanup / 审计.
466        let dispatch_caller_key_id: Option<String> =
467            connections.get(&conn_id).and_then(|c| c.key_id.clone());
468        let allowed_acc_ids_for_resp_filter: Option<HashSet<u64>> =
469            if proto_id_val == proto_id::INIT_CONNECT || needed_scope.is_none() {
470                None
471            } else {
472                // 从 conn 取 key_id 快照, 然后 KeyStore::get_by_id 拿 SIGHUP-aware
473                // 最新 rec (limits / scopes / allowed_acc_ids 都跟最新). 找不到 ↦
474                // Credential::None (legacy mode 放行 / scope mode reject Unauth).
475                let key_id_snap = dispatch_caller_key_id.clone();
476                let rec_opt = key_id_snap.as_ref().and_then(|id| key_store.get_by_id(id));
477                let credential = match rec_opt {
478                    Some(rec) => Credential::PreVerified(rec),
479                    None => Credential::None,
480                };
481
482                let env = AuthEnvelope {
483                    surface: SurfaceId::Ws,
484                    endpoint: Endpoint::Proto(proto_id_val),
485                    needed_scope,
486                    credential,
487                    proto_id: Some(proto_id_val),
488                    body: &req.body,
489                    explicit_acc_id: None,
490                    explicit_ctx: None,
491                    commit_rate: true, // WS per-msg 是 trade:real 唯一 rate gate
492                    audit_emit: true,
493                };
494                let session_id = conn_id.to_string();
495                let audit_ctx =
496                    futu_auth::audit::AuditContext::new(None::<&str>, Some(session_id.as_str()));
497
498                // v1.4.106 D1 5c: 走 SurfaceAdapter trait
499                // (`WsAdapter::translate_decision`), 与 4 surface 一致.
500                // Allow → Some(allowed_acc_ids), Reject → None + silent drop.
501                use futu_auth_pipeline::SurfaceAdapter;
502                match futu_auth::audit::with_context(audit_ctx.clone(), || {
503                    authenticate_request(&key_store, &counters, env)
504                }) {
505                    AuthDecision::Allow {
506                        allowed_acc_ids, ..
507                    } => allowed_acc_ids,
508                    decision @ AuthDecision::Reject { .. } => {
509                        // WsAdapter::translate_decision 返 Some(()) 表 reject
510                        // (silent drop). pipeline 已 audit reject, 这里不打 log.
511                        let silent_drop = WsAdapter::translate_decision(decision);
512                        debug_assert!(silent_drop.is_some());
513                        continue;
514                    }
515                }
516            };
517
518        // ── Step 3: Dispatch ──────────────────────────────────────────────────
519        let response_body = match proto_id_val {
520            proto_id::INIT_CONNECT => match connections.get_mut(&conn_id) {
521                Some(mut conn) => match conn.handle_init_connect(
522                    &req.body,
523                    config.server_ver,
524                    config.login_user_id,
525                    config.keepalive_interval,
526                    config.rsa_private_key.as_deref(),
527                ) {
528                    Ok(body) => Some(body),
529                    Err(error) => {
530                        tracing::warn!(
531                            conn_id,
532                            proto_id = proto_id_val,
533                            error = %error,
534                            "ws InitConnect handling failed"
535                        );
536                        None
537                    }
538                },
539                None => {
540                    tracing::warn!(
541                        conn_id,
542                        proto_id = proto_id_val,
543                        "ws InitConnect request received for missing connection"
544                    );
545                    None
546                }
547            },
548            proto_id::KEEP_ALIVE => match connections.get(&conn_id) {
549                Some(conn) => match conn
550                    .handle_keepalive_at(&req.body, server_now_ts(&server_time_offset_secs))
551                {
552                    Ok(body) => Some(body),
553                    Err(error) => {
554                        tracing::warn!(
555                            conn_id,
556                            proto_id = proto_id_val,
557                            error = %error,
558                            "ws KeepAlive handling failed"
559                        );
560                        None
561                    }
562                },
563                None => {
564                    tracing::warn!(
565                        conn_id,
566                        proto_id = proto_id_val,
567                        "ws KeepAlive request received for missing connection"
568                    );
569                    None
570                }
571            },
572            _ => {
573                // v1.4.105 D2 T-A1 fix: caller_allowed_acc_ids 从 pipeline allow
574                // decision 真填进 IncomingRequest, 让 dispatch handler (e.g.
575                // SubAccPushHandler) 端 enforce per-acc whitelist defense-in-depth.
576                // codex 0522 F1 v1.4.106: 同步填 caller_key_id (per-call snapshot
577                // 来自 conn.key_id), 让 cross-surface handler 都能识别 caller.
578                let dispatch_req = IncomingRequest::builder(
579                    req.conn_id,
580                    req.proto_id,
581                    req.serial_no,
582                    req.proto_fmt_type,
583                    req.body.clone(),
584                )
585                .with_idempotency_key(req.idempotency_key.clone())
586                .with_caller_scope(
587                    allowed_acc_ids_for_resp_filter
588                        .as_ref()
589                        .map(|s| std::sync::Arc::new(s.clone())),
590                    dispatch_caller_key_id.clone(),
591                )
592                .build();
593                router.dispatch(conn_id, &dispatch_req).await
594            }
595        };
596
597        // ── Step 4: Response filter (TRD_GET_ACC_LIST 等) ────────────────────
598        if let Some(body) = response_body {
599            // FilterRegistry::apply(): proto_id 未注册 → 原 body 不动 (no-op).
600            // 注册了 (e.g. proto 2001) → filter by allowed_acc_ids.
601            let filtered =
602                filter_registry.apply(proto_id_val, body, allowed_acc_ids_for_resp_filter.as_ref());
603            ApiServer::send_response(&connections, conn_id, proto_id_val, serial_no, filtered)
604                .await;
605        }
606    }
607}
608
609#[cfg(test)]
610mod tests;