futu_trd/
push.rs

1use async_trait::async_trait;
2use futu_core::error::FutuError;
3use futu_core::proto_id;
4
5/// 交易推送回调 trait
6#[async_trait]
7pub trait TradeHandler: Send + Sync + 'static {
8    /// 订单状态变更推送
9    async fn on_order_update(&self, _header: OrderUpdateInfo) {}
10
11    /// 成交推送
12    async fn on_order_fill_update(&self, _header: OrderFillUpdateInfo) {}
13
14    /// 交易通知推送
15    async fn on_trd_notify(&self, _info: TrdNotifyInfo) {}
16}
17
18/// 订单更新信息
19#[derive(Debug, Clone)]
20pub struct OrderUpdateInfo {
21    pub trd_env: i32,
22    pub acc_id: u64,
23    pub trd_market: i32,
24    // 订单详情由调用方根据需要从 proto 解析
25    pub raw_body: bytes::Bytes,
26}
27
28/// 成交更新信息
29#[derive(Debug, Clone)]
30pub struct OrderFillUpdateInfo {
31    pub trd_env: i32,
32    pub acc_id: u64,
33    pub trd_market: i32,
34    pub raw_body: bytes::Bytes,
35}
36
37/// 交易推送分发器
38pub struct TradePushDispatcher;
39
40impl TradePushDispatcher {
41    /// 分发交易推送消息
42    pub async fn dispatch(
43        handler: &dyn TradeHandler,
44        proto_id_val: u32,
45        body: &[u8],
46    ) -> Result<(), FutuError> {
47        match proto_id_val {
48            proto_id::TRD_UPDATE_ORDER => {
49                let resp: futu_proto::trd_update_order::Response =
50                    prost::Message::decode(body).map_err(FutuError::Proto)?;
51                if let Some(s2c) = &resp.s2c {
52                    let info = OrderUpdateInfo {
53                        trd_env: s2c.header.trd_env,
54                        acc_id: s2c.header.acc_id,
55                        trd_market: s2c.header.trd_market,
56                        raw_body: bytes::Bytes::copy_from_slice(body),
57                    };
58                    handler.on_order_update(info).await;
59                }
60            }
61            proto_id::TRD_UPDATE_ORDER_FILL => {
62                let resp: futu_proto::trd_update_order_fill::Response =
63                    prost::Message::decode(body).map_err(FutuError::Proto)?;
64                if let Some(s2c) = &resp.s2c {
65                    let info = OrderFillUpdateInfo {
66                        trd_env: s2c.header.trd_env,
67                        acc_id: s2c.header.acc_id,
68                        trd_market: s2c.header.trd_market,
69                        raw_body: bytes::Bytes::copy_from_slice(body),
70                    };
71                    handler.on_order_fill_update(info).await;
72                }
73            }
74            proto_id::TRD_NOTIFY => {
75                let resp: futu_proto::trd_notify::Response =
76                    prost::Message::decode(body).map_err(FutuError::Proto)?;
77                if let Some(s2c) = &resp.s2c {
78                    let info = TrdNotifyInfo {
79                        trd_env: s2c.header.trd_env,
80                        acc_id: s2c.header.acc_id,
81                        trd_market: s2c.header.trd_market,
82                        notify_type: s2c.r#type,
83                    };
84                    handler.on_trd_notify(info).await;
85                }
86            }
87            _ => {
88                tracing::debug!(proto_id = proto_id_val, "unhandled trade push");
89            }
90        }
91        Ok(())
92    }
93}
94
95/// 交易通知信息
96#[derive(Debug, Clone)]
97pub struct TrdNotifyInfo {
98    pub trd_env: i32,
99    pub acc_id: u64,
100    pub trd_market: i32,
101    pub notify_type: i32,
102}