Skip to main content

futu_server/
subscription.rs

1// 订阅管理:行情订阅 + 交易账户推送订阅 + 通知订阅
2//
3// v1.4.106 codex 1131 F3 [P1] (BLOCKER fix): C++ 把"backend 订阅状态"
4// (`m_setSub`) 与"push 注册状态" (`m_mapRegPush`) 分开维护. Rust 之前
5// 把两者塞同一 `qot_subs` map → `Qot_RegQotPush(register=true)` 制造
6// 假订阅, `Qot_RegQotPush(register=false)` 误删真订阅, GetSubInfo 把
7// push 注册当订阅项报告. 本版彻底拆开:
8//   - `qot_subs: HashMap<(SecurityKey, SubType), HashSet<ConnId>>`
9//     = desired sub state, 决定 backend CMD 6211 desired set + GetSubInfo
10//   - `qot_push_regs: HashMap<(SecurityKey, SubType, RehabType), HashSet<ConnId>>`
11//     = push 注册 state, 决定 PushDispatcher 路由对象, 不影响订阅
12// C++ 对照:
13//   - QotSubscribe.cpp:107-108 m_setSub[(stockID, subType)].insert(connID)
14//   - QotSubscribe.cpp:489-490 m_mapRegPush[(stockID, subType, rehabType)].insert(connID)
15//   - QotSubscribe.cpp:639-649 GetPushConn(stockID, subType, rehabType)
16//
17// v1.4.110 codex Phase 3 closeout (broker-aware key model): C++ QOT 模块
18// (subscription / cache / push / quota) 以 `StockKey` (=stockID + 可选
19// brokerID) 为 first-class identity. Rust 旧 String facade 已从生产
20// `SubscriptionManager` 删除;内部状态直接以 `QotSecurityKey` keyed.
21//
22// C++ 对照 (QotSubscribe.h):
23//   - Map_t<SubType, Set_t<StockKey>> m_mapSub  (per-conn)
24//   - Set_t<(StockKey, SubType)>      m_setSub  (global)
25//   - Map_t<(StockKey, SubType, RehabType), Set<ConnID>> m_mapRegPush
26//   - Map_t<(ConnID, StockKey), bool> m_mapConnOrderBookDetail / BrokerDetail
27
28mod push_regs;
29mod views;
30
31use std::collections::{HashMap, HashSet};
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::time::{Duration, Instant};
34
35use futu_core::qot_stock_key::QotSecurityKey;
36use parking_lot::RwLock;
37
38/// 订阅管理器
39pub struct SubscriptionManager {
40    /// 通知订阅:哪些连接订阅了系统通知
41    notify_subs: RwLock<HashSet<u64>>,
42
43    /// 交易账户推送订阅:acc_id → Set<conn_id>
44    trd_acc_subs: RwLock<HashMap<u64, HashSet<u64>>>,
45
46    /// **行情订阅** (desired sub state, 对齐 C++ `m_setSub`):
47    ///   key = (QotSecurityKey, sub_type), val = 订阅 conn 集合.
48    qot_subs: RwLock<HashMap<(QotSecurityKey, i32), HashSet<u64>>>,
49
50    /// **行情 push 注册** (对齐 C++ `m_mapRegPush`):
51    ///   key = (QotSecurityKey, sub_type, rehab_type), val = 注册接收 push 的 conn 集合.
52    /// rehab_type 仅 KL 类有效 (None=0 / Forward=1 / Backword=2 / N/A=0 for non-KL).
53    qot_push_regs: RwLock<QotPushRegistrations>,
54
55    /// **每 (security, sub_type) 的 desired session** (对齐 C++
56    /// `m_mapConnTickerSession` / `m_mapConnKLRTSession` global view):
57    /// max(per-conn session) 决定 backend desired session.
58    /// session: 0=Unknown / 1=RTH / 2=ETH / 3=ALL / 5=OVERNIGHT (rejected).
59    qot_sub_sessions: RwLock<HashMap<(QotSecurityKey, i32), HashMap<u64, i32>>>,
60
61    /// **每 (security) 的 OrderBook detail flag** (对齐 C++
62    /// `m_mapConnOrderBookDetail`): 一旦有 conn 要 detail, 全局走 detail.
63    qot_orderbook_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
64
65    /// **每 (security) 的 Broker detail flag** (对齐 C++
66    /// `m_mapConnBrokerDetail`).
67    qot_broker_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
68
69    /// **总 quota 上限** (对齐 C++ `INNData_APIInterLimit::GetSubQuota()`):
70    /// 启动 hardcode 4000 fallback, backend SubscribeSetRsp.max_sub_count 下发
71    /// 后 setter 更新. 不再用静态 const.
72    total_quota: RwLock<u32>,
73
74    /// 全局订阅时间 (key = (QotSecurityKey, sub_type)).
75    /// 对齐 C++ `m_mapSubTime`: 只有全局第一次订阅该 SubKey 或 backend
76    /// 属性升级重新拉取时才刷新;退订前至少等待 `QOT_MIN_UNSUB_ELAPSED_SECS`.
77    qot_sub_times: RwLock<HashMap<(QotSecurityKey, i32), Instant>>,
78
79    /// 已断开的 conn_id,但其 QOT 订阅还没达到 C++ 最短退订窗口。
80    ///
81    /// 对齐 C++ `QotSubscribe::ClearConnSubInfo`: 断线时 push 注册立即清,
82    /// 但 `m_setSub` 只有在 `IsSubTimeEnoughToUnSub` 后才移除;没到窗口的
83    /// 连接由后续定时清理再次尝试。
84    qot_disconnected_conns: RwLock<HashSet<u64>>,
85
86    /// 断线延迟清理导致 global desired set 变化的 generation。
87    ///
88    /// server 层没有 backend 句柄,不能在 `on_disconnect` 里直接发 CMD6211。
89    /// 这里仅记录“需要 gateway 同步”的单调计数,gateway 后台任务看到变化后
90    /// 发当前 desired set。
91    qot_disconnect_sync_generation: AtomicU64,
92}
93
94#[derive(Default)]
95struct QotPushRegistrations {
96    by_tuple: HashMap<(QotSecurityKey, i32, i32), HashSet<u64>>,
97    qot_push_regs_by_cache_key: HashMap<String, HashMap<(i32, i32), HashSet<u64>>>,
98}
99
100/// 总订阅额度上限 fallback (启动时未从 backend 拉到真值前用此).
101///
102/// 对齐 C++ `QotSubscribe.cpp:1132` `GetUserSubQuota()` 默认 4000.
103/// 真实 quota 由 backend `SubscribeSetRsp.max_sub_count` 在 CMD 6211 响应
104/// 里下发, daemon 收到后调 `set_total_quota_from_backend(value)` 更新
105/// `total_quota`.
106pub const TOTAL_QUOTA: u32 = 4000;
107
108/// C++ `QotSubscribe.cpp:9` uses 60 seconds and
109/// `IsSubTimeEnoughToUnSub` checks `>= SubAtLeastTime - 1`.
110pub const QOT_MIN_UNSUB_ELAPSED_SECS: u64 = 59;
111
112/// **subscribe_qot 返回的 commit 结果** (用于 quota 维度精确计算 — 对齐 C++
113/// `m_setSub` 全局唯一计 quota).
114///
115/// C++ 对照: QotSubscribe.cpp:84-111. `bNoSub = m_setSub.count(pairSubKey) == 0`,
116/// 仅当全局 set 不存在该 key 时才 `UseQuota()`.
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum SubResult {
119    /// 全局首次订阅该 (security, sub_type) — quota 应 +1.
120    NewGlobal,
121    /// 全局已有订阅, 本 conn 是新加入 — quota 不变.
122    AlreadyGlobal,
123    /// (conn_id, security, sub_type) 已在 set 中, 重复订阅 — quota 不变.
124    NoChange,
125}
126
127/// **unsubscribe_qot 返回的 commit 结果**.
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub enum UnsubResult {
130    /// 最后一个 conn 退订 → 全局 set 删除该 key, **caller 必须发 backend
131    /// fresh CMD 6211 with new desired set** (drop 该 (stock_id, sub_type)).
132    LastSubscriber,
133    /// 还有其他 conn 订阅该 (security, sub_type), backend 不需退订.
134    StillSubscribed,
135    /// (conn_id, security, sub_type) 之前未订阅, silent no-op (caller 决定
136    /// 是否 loud reject).
137    NotSubscribed,
138}
139
140impl SubscriptionManager {
141    pub fn new() -> Self {
142        Self {
143            notify_subs: RwLock::new(HashSet::new()),
144            trd_acc_subs: RwLock::new(HashMap::new()),
145            qot_subs: RwLock::new(HashMap::new()),
146            qot_push_regs: RwLock::new(QotPushRegistrations::default()),
147            qot_sub_sessions: RwLock::new(HashMap::new()),
148            qot_orderbook_detail: RwLock::new(HashMap::new()),
149            qot_broker_detail: RwLock::new(HashMap::new()),
150            total_quota: RwLock::new(TOTAL_QUOTA),
151            qot_sub_times: RwLock::new(HashMap::new()),
152            qot_disconnected_conns: RwLock::new(HashSet::new()),
153            qot_disconnect_sync_generation: AtomicU64::new(0),
154        }
155    }
156
157    // ===== 通知订阅 =====
158
159    pub fn subscribe_notify(&self, conn_id: u64) {
160        self.notify_subs.write().insert(conn_id);
161    }
162
163    pub fn unsubscribe_notify(&self, conn_id: u64) {
164        self.notify_subs.write().remove(&conn_id);
165    }
166
167    pub fn is_subscribed_notify(&self, conn_id: u64) -> bool {
168        self.notify_subs.read().contains(&conn_id)
169    }
170
171    // ===== 交易账户推送 =====
172
173    pub fn subscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
174        self.trd_acc_subs
175            .write()
176            .entry(acc_id)
177            .or_default()
178            .insert(conn_id);
179    }
180
181    pub fn unsubscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
182        if let Some(subs) = self.trd_acc_subs.write().get_mut(&acc_id) {
183            subs.remove(&conn_id);
184        }
185    }
186
187    pub fn get_acc_subscribers(&self, acc_id: u64) -> Vec<u64> {
188        match self.trd_acc_subs.read().get(&acc_id) {
189            Some(subscribers) => subscribers.iter().copied().collect(),
190            None => Vec::new(),
191        }
192    }
193
194    // ===== 行情订阅 (subscribers, F3 split-state) =====
195
196    /// 生成行情订阅 key.
197    pub fn make_qot_key(market: i32, code: &str, sub_type: i32) -> String {
198        format!("{market}_{code}:{sub_type}")
199    }
200
201    #[inline]
202    fn broker_key(sec_key: &QotSecurityKey) -> QotSecurityKey {
203        sec_key.clone()
204    }
205
206    /// **v1.4.106 codex 1131 F1+F5 [P1+P2]**: 订阅行情. 返 [`SubResult`]
207    /// 表示是否新加全局订阅 (caller 据此累 quota).
208    /// 重复订阅 (同 conn_id 同 key) 不影响 set, 不影响 quota.
209    /// **NOTE**: caller 必须先 backend ack-then-commit (F1) — 本方法仅写
210    /// local state. 失败 caller 应调 `unsubscribe_qot_broker` 回滚.
211    pub fn subscribe_qot_broker(
212        &self,
213        conn_id: u64,
214        sec_key: &QotSecurityKey,
215        sub_type: i32,
216    ) -> SubResult {
217        self.subscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
218    }
219
220    fn subscribe_qot_inner(&self, conn_id: u64, key: QotSecurityKey, sub_type: i32) -> SubResult {
221        self.qot_disconnected_conns.write().remove(&conn_id);
222        let mut qot = self.qot_subs.write();
223        let map_key = (key.clone(), sub_type);
224        let entry = qot.entry(map_key.clone()).or_default();
225        let was_empty_global = entry.is_empty();
226        let inserted = entry.insert(conn_id);
227        if was_empty_global && inserted {
228            self.qot_sub_times.write().insert(map_key, Instant::now());
229        }
230        if !inserted {
231            SubResult::NoChange
232        } else if was_empty_global {
233            SubResult::NewGlobal
234        } else {
235            SubResult::AlreadyGlobal
236        }
237    }
238
239    /// 退订并返结构化结果. caller 据 `LastSubscriber` 决定是否发 backend
240    /// fresh CMD 6211 with new desired set.
241    pub fn unsubscribe_qot_broker(
242        &self,
243        conn_id: u64,
244        sec_key: &QotSecurityKey,
245        sub_type: i32,
246    ) -> UnsubResult {
247        self.unsubscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
248    }
249
250    fn unsubscribe_qot_inner(
251        &self,
252        conn_id: u64,
253        key: QotSecurityKey,
254        sub_type: i32,
255    ) -> UnsubResult {
256        let map_key = (key.clone(), sub_type);
257        let became_empty;
258        let was_member;
259        {
260            let mut qot = self.qot_subs.write();
261            let entry = match qot.get_mut(&map_key) {
262                Some(e) => e,
263                None => return UnsubResult::NotSubscribed,
264            };
265            was_member = entry.remove(&conn_id);
266            became_empty = entry.is_empty();
267            if became_empty {
268                qot.remove(&map_key);
269            }
270        }
271        if !was_member {
272            return UnsubResult::NotSubscribed;
273        }
274        if became_empty {
275            self.qot_sub_times.write().remove(&map_key);
276        }
277        // 清 session/detail per-conn entry (对齐 C++ QotSubscribe::UnSub line 251-292).
278        {
279            let mut sess = self.qot_sub_sessions.write();
280            if let Some(e) = sess.get_mut(&map_key) {
281                e.remove(&conn_id);
282                if e.is_empty() {
283                    sess.remove(&map_key);
284                }
285            }
286        }
287        if sub_type == sub_type_orderbook() {
288            let mut ob = self.qot_orderbook_detail.write();
289            if let Some(e) = ob.get_mut(&key) {
290                e.remove(&conn_id);
291                if e.is_empty() {
292                    ob.remove(&key);
293                }
294            }
295        }
296        if sub_type == sub_type_broker() {
297            let mut br = self.qot_broker_detail.write();
298            if let Some(e) = br.get_mut(&key) {
299                e.remove(&conn_id);
300                if e.is_empty() {
301                    br.remove(&key);
302                }
303            }
304        }
305        if became_empty {
306            UnsubResult::LastSubscriber
307        } else {
308            UnsubResult::StillSubscribed
309        }
310    }
311
312    /// 是否 (conn_id, key, sub_type) 已订阅.
313    pub fn is_qot_subscribed_broker(
314        &self,
315        conn_id: u64,
316        sec_key: &QotSecurityKey,
317        sub_type: i32,
318    ) -> bool {
319        self.qot_subs
320            .read()
321            .get(&(Self::broker_key(sec_key), sub_type))
322            .is_some_and(|subs| subs.contains(&conn_id))
323    }
324
325    /// v1.4.106 codex 1131 F3 [P1]: 全局 (ignore conn) 是否有订阅 — RegQotPush
326    /// 的 precondition. 对齐 C++ `QotSubscribe::IsSub(stockID, subType)`.
327    pub fn is_globally_subscribed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
328        self.qot_subs
329            .read()
330            .get(&(Self::broker_key(sec_key), sub_type))
331            .is_some_and(|subs| !subs.is_empty())
332    }
333
334    /// min-unsub window for broker-aware subscription keys.
335    pub fn qot_min_unsub_elapsed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
336        self.qot_sub_times
337            .read()
338            .get(&(Self::broker_key(sec_key), sub_type))
339            .map(|instant| instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS))
340            .unwrap_or(true)
341    }
342
343    /// remaining min-unsub window for broker-aware keys.
344    pub fn qot_min_unsub_remaining_secs_broker(
345        &self,
346        sec_key: &QotSecurityKey,
347        sub_type: i32,
348    ) -> u64 {
349        self.qot_sub_times
350            .read()
351            .get(&(Self::broker_key(sec_key), sub_type))
352            .map(|instant| QOT_MIN_UNSUB_ELAPSED_SECS.saturating_sub(instant.elapsed().as_secs()))
353            .unwrap_or(0)
354    }
355
356    /// 断线延迟清理后需要 gateway 同步 CMD6211 的 generation。
357    pub fn qot_disconnect_sync_generation(&self) -> u64 {
358        self.qot_disconnect_sync_generation.load(Ordering::SeqCst)
359    }
360
361    fn bump_qot_disconnect_sync_generation(&self) {
362        self.qot_disconnect_sync_generation
363            .fetch_add(1, Ordering::SeqCst);
364    }
365
366    fn conn_has_qot_subs(&self, conn_id: u64) -> bool {
367        self.qot_subs
368            .read()
369            .values()
370            .any(|subs| subs.contains(&conn_id))
371    }
372
373    #[doc(hidden)]
374    pub fn backdate_qot_sub_time_broker_for_test(
375        &self,
376        sec_key: &QotSecurityKey,
377        sub_type: i32,
378        elapsed: Duration,
379    ) {
380        let map_key = (Self::broker_key(sec_key), sub_type);
381        let instant = Instant::now()
382            .checked_sub(elapsed)
383            .unwrap_or_else(Instant::now);
384        self.qot_sub_times.write().insert(map_key, instant);
385    }
386
387    /// v1.4.106 codex 1131 F2: clear all qot subs for a single conn_id.
388    /// 返 (sec_key, sub_type) 列表 of "本 conn 退订后变成全局空的" — caller
389    /// 据此构 backend new desired set. 返的 sec_key 是 cache_key display string
390    /// (`"market_code"` or `"market_code@b{id}"`).
391    pub fn unsubscribe_all_qot_collect_global_empty(&self, conn_id: u64) -> Vec<(String, i32)> {
392        let mut became_empty: Vec<(String, i32)> = Vec::new();
393        let keys_to_check: Vec<(QotSecurityKey, i32)> = self
394            .qot_subs
395            .read()
396            .iter()
397            .filter(|(_, set)| set.contains(&conn_id))
398            .map(|(k, _)| k.clone())
399            .collect();
400        {
401            let mut qot = self.qot_subs.write();
402            for k in keys_to_check {
403                if let Some(set) = qot.get_mut(&k) {
404                    set.remove(&conn_id);
405                    if set.is_empty() {
406                        became_empty.push((k.0.cache_key(), k.1));
407                        let removed_key = k.clone();
408                        qot.remove(&k);
409                        self.qot_sub_times.write().remove(&removed_key);
410                    }
411                }
412            }
413        }
414        // session/detail/push_regs cleanup for this conn.
415        {
416            let mut sess = self.qot_sub_sessions.write();
417            sess.retain(|_, m| {
418                m.remove(&conn_id);
419                !m.is_empty()
420            });
421        }
422        {
423            let mut ob = self.qot_orderbook_detail.write();
424            ob.retain(|_, m| {
425                m.remove(&conn_id);
426                !m.is_empty()
427            });
428        }
429        {
430            let mut br = self.qot_broker_detail.write();
431            br.retain(|_, m| {
432                m.remove(&conn_id);
433                !m.is_empty()
434            });
435        }
436        {
437            let mut pr = self.qot_push_regs.write();
438            pr.by_tuple.retain(|_, set| {
439                set.remove(&conn_id);
440                !set.is_empty()
441            });
442            pr.qot_push_regs_by_cache_key.retain(|_, by_sub| {
443                by_sub.retain(|_, set| {
444                    set.remove(&conn_id);
445                    !set.is_empty()
446                });
447                !by_sub.is_empty()
448            });
449        }
450        became_empty
451    }
452
453    /// 清理已断开且已满足 C++ 最短退订窗口的 QOT conn。
454    ///
455    /// 返回本次清理后 global desired set 变空的 `(sec_key, sub_type)` 列表
456    /// (sec_key 是 cache_key display string: `"market_code"` or
457    /// `"market_code@b{id}"`).
458    /// 若列表非空,会 bump `qot_disconnect_sync_generation`,由 gateway 后台
459    /// 任务负责把新的全局 desired set 发到 backend。
460    pub fn cleanup_due_disconnected_qot(&self) -> Vec<(String, i32)> {
461        let disconnected: Vec<u64> = self.qot_disconnected_conns.read().iter().copied().collect();
462        if disconnected.is_empty() {
463            return Vec::new();
464        }
465
466        let mut to_remove: Vec<(u64, QotSecurityKey, i32)> = Vec::new();
467        {
468            let qot = self.qot_subs.read();
469            let sub_times = self.qot_sub_times.read();
470            for conn_id in &disconnected {
471                for ((key, sub_type), subs) in qot.iter() {
472                    if !subs.contains(conn_id) {
473                        continue;
474                    }
475                    let elapsed_ok = sub_times
476                        .get(&(key.clone(), *sub_type))
477                        .map(|instant| {
478                            instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS)
479                        })
480                        .unwrap_or(true);
481                    if elapsed_ok {
482                        to_remove.push((*conn_id, key.clone(), *sub_type));
483                    }
484                }
485            }
486        }
487
488        let mut became_empty = Vec::new();
489        for (conn_id, key, sub_type) in to_remove {
490            let display_key = key.cache_key();
491            if matches!(
492                self.unsubscribe_qot_inner(conn_id, key, sub_type),
493                UnsubResult::LastSubscriber
494            ) {
495                became_empty.push((display_key, sub_type));
496            }
497        }
498
499        {
500            let mut disconnected = self.qot_disconnected_conns.write();
501            disconnected.retain(|conn_id| self.conn_has_qot_subs(*conn_id));
502        }
503
504        if !became_empty.is_empty() {
505            self.bump_qot_disconnect_sync_generation();
506        }
507        became_empty
508    }
509
510    /// **v1.4.106 codex 0631 F1 [P1]**: ack-then-commit `unsub_all` 的"干跑"半段.
511    ///
512    /// 计算: **若**本 conn 退订全部, 哪些 `(sec_key, sub_type)` 在 global
513    /// desired set 中**变空** (= backend 该真退). **不修 state, 不动 detail
514    /// flag, 不动 push_regs**. 用在 ack-then-commit pipeline:
515    ///
516    /// `dry_run -> submit_global_desired_set (backend ack) -> commit (清 state)`
517    ///
518    /// backend reject → caller 不调 `commit`, state 保留 → 客户端可重试幂等.
519    /// 老 `unsubscribe_all_qot_collect_global_empty` 是先清后算 — 失败时
520    /// state 已 mutate, 不能 rollback (split-brain 风险). 本 helper 替代.
521    pub fn unsubscribe_all_qot_dry_run(&self, conn_id: u64) -> Vec<(String, i32)> {
522        let mut became_empty: Vec<(String, i32)> = Vec::new();
523        let qot = self.qot_subs.read();
524        for ((k, sub_type), set) in qot.iter() {
525            if !set.contains(&conn_id) {
526                continue;
527            }
528            // 退订本 conn 后该 set 是否变空? set.len() == 1 + contains(&conn_id) → 退后空.
529            if set.len() == 1 {
530                became_empty.push((k.cache_key(), *sub_type));
531            }
532        }
533        became_empty
534    }
535
536    /// **v1.4.106 codex 0631 F1 [P1]**: ack-then-commit `unsub_all` 的"提交"半段.
537    /// 等价于老 `unsubscribe_all_qot_collect_global_empty` (语义不变, 仅在
538    /// backend ack OK 后才调). 同时清 session / detail / push_regs.
539    pub fn unsubscribe_all_qot_commit(&self, conn_id: u64) -> Vec<(String, i32)> {
540        self.unsubscribe_all_qot_collect_global_empty(conn_id)
541    }
542
543    /// 获取订阅了指定行情的连接列表 (subscribers, **不**用作 push 路由).
544    /// 用于 `apply_unsubscribe_delta` 判断 broker-aware key 上是否还有其他
545    /// conn 订阅 (last-subscriber gate for desired-set remove).
546    pub fn get_qot_subscribers_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> Vec<u64> {
547        match self
548            .qot_subs
549            .read()
550            .get(&(Self::broker_key(sec_key), sub_type))
551        {
552            Some(subscribers) => subscribers.iter().copied().collect(),
553            None => Vec::new(),
554        }
555    }
556
557    /// **v1.4.110 codex audit Round3 P2 #21**: 给定 `stock_id`, 判断该 stock 是否
558    /// **全局**已无任何 conn 订阅 (跨所有 broker_id + 所有 sub_type).
559    ///
560    /// 用途: `Qot_Sub` 退订路径在 commit 之后判断 crypto symbol 是否真正全空,
561    /// 决定是否调 `CryptoExchangeCache::clear_stock(stock_id)` 清 stale entry —
562    /// 因为 `crypto_exchange_cache` 按 `stock_id` keyed (broker 无关), 只有该
563    /// stock 全 broker 全 sub_type 都退掉才能安全清.
564    ///
565    /// 扫 `qot_subs` 找任意 key 满足 `QotStockKey.stock_id == stock_id`;
566    /// broker 1007 退订但 1008 仍订阅时返 `false` (不能清).
567    ///
568    /// 返 `true` ⟺ 该 stock_id 在 `qot_subs` 中无任何带 subscriber 的 entry.
569    pub fn crypto_stock_globally_unsubscribed(&self, stock_id: u64) -> bool {
570        let qot = self.qot_subs.read();
571        !qot.iter()
572            .any(|((key, _sub_type), subs)| !subs.is_empty() && key.stock_key.stock_id == stock_id)
573    }
574
575    /// v1.4.110 R6-8: `(stock_id, broker_id)`-级版 `crypto_stock_globally_unsubscribed`.
576    ///
577    /// 用途: 部分 broker 退订时, 判断某具体 `(stock_id, broker_id)` 是否已无任何
578    /// conn 订阅 → 决定是否调 `CryptoExchangeCache::clear_stock_broker` 清该
579    /// broker 的 stale `by_broker` entry (整 stock 仍有别的 broker 在订时
580    /// `clear_stock` 不适用).
581    ///
582    /// 按 `stock_key.stock_id` + `stock_key.broker_id` 双字段匹配.
583    ///
584    /// 返 `true` ⟺ 该 `(stock_id, broker_id)` 在 `qot_subs` 无任何带 subscriber 的 entry.
585    pub fn crypto_stock_broker_globally_unsubscribed(&self, stock_id: u64, broker_id: u32) -> bool {
586        let target_broker = std::num::NonZeroU32::new(broker_id);
587        let qot = self.qot_subs.read();
588        !qot.iter().any(|((key, _sub_type), subs)| {
589            !subs.is_empty()
590                && key.stock_key.stock_id == stock_id
591                && key.stock_key.broker_id == target_broker
592        })
593    }
594
595    // ===== Session / Detail (per-(security, sub_type) global aggregator) =====
596
597    pub fn set_conn_session_broker(
598        &self,
599        conn_id: u64,
600        sec_key: &QotSecurityKey,
601        sub_type: i32,
602        session: i32,
603    ) {
604        self.qot_sub_sessions
605            .write()
606            .entry((Self::broker_key(sec_key), sub_type))
607            .or_default()
608            .insert(conn_id, session);
609    }
610
611    pub fn get_global_session_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> i32 {
612        self.qot_sub_sessions
613            .read()
614            .get(&(Self::broker_key(sec_key), sub_type))
615            .map(|m| m.values().copied().max().unwrap_or(1))
616            .unwrap_or(1)
617    }
618
619    /// 获取单连接订阅 session(没有显式记录时按 C++ 默认 RTH)。
620    pub fn get_conn_session_broker(
621        &self,
622        conn_id: u64,
623        sec_key: &QotSecurityKey,
624        sub_type: i32,
625    ) -> i32 {
626        self.qot_sub_sessions
627            .read()
628            .get(&(Self::broker_key(sec_key), sub_type))
629            .and_then(|m| m.get(&conn_id).copied())
630            .unwrap_or(1)
631    }
632
633    pub fn set_conn_orderbook_detail_broker(
634        &self,
635        conn_id: u64,
636        sec_key: &QotSecurityKey,
637        detail: bool,
638    ) {
639        self.qot_orderbook_detail
640            .write()
641            .entry(Self::broker_key(sec_key))
642            .or_default()
643            .insert(conn_id, detail);
644    }
645
646    pub fn is_global_orderbook_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
647        self.qot_orderbook_detail
648            .read()
649            .get(&Self::broker_key(sec_key))
650            .map(|m| m.values().any(|&d| d))
651            .unwrap_or(false)
652    }
653
654    pub fn set_conn_broker_detail_broker(
655        &self,
656        conn_id: u64,
657        sec_key: &QotSecurityKey,
658        detail: bool,
659    ) {
660        self.qot_broker_detail
661            .write()
662            .entry(Self::broker_key(sec_key))
663            .or_default()
664            .insert(conn_id, detail);
665    }
666
667    pub fn is_global_broker_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
668        self.qot_broker_detail
669            .read()
670            .get(&Self::broker_key(sec_key))
671            .map(|m| m.values().any(|&d| d))
672            .unwrap_or(false)
673    }
674
675    // ===== 连接断开清理 =====
676
677    pub fn on_disconnect(&self, conn_id: u64) -> Vec<(String, i32)> {
678        self.notify_subs.write().remove(&conn_id);
679
680        {
681            let mut trd = self.trd_acc_subs.write();
682            for subs in trd.values_mut() {
683                subs.remove(&conn_id);
684            }
685        }
686
687        {
688            let mut pr = self.qot_push_regs.write();
689            pr.by_tuple.retain(|_, set| {
690                set.remove(&conn_id);
691                !set.is_empty()
692            });
693            pr.qot_push_regs_by_cache_key.retain(|_, by_sub| {
694                by_sub.retain(|_, set| {
695                    set.remove(&conn_id);
696                    !set.is_empty()
697                });
698                !by_sub.is_empty()
699            });
700        }
701
702        if self.conn_has_qot_subs(conn_id) {
703            self.qot_disconnected_conns.write().insert(conn_id);
704        }
705        self.cleanup_due_disconnected_qot()
706    }
707}
708
709impl Default for SubscriptionManager {
710    fn default() -> Self {
711        Self::new()
712    }
713}
714
715#[inline]
716fn sub_type_orderbook() -> i32 {
717    2
718}
719
720#[inline]
721fn sub_type_broker() -> i32 {
722    14
723}
724
725#[cfg(test)]
726mod tests;