1use std::collections::{BTreeMap, BTreeSet};
18
19use prost::Message;
20
21use futu_core::error::{FutuError, Result};
22
23use crate::conn::BackendConn;
24use crate::proto_internal::ft_cmd_stock_quote_sub;
25use crate::proto_internal::ft_cmd_stock_quote_sub_data;
26use crate::proto_internal::ft_cmd_tick;
27
28mod sub_bits;
29
30pub use sub_bits::{
31 SubBitOptions, SubscribeBitInfo, sub_type_to_bit_infos_with_options, sub_type_to_bits,
32 sub_type_to_bits_with_options,
33};
34
35pub const CMD_QOT_SUB: u16 = 6211;
37pub const CMD_QOT_PUSH: u16 = 6212;
39pub const CMD_QOT_PULL_TICKER: u16 = 6128;
44
45const TICKER_FETCH_LATEST_KEY: u64 = u64::MAX;
47const TICKER_LATEST_DATE_TIME_S: u32 = u32::MAX;
49
50mod nn_quote_session {
51 pub const RTH: i32 = 0;
52 pub const ETH: i32 = 1;
53 pub const ALL: i32 = 2;
54}
55
56mod tick_period_type {
57 pub const NORMAL: u32 = 0;
59 pub const BEFORE: u32 = 1;
60 pub const AFTER: u32 = 2;
61 pub const OVERNIGHT: u32 = 4;
62}
63
64pub mod sub_type {
66 pub const BASIC: i32 = 1;
67 pub const ORDER_BOOK: i32 = 2;
68 pub const TICKER: i32 = 4;
69 pub const RT: i32 = 5;
70 pub const KL_DAY: i32 = 6;
71 pub const KL_5MIN: i32 = 7;
72 pub const KL_15MIN: i32 = 8;
73 pub const KL_30MIN: i32 = 9;
74 pub const KL_60MIN: i32 = 10;
75 pub const KL_1MIN: i32 = 11;
76 pub const KL_WEEK: i32 = 12;
77 pub const KL_MONTH: i32 = 13;
78 pub const BROKER: i32 = 14;
79 pub const KL_QUARTER: i32 = 15;
80 pub const KL_YEAR: i32 = 16;
81 pub const KL_3MIN: i32 = 17;
82 pub const ORDER_BOOK_ODD: i32 = 22;
83}
84
85fn common_session_to_nn(session: i32) -> i32 {
86 match session {
87 2 => nn_quote_session::ETH,
89 3 => nn_quote_session::ALL,
90 _ => nn_quote_session::RTH,
91 }
92}
93
94fn ticker_periods_for_nn_session(nn_session: i32) -> Vec<u32> {
95 match nn_session {
96 nn_quote_session::ALL => vec![
97 tick_period_type::NORMAL,
98 tick_period_type::BEFORE,
99 tick_period_type::AFTER,
100 tick_period_type::OVERNIGHT,
101 ],
102 nn_quote_session::ETH => vec![
103 tick_period_type::NORMAL,
104 tick_period_type::BEFORE,
105 tick_period_type::AFTER,
106 ],
107 _ => vec![tick_period_type::NORMAL],
108 }
109}
110
111pub async fn pull_latest_ticker(
120 backend: &BackendConn,
121 stock_id: u64,
122 nn_mkt_type: u8,
123 common_session: i32,
124 pull_count: u32,
125 broker_id: Option<i32>,
126) -> Result<ft_cmd_tick::TickRsp> {
127 if stock_id == 0 || pull_count == 0 {
128 return Err(FutuError::Codec(format!(
129 "PullLatestTicker: invalid stock_id={stock_id} pull_count={pull_count}"
130 )));
131 }
132
133 let nn_session = if nn_mkt_type == ftapi_market_to_quote_mkt(11) {
136 common_session_to_nn(common_session)
137 } else {
138 nn_quote_session::RTH
139 };
140
141 let req = ft_cmd_tick::TickReq {
142 security_id: Some(stock_id),
143 date_time_s: Some(TICKER_LATEST_DATE_TIME_S),
144 begin_tick_key: Some(TICKER_FETCH_LATEST_KEY),
145 tick_count: Some(pull_count),
146 tick_period_type: None,
147 tick_period_type_ex: ticker_periods_for_nn_session(nn_session),
148 req_auth: None,
149 end_tick_key: None,
150 date_time_s_v2: None,
151 broker_id,
154 };
155
156 let mut reserved = [0u8; 10];
157 reserved[0] = nn_mkt_type;
158 let frame = backend
161 .request_with_reserved(CMD_QOT_PULL_TICKER, req.encode_to_vec(), reserved)
162 .await?;
163 let rsp: ft_cmd_tick::TickRsp = Message::decode(frame.body.as_ref())?;
164 let result = rsp.result.unwrap_or(-1);
165 if result != 0 {
166 return Err(FutuError::ServerError {
167 ret_type: result,
168 msg: format!("CMD6128 PullLatestTicker result={result}"),
169 });
170 }
171 Ok(rsp)
172}
173
174pub mod sbit {
176 pub const PRICE: u32 = 0;
177 pub const STOCK_STATE: u32 = 1;
178 pub const STOCK_TYPE_SPECIFIC: u32 = 2;
179 pub const ORDER_BOOK: u32 = 3;
180 pub const ORDER_BOOK_DETAIL: u32 = 4; pub const DEAL_STATISTICS: u32 = 5;
182 pub const HK_BROKER_QUEUE: u32 = 9;
183 pub const HK_BROKER_DETAIL: u32 = 10; pub const US_PREMARKET_AFTERHOURS: u32 = 13;
185 pub const US_LV2_ORDER: u32 = 17;
186 pub const TIME_SHARING: u32 = 20;
187 pub const KLINE_1MIN: u32 = 21;
188 pub const KLINE_3MIN: u32 = 22;
189 pub const KLINE_5MIN: u32 = 23;
190 pub const KLINE_15MIN: u32 = 24;
191 pub const KLINE_30MIN: u32 = 25;
192 pub const KLINE_60MIN: u32 = 26;
193 pub const KLINE_DAY: u32 = 27;
194 pub const KLINE_WEEK: u32 = 28;
195 pub const KLINE_MONTH: u32 = 29;
196 pub const KLINE_QUARTER: u32 = 30;
197 pub const KLINE_YEAR: u32 = 31;
198 pub const TICK: u32 = 35;
199 pub const MEGER_LV2_ORDER: u32 = 39;
200}
201
202#[derive(Debug, Clone)]
214pub struct SecurityWithOpts {
215 pub stock_id: u64,
216 pub ftapi_market: i32,
217 pub sub_types_with_opts: Vec<(i32, SubBitOptions)>,
218 pub broker_id: Option<std::num::NonZeroU32>,
223}
224
225impl SecurityWithOpts {
226 pub fn new(
228 stock_id: u64,
229 ftapi_market: i32,
230 sub_types_with_opts: Vec<(i32, SubBitOptions)>,
231 ) -> Self {
232 Self {
233 stock_id,
234 ftapi_market,
235 sub_types_with_opts,
236 broker_id: None,
239 }
240 }
241
242 pub fn with_broker(
244 stock_id: u64,
245 ftapi_market: i32,
246 sub_types_with_opts: Vec<(i32, SubBitOptions)>,
247 broker_id: u32,
248 ) -> Self {
249 Self {
250 stock_id,
251 ftapi_market,
252 sub_types_with_opts,
253 broker_id: std::num::NonZeroU32::new(broker_id),
254 }
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
264pub struct EmptyDesiredMarket {
265 pub mkt_type: u8,
266 pub is_depth: bool,
267}
268
269pub fn empty_desired_market_for_sub(
270 ftapi_market: i32,
271 sub_type: i32,
272) -> Option<EmptyDesiredMarket> {
273 let mkt_type = ftapi_market_to_quote_mkt(ftapi_market);
274 if mkt_type == 0 {
275 return None;
276 }
277 Some(EmptyDesiredMarket {
278 mkt_type,
279 is_depth: is_depth_sub_type(sub_type),
280 })
281}
282
283#[derive(Debug)]
287pub enum QotSubError {
288 BackendRejected { result: i32, warning: i32 },
290 DecodeFailed(String),
292 Transport(FutuError),
294 UnsupportedMarket { offending: Vec<i32> },
299 PartialMarketFailure { succeeded: Vec<u8>, failed: Vec<u8> },
304}
305
306impl std::fmt::Display for QotSubError {
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 match self {
309 QotSubError::BackendRejected { result, warning } => {
310 write!(
311 f,
312 "backend rejected CMD6211: result={result} warning={warning}"
313 )
314 }
315 QotSubError::DecodeFailed(s) => write!(f, "CMD6211 response decode failed: {s}"),
316 QotSubError::Transport(e) => write!(f, "CMD6211 transport error: {e}"),
317 QotSubError::UnsupportedMarket { offending } => write!(
318 f,
319 "CMD6211 unsupported ftapi_market(s): {offending:?} \
320 (ftapi_market_to_quote_mkt returned 0). Caller must validate \
321 ftapi_market before submit_global_desired_set."
322 ),
323 QotSubError::PartialMarketFailure { succeeded, failed } => write!(
324 f,
325 "CMD6211 partial-market failure: succeeded={succeeded:?} \
326 failed={failed:?}. State is split: succeeded markets are \
327 applied, failed markets need re-submit."
328 ),
329 }
330 }
331}
332
333impl std::error::Error for QotSubError {
334 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
335 match self {
336 QotSubError::Transport(e) => Some(e),
337 _ => None,
338 }
339 }
340}
341
342impl From<FutuError> for QotSubError {
343 fn from(e: FutuError) -> Self {
344 QotSubError::Transport(e)
345 }
346}
347
348pub type SecuritySubscribeInput = (u64, Option<std::num::NonZeroU32>, Vec<(i32, SubBitOptions)>);
354
355pub fn build_subscribe_req_with_options(
369 securities: &[SecuritySubscribeInput],
370) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
371 build_subscribe_req_with_options_inner(securities, None)
372}
373
374pub fn build_keep_subscribe_req_with_options(
375 securities: &[SecuritySubscribeInput],
376) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
377 build_subscribe_req_with_options_inner(securities, Some(1))
380}
381
382fn build_subscribe_req_with_options_inner(
383 securities: &[SecuritySubscribeInput],
384 timer_sub: Option<i32>,
385) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
386 let mut security_list = Vec::new();
387
388 for (stock_id, broker_id, sub_types_with_opts) in securities {
389 let mut bit_info_list = Vec::new();
390 for (st, opts) in sub_types_with_opts {
391 for info in sub_type_to_bit_infos_with_options(*st, *opts) {
392 bit_info_list.push(ft_cmd_stock_quote_sub_data::BitInfo {
393 bit: Some(info.bit),
394 prob: Some(info.prob),
395 prob2: info.prob2,
396 prob2_v2: info.prob2_v2,
397 });
398 }
399 }
400 security_list.push(ft_cmd_stock_quote_sub_data::SecuritySubscribe {
401 security_id: Some(*stock_id),
402 bit_info_list,
403 broker_id: broker_id.map(|nz| nz.get() as i32),
406 });
407 }
408
409 ft_cmd_stock_quote_sub::SubscribeSetReq {
410 security_list,
411 reserved: None,
412 timer_sub,
413 }
414}
415
416pub fn ftapi_market_to_quote_mkt(market: i32) -> u8 {
420 match market {
421 1 => 1, 11 => 2, 21 => 3, 22 => 4, 5 => 5, 6 => 6, 9 => 9, 13 => 13, 31 => 15, 15 => 7, 14 => 8, 16 => 16, 41 => 25, 61 => 27, 23 => 10, 91 => 17, _ => 0,
448 }
449}
450
451fn is_depth_sub_type(sub_type: i32) -> bool {
458 matches!(
459 sub_type,
460 sub_type::ORDER_BOOK | sub_type::ORDER_BOOK_ODD | sub_type::BROKER
461 )
462}
463
464pub async fn submit_global_desired_set(
485 backend: &BackendConn,
486 securities: &[SecurityWithOpts],
487) -> std::result::Result<i32, QotSubError> {
488 submit_global_desired_set_inner(backend, securities, false).await
489}
490
491pub async fn submit_keep_subscribe_desired_set(
496 backend: &BackendConn,
497 securities: &[SecurityWithOpts],
498) -> std::result::Result<i32, QotSubError> {
499 submit_global_desired_set_inner(backend, securities, true).await
500}
501
502async fn submit_global_desired_set_inner(
503 backend: &BackendConn,
504 securities: &[SecurityWithOpts],
505 is_timer_sub: bool,
506) -> std::result::Result<i32, QotSubError> {
507 if securities.is_empty() {
508 return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
512 }
513
514 let offending: Vec<i32> = securities
521 .iter()
522 .filter_map(|sec| {
523 if ftapi_market_to_quote_mkt(sec.ftapi_market) == 0 {
524 Some(sec.ftapi_market)
525 } else {
526 None
527 }
528 })
529 .collect();
530 if !offending.is_empty() {
531 let mut dedup: Vec<i32> = offending;
532 dedup.sort_unstable();
533 dedup.dedup();
534 tracing::warn!(
535 offending = ?dedup,
536 "v1.4.106 audit 0631 F3: submit_global_desired_set rejected — unsupported ftapi_market(s)"
537 );
538 return Err(QotSubError::UnsupportedMarket { offending: dedup });
539 }
540
541 type MktGroup = Vec<SecuritySubscribeInput>;
552 let mut by_market: BTreeMap<u8, MktGroup> = BTreeMap::new();
553 for sec in securities {
554 let mkt = ftapi_market_to_quote_mkt(sec.ftapi_market);
555 debug_assert!(mkt != 0, "F3 validate 应已拒未知 market");
557 by_market.entry(mkt).or_default().push((
558 sec.stock_id,
559 sec.broker_id,
560 sec.sub_types_with_opts.clone(),
561 ));
562 }
563
564 let mut max_sub_count = 0i32;
570 let mut succeeded_markets: Vec<u8> = Vec::new();
571 let mut failed_markets: Vec<u8> = Vec::new();
572 let mut first_transport_err: Option<FutuError> = None;
573 for (mkt_type, secs) in &by_market {
574 let contains_depth_type = secs
575 .iter()
576 .any(|(_, _, sub_types)| sub_types.iter().any(|(st, _)| is_depth_sub_type(*st)));
577 match submit_subscribe_with_market(
578 backend,
579 secs,
580 *mkt_type,
581 contains_depth_type,
582 false,
583 is_timer_sub,
584 )
585 .await
586 {
587 Ok(count) => {
588 if count > max_sub_count {
589 max_sub_count = count;
590 }
591 if !succeeded_markets.contains(mkt_type) {
592 succeeded_markets.push(*mkt_type);
593 }
594 }
595 Err(QotSubError::Transport(e)) => {
596 first_transport_err = Some(e);
599 break;
600 }
601 Err(_) => {
602 if !failed_markets.contains(mkt_type) {
603 failed_markets.push(*mkt_type);
604 }
605 }
606 }
607 }
608 if let Some(e) = first_transport_err {
609 return Err(QotSubError::Transport(e));
610 }
611 if !failed_markets.is_empty() {
612 succeeded_markets.sort_unstable();
613 failed_markets.sort_unstable();
614 tracing::warn!(
615 succeeded = ?succeeded_markets,
616 failed = ?failed_markets,
617 "v1.4.106 audit 0631 F3: submit_global_desired_set partial failure"
618 );
619 return Err(QotSubError::PartialMarketFailure {
620 succeeded: succeeded_markets,
621 failed: failed_markets,
622 });
623 }
624
625 Ok(max_sub_count)
626}
627
628pub async fn submit_empty_desired_set_for_markets(
629 backend: &BackendConn,
630 markets: &[EmptyDesiredMarket],
631) -> std::result::Result<i32, QotSubError> {
632 if markets.is_empty() {
633 return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
634 }
635
636 let groups: Vec<EmptyDesiredMarket> = BTreeSet::from_iter(markets.iter().copied())
637 .into_iter()
638 .collect();
639 if groups.iter().any(|g| g.mkt_type == 0) {
640 return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
641 }
642
643 let mut max_sub_count = 0i32;
644 let mut succeeded_markets: Vec<u8> = Vec::new();
645 let mut failed_markets: Vec<u8> = Vec::new();
646 let mut first_transport_err: Option<FutuError> = None;
647
648 for group in groups {
649 match submit_subscribe_with_market(
650 backend,
651 &[],
652 group.mkt_type,
653 group.is_depth,
654 true,
655 false,
656 )
657 .await
658 {
659 Ok(count) => {
660 max_sub_count = max_sub_count.max(count);
661 if !succeeded_markets.contains(&group.mkt_type) {
662 succeeded_markets.push(group.mkt_type);
663 }
664 }
665 Err(QotSubError::Transport(e)) => {
666 first_transport_err = Some(e);
667 break;
668 }
669 Err(_) => {
670 if !failed_markets.contains(&group.mkt_type) {
671 failed_markets.push(group.mkt_type);
672 }
673 }
674 }
675 }
676
677 if let Some(e) = first_transport_err {
678 return Err(QotSubError::Transport(e));
679 }
680 if !failed_markets.is_empty() {
681 succeeded_markets.sort_unstable();
682 succeeded_markets.dedup();
683 failed_markets.sort_unstable();
684 failed_markets.dedup();
685 return Err(QotSubError::PartialMarketFailure {
686 succeeded: succeeded_markets,
687 failed: failed_markets,
688 });
689 }
690
691 Ok(max_sub_count)
692}
693
694async fn submit_subscribe_with_market(
699 backend: &BackendConn,
700 secs: &[SecuritySubscribeInput],
701 mkt_type: u8,
702 is_depth: bool,
703 is_unsub_all: bool,
704 is_timer_sub: bool,
705) -> std::result::Result<i32, QotSubError> {
706 let req = if is_unsub_all {
707 ft_cmd_stock_quote_sub::SubscribeSetReq {
709 security_list: vec![],
710 reserved: Some(1),
711 timer_sub: None,
712 }
713 } else if is_timer_sub {
714 build_keep_subscribe_req_with_options(secs)
715 } else {
716 build_subscribe_req_with_options(secs)
717 };
718 let body = req.encode_to_vec();
719
720 let mut reserved = [0u8; 10];
721 reserved[0] = mkt_type;
722 let request_bits: Vec<(u64, Vec<(u32, i64)>)> = secs
725 .iter()
726 .map(|(stock_id, _broker_id, sub_types)| {
727 let bits = sub_types
728 .iter()
729 .flat_map(|(sub_type, opts)| sub_type_to_bits_with_options(*sub_type, *opts))
730 .collect();
731 (*stock_id, bits)
732 })
733 .collect();
734
735 tracing::info!(
736 mkt_type,
737 is_depth,
738 is_unsub_all,
739 is_timer_sub,
740 count = secs.len(),
741 body_len = body.len(),
742 request_bits = ?request_bits,
743 "v1.4.106 audit 1131 F1: sending CMD6211 subscribe (set-state)"
744 );
745
746 let resp = backend
747 .request_with_reserved(CMD_QOT_SUB, body, reserved)
748 .await
749 .map_err(QotSubError::Transport)?;
750
751 let parsed: ft_cmd_stock_quote_sub::SubscribeSetRsp = Message::decode(resp.body.as_ref())
752 .map_err(|e| QotSubError::DecodeFailed(format!("{e}")))?;
753
754 let result = parsed.result.unwrap_or(-1);
755 let warning = parsed.warning_code.unwrap_or(0);
756 let max_sub_count = parsed.max_sub_count.unwrap_or(0);
757
758 if result != 0 {
759 tracing::warn!(
761 mkt_type,
762 is_depth,
763 is_timer_sub,
764 result,
765 warning,
766 request_bits = ?request_bits,
767 "v1.4.106 audit 1131 F1: CMD6211 backend rejected"
768 );
769 return Err(QotSubError::BackendRejected { result, warning });
770 }
771
772 tracing::info!(
773 mkt_type,
774 is_depth,
775 is_timer_sub,
776 max_sub_count,
777 "v1.4.106 audit 1131 F1: CMD6211 ok"
778 );
779 Ok(max_sub_count)
780}
781
782#[cfg(test)]
783mod tests;