Skip to main content

futu_opend/startup/
phase3.rs

1//! v1.4.110 Layer 3 A: startup Phase 3 — ApiServer 构造 / handler 注册 /
2//! 推送广播器 / push dispatcher 启动. 抽自原 `mod.rs::run_daemon` 477..531 行段.
3//!
4//! Phase 3 主要副作用 (按顺序):
5//! 1. 从 `bridge.caches().login_cache` 取 user_id 构造 `ServerConfig`
6//! 2. `ApiServer::new` + `set_metrics` + `set_subscriptions`
7//! 3. `install_prometheus_extension` (server.metrics() 暴露到 `/metrics`)
8//! 4. 调 3 个域 register fn (qot / trd / sys) 注册业务 handler
9//! 5. 创建 `WsBroadcaster` + `GrpcPushBroadcaster` (各容量 1024)
10//! 6. 如 push_receiver Some → `bridge.start_push_dispatcher` (ws + grpc sinks)
11
12use std::sync::Arc;
13
14use futu_gateway_core::bridge::{GatewayBridge, PushEvent};
15use futu_server::listener::{ApiServer, ServerConfig};
16
17use crate::config::RuntimeConfig;
18
19/// Phase 3 output — Phase 4 spawn 各 surface server 时需要这些资源.
20pub(super) struct Phase3Out {
21    pub(super) server: ApiServer,
22    pub(super) server_config: ServerConfig,
23    pub(super) ws_broadcaster: Arc<futu_rest::ws::WsBroadcaster>,
24    pub(super) grpc_broadcaster: Arc<futu_grpc::server::GrpcPushBroadcaster>,
25}
26
27pub(super) fn run_phase3(
28    config: &RuntimeConfig,
29    bridge: &Arc<GatewayBridge>,
30    listen_addr: &str,
31    push_receiver: Option<tokio::sync::mpsc::Receiver<PushEvent>>,
32    shutdown_tx: tokio::sync::watch::Sender<bool>,
33) -> Phase3Out {
34    // 3. 创建 API 服务端
35    let user_id = bridge
36        .caches()
37        .login_cache
38        .get_login_state()
39        .map(|s| s.user_id as u64)
40        // Offline mode deliberately starts local surfaces without login credentials;
41        // C++/FTAPI InitConnect tolerates user_id=0 until backend login is available.
42        .unwrap_or(0);
43
44    let server_config = ServerConfig {
45        listen_addr: listen_addr.to_string(),
46        server_ver: 1000,
47        login_user_id: user_id,
48        keepalive_interval: 10,
49        rsa_private_key: config.rsa_private_key.clone(),
50    };
51    if server_config.rsa_private_key.is_some() {
52        tracing::info!("RSA encryption enabled for InitConnect");
53    }
54    let mut server = ApiServer::new(server_config.clone());
55    server.set_server_time_offset_secs(bridge.server_clock().offset_secs());
56    server.set_metrics(std::sync::Arc::clone(bridge.push_runtime().metrics()));
57    server.set_subscriptions(bridge.subscription_runtime().manager());
58
59    // v1.4.90 P1-B: 把 GatewayMetrics 注册为 futu_auth Registry 的 extension
60    // renderer, 让 `/metrics` HTTP 端点暴露 per-cmd / per-hour push counter.
61    // 之前 v1.4.83/84 只在 telnet `show_metrics` 暴露, /metrics 仅 3 个 auth
62    // counter (cmd_6212_quote / cmd_14716_trade_new 等被双 tester 报告 missing).
63    futu_server::metrics::install_prometheus_extension(std::sync::Arc::clone(server.metrics()));
64
65    // 4. 注册业务处理器
66    //
67    // v1.4.110 P0-5 T15 P4 wire: 显式调 3 个域 register fn (替原
68    // `bridge.register_handlers(&server)` 反向调用). 3 crate split 后各域
69    // register fn 在各自 crate:
70    {
71        let router = server.router();
72        futu_gateway_qot::register_handlers(router, bridge);
73        futu_gateway_trd::register_handlers(router, bridge);
74        futu_gateway_core::handlers_sys::register_handlers_with_shutdown(
75            router,
76            bridge,
77            shutdown_tx,
78        );
79        tracing::info!("all business handlers registered");
80    }
81
82    // 5. 创建推送广播器 (REST WebSocket + gRPC)
83    let ws_broadcaster = std::sync::Arc::new(futu_rest::ws::WsBroadcaster::new(1024));
84    let grpc_broadcaster = std::sync::Arc::new(futu_grpc::server::GrpcPushBroadcaster::new(1024));
85
86    // 6. 启动推送分发 (push_callback → channel → PushDispatcher → 客户端 + WebSocket + gRPC)
87    //    PushDispatcher 内部通过 ExternalPushSink 自动转发所有推送,
88    //    包括行情、广播、交易推送 (UpdateOrder/UpdateOrderFill)
89    if let Some(push_rx) = push_receiver {
90        let sinks: Vec<std::sync::Arc<dyn futu_server::push::ExternalPushSink>> = vec![
91            std::sync::Arc::clone(&ws_broadcaster) as _,
92            std::sync::Arc::clone(&grpc_broadcaster) as _,
93        ];
94        bridge.start_push_dispatcher(&server, push_rx, sinks);
95        tracing::info!("push dispatcher started (with WebSocket + gRPC broadcast)");
96    }
97
98    Phase3Out {
99        server,
100        server_config,
101        ws_broadcaster,
102        grpc_broadcaster,
103    }
104}