Skip to main content

futu_server/subscription/
push_regs.rs

1use futu_core::qot_stock_key::QotSecurityKey;
2
3use super::SubscriptionManager;
4
5impl SubscriptionManager {
6    // ===== 行情 push 注册 (qot_push_regs, F3 独立) =====
7
8    /// **v1.4.106 codex 1131 F3 [P1]**: 注册接收 push (对齐 C++ `RegPush`).
9    /// 仅写本 map, **不动 `qot_subs`**. caller (RegQotPushHandler) 必须在
10    /// 调本方法前确认已订阅 (`is_globally_subscribed_broker`).
11    pub fn register_push_broker(
12        &self,
13        conn_id: u64,
14        sec_key: &QotSecurityKey,
15        sub_type: i32,
16        rehab_type: i32,
17    ) {
18        self.register_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
19    }
20
21    fn register_push_inner(
22        &self,
23        conn_id: u64,
24        key: QotSecurityKey,
25        sub_type: i32,
26        rehab_type: i32,
27    ) {
28        let effective_rehab = if is_kl_sub_type(sub_type) {
29            rehab_type
30        } else {
31            0
32        };
33        let cache_key = key.cache_key();
34        let mut regs = self.qot_push_regs.write();
35        regs.by_tuple
36            .entry((key, sub_type, effective_rehab))
37            .or_default()
38            .insert(conn_id);
39        regs.qot_push_regs_by_cache_key
40            .entry(cache_key)
41            .or_default()
42            .entry((sub_type, effective_rehab))
43            .or_default()
44            .insert(conn_id);
45    }
46
47    /// **v1.4.106 codex 1131 F3 [P1]**: 取消 push 注册 — 不删 `qot_subs`.
48    pub fn unregister_push_broker(
49        &self,
50        conn_id: u64,
51        sec_key: &QotSecurityKey,
52        sub_type: i32,
53        rehab_type: i32,
54    ) {
55        self.unregister_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
56    }
57
58    fn unregister_push_inner(
59        &self,
60        conn_id: u64,
61        key: QotSecurityKey,
62        sub_type: i32,
63        rehab_type: i32,
64    ) {
65        let effective_rehab = if is_kl_sub_type(sub_type) {
66            rehab_type
67        } else {
68            0
69        };
70        let cache_key = key.cache_key();
71        let map_key = (key, sub_type, effective_rehab);
72        let mut regs = self.qot_push_regs.write();
73        if let Some(set) = regs.by_tuple.get_mut(&map_key) {
74            set.remove(&conn_id);
75            if set.is_empty() {
76                regs.by_tuple.remove(&map_key);
77            }
78        }
79        if let Some(by_sub) = regs.qot_push_regs_by_cache_key.get_mut(&cache_key) {
80            if let Some(set) = by_sub.get_mut(&(sub_type, effective_rehab)) {
81                set.remove(&conn_id);
82                if set.is_empty() {
83                    by_sub.remove(&(sub_type, effective_rehab));
84                }
85            }
86            if by_sub.is_empty() {
87                regs.qot_push_regs_by_cache_key.remove(&cache_key);
88            }
89        }
90    }
91
92    /// **v1.4.106 codex 1131 F4 [P1]**: push delivery filter. 对齐 C++
93    /// `QotSubscribe::GetPushConn`.
94    pub fn get_qot_push_subscribers_broker(
95        &self,
96        sec_key: &QotSecurityKey,
97        sub_type: i32,
98        rehab_type: i32,
99    ) -> Vec<u64> {
100        self.get_qot_push_subscribers_inner(Self::broker_key(sec_key), sub_type, rehab_type)
101    }
102
103    /// Lookup by internal cache-key display
104    /// string carried by push events (`"market_code"` or `"market_code@b{id}"`).
105    ///
106    /// `PushEvent::QuotePush` does not carry `stock_id`, so dispatch cannot
107    /// reconstruct a full `QotSecurityKey`. Instead, match the already stored
108    /// subscription keys by their cache-key display string.
109    pub fn get_qot_push_subscribers_by_cache_key(
110        &self,
111        cache_key: &str,
112        sub_type: i32,
113        rehab_type: i32,
114    ) -> Vec<u64> {
115        let effective_rehab = if is_kl_sub_type(sub_type) {
116            rehab_type
117        } else {
118            0
119        };
120        let regs = self.qot_push_regs.read();
121        let mut out: Vec<u64> = regs
122            .qot_push_regs_by_cache_key
123            .get(cache_key)
124            .and_then(|by_sub| by_sub.get(&(sub_type, effective_rehab)))
125            .map_or_else(Vec::new, |subscribers| {
126                subscribers.iter().copied().collect()
127            });
128
129        out.sort_unstable();
130        out.dedup();
131        out
132    }
133
134    fn get_qot_push_subscribers_inner(
135        &self,
136        key: QotSecurityKey,
137        sub_type: i32,
138        rehab_type: i32,
139    ) -> Vec<u64> {
140        let effective_rehab = if is_kl_sub_type(sub_type) {
141            rehab_type
142        } else {
143            0
144        };
145        match self
146            .qot_push_regs
147            .read()
148            .by_tuple
149            .get(&(key, sub_type, effective_rehab))
150        {
151            Some(subscribers) => subscribers.iter().copied().collect(),
152            None => Vec::new(),
153        }
154    }
155
156    /// **v1.4.106 codex 1131 F3 [P1]**: 是否注册过 push (任意 rehab).
157    pub fn is_push_registered_any_rehab_broker(
158        &self,
159        conn_id: u64,
160        sec_key: &QotSecurityKey,
161        sub_type: i32,
162    ) -> bool {
163        let broker_key = Self::broker_key(sec_key);
164        let pr = self.qot_push_regs.read();
165        pr.by_tuple.iter().any(|((k, st, _rehab), set)| {
166            k == &broker_key && *st == sub_type && set.contains(&conn_id)
167        })
168    }
169}
170
171/// 是否 KL 类 — 对齐 C++ `IsKLSubType`.
172#[inline]
173fn is_kl_sub_type(sub_type: i32) -> bool {
174    matches!(sub_type, 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 15 | 16 | 17)
175}