futu_opend/startup/
phase3.rs1use std::sync::Arc;
13
14use futu_gateway_core::bridge::{GatewayBridge, PushEvent};
15use futu_server::listener::{ApiServer, ServerConfig};
16
17use crate::config::RuntimeConfig;
18
19pub(super) struct Phase3Out {
21 pub(super) server: ApiServer,
22 pub(super) server_config: ServerConfig,
23 pub(super) ws_broadcaster: Arc<futu_rest::ws::WsBroadcaster>,
24 pub(super) grpc_broadcaster: Arc<futu_grpc::server::GrpcPushBroadcaster>,
25}
26
27pub(super) fn run_phase3(
28 config: &RuntimeConfig,
29 bridge: &Arc<GatewayBridge>,
30 listen_addr: &str,
31 push_receiver: Option<tokio::sync::mpsc::Receiver<PushEvent>>,
32 shutdown_tx: tokio::sync::watch::Sender<bool>,
33) -> Phase3Out {
34 let user_id = bridge
36 .caches()
37 .login_cache
38 .get_login_state()
39 .map(|s| s.user_id as u64)
40 .unwrap_or(0);
43
44 let server_config = ServerConfig {
45 listen_addr: listen_addr.to_string(),
46 server_ver: 1000,
47 login_user_id: user_id,
48 keepalive_interval: 10,
49 rsa_private_key: config.rsa_private_key.clone(),
50 };
51 if server_config.rsa_private_key.is_some() {
52 tracing::info!("RSA encryption enabled for InitConnect");
53 }
54 let mut server = ApiServer::new(server_config.clone());
55 server.set_server_time_offset_secs(bridge.server_clock().offset_secs());
56 server.set_metrics(std::sync::Arc::clone(bridge.push_runtime().metrics()));
57 server.set_subscriptions(bridge.subscription_runtime().manager());
58
59 futu_server::metrics::install_prometheus_extension(std::sync::Arc::clone(server.metrics()));
64
65 {
71 let router = server.router();
72 futu_gateway_qot::register_handlers(router, bridge);
73 futu_gateway_trd::register_handlers(router, bridge);
74 futu_gateway_core::handlers_sys::register_handlers_with_shutdown(
75 router,
76 bridge,
77 shutdown_tx,
78 );
79 tracing::info!("all business handlers registered");
80 }
81
82 let ws_broadcaster = std::sync::Arc::new(futu_rest::ws::WsBroadcaster::new(1024));
84 let grpc_broadcaster = std::sync::Arc::new(futu_grpc::server::GrpcPushBroadcaster::new(1024));
85
86 if let Some(push_rx) = push_receiver {
90 let sinks: Vec<std::sync::Arc<dyn futu_server::push::ExternalPushSink>> = vec![
91 std::sync::Arc::clone(&ws_broadcaster) as _,
92 std::sync::Arc::clone(&grpc_broadcaster) as _,
93 ];
94 bridge.start_push_dispatcher(&server, push_rx, sinks);
95 tracing::info!("push dispatcher started (with WebSocket + gRPC broadcast)");
96 }
97
98 Phase3Out {
99 server,
100 server_config,
101 ws_broadcaster,
102 grpc_broadcaster,
103 }
104}