futu_server/ws_listener.rs
1// WebSocket 监听器:接受 WebSocket 连接,复用 TCP 的请求路由和连接池
2//
3// 每个 WebSocket 二进制消息 = 一个完整的 FutuAPI 帧(44 字节帧头 + body)。
4// 与 TCP 共享同一个 connections DashMap、RequestRouter、SubscriptionManager。
5//
6// ## v1.0 鉴权
7//
8// 握手阶段(accept_hdr_async)校验 HTTP `Authorization: Bearer <token>` 或
9// `?token=<plaintext>` query —— 通过 `KeyStore::verify` 得到 `KeyRecord`,
10// 把 scope 集合和 key_id 存到 `ClientConn`。每条消息进 `ws_process_requests`
11// 时按 `futu_auth::scope_for_proto_id(proto_id)` 查所需 scope,不匹配 → 不 dispatch、
12// 记 audit reject。`trade:real` 额外跑 `check_and_commit` 过一道 rate + hours
13// 全局闸门。未注入 KeyStore(TCP listener 或 legacy 模式)→ scopes 空集被
14// 解释为"全放行",保持向后兼容。
15//
16// ## v1.4.104 阶段 3
17//
18// 把 inline scope gate / rate gate / body-aware acc_id check 替换为
19// `futu_auth_pipeline::authenticate_request` 单一调用. 流程:
20// 1. AES decrypt (handshake protocol INIT_CONNECT 跳过)
21// 2. (非 INIT_CONNECT + 非 1xxx 系统协议) → 调 pipeline
22// Credential::PreVerified(rec_from_get_by_id) 拿 SIGHUP-aware 最新 rec.
23// Reject → drop. Allow → 取 allowed_acc_ids 给 response filter.
24// 3. dispatch (router / handle_init_connect / handle_keepalive)
25// 4. response filter (proto 2001 TRD_GET_ACC_LIST 等)
26//
27// 删除: 旧 `ws_body_aware_check` (功能进 pipeline body_aware::build_check_ctxs).
28// 删除: 旧 inline scope gate + rate gate + audit allow/reject. 全 pipeline 一处.
29
30use std::collections::HashSet;
31use std::sync::Arc;
32use std::sync::atomic::AtomicI64;
33use std::time::Instant;
34
35use dashmap::DashMap;
36use tokio::net::TcpListener;
37use tokio::sync::{mpsc, watch};
38
39mod connection;
40mod handshake;
41
42use connection::run_ws_connection;
43use handshake::AuthResult;
44
45use futu_auth::{KeyStore, RuntimeCounters};
46use futu_auth_pipeline::{
47 AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, RejectKind, SurfaceId,
48 authenticate_request,
49};
50
51/// v1.4.106 D1 5c: WS surface adapter — `AuthDecision::Reject` 翻成 silent drop.
52///
53/// **历史**: WS 在 v1.4.103 / v1.4.104 阶段 3 都按 silent drop 处理 reject
54/// (与 v1.4.103 行为一致, 防 timing 探测 — 给客户端任何 wire response 都让
55/// 它知道帧被读了 vs 没读). v1.4.106 D1 把这层"翻译为 unit"也走 trait, 让 4
56/// surface SurfaceAdapter 一致.
57///
58/// **WireResponse = ()**: WS 不发任何东西回 client; 调用方拿 `Option<()>`
59/// 知道是 reject (Some) 还是 allow (None) 即可继续 dispatch / 丢弃.
60///
61/// **不变量**: `translate_reject` 不能写日志 (pipeline 已 audit::reject 一次,
62/// 不要重复). 只 drain reason / kind 让它进 `_`.
63pub struct WsAdapter;
64
65impl futu_auth_pipeline::SurfaceAdapter for WsAdapter {
66 type WireResponse = ();
67
68 fn surface_id() -> SurfaceId {
69 SurfaceId::Ws
70 }
71
72 fn translate_reject(_kind: RejectKind, _reason: String) -> Self::WireResponse {
73 // 与 v1.4.103/104 行为一致: silent drop. pipeline 已 audit reject,
74 // 不再写 log; reject fields intentionally unused to avoid leaking daemon state.
75 }
76}
77use futu_codec::header::ProtoFmtType;
78use futu_core::proto_id;
79
80use crate::conn::{ClientConn, ConnState, DisconnectNotify, IncomingRequest};
81use crate::listener::{
82 MAX_CONNECTIONS, REQUEST_QUEUE_CAPACITY, ServerConfig, default_server_time_offset_secs,
83 server_now_ts,
84};
85use crate::router::RequestRouter;
86
87/// WebSocket 服务端
88pub struct WsServer {
89 listen_addr: String,
90 config: ServerConfig,
91 connections: Arc<DashMap<u64, ClientConn>>,
92 router: Arc<RequestRouter>,
93 subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
94 /// v1.0:握手时做 Bearer token 鉴权。None 或 `!is_configured()` → legacy 模式放行
95 key_store: Option<Arc<KeyStore>>,
96 /// v1.0:跨 REST / gRPC / WS 共享的限额 counters
97 counters: Option<Arc<RuntimeCounters>>,
98 /// v1.4.104 阶段 3: 跨 surface 共享的 response filter registry (proto 2001
99 /// TRD_GET_ACC_LIST 默认装入). None 时 fallback 到内置 with_defaults.
100 filter_registry: Option<Arc<FilterRegistry>>,
101 server_time_offset_secs: Arc<AtomicI64>,
102}
103
104/// Shared dependencies for the WebSocket server.
105///
106/// These are the same runtime objects the raw TCP server owns. Keeping them in a
107/// bundle avoids every constructor carrying a growing positional list as auth /
108/// counters / filters evolve.
109pub struct WsServerDeps {
110 connections: Arc<DashMap<u64, ClientConn>>,
111 router: Arc<RequestRouter>,
112 subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
113}
114
115impl WsServerDeps {
116 pub fn new(
117 connections: Arc<DashMap<u64, ClientConn>>,
118 router: Arc<RequestRouter>,
119 subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
120 ) -> Self {
121 Self {
122 connections,
123 router,
124 subscriptions,
125 }
126 }
127}
128
129impl WsServer {
130 /// 创建 WsServer,共享 TCP 的连接池、路由器、订阅管理器(无鉴权,向后兼容)
131 pub fn new(
132 listen_addr: String,
133 config: ServerConfig,
134 connections: Arc<DashMap<u64, ClientConn>>,
135 router: Arc<RequestRouter>,
136 subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
137 ) -> Self {
138 Self::with_auth(
139 listen_addr,
140 config,
141 WsServerDeps::new(connections, router, subscriptions),
142 None,
143 None,
144 )
145 }
146
147 /// v1.0 入口:同时接入 KeyStore + 共享 RuntimeCounters 做握手鉴权和 per-message
148 /// scope / 限额检查。`key_store = None` 或未配置时保持 legacy(全放行)。
149 pub fn with_auth(
150 listen_addr: String,
151 config: ServerConfig,
152 deps: WsServerDeps,
153 key_store: Option<Arc<KeyStore>>,
154 counters: Option<Arc<RuntimeCounters>>,
155 ) -> Self {
156 Self {
157 listen_addr,
158 config,
159 connections: deps.connections,
160 router: deps.router,
161 subscriptions: deps.subscriptions,
162 key_store,
163 counters,
164 filter_registry: None,
165 server_time_offset_secs: default_server_time_offset_secs(),
166 }
167 }
168
169 /// Inject backend server-time offset for SDK-facing WS protocol fields.
170 pub fn with_server_time_offset_secs(mut self, offset: Arc<AtomicI64>) -> Self {
171 self.server_time_offset_secs = offset;
172 self
173 }
174
175 /// v1.4.104 阶段 3: 注入显式 FilterRegistry (跨 surface 共享同一份).
176 /// 不调用此 setter 则 run() 时 fallback 到 `FilterRegistry::with_defaults()`.
177 pub fn with_filter_registry(mut self, registry: Arc<FilterRegistry>) -> Self {
178 self.filter_registry = Some(registry);
179 self
180 }
181
182 /// 启动 WebSocket 服务端监听
183 pub async fn run(&self) -> anyhow::Result<()> {
184 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
185 self.run_until_shutdown(shutdown_rx).await
186 }
187
188 /// 启动 WebSocket 服务端监听,并在 shutdown 信号到来时停止接受新连接。
189 pub async fn run_until_shutdown(
190 &self,
191 mut shutdown_rx: watch::Receiver<bool>,
192 ) -> anyhow::Result<()> {
193 let listener = TcpListener::bind(&self.listen_addr).await?;
194 tracing::info!(addr = %self.listen_addr, "WebSocket server listening");
195
196 let (req_tx, req_rx) = mpsc::channel::<IncomingRequest>(REQUEST_QUEUE_CAPACITY);
197 // The disconnect cleanup signal is intentionally unbounded: each item
198 // is a tiny `conn_id`, and blocking this path behind request-queue
199 // backpressure would risk leaking connection/subscription state.
200 let (disconnect_tx, mut disconnect_rx) = mpsc::unbounded_channel::<DisconnectNotify>();
201
202 // 启动请求处理任务(与 TCP 共享同一逻辑)
203 let connections = Arc::clone(&self.connections);
204 let router = Arc::clone(&self.router);
205 let config = self.config.clone();
206 // v1.4.104 阶段 3: KeyStore + RuntimeCounters 总是材料化 (legacy mode 用
207 // empty / new()), 让 ws_process_requests 拿 non-Option Arc 直接调 pipeline.
208 // 行为与 v1.4.103 等价: KeyStore::empty().is_configured() = false, pipeline
209 // 走 legacy short-circuit (Allow{rec:None} 不 audit, body-aware 不 enforce).
210 let key_store_for_process = self
211 .key_store
212 .clone()
213 .unwrap_or_else(|| Arc::new(KeyStore::empty()));
214 let counters_for_process = self
215 .counters
216 .clone()
217 .unwrap_or_else(|| Arc::new(RuntimeCounters::new()));
218 let filter_registry_for_process = self
219 .filter_registry
220 .clone()
221 .unwrap_or_else(|| Arc::new(FilterRegistry::with_defaults()));
222 let server_time_offset_secs = Arc::clone(&self.server_time_offset_secs);
223 tokio::spawn(async move {
224 ws_process_requests(
225 req_rx,
226 connections,
227 router,
228 config,
229 counters_for_process,
230 key_store_for_process,
231 filter_registry_for_process,
232 server_time_offset_secs,
233 )
234 .await;
235 });
236
237 // 启动连接清理任务
238 let cleanup_connections = Arc::clone(&self.connections);
239 let cleanup_subs = self.subscriptions.clone();
240 tokio::spawn(async move {
241 while let Some(notify) = disconnect_rx.recv().await {
242 let removed = cleanup_connections.remove(¬ify.conn_id);
243 if removed.is_some() {
244 if let Some(ref subs) = cleanup_subs {
245 subs.on_disconnect(notify.conn_id);
246 }
247 tracing::info!(
248 conn_id = notify.conn_id,
249 remaining = cleanup_connections.len(),
250 "ws connection removed from pool"
251 );
252 }
253 }
254 });
255
256 // 接受连接循环
257 let connections = Arc::clone(&self.connections);
258 let key_store_accept = self.key_store.clone();
259 // v1.4.104 阶段 3: scope_mode 局部计算 (KeyStore configured = 启用 auth).
260 let scope_mode = self.key_store.as_ref().is_some_and(|ks| ks.is_configured());
261 if !scope_mode {
262 // v1.4.93 P0-5 (NEW-C-02): 加强 legacy mode loud WARN —
263 // 对齐 REST mutating-blocked policy 的 startup signal。任何
264 // 未授权客户端都可 handshake + 接收 push(HTTP 101 OK)。本版
265 // **不 reject**(保持向后兼容),未来 v2 默认 reject。
266 tracing::warn!("{}", legacy_mode_warn_tracing_message());
267 eprintln!("{}", legacy_mode_warn_stderr_message());
268 }
269 loop {
270 let (stream, peer_addr) = tokio::select! {
271 _ = crate::listener::shutdown_requested(&mut shutdown_rx) => {
272 tracing::info!("WebSocket server accept loop stopped by shutdown signal");
273 break;
274 }
275 accepted = listener.accept() => accepted?,
276 };
277
278 if connections.len() >= MAX_CONNECTIONS {
279 tracing::warn!(
280 peer = %peer_addr,
281 "max connections reached ({}), rejecting ws client",
282 MAX_CONNECTIONS,
283 );
284 drop(stream);
285 continue;
286 }
287
288 let conn_id = ClientConn::generate_conn_id();
289 let aes_key = ClientConn::generate_aes_key();
290 crate::listener::set_nodelay_with_log(&stream, peer_addr, "ws");
291
292 tracing::info!(
293 conn_id = conn_id,
294 peer = %peer_addr,
295 total = connections.len() + 1,
296 "ws client connected"
297 );
298
299 let (tx, authed) = run_ws_connection(
300 stream,
301 peer_addr,
302 conn_id,
303 aes_key,
304 req_tx.clone(),
305 disconnect_tx.clone(),
306 shutdown_rx.clone(),
307 key_store_accept.clone(),
308 )
309 .await;
310
311 // 握手鉴权失败 → run_ws_connection 已经 drop 连接;这里什么都不做
312 let Some(authed) = authed else {
313 continue;
314 };
315
316 let (key_id, scopes, allowed_markets, allowed_acc_ids) = match authed {
317 AuthResult::Authenticated(rec) => (
318 Some(rec.id.clone()),
319 rec.scopes.clone(),
320 // v1.4.105 D3 (Phase 4) T-B2: 拷贝 caller key 的 allowed_markets
321 // 到 ClientConn 让 PushDispatcher::push_trd_acc Layer 3 用.
322 rec.allowed_markets
323 .as_ref()
324 .map(|s| std::sync::Arc::new(s.clone())),
325 // codex round 1 F4 (P2) v1.4.105: 拷贝 caller key 的
326 // allowed_acc_ids 到 ClientConn 让 PushDispatcher::push_trd_acc
327 // Layer 1 push-time 硬过滤. 防 stale subscription /
328 // KeyRecord reload 后 acc 范围窄化 时 push leak.
329 rec.allowed_acc_ids
330 .as_ref()
331 .map(|s| std::sync::Arc::new(s.clone())),
332 ),
333 AuthResult::Legacy => (None, HashSet::new(), None, None),
334 };
335
336 let conn = ClientConn {
337 conn_id,
338 state: ConnState::Connected,
339 aes_key,
340 aes_encrypt_enabled: false,
341 proto_fmt_type: ProtoFmtType::Protobuf,
342 last_keepalive: Instant::now(),
343 recv_notify: false,
344 ai_type: 0,
345 keepalive_count: std::sync::atomic::AtomicU32::new(0),
346 tx,
347 key_id,
348 scopes,
349 allowed_markets,
350 allowed_acc_ids,
351 };
352
353 connections.insert(conn_id, conn);
354 }
355
356 Ok(())
357 }
358}
359
360/// v1.4.93 P0-5 (NEW-C-02): legacy mode 的 `tracing::warn!` 内容。
361///
362/// 抽出 const fn 以便单测验证 warn 消息携带 "v2"/"reject" 等关键提示词,
363/// 防止后续被误删(同模式 v1.4.86 SEC-003 Q4 已沉淀)。
364pub(crate) const fn legacy_mode_warn_tracing_message() -> &'static str {
365 "WS server running WITHOUT API key auth (legacy mode); \
366 all WS clients accept unauthenticated handshake (no-token / \
367 wrong-bearer / bogus-query all return success). \
368 Pass KeyStore via with_auth() to enable. \
369 v2 will default-reject; migrate to --rest-keys-file / --ws-keys-file for production."
370}
371
372/// v1.4.93 P0-5 (NEW-C-02): legacy mode 的 stderr 用户可见消息。
373///
374/// 比 tracing::warn 更短,方便 systemd / docker logs 一行抓住。
375pub(crate) const fn legacy_mode_warn_stderr_message() -> &'static str {
376 "⚠️ WS server (legacy mode, no --ws-keys-file): \
377 unauthenticated handshakes accepted. v2 will default-reject. \
378 Migrate to --ws-keys-file for production."
379}
380
381/// 处理 WebSocket 连接的请求(逻辑与 TCP 的 process_requests 相同,额外做 scope / 限额)
382///
383/// v1.4.104 阶段 3 重构: inline scope gate + trade:real rate gate +
384/// `ws_body_aware_check` 三段折叠为单一 `authenticate_request` pipeline 调用.
385///
386/// **流程**:
387/// 1. AES decrypt (handshake INIT_CONNECT 跳过 — body 是明文 RSA 加密 InitReq)
388/// 2. 非 INIT_CONNECT + scope_for_proto_id != None → 调 pipeline:
389/// - Credential::PreVerified(`KeyStore::get_by_id(key_id)`) 拿 SIGHUP-aware
390/// 最新 rec, 复用 handshake 已 verify 的身份 (skip re-verify).
391/// - Endpoint::Proto(proto_id), commit_rate=true (per-msg rate 闸门).
392/// - Reject → drop request (audit 已 emit). Allow → 拿 allowed_acc_ids.
393/// 3. dispatch (router / handle_init_connect / handle_keepalive)
394/// 4. response filter (proto 2001 等, 通过 FilterRegistry::apply).
395///
396/// **legacy mode 行为**: KeyStore::empty().is_configured() = false, pipeline
397/// 走 legacy short-circuit (Allow{rec:None}, 不 audit, body-aware 不 enforce).
398/// 1xxx 系统协议 (INIT_CONNECT / KEEP_ALIVE / GET_GLOBAL_STATE) 与 v1.4.103
399/// 一致跳过 pipeline 直接 dispatch (handshake / heartbeat / 公开协议无 scope).
400async fn ws_process_requests(
401 mut req_rx: mpsc::Receiver<IncomingRequest>,
402 connections: Arc<DashMap<u64, ClientConn>>,
403 router: Arc<RequestRouter>,
404 config: ServerConfig,
405 counters: Arc<RuntimeCounters>,
406 key_store: Arc<KeyStore>,
407 filter_registry: Arc<FilterRegistry>,
408 server_time_offset_secs: Arc<AtomicI64>,
409) {
410 use crate::listener::ApiServer;
411
412 while let Some(mut req) = req_rx.recv().await {
413 let conn_id = req.conn_id;
414 let proto_id_val = req.proto_id;
415 let serial_no = req.serial_no;
416
417 // 更新 last_keepalive(任何包都算活跃)
418 if let Some(mut conn) = connections.get_mut(&conn_id) {
419 conn.last_keepalive = Instant::now();
420 }
421
422 // ── Step 0: v1.4.106 codex 0532 F3 (P2): daemon-internal proto_id
423 // (高位 0x8000_0000 bit) 绝不应从 raw WS 公开 surface 进入 — 仅 REST
424 // handler 内部合成给 router. 在 AES decrypt 前显式 reject + log,
425 // 防探测 daemon 内部 routing.
426 if futu_auth::is_internal_proto_id(proto_id_val) {
427 tracing::warn!(
428 conn_id,
429 proto_id = proto_id_val,
430 "rejecting daemon-internal proto_id at raw WS public surface (audit 0532 F3)"
431 );
432 continue;
433 }
434
435 // ── Step 1: AES decrypt (always for non-INIT_CONNECT) ─────────────────
436 // pipeline + body-aware 都需 plaintext body. INIT_CONNECT body 是 RSA
437 // 加密的 ConnInitReq, 由 handle_init_connect 自行 RSA decrypt.
438 if proto_id_val != proto_id::INIT_CONNECT
439 && let Some(conn) = connections.get(&conn_id)
440 && conn.aes_encrypt_enabled
441 {
442 match conn.decrypt_body(&req.body) {
443 Ok(decrypted) => {
444 req.body = bytes::Bytes::from(decrypted);
445 }
446 Err(e) => {
447 tracing::warn!(
448 conn_id = conn_id,
449 proto_id = proto_id_val,
450 error = %e,
451 "ws AES decrypt request failed, dropping"
452 );
453 continue;
454 }
455 }
456 }
457
458 // ── Step 2: Pipeline auth ─────────────────────────────────────────────
459 // INIT_CONNECT (handshake) + 1xxx 系统协议 (scope_for_proto_id == None)
460 // 跳 pipeline. 与 v1.4.103 行为一致: 系统协议不审 / 不限频 / 直 dispatch.
461 let needed_scope = futu_auth_pipeline::capability::scope_for_proto_id(proto_id_val);
462 // codex 0522 F1 v1.4.106: 提前抓 conn.key_id 快照让 dispatch IncomingRequest
463 // 也能填 caller_key_id (per-call snapshot, 与 caller_allowed_acc_ids 同源).
464 // 即使 INIT_CONNECT / 1xxx 系统协议 (跳 pipeline) 也带上 — handler 可基于
465 // key_id 做 per-key 订阅配额 / cleanup / 审计.
466 let dispatch_caller_key_id: Option<String> =
467 connections.get(&conn_id).and_then(|c| c.key_id.clone());
468 let allowed_acc_ids_for_resp_filter: Option<HashSet<u64>> =
469 if proto_id_val == proto_id::INIT_CONNECT || needed_scope.is_none() {
470 None
471 } else {
472 // 从 conn 取 key_id 快照, 然后 KeyStore::get_by_id 拿 SIGHUP-aware
473 // 最新 rec (limits / scopes / allowed_acc_ids 都跟最新). 找不到 ↦
474 // Credential::None (legacy mode 放行 / scope mode reject Unauth).
475 let key_id_snap = dispatch_caller_key_id.clone();
476 let rec_opt = key_id_snap.as_ref().and_then(|id| key_store.get_by_id(id));
477 let credential = match rec_opt {
478 Some(rec) => Credential::PreVerified(rec),
479 None => Credential::None,
480 };
481
482 let env = AuthEnvelope {
483 surface: SurfaceId::Ws,
484 endpoint: Endpoint::Proto(proto_id_val),
485 needed_scope,
486 credential,
487 proto_id: Some(proto_id_val),
488 body: &req.body,
489 explicit_acc_id: None,
490 explicit_ctx: None,
491 commit_rate: true, // WS per-msg 是 trade:real 唯一 rate gate
492 audit_emit: true,
493 };
494 let session_id = conn_id.to_string();
495 let audit_ctx =
496 futu_auth::audit::AuditContext::new(None::<&str>, Some(session_id.as_str()));
497
498 // v1.4.106 D1 5c: 走 SurfaceAdapter trait
499 // (`WsAdapter::translate_decision`), 与 4 surface 一致.
500 // Allow → Some(allowed_acc_ids), Reject → None + silent drop.
501 use futu_auth_pipeline::SurfaceAdapter;
502 match futu_auth::audit::with_context(audit_ctx.clone(), || {
503 authenticate_request(&key_store, &counters, env)
504 }) {
505 AuthDecision::Allow {
506 allowed_acc_ids, ..
507 } => allowed_acc_ids,
508 decision @ AuthDecision::Reject { .. } => {
509 // WsAdapter::translate_decision 返 Some(()) 表 reject
510 // (silent drop). pipeline 已 audit reject, 这里不打 log.
511 let silent_drop = WsAdapter::translate_decision(decision);
512 debug_assert!(silent_drop.is_some());
513 continue;
514 }
515 }
516 };
517
518 // ── Step 3: Dispatch ──────────────────────────────────────────────────
519 let response_body = match proto_id_val {
520 proto_id::INIT_CONNECT => match connections.get_mut(&conn_id) {
521 Some(mut conn) => match conn.handle_init_connect(
522 &req.body,
523 config.server_ver,
524 config.login_user_id,
525 config.keepalive_interval,
526 config.rsa_private_key.as_deref(),
527 ) {
528 Ok(body) => Some(body),
529 Err(error) => {
530 tracing::warn!(
531 conn_id,
532 proto_id = proto_id_val,
533 error = %error,
534 "ws InitConnect handling failed"
535 );
536 None
537 }
538 },
539 None => {
540 tracing::warn!(
541 conn_id,
542 proto_id = proto_id_val,
543 "ws InitConnect request received for missing connection"
544 );
545 None
546 }
547 },
548 proto_id::KEEP_ALIVE => match connections.get(&conn_id) {
549 Some(conn) => match conn
550 .handle_keepalive_at(&req.body, server_now_ts(&server_time_offset_secs))
551 {
552 Ok(body) => Some(body),
553 Err(error) => {
554 tracing::warn!(
555 conn_id,
556 proto_id = proto_id_val,
557 error = %error,
558 "ws KeepAlive handling failed"
559 );
560 None
561 }
562 },
563 None => {
564 tracing::warn!(
565 conn_id,
566 proto_id = proto_id_val,
567 "ws KeepAlive request received for missing connection"
568 );
569 None
570 }
571 },
572 _ => {
573 // v1.4.105 D2 T-A1 fix: caller_allowed_acc_ids 从 pipeline allow
574 // decision 真填进 IncomingRequest, 让 dispatch handler (e.g.
575 // SubAccPushHandler) 端 enforce per-acc whitelist defense-in-depth.
576 // codex 0522 F1 v1.4.106: 同步填 caller_key_id (per-call snapshot
577 // 来自 conn.key_id), 让 cross-surface handler 都能识别 caller.
578 let dispatch_req = IncomingRequest::builder(
579 req.conn_id,
580 req.proto_id,
581 req.serial_no,
582 req.proto_fmt_type,
583 req.body.clone(),
584 )
585 .with_idempotency_key(req.idempotency_key.clone())
586 .with_caller_scope(
587 allowed_acc_ids_for_resp_filter
588 .as_ref()
589 .map(|s| std::sync::Arc::new(s.clone())),
590 dispatch_caller_key_id.clone(),
591 )
592 .build();
593 router.dispatch(conn_id, &dispatch_req).await
594 }
595 };
596
597 // ── Step 4: Response filter (TRD_GET_ACC_LIST 等) ────────────────────
598 if let Some(body) = response_body {
599 // FilterRegistry::apply(): proto_id 未注册 → 原 body 不动 (no-op).
600 // 注册了 (e.g. proto 2001) → filter by allowed_acc_ids.
601 let filtered =
602 filter_registry.apply(proto_id_val, body, allowed_acc_ids_for_resp_filter.as_ref());
603 ApiServer::send_response(&connections, conn_id, proto_id_val, serial_no, filtered)
604 .await;
605 }
606 }
607}
608
609#[cfg(test)]
610mod tests;