Skip to main content

futu_backend/
quote_sub.rs

1// 行情订阅管理
2//
3// FTAPI SubType → 后端 SubscribeBit 映射
4// CMD 6211: 发送订阅请求到后端 (set-state semantics, server 覆盖式)
5// CMD 6212: 接收行情推送
6//
7// **v1.4.106 codex 1131 F1+F2** 重构:
8// - submit_global_desired_set 公共 fn 发"全集"CMD6211,并返回 QotSubError。
9//   backend reject / decode 错 / timeout 都 loud 返回,caller 据此决定
10//   ack-then-commit (F1);旧 silent legacy wrapper 已在 v1.4.109 删除。
11// - backend 覆盖式
12//   ("每次请求,客户端都需要提供当前需要订阅的所有股票的所有订阅位")
13//   unsub 通过发**新的更小集**实现 — 与 sub 同 entry 复用 (F2)
14// - SubscribeRequestParams 携带 session / orderbook_detail / broker_detail /
15//   rehab_type / first_push 等 backend req fields (F6)
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use prost::Message;
20
21use futu_core::error::{FutuError, Result};
22
23use crate::conn::BackendConn;
24use crate::proto_internal::ft_cmd_stock_quote_sub;
25use crate::proto_internal::ft_cmd_stock_quote_sub_data;
26use crate::proto_internal::ft_cmd_tick;
27
28mod sub_bits;
29
30pub use sub_bits::{
31    SubBitOptions, SubscribeBitInfo, sub_type_to_bit_infos_with_options, sub_type_to_bits,
32    sub_type_to_bits_with_options,
33};
34
35/// 后端订阅命令 ID
36pub const CMD_QOT_SUB: u16 = 6211;
37/// 后端推送命令 ID
38pub const CMD_QOT_PUSH: u16 = 6212;
39/// 逐笔主动拉取命令 ID.
40///
41/// Ref: C++ `NNBiz_Qot_PullQot.cpp:327-365`
42/// `SendPullTickerListReq(... NN_ProtoCmd_Qot_Pull_Ticker ...)`.
43pub const CMD_QOT_PULL_TICKER: u16 = 6128;
44
45/// C++ `NN_QuoteTickerKey_FetchLatest ((u64_t)-1)`.
46const TICKER_FETCH_LATEST_KEY: u64 = u64::MAX;
47/// C++ `NN_QuotePullTicker_LatestTime ((u32_t)-1)`.
48const TICKER_LATEST_DATE_TIME_S: u32 = u32::MAX;
49
50mod nn_quote_session {
51    pub const RTH: i32 = 0;
52    pub const ETH: i32 = 1;
53    pub const ALL: i32 = 2;
54}
55
56mod tick_period_type {
57    // Ref: C++ `FTCmdTick.proto` TickPeriodType.
58    pub const NORMAL: u32 = 0;
59    pub const BEFORE: u32 = 1;
60    pub const AFTER: u32 = 2;
61    pub const OVERNIGHT: u32 = 4;
62}
63
64/// FTAPI SubType 枚举值
65pub mod sub_type {
66    pub const BASIC: i32 = 1;
67    pub const ORDER_BOOK: i32 = 2;
68    pub const TICKER: i32 = 4;
69    pub const RT: i32 = 5;
70    pub const KL_DAY: i32 = 6;
71    pub const KL_5MIN: i32 = 7;
72    pub const KL_15MIN: i32 = 8;
73    pub const KL_30MIN: i32 = 9;
74    pub const KL_60MIN: i32 = 10;
75    pub const KL_1MIN: i32 = 11;
76    pub const KL_WEEK: i32 = 12;
77    pub const KL_MONTH: i32 = 13;
78    pub const BROKER: i32 = 14;
79    pub const KL_QUARTER: i32 = 15;
80    pub const KL_YEAR: i32 = 16;
81    pub const KL_3MIN: i32 = 17;
82    pub const ORDER_BOOK_ODD: i32 = 22;
83}
84
85fn common_session_to_nn(session: i32) -> i32 {
86    match session {
87        // FTAPI Common.Session: 1=RTH, 2=ETH, 3=ALL.
88        2 => nn_quote_session::ETH,
89        3 => nn_quote_session::ALL,
90        _ => nn_quote_session::RTH,
91    }
92}
93
94fn ticker_periods_for_nn_session(nn_session: i32) -> Vec<u32> {
95    match nn_session {
96        nn_quote_session::ALL => vec![
97            tick_period_type::NORMAL,
98            tick_period_type::BEFORE,
99            tick_period_type::AFTER,
100            tick_period_type::OVERNIGHT,
101        ],
102        nn_quote_session::ETH => vec![
103            tick_period_type::NORMAL,
104            tick_period_type::BEFORE,
105            tick_period_type::AFTER,
106        ],
107        _ => vec![tick_period_type::NORMAL],
108    }
109}
110
111/// 拉取最新逐笔,用于对齐 C++ Qot_Sub 成功后“订阅逐笔要提前拉一根”。
112///
113/// Ref:
114/// - `APIServer_Qot_Sub.cpp:265-280`: subscribe Ticker 后调用
115///   `PullNewestTickerList_Lot(..., 1, false, SessionToNN(enSession))`.
116/// - `NNBiz_Qot_PullQot.cpp:126-129`: 非 US 强制 RTH,再从 latest key 拉取。
117/// - `NNBiz_Qot_PullQot.cpp:327-365`: CMD6128 body + reserved[0]=head market,
118///   reserved[1]=NN_QuoteExType_SECURITY(0).
119pub async fn pull_latest_ticker(
120    backend: &BackendConn,
121    stock_id: u64,
122    nn_mkt_type: u8,
123    common_session: i32,
124    pull_count: u32,
125    broker_id: Option<i32>,
126) -> Result<ft_cmd_tick::TickRsp> {
127    if stock_id == 0 || pull_count == 0 {
128        return Err(FutuError::Codec(format!(
129            "PullLatestTicker: invalid stock_id={stock_id} pull_count={pull_count}"
130        )));
131    }
132
133    // C++ `PullNewestTickerList`: only US keeps ETH/ALL; other markets collapse
134    // to RTH because backend does not distinguish pre/after sessions there.
135    let nn_session = if nn_mkt_type == ftapi_market_to_quote_mkt(11) {
136        common_session_to_nn(common_session)
137    } else {
138        nn_quote_session::RTH
139    };
140
141    let req = ft_cmd_tick::TickReq {
142        security_id: Some(stock_id),
143        date_time_s: Some(TICKER_LATEST_DATE_TIME_S),
144        begin_tick_key: Some(TICKER_FETCH_LATEST_KEY),
145        tick_count: Some(pull_count),
146        tick_period_type: None,
147        tick_period_type_ex: ticker_periods_for_nn_session(nn_session),
148        req_auth: None,
149        end_tick_key: None,
150        date_time_s_v2: None,
151        // v1.4.110 codex Phase 3 Slice 6a: caller-provided broker_id (crypto-only).
152        // 对齐 C++ NNBiz_Qot_PullQot.cpp:344-349 `pbReq.set_broker_id(...)`.
153        broker_id,
154    };
155
156    let mut reserved = [0u8; 10];
157    reserved[0] = nn_mkt_type;
158    // reserved[1] = 0 == NN_QuoteExType_SECURITY.
159
160    let frame = backend
161        .request_with_reserved(CMD_QOT_PULL_TICKER, req.encode_to_vec(), reserved)
162        .await?;
163    let rsp: ft_cmd_tick::TickRsp = Message::decode(frame.body.as_ref())?;
164    let result = rsp.result.unwrap_or(-1);
165    if result != 0 {
166        return Err(FutuError::ServerError {
167            ret_type: result,
168            msg: format!("CMD6128 PullLatestTicker result={result}"),
169        });
170    }
171    Ok(rsp)
172}
173
174/// 后端 SubscribeBit 值
175pub mod sbit {
176    pub const PRICE: u32 = 0;
177    pub const STOCK_STATE: u32 = 1;
178    pub const STOCK_TYPE_SPECIFIC: u32 = 2;
179    pub const ORDER_BOOK: u32 = 3;
180    pub const ORDER_BOOK_DETAIL: u32 = 4; // v1.4.106 codex 1131 F6: detail 走独立 bit
181    pub const DEAL_STATISTICS: u32 = 5;
182    pub const HK_BROKER_QUEUE: u32 = 9;
183    pub const HK_BROKER_DETAIL: u32 = 10; // v1.4.106 codex 1131 F6: HK broker detail
184    pub const US_PREMARKET_AFTERHOURS: u32 = 13;
185    pub const US_LV2_ORDER: u32 = 17;
186    pub const TIME_SHARING: u32 = 20;
187    pub const KLINE_1MIN: u32 = 21;
188    pub const KLINE_3MIN: u32 = 22;
189    pub const KLINE_5MIN: u32 = 23;
190    pub const KLINE_15MIN: u32 = 24;
191    pub const KLINE_30MIN: u32 = 25;
192    pub const KLINE_60MIN: u32 = 26;
193    pub const KLINE_DAY: u32 = 27;
194    pub const KLINE_WEEK: u32 = 28;
195    pub const KLINE_MONTH: u32 = 29;
196    pub const KLINE_QUARTER: u32 = 30;
197    pub const KLINE_YEAR: u32 = 31;
198    pub const TICK: u32 = 35;
199    pub const MEGER_LV2_ORDER: u32 = 39;
200}
201
202/// **v1.4.106 codex 1131 F1+F2**: 单个 (security, sub_types_with_opts) 集合.
203/// 让 caller 一次表达 (stock_id, ftapi_market, [(sub_type, SubBitOptions)]).
204///
205/// **v1.4.110 Phase 2 Slice 4**: 从 3-tuple 升级为 struct, 加 `broker_id` 字段
206/// 让 CMD6211 `SecuritySubscribe.broker_id` (FTCmdStockQuoteSubData.proto:269,
207/// 对齐 C++ `MktQotSub.cpp:454-463 SecuritySubscribe::set_broker_id`) 真正写
208/// 出去. broker_id = `None` 走 no-broker 路径 (普通股); `Some(NonZeroU32)`
209/// = crypto multi-broker 隔离.
210///
211/// 当前 (Phase 2) 所有 caller 都传 `broker_id = None` (Phase 3 才在 SubHandler
212/// 入口接 `securityFirm` resolve 写 broker, 见 codex 调研 12:13 增量).
213#[derive(Debug, Clone)]
214pub struct SecurityWithOpts {
215    pub stock_id: u64,
216    pub ftapi_market: i32,
217    pub sub_types_with_opts: Vec<(i32, SubBitOptions)>,
218    /// **v1.4.110 Phase 2 Slice 4**: broker-aware 订阅 (C++ `StockKey(stockID, brokerID)`,
219    /// 见 `qot_stock_key.rs` Phase 1 doc).
220    /// - `None`: C++ `m_hasBroker = false`, CMD6211 不写 broker_id field
221    /// - `Some(N)`: C++ `m_hasBroker = true`, CMD6211 写 broker_id = N
222    pub broker_id: Option<std::num::NonZeroU32>,
223}
224
225impl SecurityWithOpts {
226    /// 不带 broker 的快捷构造 (普通股 / 非 crypto).
227    pub fn new(
228        stock_id: u64,
229        ftapi_market: i32,
230        sub_types_with_opts: Vec<(i32, SubBitOptions)>,
231    ) -> Self {
232        Self {
233            stock_id,
234            ftapi_market,
235            sub_types_with_opts,
236            // v1.4.110 codex audit P3 #10: 默认 no-broker. 普通股 / non-crypto
237            // 路径走这里; crypto multi-broker 用 `with_broker(...)` 构造.
238            broker_id: None,
239        }
240    }
241
242    /// 带 broker 的快捷构造 (crypto multi-broker). `broker_id = 0` 自动降级 None.
243    pub fn with_broker(
244        stock_id: u64,
245        ftapi_market: i32,
246        sub_types_with_opts: Vec<(i32, SubBitOptions)>,
247        broker_id: u32,
248    ) -> Self {
249        Self {
250            stock_id,
251            ftapi_market,
252            sub_types_with_opts,
253            broker_id: std::num::NonZeroU32::new(broker_id),
254        }
255    }
256}
257
258/// Explicit backend market group for an empty CMD6211 set-state request.
259///
260/// Empty requests are used when the last desired item in a market bucket is
261/// removed. They still must carry the original backend route market in protocol
262/// header `reserved[0]`; routeMarketID=0 is rejected by backend.
263#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
264pub struct EmptyDesiredMarket {
265    pub mkt_type: u8,
266    pub is_depth: bool,
267}
268
269pub fn empty_desired_market_for_sub(
270    ftapi_market: i32,
271    sub_type: i32,
272) -> Option<EmptyDesiredMarket> {
273    let mkt_type = ftapi_market_to_quote_mkt(ftapi_market);
274    if mkt_type == 0 {
275        return None;
276    }
277    Some(EmptyDesiredMarket {
278        mkt_type,
279        is_depth: is_depth_sub_type(sub_type),
280    })
281}
282
283/// **v1.4.106 codex 1131 F1**: `submit_global_desired_set` 错误类型.
284/// 让 caller (SubHandler) 区分 backend reject vs decode err vs timeout →
285/// ack-then-commit pattern (F1 P1).
286#[derive(Debug)]
287pub enum QotSubError {
288    /// backend 返了 SubscribeSetRsp.result != 0 (reject), 携带 result 数 + warning.
289    BackendRejected { result: i32, warning: i32 },
290    /// 响应 decode 失败 — 该批次状态未知, 不写 local state.
291    DecodeFailed(String),
292    /// 网络 / TCP 错误 (timeout / 连接断). 透传 inner.
293    Transport(FutuError),
294    /// **v1.4.106 codex 0631 F3 [P2]**: caller 传了一个 backend 不识别的
295    /// `ftapi_market` (`ftapi_market_to_quote_mkt → 0`). 防御性 fail loud:
296    /// 不发任何 CMD6211, 整批 reject. caller 应早 validate 后再调.
297    /// 携带 offending list 让 caller 报清晰错给用户.
298    UnsupportedMarket { offending: Vec<i32> },
299    /// **v1.4.106 codex 0631 F3 [P2]**: 多 market 分批发送时, 部分市场
300    /// backend 失败 (BackendRejected / DecodeFailed) 但其它 OK — 该批次
301    /// 是部分应用 (split state). caller 不能当全成功, 需明示用户
302    /// "succeeded markets 已生效, failed 需重发".
303    PartialMarketFailure { succeeded: Vec<u8>, failed: Vec<u8> },
304}
305
306impl std::fmt::Display for QotSubError {
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        match self {
309            QotSubError::BackendRejected { result, warning } => {
310                write!(
311                    f,
312                    "backend rejected CMD6211: result={result} warning={warning}"
313                )
314            }
315            QotSubError::DecodeFailed(s) => write!(f, "CMD6211 response decode failed: {s}"),
316            QotSubError::Transport(e) => write!(f, "CMD6211 transport error: {e}"),
317            QotSubError::UnsupportedMarket { offending } => write!(
318                f,
319                "CMD6211 unsupported ftapi_market(s): {offending:?} \
320                 (ftapi_market_to_quote_mkt returned 0). Caller must validate \
321                 ftapi_market before submit_global_desired_set."
322            ),
323            QotSubError::PartialMarketFailure { succeeded, failed } => write!(
324                f,
325                "CMD6211 partial-market failure: succeeded={succeeded:?} \
326                 failed={failed:?}. State is split: succeeded markets are \
327                 applied, failed markets need re-submit."
328            ),
329        }
330    }
331}
332
333impl std::error::Error for QotSubError {
334    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
335        match self {
336            QotSubError::Transport(e) => Some(e),
337            _ => None,
338        }
339    }
340}
341
342impl From<FutuError> for QotSubError {
343    fn from(e: FutuError) -> Self {
344        QotSubError::Transport(e)
345    }
346}
347
348/// **v1.4.110 Phase 2 Slice 4**: per-security 输入 — `(stock_id, broker_id, sub_types_with_opts)`
349/// 3-tuple. broker_id = `None` 走 no-broker 路径 (普通股, Phase 2 默认),
350/// `Some(NonZeroU32)` 走 broker-aware 路径 (crypto multi-broker).
351///
352/// 抽出 type alias 防 clippy `type_complexity` warn.
353pub type SecuritySubscribeInput = (u64, Option<std::num::NonZeroU32>, Vec<(i32, SubBitOptions)>);
354
355/// **v1.4.106 codex 1131 F6**: 构建带 SubBitOptions 的订阅请求 — 真传 session
356/// / detail / extended_time 给 backend.
357///
358/// **v1.4.110 Phase 2 Slice 4**: 升 3-tuple `(stock_id, broker_id, sub_types)`
359/// 让 SecuritySubscribe.broker_id 真写出去 (对齐 C++ `MktQotSub.cpp:454-463`):
360/// ```cpp
361/// SecuritySubscribe::set_security_id(stSecKey.nStockID);
362/// if (stSecKey.HasBroker()) {
363///     SecuritySubscribe::set_broker_id(stSecKey.GetBrokerID());
364/// }
365/// ```
366/// - `broker_id = None` ⟺ C++ `m_hasBroker = false`, 不调 `set_broker_id`
367/// - `broker_id = Some(N)` ⟺ C++ `m_hasBroker = true`, 写 `broker_id = N`
368pub fn build_subscribe_req_with_options(
369    securities: &[SecuritySubscribeInput],
370) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
371    build_subscribe_req_with_options_inner(securities, None)
372}
373
374pub fn build_keep_subscribe_req_with_options(
375    securities: &[SecuritySubscribeInput],
376) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
377    // Ref: C++ `MktQotSub.cpp:532-535` stores the last normal SubscribeSetReq
378    // and sets `timer_sub=1` only for periodic keep-sub replay.
379    build_subscribe_req_with_options_inner(securities, Some(1))
380}
381
382fn build_subscribe_req_with_options_inner(
383    securities: &[SecuritySubscribeInput],
384    timer_sub: Option<i32>,
385) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
386    let mut security_list = Vec::new();
387
388    for (stock_id, broker_id, sub_types_with_opts) in securities {
389        let mut bit_info_list = Vec::new();
390        for (st, opts) in sub_types_with_opts {
391            for info in sub_type_to_bit_infos_with_options(*st, *opts) {
392                bit_info_list.push(ft_cmd_stock_quote_sub_data::BitInfo {
393                    bit: Some(info.bit),
394                    prob: Some(info.prob),
395                    prob2: info.prob2,
396                    prob2_v2: info.prob2_v2,
397                });
398            }
399        }
400        security_list.push(ft_cmd_stock_quote_sub_data::SecuritySubscribe {
401            security_id: Some(*stock_id),
402            bit_info_list,
403            // v1.4.110 Phase 2 Slice 4: broker-aware CMD6211 wire.
404            // Some(NZ) → 写 i32 (crypto multi-broker); None → 不写 (普通股).
405            broker_id: broker_id.map(|nz| nz.get() as i32),
406        });
407    }
408
409    ft_cmd_stock_quote_sub::SubscribeSetReq {
410        security_list,
411        reserved: None,
412        timer_sub,
413    }
414}
415
416/// FTAPI market → 后端 NN_QuoteMktType 映射
417///
418/// 0 = unknown (caller should reject loud per F7).
419pub fn ftapi_market_to_quote_mkt(market: i32) -> u8 {
420    match market {
421        1 => 1,  // HK → NN_QuoteMktType_HK
422        11 => 2, // US → NN_QuoteMktType_US
423        21 => 3, // SH → NN_QuoteMktType_SH
424        22 => 4, // SZ → NN_QuoteMktType_SZ
425        // Internal subscribe-market markers derived from static security info.
426        // Public QotMarket maps HK futures back to HK_Security (C++
427        // APIServer_Inner_API.cpp::Market_NNToAPI), but CMD6211 reserved[0]
428        // still needs the backend NN_QuoteMktType. These values are not
429        // client-facing QotMarket values; SubHandler derives them from
430        // CachedSecurityInfo.sec_type/mkt_id before entering this function.
431        5 => 5,   // HK futures legacy → NN_QuoteMktType_FUT_HK
432        6 => 6,   // HK futures current → NN_QuoteMktType_FUT_HK_NEW
433        9 => 9,   // HK_OPTIONS → NN_QuoteMktType_HK_OPTIONS
434        13 => 13, // SG futures → NN_QuoteMktType_SG_FUTURE
435        31 => 15, // SG stock → NN_QuoteMktType_SG_SECURITY
436        15 => 7,  // US_OPTIONS → NN_QuoteMktType_US_OPTIONS
437        14 => 8,  // US_FUTURE → NN_QuoteMktType_US_FUT
438        16 => 16, // JP futures → NN_QuoteMktType_JP_FUTURE
439        41 => 25, // JP stock → NN_QuoteMktType_JP_SECURITY
440        61 => 27, // MY stock → NN_QuoteMktType_MY_SECURITY
441        23 => 10, // SH_KCB → NN_QuoteMktType_SH_KCB
442        // v1.4.110 codex Phase 3 Slice 6b: QotMarket_CC_Security=91 (Crypto)
443        // → NN_QuoteMktType_CRYPTO=17. C++ `NNBase_Define_Enum.h:517`.
444        91 => 17, // Crypto → NN_QuoteMktType_CRYPTO
445        // F7 [P2]: AU=51 / CA=71 / FX=81 当前无明确 NN_QuoteMktType 对应。
446        // SubHandler 会按 F7 loud reject ftapi_market_to_quote_mkt==0 的 case.
447        _ => 0,
448    }
449}
450
451/// "深度"类 SubType.
452///
453/// 仅用于旧的 empty desired-set bucket 标记和 trace;非空 CMD6211 不能按
454/// normal/depth 拆分。CMD6211 是 backend 覆盖式 set-state,请求必须携带同一
455/// market 的完整订阅 bit 集合;C++ `MktQotSub.cpp::Timer_SendSubReq` 也是每个
456/// market 发一条 `SubscribeSetReq`,在同一个 `SecuritySubscribe` 内追加全部 bit。
457fn is_depth_sub_type(sub_type: i32) -> bool {
458    matches!(
459        sub_type,
460        sub_type::ORDER_BOOK | sub_type::ORDER_BOOK_ODD | sub_type::BROKER
461    )
462}
463
464/// **v1.4.106 codex 1131 F1+F2**: 给 caller (SubHandler) 的 set-state 接口.
465///
466/// **语义**: 发送整组 desired (stock_id, ftapi_market, sub_type_with_opts) →
467/// backend 返 SubscribeSetRsp. backend "覆盖式" — 所有不在新集合中的旧订阅
468/// 自动取消 (per FTCmdStockQuoteSub.proto 设计 doc:
469/// "server会覆盖此客户端之前的订阅,并主动推送一次新增加的股票订阅位数据").
470///
471/// **F1 ack-then-commit**: caller 必须仅在 Ok 后才写 SubscriptionManager state.
472/// Err 时 → 不写 state, 返用户 ret_type=-1 + 错误原因.
473///
474/// **F2 unsub via fresh set**: 退订通过传"new desired set 不含 removed key"
475/// 实现, 不调单独 unsub backend cmd. SubHandler 计算
476/// `current global - removed` 后调本 fn.
477///
478/// **max_sub_count**: 响应中 backend 下发的 quota — caller 应调
479/// `SubscriptionManager::set_total_quota_from_backend(max_sub_count as u32)`
480/// 同步真值 (F5).
481///
482/// **return 值**: backend 下发的 max_sub_count (caller 据此 update SubscriptionManager
483/// 总配额 — F5 P2 dynamic quota).
484pub async fn submit_global_desired_set(
485    backend: &BackendConn,
486    securities: &[SecurityWithOpts],
487) -> std::result::Result<i32, QotSubError> {
488    submit_global_desired_set_inner(backend, securities, false).await
489}
490
491/// Periodically replay the current CMD6211 desired set with `timer_sub=1`.
492///
493/// Ref: C++ `MktQotSub.cpp:543-580` `Timer_KeepSubStatus()` sends the cached
494/// `m_pbLastSendReq`; `MktQotSub.cpp:532-535` sets `timer_sub=1` for that replay.
495pub async fn submit_keep_subscribe_desired_set(
496    backend: &BackendConn,
497    securities: &[SecurityWithOpts],
498) -> std::result::Result<i32, QotSubError> {
499    submit_global_desired_set_inner(backend, securities, true).await
500}
501
502async fn submit_global_desired_set_inner(
503    backend: &BackendConn,
504    securities: &[SecurityWithOpts],
505    is_timer_sub: bool,
506) -> std::result::Result<i32, QotSubError> {
507    if securities.is_empty() {
508        // Empty desired-set submit is ambiguous without the original backend
509        // route market. Call `submit_empty_desired_set_for_markets` with the
510        // market bucket(s) being cleared instead.
511        return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
512    }
513
514    // **v1.4.106 codex 0631 F3 [P2]**: 入口 validate 全部 ftapi_market 已知.
515    // 任一未知 → Err(UnsupportedMarket{offending}), 不发任何 CMD6211 请求.
516    //
517    // 老代码 (v1.4.106 codex 1131 F7) 是 silent skip + warn —— 全部未知时返
518    // Ok(()) 但 0 backend 请求 = silent success (CLAUDE.md 反模式 D
519    // silent-success). 修法: validate first, fail loud, caller 早 reject.
520    let offending: Vec<i32> = securities
521        .iter()
522        .filter_map(|sec| {
523            if ftapi_market_to_quote_mkt(sec.ftapi_market) == 0 {
524                Some(sec.ftapi_market)
525            } else {
526                None
527            }
528        })
529        .collect();
530    if !offending.is_empty() {
531        let mut dedup: Vec<i32> = offending;
532        dedup.sort_unstable();
533        dedup.dedup();
534        tracing::warn!(
535            offending = ?dedup,
536            "v1.4.106 audit 0631 F3: submit_global_desired_set rejected — unsupported ftapi_market(s)"
537        );
538        return Err(QotSubError::UnsupportedMarket { offending: dedup });
539    }
540
541    // 按后端市场类型分组(C++ 每个市场独立发送订阅请求)。
542    //
543    // 不再按 normal/depth 拆分: CMD6211 是覆盖式 set-state;同一 market 拆两条
544    // 请求会让后发请求覆盖先发请求,表现为 QUOTE 或 ORDER_BOOK 只能有一边持续
545    // push。C++ 对齐点: `MktQotSub.cpp::Timer_SendSubReq` 为一个 market 构造
546    // 单个 `SubscribeSetReq`,同一个 security 内聚合全部 `SubBit`。
547    //
548    // v1.4.110 Phase 2 Slice 4: MktGroup 内每项升 `(stock_id, broker_id, sub_types_with_opts)`
549    // 3-tuple, 让 build_subscribe_req_with_options 能写出
550    // SecuritySubscribe.broker_id.
551    type MktGroup = Vec<SecuritySubscribeInput>;
552    let mut by_market: BTreeMap<u8, MktGroup> = BTreeMap::new();
553    for sec in securities {
554        let mkt = ftapi_market_to_quote_mkt(sec.ftapi_market);
555        // F3 已 validate, 不可能是 0 — 此处 debug_assert.
556        debug_assert!(mkt != 0, "F3 validate 应已拒未知 market");
557        by_market.entry(mkt).or_default().push((
558            sec.stock_id,
559            sec.broker_id,
560            sec.sub_types_with_opts.clone(),
561        ));
562    }
563
564    // **v1.4.106 codex 0631 F3 [P2]**: 多 market 分批发送, 部分失败 → 不能当
565    // 全成功. 收集 succeeded / failed market 列表, 返 PartialMarketFailure.
566    //
567    // 老代码 .await? 短路: 第一个失败市场退出, max_sub_count=0 — caller 看不到
568    // 哪些市场已成功 (split state). 改为收集所有结果, 部分失败 loud Err.
569    let mut max_sub_count = 0i32;
570    let mut succeeded_markets: Vec<u8> = Vec::new();
571    let mut failed_markets: Vec<u8> = Vec::new();
572    let mut first_transport_err: Option<FutuError> = None;
573    for (mkt_type, secs) in &by_market {
574        let contains_depth_type = secs
575            .iter()
576            .any(|(_, _, sub_types)| sub_types.iter().any(|(st, _)| is_depth_sub_type(*st)));
577        match submit_subscribe_with_market(
578            backend,
579            secs,
580            *mkt_type,
581            contains_depth_type,
582            false,
583            is_timer_sub,
584        )
585        .await
586        {
587            Ok(count) => {
588                if count > max_sub_count {
589                    max_sub_count = count;
590                }
591                if !succeeded_markets.contains(mkt_type) {
592                    succeeded_markets.push(*mkt_type);
593                }
594            }
595            Err(QotSubError::Transport(e)) => {
596                // Transport error: 网络断 / TCP 错 — 整批中断, 透传不分批.
597                // 这种情况下 partial 没意义 (后续市场也会同样 Transport 错).
598                first_transport_err = Some(e);
599                break;
600            }
601            Err(_) => {
602                if !failed_markets.contains(mkt_type) {
603                    failed_markets.push(*mkt_type);
604                }
605            }
606        }
607    }
608    if let Some(e) = first_transport_err {
609        return Err(QotSubError::Transport(e));
610    }
611    if !failed_markets.is_empty() {
612        succeeded_markets.sort_unstable();
613        failed_markets.sort_unstable();
614        tracing::warn!(
615            succeeded = ?succeeded_markets,
616            failed = ?failed_markets,
617            "v1.4.106 audit 0631 F3: submit_global_desired_set partial failure"
618        );
619        return Err(QotSubError::PartialMarketFailure {
620            succeeded: succeeded_markets,
621            failed: failed_markets,
622        });
623    }
624
625    Ok(max_sub_count)
626}
627
628pub async fn submit_empty_desired_set_for_markets(
629    backend: &BackendConn,
630    markets: &[EmptyDesiredMarket],
631) -> std::result::Result<i32, QotSubError> {
632    if markets.is_empty() {
633        return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
634    }
635
636    let groups: Vec<EmptyDesiredMarket> = BTreeSet::from_iter(markets.iter().copied())
637        .into_iter()
638        .collect();
639    if groups.iter().any(|g| g.mkt_type == 0) {
640        return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
641    }
642
643    let mut max_sub_count = 0i32;
644    let mut succeeded_markets: Vec<u8> = Vec::new();
645    let mut failed_markets: Vec<u8> = Vec::new();
646    let mut first_transport_err: Option<FutuError> = None;
647
648    for group in groups {
649        match submit_subscribe_with_market(
650            backend,
651            &[],
652            group.mkt_type,
653            group.is_depth,
654            true,
655            false,
656        )
657        .await
658        {
659            Ok(count) => {
660                max_sub_count = max_sub_count.max(count);
661                if !succeeded_markets.contains(&group.mkt_type) {
662                    succeeded_markets.push(group.mkt_type);
663                }
664            }
665            Err(QotSubError::Transport(e)) => {
666                first_transport_err = Some(e);
667                break;
668            }
669            Err(_) => {
670                if !failed_markets.contains(&group.mkt_type) {
671                    failed_markets.push(group.mkt_type);
672                }
673            }
674        }
675    }
676
677    if let Some(e) = first_transport_err {
678        return Err(QotSubError::Transport(e));
679    }
680    if !failed_markets.is_empty() {
681        succeeded_markets.sort_unstable();
682        succeeded_markets.dedup();
683        failed_markets.sort_unstable();
684        failed_markets.dedup();
685        return Err(QotSubError::PartialMarketFailure {
686            succeeded: succeeded_markets,
687            failed: failed_markets,
688        });
689    }
690
691    Ok(max_sub_count)
692}
693
694/// 发送单个 (mkt_type, is_depth) 的 CMD6211 请求, 返 max_sub_count.
695///
696/// v1.4.110 Phase 2 Slice 4: secs 升 3-tuple `(stock_id, broker_id, sub_types)`
697/// 让 build_subscribe_req_with_options 能写出 SecuritySubscribe.broker_id.
698async fn submit_subscribe_with_market(
699    backend: &BackendConn,
700    secs: &[SecuritySubscribeInput],
701    mkt_type: u8,
702    is_depth: bool,
703    is_unsub_all: bool,
704    is_timer_sub: bool,
705) -> std::result::Result<i32, QotSubError> {
706    let req = if is_unsub_all {
707        // 全退场景 — security_list 空, reserved=1 让 body 非零 (Windows backend 兼容).
708        ft_cmd_stock_quote_sub::SubscribeSetReq {
709            security_list: vec![],
710            reserved: Some(1),
711            timer_sub: None,
712        }
713    } else if is_timer_sub {
714        build_keep_subscribe_req_with_options(secs)
715    } else {
716        build_subscribe_req_with_options(secs)
717    };
718    let body = req.encode_to_vec();
719
720    let mut reserved = [0u8; 10];
721    reserved[0] = mkt_type;
722    // v1.4.110 Phase 2 Slice 4: request_bits 是 trace 用, 不必含 broker_id;
723    // SecuritySubscribe.broker_id 在 build_subscribe_req_with_options 内部写.
724    let request_bits: Vec<(u64, Vec<(u32, i64)>)> = secs
725        .iter()
726        .map(|(stock_id, _broker_id, sub_types)| {
727            let bits = sub_types
728                .iter()
729                .flat_map(|(sub_type, opts)| sub_type_to_bits_with_options(*sub_type, *opts))
730                .collect();
731            (*stock_id, bits)
732        })
733        .collect();
734
735    tracing::info!(
736        mkt_type,
737        is_depth,
738        is_unsub_all,
739        is_timer_sub,
740        count = secs.len(),
741        body_len = body.len(),
742        request_bits = ?request_bits,
743        "v1.4.106 audit 1131 F1: sending CMD6211 subscribe (set-state)"
744    );
745
746    let resp = backend
747        .request_with_reserved(CMD_QOT_SUB, body, reserved)
748        .await
749        .map_err(QotSubError::Transport)?;
750
751    let parsed: ft_cmd_stock_quote_sub::SubscribeSetRsp = Message::decode(resp.body.as_ref())
752        .map_err(|e| QotSubError::DecodeFailed(format!("{e}")))?;
753
754    let result = parsed.result.unwrap_or(-1);
755    let warning = parsed.warning_code.unwrap_or(0);
756    let max_sub_count = parsed.max_sub_count.unwrap_or(0);
757
758    if result != 0 {
759        // **F1 P1**: backend reject → 让 caller 知道 (Err), 不 silent-warn.
760        tracing::warn!(
761            mkt_type,
762            is_depth,
763            is_timer_sub,
764            result,
765            warning,
766            request_bits = ?request_bits,
767            "v1.4.106 audit 1131 F1: CMD6211 backend rejected"
768        );
769        return Err(QotSubError::BackendRejected { result, warning });
770    }
771
772    tracing::info!(
773        mkt_type,
774        is_depth,
775        is_timer_sub,
776        max_sub_count,
777        "v1.4.106 audit 1131 F1: CMD6211 ok"
778    );
779    Ok(max_sub_count)
780}
781
782#[cfg(test)]
783mod tests;