1use async_trait::async_trait;
2use futu_core::error::FutuError;
3use futu_core::proto_id;
4
5use crate::broker::{BrokerData, BrokerEntry};
6use crate::order_detail::OrderDetailData;
7use crate::rt::TimeShare;
8use crate::ticker::Ticker;
9use crate::types::{BasicQot, KLine, OrderBookData, OrderBookEntry, Security};
10
11#[async_trait]
16pub trait QuoteHandler: Send + Sync + 'static {
17 async fn on_basic_qot_update(&self, _qot_list: Vec<BasicQot>) {}
19
20 async fn on_kl_update(&self, _security: Security, _kl_list: Vec<KLine>) {}
22
23 async fn on_order_book_update(&self, _data: OrderBookData) {}
25
26 async fn on_ticker_update(&self, _security: Security, _ticker_list: Vec<Ticker>) {}
28
29 async fn on_rt_update(&self, _security: Security, _rt_list: Vec<TimeShare>) {}
31
32 async fn on_broker_update(&self, _data: BrokerData) {}
34
35 async fn on_order_detail_update(&self, _data: OrderDetailData) {}
37}
38
39pub struct QuotePushDispatcher;
41
42impl QuotePushDispatcher {
43 pub async fn dispatch(
45 handler: &dyn QuoteHandler,
46 proto_id: u32,
47 body: &[u8],
48 ) -> Result<(), FutuError> {
49 match proto_id {
50 proto_id::QOT_UPDATE_BASIC_QOT => {
51 let resp: futu_proto::qot_update_basic_qot::Response =
52 prost::Message::decode(body).map_err(FutuError::Proto)?;
53 if let Some(s2c) = resp.s2c {
54 let qot_list: Vec<BasicQot> = s2c
55 .basic_qot_list
56 .iter()
57 .map(BasicQot::from_proto)
58 .collect();
59 handler.on_basic_qot_update(qot_list).await;
60 }
61 }
62 proto_id::QOT_UPDATE_KL => {
63 let resp: futu_proto::qot_update_kl::Response =
64 prost::Message::decode(body).map_err(FutuError::Proto)?;
65 if let Some(s2c) = resp.s2c {
66 let security = Security::from_proto(&s2c.security);
67 let kl_list: Vec<KLine> = s2c.kl_list.iter().map(KLine::from_proto).collect();
68 handler.on_kl_update(security, kl_list).await;
69 }
70 }
71 proto_id::QOT_UPDATE_ORDER_BOOK => {
72 let resp: futu_proto::qot_update_order_book::Response =
73 prost::Message::decode(body).map_err(FutuError::Proto)?;
74 if let Some(s2c) = resp.s2c {
75 let data = OrderBookData {
76 security: Security::from_proto(&s2c.security),
77 ask_list: s2c
78 .order_book_ask_list
79 .iter()
80 .map(OrderBookEntry::from_proto)
81 .collect(),
82 bid_list: s2c
83 .order_book_bid_list
84 .iter()
85 .map(OrderBookEntry::from_proto)
86 .collect(),
87 };
88 handler.on_order_book_update(data).await;
89 }
90 }
91 proto_id::QOT_UPDATE_TICKER => {
92 let resp: futu_proto::qot_update_ticker::Response =
93 prost::Message::decode(body).map_err(FutuError::Proto)?;
94 if let Some(s2c) = resp.s2c {
95 let security = Security::from_proto(&s2c.security);
96 let ticker_list: Vec<Ticker> =
97 s2c.ticker_list.iter().map(Ticker::from_proto).collect();
98 handler.on_ticker_update(security, ticker_list).await;
99 }
100 }
101 proto_id::QOT_UPDATE_RT => {
102 let resp: futu_proto::qot_update_rt::Response =
103 prost::Message::decode(body).map_err(FutuError::Proto)?;
104 if let Some(s2c) = resp.s2c {
105 let security = Security::from_proto(&s2c.security);
106 let rt_list: Vec<TimeShare> =
107 s2c.rt_list.iter().map(TimeShare::from_proto).collect();
108 handler.on_rt_update(security, rt_list).await;
109 }
110 }
111 proto_id::QOT_UPDATE_BROKER => {
112 let resp: futu_proto::qot_update_broker::Response =
113 prost::Message::decode(body).map_err(FutuError::Proto)?;
114 if let Some(s2c) = resp.s2c {
115 let data = BrokerData {
116 security: Security::from_proto(&s2c.security),
117 ask_list: s2c
118 .broker_ask_list
119 .iter()
120 .map(BrokerEntry::from_proto)
121 .collect(),
122 bid_list: s2c
123 .broker_bid_list
124 .iter()
125 .map(BrokerEntry::from_proto)
126 .collect(),
127 };
128 handler.on_broker_update(data).await;
129 }
130 }
131 _ => {
134 tracing::debug!(proto_id = proto_id, "unhandled quote push");
135 }
136 }
137 Ok(())
138 }
139}