futu_server/
push.rs

1// 推送分发:三种推送模式
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::DashMap;
7
8use crate::conn::ClientConn;
9use crate::metrics::GatewayMetrics;
10use crate::subscription::SubscriptionManager;
11
12/// 外部推送接收器 trait
13///
14/// 允许外部模块(如 REST WebSocket)接收推送事件,
15/// 不引入模块间循环依赖。
16pub trait ExternalPushSink: Send + Sync {
17    /// 行情推送
18    fn on_quote_push(&self, sec_key: &str, sub_type: i32, proto_id: u32, body: &[u8]);
19    /// 广播推送 (到价提醒、系统通知等)
20    fn on_broadcast_push(&self, proto_id: u32, body: &[u8]);
21    /// 交易推送 (订单更新、成交更新等)
22    fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8]);
23}
24
25/// 推送分发器
26pub struct PushDispatcher {
27    connections: Arc<DashMap<u64, ClientConn>>,
28    subscriptions: Arc<SubscriptionManager>,
29    metrics: Option<Arc<GatewayMetrics>>,
30    /// 外部推送接收器列表 (REST WebSocket, gRPC 等)
31    external_sinks: Vec<Arc<dyn ExternalPushSink>>,
32}
33
34impl PushDispatcher {
35    pub fn new(
36        connections: Arc<DashMap<u64, ClientConn>>,
37        subscriptions: Arc<SubscriptionManager>,
38    ) -> Self {
39        Self {
40            connections,
41            subscriptions,
42            metrics: None,
43            external_sinks: Vec::new(),
44        }
45    }
46
47    /// 设置监控指标引用
48    pub fn with_metrics(mut self, metrics: Arc<GatewayMetrics>) -> Self {
49        self.metrics = Some(metrics);
50        self
51    }
52
53    /// 添加外部推送接收器(可多次调用注册多个)
54    pub fn with_external_sink(mut self, sink: Arc<dyn ExternalPushSink>) -> Self {
55        self.external_sinks.push(sink);
56        self
57    }
58
59    fn record_push(&self) {
60        if let Some(ref m) = self.metrics {
61            m.client_pushes_sent
62                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63        }
64    }
65
66    /// 向指定连接推送(自动处理 AES 加密)
67    pub async fn push_to_conn(&self, conn_id: u64, proto_id: u32, body: Vec<u8>) {
68        if let Some(conn) = self.connections.get(&conn_id) {
69            let frame = conn.make_frame(proto_id, 0, Bytes::from(body));
70            let _ = conn.tx.send(frame).await;
71            self.record_push();
72        }
73    }
74
75    /// 向所有订阅了通知的连接广播(每个连接独立 AES 加密)
76    pub async fn push_notify(&self, proto_id: u32, body: Vec<u8>) {
77        let body = Bytes::from(body);
78        for entry in self.connections.iter() {
79            let conn = entry.value();
80            if self.subscriptions.is_subscribed_notify(conn.conn_id) {
81                let frame = conn.make_frame(proto_id, 0, body.clone());
82                let _ = conn.tx.send(frame).await;
83                self.record_push();
84            }
85        }
86    }
87
88    /// 向订阅了指定交易账户的所有连接推送
89    pub async fn push_trd_acc(&self, acc_id: u64, proto_id: u32, body: Vec<u8>) {
90        // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
91        for sink in &self.external_sinks {
92            sink.on_trade_push(acc_id, proto_id, &body);
93        }
94        let body = Bytes::from(body);
95        let subscribers = self.subscriptions.get_acc_subscribers(acc_id);
96        for conn_id in subscribers {
97            if let Some(conn) = self.connections.get(&conn_id) {
98                let frame = conn.make_frame(proto_id, 0, body.clone());
99                let _ = conn.tx.send(frame).await;
100                self.record_push();
101            }
102        }
103    }
104
105    /// 向所有已连接的客户端广播(到价提醒等,不需要订阅通知)
106    /// C++ 检查 IsConnSubRecvNotify,对齐使用 is_subscribed_notify
107    pub async fn push_broadcast(&self, proto_id: u32, body: Vec<u8>) {
108        // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
109        for sink in &self.external_sinks {
110            sink.on_broadcast_push(proto_id, &body);
111        }
112        let body = Bytes::from(body);
113        for entry in self.connections.iter() {
114            let conn = entry.value();
115            if self.subscriptions.is_subscribed_notify(conn.conn_id) {
116                let frame = conn.make_frame(proto_id, 0, body.clone());
117                let _ = conn.tx.send(frame).await;
118                self.record_push();
119            }
120        }
121    }
122
123    /// 向订阅了指定行情类型+股票的所有连接推送
124    pub async fn push_qot(&self, security_key: &str, sub_type: i32, proto_id: u32, body: Vec<u8>) {
125        // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
126        for sink in &self.external_sinks {
127            sink.on_quote_push(security_key, sub_type, proto_id, &body);
128        }
129        let body = Bytes::from(body);
130        let subscribers = self
131            .subscriptions
132            .get_qot_subscribers(security_key, sub_type);
133        for conn_id in subscribers {
134            if let Some(conn) = self.connections.get(&conn_id) {
135                let frame = conn.make_frame(proto_id, 0, body.clone());
136                let _ = conn.tx.send(frame).await;
137                self.record_push();
138            }
139        }
140    }
141}