Skip to main content

futu_opend/startup/
phase4.rs

1//! v1.4.110 Layer 3 A: startup Phase 4 — surface server (WS/REST/gRPC/Telnet)
2//! spawn / card_num expand / SIGHUP unified handler / TCP fail-closed gate /
3//! 主 `tokio::select!` 循环 + 清理. 抽自原 `mod.rs::run_daemon` 533..1075 行段.
4//!
5//! Phase 4 取走 `Phase1Out` 所有权 (保证 `_audit_guard` 直到 fn 返回才 drop),
6//! 同时取 `bridge: Arc<GatewayBridge>` 与 `Phase3Out` 启动各 surface server.
7
8use anyhow::{Context, Result};
9use std::sync::Arc;
10
11use futu_gateway_core::bridge::GatewayBridge;
12use futu_server::ws_listener::{WsServer, WsServerDeps};
13
14use crate::config::RuntimeConfig;
15use crate::startup::phase1::Phase1Out;
16use crate::startup::phase3::Phase3Out;
17
18mod shutdown;
19#[cfg(test)]
20mod tests;
21
22use shutdown::{
23    BackgroundJoinHandle, SHUTDOWN_GRACE_PERIOD, SurfaceJoinHandle, await_background_shutdown,
24    await_surface_shutdown, log_surface_task_result, request_surface_shutdown,
25    surface_task_result_or_pending,
26};
27
28fn merge_push_health_snapshots_for_rest(
29    push: serde_json::Value,
30    qot_login: serde_json::Value,
31    backend_connected: bool,
32) -> serde_json::Value {
33    let mut combined = match push {
34        serde_json::Value::Object(map) => map,
35        other => {
36            let mut map = serde_json::Map::new();
37            map.insert(
38                "error".to_string(),
39                serde_json::Value::String(format!(
40                    "push_health snapshot serialized to non-object {}",
41                    json_value_kind(&other)
42                )),
43            );
44            map
45        }
46    };
47    combined.insert("qot_login_health".to_string(), qot_login);
48    combined.insert(
49        "backend_connected".to_string(),
50        serde_json::Value::Bool(backend_connected),
51    );
52    serde_json::Value::Object(combined)
53}
54
55fn json_value_kind(value: &serde_json::Value) -> &'static str {
56    match value {
57        serde_json::Value::Null => "null",
58        serde_json::Value::Bool(_) => "bool",
59        serde_json::Value::Number(_) => "number",
60        serde_json::Value::String(_) => "string",
61        serde_json::Value::Array(_) => "array",
62        serde_json::Value::Object(_) => "object",
63    }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67struct LegacyNetworkExposureWarning {
68    surface: &'static str,
69    key_flag: &'static str,
70}
71
72fn is_loopback_bind(ip: &str) -> bool {
73    matches!(ip, "127.0.0.1" | "localhost" | "::1") || ip.starts_with("127.")
74}
75
76fn legacy_network_exposure_warnings(
77    ip: &str,
78    any_keys_configured: bool,
79    allow_tcp_unauthenticated: bool,
80    rest_key_configured: Option<bool>,
81    grpc_key_configured: Option<bool>,
82    ws_key_configured: Option<bool>,
83) -> Vec<LegacyNetworkExposureWarning> {
84    if is_loopback_bind(ip) {
85        return Vec::new();
86    }
87
88    let mut warnings = Vec::new();
89    if !any_keys_configured || allow_tcp_unauthenticated {
90        warnings.push(LegacyNetworkExposureWarning {
91            surface: "FTAPI TCP",
92            key_flag: "--rest-keys-file/--grpc-keys-file/--ws-keys-file",
93        });
94    }
95    for (enabled_keyed, surface, key_flag) in [
96        (rest_key_configured, "REST", "--rest-keys-file"),
97        (grpc_key_configured, "gRPC", "--grpc-keys-file"),
98        (ws_key_configured, "WS", "--ws-keys-file"),
99    ] {
100        if enabled_keyed == Some(false) {
101            warnings.push(LegacyNetworkExposureWarning { surface, key_flag });
102        }
103    }
104    warnings
105}
106
107fn warn_legacy_network_exposure(config: &RuntimeConfig, any_keys_configured: bool) {
108    for warning in legacy_network_exposure_warnings(
109        &config.ip,
110        any_keys_configured,
111        config.allow_tcp_unauthenticated,
112        config.rest_port.map(|_| config.rest_keys_file.is_some()),
113        config.grpc_port.map(|_| config.grpc_keys_file.is_some()),
114        config.websocket_port.map(|_| config.ws_keys_file.is_some()),
115    ) {
116        tracing::warn!(
117            bind_ip = %config.ip,
118            surface = warning.surface,
119            key_flag = warning.key_flag,
120            "legacy network exposure: surface is reachable on a non-loopback bind without API-key auth"
121        );
122        eprintln!(
123            "⚠️  {} legacy mode is reachable on {} without API-key auth. \
124             For production, bind 127.0.0.1 or configure {}.",
125            warning.surface, config.ip, warning.key_flag
126        );
127    }
128}
129
130pub(super) async fn run_phase4(
131    config: &RuntimeConfig,
132    phase1: Phase1Out,
133    bridge: Arc<GatewayBridge>,
134    phase3: Phase3Out,
135    shutdown_tx: tokio::sync::watch::Sender<bool>,
136    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
137) -> Result<()> {
138    let Phase1Out {
139        _audit_guard,
140        shared_counters,
141        listen_addr,
142        rest_keys_file,
143        ws_keys_file,
144        grpc_keys_file,
145        allow_tcp_unauthenticated,
146    } = phase1;
147    let Phase3Out {
148        server,
149        server_config,
150        ws_broadcaster,
151        grpc_broadcaster,
152    } = phase3;
153
154    // v1.4.103 (B10): 把 3 个 server 的 key_store Option 提升到外层作用域,
155    // 让后面的 expand_allowed_card_nums spawn task 能跨 server 块捕获.
156    // 各 server 块仍各自加载 (允许独立 keys.json), 这里只共享 Arc clone.
157    let mut ws_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
158    let mut rest_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
159    let mut grpc_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
160    let any_keys_configured =
161        rest_keys_file.is_some() || grpc_keys_file.is_some() || ws_keys_file.is_some();
162    warn_legacy_network_exposure(config, any_keys_configured);
163
164    // 7. 启动 WebSocket 服务(可选)
165    let mut ws_handle = if let Some(ws_port) = config.websocket_port {
166        let ws_addr = format!("{}:{}", config.ip, ws_port);
167        // v1.0:WS 握手鉴权 —— 复用 REST 的 key store 设计(`--ws-keys-file` 独立
168        // 指定,不指定时 legacy 放行)
169        // v1.4.102 BUG-007 fix (P1, leaf v1.4.100 报告): keys-file load 失败
170        // 必须 fail-closed (abort daemon), 不再 silent fallback to legacy mode.
171        // 历史: 用户传 `--ws-keys-file` 表示明确意图启用 auth, 文件 typo / parse
172        // 错时 daemon 只 log error 然后继续无 auth → 用户以为有门锁, 实际门锁
173        // 没装上 (security misconfig 比纯 legacy 更危险).
174        let ws_key_store = match &ws_keys_file {
175            Some(path) => match futu_auth::KeyStore::load(path) {
176                Ok(ks) => {
177                    tracing::info!(
178                        path = %path.display(),
179                        keys_loaded = ks.len(),
180                        "WS keys file loaded (Bearer/?token auth enabled)"
181                    );
182                    Some(std::sync::Arc::new(ks))
183                }
184                Err(e) => {
185                    // v1.4.102 BUG-007: fail-closed
186                    tracing::error!(
187                        error = %e,
188                        path = %path.display(),
189                        "failed to load WS keys file (--ws-keys-file 明确指定 → fail-closed). \
190                         daemon abort. fix the keys file then restart. \
191                         不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
192                    );
193                    return Err(anyhow::anyhow!(
194                        "failed to load WS keys file {}: {e}. \
195                         --ws-keys-file 明确指定 → fail-closed (BUG-007).",
196                        path.display()
197                    ));
198                }
199            },
200            None => None,
201        };
202        let ws_counters = std::sync::Arc::clone(&shared_counters);
203        // v1.4.103 (B10): clone Arc 给外层 holder 持有, 同时让 ws_server 仍接管原 Arc.
204        ws_key_store_holder = ws_key_store.as_ref().map(std::sync::Arc::clone);
205        let ws_server = WsServer::with_auth(
206            ws_addr.clone(),
207            server_config.clone(),
208            WsServerDeps::new(
209                std::sync::Arc::clone(server.connections()),
210                std::sync::Arc::clone(server.router()),
211                Some(bridge.subscription_runtime().manager()),
212            ),
213            ws_key_store,
214            Some(ws_counters),
215        )
216        .with_server_time_offset_secs(bridge.server_clock().offset_secs());
217        tracing::info!(addr = %ws_addr, "starting WebSocket server");
218        let ws_shutdown_rx = shutdown_rx.clone();
219        Some(tokio::spawn(async move {
220            ws_server
221                .run_until_shutdown(ws_shutdown_rx)
222                .await
223                .context("WebSocket server error")
224        }))
225    } else {
226        None
227    };
228
229    // 8. 启动 REST API 服务(可选,含 WebSocket 推送)
230    let mut rest_handle = if let Some(rest_port) = config.rest_port {
231        let rest_addr = format!("{}:{}", config.ip, rest_port);
232        let router = std::sync::Arc::clone(server.router());
233        let broadcaster = std::sync::Arc::clone(&ws_broadcaster);
234        // v1.4.102 BUG-007 fix: 同 WS 路径 (fail-closed when keys-file 显式指定).
235        let rest_key_store = match &rest_keys_file {
236            Some(path) => match futu_auth::KeyStore::load(path) {
237                Ok(ks) => {
238                    tracing::info!(
239                        path = %path.display(),
240                        keys_loaded = ks.len(),
241                        "REST keys file loaded (Bearer auth enabled)"
242                    );
243                    std::sync::Arc::new(ks)
244                }
245                Err(e) => {
246                    tracing::error!(
247                        error = %e,
248                        path = %path.display(),
249                        "failed to load REST keys file (--rest-keys-file 明确指定 → fail-closed). \
250                         daemon abort. fix the keys file then restart. \
251                         不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
252                    );
253                    return Err(anyhow::anyhow!(
254                        "failed to load REST keys file {}: {e}. \
255                         --rest-keys-file 明确指定 → fail-closed (BUG-007).",
256                        path.display()
257                    ));
258                }
259            },
260            None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
261        };
262        tracing::info!(addr = %rest_addr, "starting REST API server (WebSocket: /ws)");
263
264        // v1.4.103 (B10): clone Arc 给外层 holder 持有 (跨 server 块共享给
265        // expand_allowed_card_nums spawn task). 仅在 keys 实际配置时填充
266        // (empty store 不需要 expand).
267        if rest_key_store.is_configured() {
268            rest_key_store_holder = Some(std::sync::Arc::clone(&rest_key_store));
269        }
270
271        // v1.4.103 codex F3.1 (P1) round 3: REST 单独的 SIGHUP reload listener
272        // 已**移除** — 防与 card_num expand 间的 race (多 listener 并发顺序乱
273        // 可能让 reload 覆盖 expand 写入的 sentinel, 受限 key 在 reload 窗口
274        // 内 silent unrestricted). 现在 reload + expand 由文件末尾的 unified
275        // SIGHUP handler 顺序操作 (调 card_num_reload_and_expand_fn(true)).
276
277        let rest_counters = std::sync::Arc::clone(&shared_counters);
278        // v1.4.32+ admin snapshot provider:closure 捕获 `Arc<GatewayBridge>`
279        // 每次被 admin_status handler 调用时返回实时 StatusSnapshot JSON。
280        // bridge 已经在上面 Arc 化,这里只做一次 clone。
281        let bridge_for_status = std::sync::Arc::clone(&bridge);
282        let admin_status_provider: futu_rest::adapter::AdminStatusProvider =
283            std::sync::Arc::new(move || {
284                serde_json::to_value(bridge_for_status.snapshot_status())
285                    .unwrap_or_else(|_| serde_json::json!({"error": "snapshot serialize failed"}))
286            });
287        let rest_shutdown_tx = shutdown_tx.clone();
288        let admin_shutdown_handler: futu_rest::adapter::AdminShutdownHandler =
289            std::sync::Arc::new(move || {
290                rest_shutdown_tx
291                    .send(true)
292                    .map_err(|e| format!("shutdown receiver dropped: {e}"))
293            });
294        // v1.4.32+ admin reload handler:closure 调 Bridge::reload() 清 cipher cache
295        // v1.4.34: reload 升级为 async(内部刷 credentials 走网络 I/O),
296        // handler 返 Future,axum admin_reload async handler await 之
297        // v1.4.106 codex 0554 F3 [P2]: reload 拆两阶段后变 sync (sync clear +
298        // tokio::spawn refresh). closure 仍返 Future 类型不变 (向后兼容
299        // AdminReloadHandler API), 但内部不 await — sync 阶段已完成 + spawn
300        // 已派发, ReloadReport 立即可用.
301        let bridge_for_reload = std::sync::Arc::clone(&bridge);
302        let admin_reload_handler: futu_rest::adapter::AdminReloadHandler =
303            std::sync::Arc::new(move || {
304                let bridge = std::sync::Arc::clone(&bridge_for_reload);
305                Box::pin(async move {
306                    serde_json::to_value(bridge.reload())
307                        .unwrap_or_else(|_| serde_json::json!({"error": "reload serialize failed"}))
308                })
309            });
310        // v1.4.83 §9 Phase 2 F5: push health snapshot provider —— closure
311        // 捕获 bridge.push_runtime().push_health Arc, 每次
312        // `/api/push-subscriber-info` 调用时返当前真实 snapshot.
313        // v1.4.91 P1-D wiring: closure 同时捕获 bridge.push_runtime().qot_login_health,
314        // 在返 push_health 同时多带一个 qot_login_health 字段供 ops 看
315        // qot_logined self-heal counter (修 P1-D non-deterministic gap).
316        let bridge_for_push_health = std::sync::Arc::clone(&bridge);
317        let push_health_snapshot_provider: futu_rest::adapter::PushHealthSnapshotProvider =
318            std::sync::Arc::new(move || {
319                let push = serde_json::to_value(
320                    bridge_for_push_health
321                        .push_runtime()
322                        .push_health()
323                        .snapshot(),
324                )
325                .unwrap_or_else(
326                    |_| serde_json::json!({"error": "push_health snapshot serialize failed"}),
327                );
328                let qot_login = serde_json::to_value(
329                    bridge_for_push_health
330                        .push_runtime()
331                        .qot_login_health()
332                        .snapshot(),
333                )
334                .unwrap_or_else(
335                    |_| serde_json::json!({"error": "qot_login_health snapshot serialize failed"}),
336                );
337                let backend_connected =
338                    bridge_for_push_health.broker_runtime().platform_connected();
339                merge_push_health_snapshots_for_rest(push, qot_login, backend_connected)
340            });
341        // v1.4.105 D12 (Phase 2): 注入 card_num resolver 让 REST trade
342        // handler (place_order / modify_order / cancel_all_order) 能解析
343        // user 传 `card_num` 字段 → acc_id (覆盖 c2s.header.acc_id).
344        // closure 捕获 bridge.caches().trd_cache, 调
345        // `find_acc_ids_by_card_num(input) -> Vec<u64>`.
346        let bridge_for_card_num = std::sync::Arc::clone(&bridge);
347        let card_num_resolver: futu_rest::adapter::CardNumResolver =
348            std::sync::Arc::new(move |cn: &str| {
349                bridge_for_card_num
350                    .caches()
351                    .trd_cache
352                    .find_acc_ids_by_card_num(cn)
353            });
354        let rest_shutdown_rx = shutdown_rx.clone();
355        Some(tokio::spawn(async move {
356            futu_rest::server::start_with_auth_full_admin_until_shutdown(
357                &rest_addr,
358                router,
359                broadcaster,
360                rest_key_store,
361                rest_counters,
362                futu_rest::server::RestAdminHooks {
363                    admin_status_provider: Some(admin_status_provider),
364                    admin_shutdown_handler: Some(admin_shutdown_handler),
365                    admin_reload_handler: Some(admin_reload_handler),
366                    push_health_snapshot_provider: Some(push_health_snapshot_provider),
367                    card_num_resolver: Some(card_num_resolver),
368                },
369                rest_shutdown_rx,
370            )
371            .await
372            .context("REST API server error")
373        }))
374    } else {
375        None
376    };
377
378    // 9. 启动 gRPC 服务(可选,含流式推送)
379    let mut grpc_handle = if let Some(grpc_port) = config.grpc_port {
380        let grpc_addr = format!("{}:{}", config.ip, grpc_port);
381        let router = std::sync::Arc::clone(server.router());
382        let broadcaster = std::sync::Arc::clone(&grpc_broadcaster);
383        // v1.4.102 BUG-007 fix: 同 WS / REST 路径 (fail-closed when keys-file 显式指定).
384        let grpc_key_store = match &grpc_keys_file {
385            Some(path) => match futu_auth::KeyStore::load(path) {
386                Ok(ks) => {
387                    tracing::info!(
388                        path = %path.display(),
389                        keys_loaded = ks.len(),
390                        "gRPC keys file loaded (Bearer auth enabled)"
391                    );
392                    std::sync::Arc::new(ks)
393                }
394                Err(e) => {
395                    tracing::error!(
396                        error = %e,
397                        path = %path.display(),
398                        "failed to load gRPC keys file (--grpc-keys-file 明确指定 → fail-closed). \
399                         daemon abort. fix the keys file then restart. \
400                         不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
401                    );
402                    return Err(anyhow::anyhow!(
403                        "failed to load gRPC keys file {}: {e}. \
404                         --grpc-keys-file 明确指定 → fail-closed (BUG-007).",
405                        path.display()
406                    ));
407                }
408            },
409            None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
410        };
411        tracing::info!(addr = %grpc_addr, "starting gRPC server (SubscribePush: streaming)");
412
413        // v1.4.103 (B10): clone Arc 给外层 holder 持有.
414        if grpc_key_store.is_configured() {
415            grpc_key_store_holder = Some(std::sync::Arc::clone(&grpc_key_store));
416        }
417
418        // v1.4.103 codex F3.1 (P1) round 3: gRPC 单独的 SIGHUP reload listener
419        // 已**移除** — 同 REST 块, 由文件末尾的 unified SIGHUP handler 顺序
420        // reload + expand 防 race.
421
422        let grpc_counters = std::sync::Arc::clone(&shared_counters);
423        let grpc_shutdown_rx = shutdown_rx.clone();
424        Some(tokio::spawn(async move {
425            futu_grpc::server::start_with_auth_until_shutdown(
426                &grpc_addr,
427                router,
428                broadcaster,
429                grpc_key_store,
430                grpc_counters,
431                grpc_shutdown_rx,
432            )
433            .await
434            .map_err(|e| anyhow::anyhow!("gRPC server error: {e}"))
435        }))
436    } else {
437        None
438    };
439
440    // 10. 启动 Telnet 管理服务(可选)
441    let mut telnet_handle = if let Some(telnet_port) = config.telnet_port {
442        let telnet_addr = format!("{}:{}", config.ip, telnet_port);
443        // v1.4.97 P1-D-F: relogin callback closes over bridge to clear
444        // login_cache; next 30s P1-D health tick triggers relogin.
445        // Aligns with C++ GTWCmd_ReLogin (FTGateway/FTGTW_Define_Key.h:5).
446        let bridge_for_relogin = std::sync::Arc::clone(&bridge);
447        let relogin_fn: futu_server::telnet::ReloginFn = std::sync::Arc::new(move || {
448            tracing::warn!(
449                "v1.4.97 P1-D-F: telnet relogin clearing login_cache; \
450                     next P1-D tick will trigger AuthRefresher relogin"
451            );
452            bridge_for_relogin.caches().login_cache.clear();
453        });
454        let telnet_server = futu_server::telnet::TelnetServer::new(
455            telnet_addr.clone(),
456            std::sync::Arc::clone(server.connections()),
457            Some(bridge.subscription_runtime().manager()),
458            Some(std::sync::Arc::clone(server.metrics())),
459            shutdown_tx.clone(),
460        )
461        .with_relogin_fn(relogin_fn);
462        tracing::info!(addr = %telnet_addr, "starting Telnet server");
463        let telnet_shutdown_rx = shutdown_rx.clone();
464        Some(tokio::spawn(async move {
465            telnet_server
466                .run_until_shutdown(telnet_shutdown_rx)
467                .await
468                .context("Telnet server error")
469        }))
470    } else {
471        None
472    };
473
474    tracing::info!("gateway ready, accepting connections on {listen_addr}");
475    tracing::info!("press Ctrl+C to exit");
476
477    // v1.4.103 (B10) + codex F2 (P1): 立即跑首次 expand + background retry
478    // + SIGHUP reload 钩子 (fail-closed sentinel 即时生效, 无 startup window).
479    //
480    // 行为:
481    // 1. **立即跑首次 expand** — 即便 cache 空, fail-closed sentinel (codex F1)
482    //    也写进 allowed_acc_ids, 短路 startup window 的 silent unrestricted.
483    // 2. **background retry**: 每 10s 检查 cache, 加载后 re-expand 真 acc_id
484    //    覆盖 sentinel. 60s 上限.
485    // 3. **SIGHUP reload 钩子**: 重载 keys.json 后再 expand 一次 (codex F2 reload window).
486    // v1.4.103 codex F3.1 (P1) round 3: 合并 reload + expand 为单一 ordered op,
487    // 防 race. 当 SIGHUP 触发时, 此 fn 同步顺序: (1) reload 所有 store (从
488    // keys.json 读 raw allowed_card_nums + ArcSwap.store) (2) expand (resolve
489    // card_num → acc_ids + ArcSwap.store with sentinel/resolved). 因为是单线
490    // 调用, 不存在多 SIGHUP listener 间的乱序 race.
491    //
492    // `do_reload` 控制是否先 reload (启动时第一次 / 60s retry 不需 reload,
493    // 直接 expand; SIGHUP 路径需要先 reload).
494    let card_num_reload_and_expand_fn: std::sync::Arc<dyn Fn(bool) + Send + Sync> = {
495        let bridge_for_expand = std::sync::Arc::clone(&bridge);
496        let ws_ks = ws_key_store_holder.clone();
497        let rest_ks = rest_key_store_holder.clone();
498        let grpc_ks = grpc_key_store_holder.clone();
499        std::sync::Arc::new(move |do_reload: bool| {
500            // (1) Reload phase — 仅 SIGHUP 路径调
501            if do_reload {
502                for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
503                    let Some(ks) = ks_opt.as_ref() else { continue };
504                    match ks.reload() {
505                        Ok(()) => tracing::warn!(
506                            ks = ks_name,
507                            keys_loaded = ks.len(),
508                            "v1.4.103 F3.1: keys reloaded on SIGHUP (before card_num expand)"
509                        ),
510                        Err(e) => tracing::error!(
511                            ks = ks_name,
512                            error = %e,
513                            "v1.4.103 F3.1: keys reload failed (skipping expand for this store)"
514                        ),
515                    }
516                }
517            }
518            // (2) Expand phase
519            let trd_cache = std::sync::Arc::clone(&bridge_for_expand.caches().trd_cache);
520            let resolver = {
521                let cache_clone = std::sync::Arc::clone(&trd_cache);
522                move |cn: &str| cache_clone.find_acc_ids_by_card_num(cn)
523            };
524            for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
525                let Some(ks) = ks_opt.as_ref() else { continue };
526                let (resolved, unresolved, ambiguous) = ks.expand_allowed_card_nums(
527                    &resolver,
528                    |key_id, cn| {
529                        tracing::warn!(
530                            key_id = %key_id,
531                            card_num = %cn,
532                            "v1.4.103 B10/F1 fail-closed: card_num not found in trd_cache; \
533                             writing sentinel acc_id=0 to enforce restrictive denylist \
534                             (limits.contains check 永远 false → reject 真账户)"
535                        );
536                    },
537                    |key_id, cn, candidates| {
538                        tracing::warn!(
539                            key_id = %key_id,
540                            card_num = %cn,
541                            candidates = ?candidates,
542                            "v1.4.103 B10/F1 fail-closed: ambiguous card_num suffix \
543                             matched multiple accounts (skipped, write 完整 16 位 / specific 4 位)"
544                        );
545                    },
546                );
547                tracing::info!(
548                    ks = ks_name,
549                    resolved,
550                    unresolved,
551                    ambiguous,
552                    "v1.4.103 B10: expanded allowed_card_nums into allowed_acc_ids"
553                );
554            }
555        })
556    };
557    // 老 alias (不 reload, 仅 expand) 用于启动 + retry 路径
558    let card_num_expand_fn: std::sync::Arc<dyn Fn() + Send + Sync> = {
559        let inner = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
560        std::sync::Arc::new(move || (inner)(false))
561    };
562
563    // codex F2 (P1) 立即跑首次: fail-closed sentinel 即时生效, 不留 startup window
564    (card_num_expand_fn)();
565
566    // background retry loop: cache 加载后 re-expand 覆盖 sentinel
567    let card_num_retry_handle: BackgroundJoinHandle = {
568        let card_num_expand_fn_loop = std::sync::Arc::clone(&card_num_expand_fn);
569        let bridge_for_check = std::sync::Arc::clone(&bridge);
570        let mut card_num_retry_shutdown_rx = shutdown_rx.clone();
571        tokio::spawn(async move {
572            let trd_cache = std::sync::Arc::clone(&bridge_for_check.caches().trd_cache);
573            let mut attempts = 0u32;
574            let max_attempts = 6u32; // 6 × 10s = 60s
575            loop {
576                tokio::select! {
577                    changed = card_num_retry_shutdown_rx.changed() => {
578                        if changed.is_err() || *card_num_retry_shutdown_rx.borrow() {
579                            tracing::debug!(
580                                "v1.4.111: card_num retry loop received shutdown signal"
581                            );
582                            return;
583                        }
584                    }
585                    _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {}
586                }
587                attempts += 1;
588                let accounts = trd_cache.get_accounts();
589                if accounts.is_empty() {
590                    if attempts >= max_attempts {
591                        tracing::warn!(
592                            "v1.4.103 B10: trd_cache 仍空 (after {max_attempts} × 10s); \
593                             受限 key 仍走 fail-closed sentinel reject 直到 SIGHUP / cache 加载."
594                        );
595                        return;
596                    }
597                    continue;
598                }
599                (card_num_expand_fn_loop)();
600                return;
601            }
602        })
603    };
604
605    // codex F2 (P1) + F3.1 (P1) round 3: 单一 SIGHUP 钩子 — reload + expand
606    // 顺序操作避免 race. 之前每 server (REST/gRPC) 各自 SIGHUP 监听 reload,
607    // 加上 card_num expand 单独监听 — 多 SIGHUP listener 并发执行无序, 可能
608    // expand 先跑 (写 sentinel) 然后 reload 后跑 (overwrite sentinel 用 raw
609    // allowed_card_nums) → 受限 key 在 reload window 内 silent unrestricted.
610    //
611    // 现在: **唯一 SIGHUP listener**, 顺序 reload → expand. 删除 REST/gRPC 各
612    // 自的 SIGHUP reload listener (上面 server 块内已注释).
613    #[cfg(unix)]
614    let sighup_handle: Option<BackgroundJoinHandle> = {
615        let unified_sighup_fn = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
616        let mut sighup_shutdown_rx = shutdown_rx.clone();
617        Some(tokio::spawn(async move {
618            use tokio::signal::unix::{SignalKind, signal};
619            let mut sig = match signal(SignalKind::hangup()) {
620                Ok(s) => s,
621                Err(e) => {
622                    tracing::error!(error = %e, "SIGHUP install failed (unified reload+expand)");
623                    return;
624                }
625            };
626            tracing::info!(
627                "v1.4.103 F3.1: unified SIGHUP handler installed (reload all keys + expand card_num)"
628            );
629            loop {
630                tokio::select! {
631                    signal = sig.recv() => {
632                        if signal.is_none() {
633                            return;
634                        }
635                        tracing::info!(
636                            "v1.4.103 F3.1: SIGHUP received — running reload_all_stores + \
637                             expand_allowed_card_nums (single ordered op, no race)"
638                        );
639                        (unified_sighup_fn)(true); // do_reload=true
640                    }
641                    changed = sighup_shutdown_rx.changed() => {
642                        if changed.is_err() || *sighup_shutdown_rx.borrow() {
643                            tracing::debug!(
644                                "v1.4.111: unified SIGHUP handler received shutdown signal"
645                            );
646                            return;
647                        }
648                    }
649                }
650            }
651        }))
652    };
653    #[cfg(not(unix))]
654    let sighup_handle: Option<BackgroundJoinHandle> = None;
655
656    // 11. v1.4.104 external reviewer S-001 (P0): native TCP keystore guard.
657    //
658    // 配任一 keys file (rest / grpc / ws) → 用户**意图**启用 scope mode.
659    // 但 TCP FTAPI 协议 InitConnect 没有 Bearer 字段, 无法做 caller-specific
660    // scope check (S-001). 默认 fail-closed: 不启 TCP listener, 用户需要
661    // 通过 REST/gRPC/WS 与 daemon 交互 (这些 surface 都已加 pipeline auth).
662    //
663    // 显式 `--allow-tcp-unauthenticated` opt-in → 启 TCP, 但 daemon 启动
664    // loud warn 该端口完全无 auth, agent skill 等 local process 可任意调用.
665    // 用之前 captured 的 local 变量, args 已被 merge_config 消费
666    let tcp_disabled = any_keys_configured && !allow_tcp_unauthenticated;
667
668    if tcp_disabled {
669        tracing::warn!(
670            listen_addr = %listen_addr,
671            "v1.4.104 external report S-001 (P0) fix: TCP listener (port {}) NOT started — \
672             keys file configured but --allow-tcp-unauthenticated not set. \
673             native TCP FTAPI protocol has no Bearer field, cannot enforce \
674             caller-specific scope check; defaulting to fail-closed (skip TCP). \
675             Use REST/gRPC/WS endpoints for authenticated access. \
676             To restore TCP (legacy Python SDK clients) add --allow-tcp-unauthenticated, \
677             but be aware that port {} will accept ANY local connection without \
678             scope check (跨账户 leak risk).",
679            config.port,
680            config.port,
681        );
682        eprintln!(
683            "⚠️  TCP listener (port {}) DISABLED (v1.4.104 external report S-001 fix): \
684             keys file configured + no --allow-tcp-unauthenticated. \
685             Pass --allow-tcp-unauthenticated to restore (with security warning).",
686            config.port,
687        );
688    } else if any_keys_configured && allow_tcp_unauthenticated {
689        tracing::warn!(
690            listen_addr = %listen_addr,
691            "⚠️  v1.4.104: TCP listener running WITHOUT scope check despite keys configured \
692             (--allow-tcp-unauthenticated set). Port {} accepts ANY local connection — \
693             跨账户 leak risk. Use REST/gRPC/WS for authenticated clients; reserve \
694             TCP only for legacy Python SDK / C++ OpenD where Bearer not feasible.",
695            config.port,
696        );
697        eprintln!(
698            "⚠️  TCP port {} ACCEPTS UNAUTHENTICATED connections (--allow-tcp-unauthenticated). \
699             受限 keys 不在该 surface 强制. 推荐改用 REST/gRPC/WS.",
700            config.port,
701        );
702    }
703
704    let tcp_shutdown_rx = shutdown_rx.clone();
705    let mut tcp_handle: Option<SurfaceJoinHandle> = if tcp_disabled {
706        None
707    } else {
708        Some(tokio::spawn(async move {
709            server
710                .run_until_shutdown(tcp_shutdown_rx)
711                .await
712                .context("API server error")
713        }))
714    };
715
716    // 11. 等待任一 surface 退出、Ctrl+C 或 telnet shutdown
717    let phase4_result: anyhow::Result<()> = tokio::select! {
718        result = surface_task_result_or_pending(&mut tcp_handle) => {
719            tcp_handle = None;
720            log_surface_task_result("API server", result)
721        }
722        result = surface_task_result_or_pending(&mut ws_handle) => {
723            ws_handle = None;
724            log_surface_task_result("WebSocket", result)
725        }
726        result = surface_task_result_or_pending(&mut rest_handle) => {
727            rest_handle = None;
728            log_surface_task_result("REST API", result)
729        }
730        result = surface_task_result_or_pending(&mut grpc_handle) => {
731            grpc_handle = None;
732            log_surface_task_result("gRPC", result)
733        }
734        result = surface_task_result_or_pending(&mut telnet_handle) => {
735            telnet_handle = None;
736            log_surface_task_result("Telnet", result)
737        }
738        _ = tokio::signal::ctrl_c() => {
739            tracing::info!("received Ctrl+C, shutting down gracefully...");
740            Ok(())
741        }
742        _ = async {
743            while shutdown_rx.changed().await.is_ok() {
744                if *shutdown_rx.borrow() {
745                    break;
746                }
747            }
748        } => {
749            tracing::info!("shutdown requested via telnet");
750            Ok(())
751        }
752    };
753
754    // 清理后台任务
755    request_surface_shutdown(&shutdown_tx, "phase4 exit");
756    await_background_shutdown(
757        "card_num retry",
758        card_num_retry_handle,
759        SHUTDOWN_GRACE_PERIOD,
760    )
761    .await;
762    if let Some(handle) = sighup_handle {
763        await_background_shutdown("SIGHUP reload", handle, SHUTDOWN_GRACE_PERIOD).await;
764    }
765    if let Some(handle) = tcp_handle {
766        await_surface_shutdown("API server", handle, SHUTDOWN_GRACE_PERIOD).await;
767    }
768    if let Some(handle) = ws_handle {
769        await_surface_shutdown("WebSocket", handle, SHUTDOWN_GRACE_PERIOD).await;
770    }
771    if let Some(handle) = rest_handle {
772        await_surface_shutdown("REST API", handle, SHUTDOWN_GRACE_PERIOD).await;
773    }
774    if let Some(handle) = grpc_handle {
775        await_surface_shutdown("gRPC", handle, SHUTDOWN_GRACE_PERIOD).await;
776    }
777    if let Some(handle) = telnet_handle {
778        await_surface_shutdown("Telnet", handle, SHUTDOWN_GRACE_PERIOD).await;
779    }
780
781    phase4_result?;
782    tracing::info!("gateway stopped");
783    Ok(())
784}