futu_server/subscription/
push_regs.rs1use futu_core::qot_stock_key::QotSecurityKey;
2
3use super::SubscriptionManager;
4
5impl SubscriptionManager {
6 pub fn register_push_broker(
12 &self,
13 conn_id: u64,
14 sec_key: &QotSecurityKey,
15 sub_type: i32,
16 rehab_type: i32,
17 ) {
18 self.register_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
19 }
20
21 fn register_push_inner(
22 &self,
23 conn_id: u64,
24 key: QotSecurityKey,
25 sub_type: i32,
26 rehab_type: i32,
27 ) {
28 let effective_rehab = if is_kl_sub_type(sub_type) {
29 rehab_type
30 } else {
31 0
32 };
33 let cache_key = key.cache_key();
34 let mut regs = self.qot_push_regs.write();
35 regs.by_tuple
36 .entry((key, sub_type, effective_rehab))
37 .or_default()
38 .insert(conn_id);
39 regs.qot_push_regs_by_cache_key
40 .entry(cache_key)
41 .or_default()
42 .entry((sub_type, effective_rehab))
43 .or_default()
44 .insert(conn_id);
45 }
46
47 pub fn unregister_push_broker(
49 &self,
50 conn_id: u64,
51 sec_key: &QotSecurityKey,
52 sub_type: i32,
53 rehab_type: i32,
54 ) {
55 self.unregister_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
56 }
57
58 fn unregister_push_inner(
59 &self,
60 conn_id: u64,
61 key: QotSecurityKey,
62 sub_type: i32,
63 rehab_type: i32,
64 ) {
65 let effective_rehab = if is_kl_sub_type(sub_type) {
66 rehab_type
67 } else {
68 0
69 };
70 let cache_key = key.cache_key();
71 let map_key = (key, sub_type, effective_rehab);
72 let mut regs = self.qot_push_regs.write();
73 if let Some(set) = regs.by_tuple.get_mut(&map_key) {
74 set.remove(&conn_id);
75 if set.is_empty() {
76 regs.by_tuple.remove(&map_key);
77 }
78 }
79 if let Some(by_sub) = regs.qot_push_regs_by_cache_key.get_mut(&cache_key) {
80 if let Some(set) = by_sub.get_mut(&(sub_type, effective_rehab)) {
81 set.remove(&conn_id);
82 if set.is_empty() {
83 by_sub.remove(&(sub_type, effective_rehab));
84 }
85 }
86 if by_sub.is_empty() {
87 regs.qot_push_regs_by_cache_key.remove(&cache_key);
88 }
89 }
90 }
91
92 pub fn get_qot_push_subscribers_broker(
95 &self,
96 sec_key: &QotSecurityKey,
97 sub_type: i32,
98 rehab_type: i32,
99 ) -> Vec<u64> {
100 self.get_qot_push_subscribers_inner(Self::broker_key(sec_key), sub_type, rehab_type)
101 }
102
103 pub fn get_qot_push_subscribers_by_cache_key(
110 &self,
111 cache_key: &str,
112 sub_type: i32,
113 rehab_type: i32,
114 ) -> Vec<u64> {
115 let effective_rehab = if is_kl_sub_type(sub_type) {
116 rehab_type
117 } else {
118 0
119 };
120 let regs = self.qot_push_regs.read();
121 let mut out: Vec<u64> = regs
122 .qot_push_regs_by_cache_key
123 .get(cache_key)
124 .and_then(|by_sub| by_sub.get(&(sub_type, effective_rehab)))
125 .map_or_else(Vec::new, |subscribers| {
126 subscribers.iter().copied().collect()
127 });
128
129 out.sort_unstable();
130 out.dedup();
131 out
132 }
133
134 fn get_qot_push_subscribers_inner(
135 &self,
136 key: QotSecurityKey,
137 sub_type: i32,
138 rehab_type: i32,
139 ) -> Vec<u64> {
140 let effective_rehab = if is_kl_sub_type(sub_type) {
141 rehab_type
142 } else {
143 0
144 };
145 match self
146 .qot_push_regs
147 .read()
148 .by_tuple
149 .get(&(key, sub_type, effective_rehab))
150 {
151 Some(subscribers) => subscribers.iter().copied().collect(),
152 None => Vec::new(),
153 }
154 }
155
156 pub fn is_push_registered_any_rehab_broker(
158 &self,
159 conn_id: u64,
160 sec_key: &QotSecurityKey,
161 sub_type: i32,
162 ) -> bool {
163 let broker_key = Self::broker_key(sec_key);
164 let pr = self.qot_push_regs.read();
165 pr.by_tuple.iter().any(|((k, st, _rehab), set)| {
166 k == &broker_key && *st == sub_type && set.contains(&conn_id)
167 })
168 }
169}
170
171#[inline]
173fn is_kl_sub_type(sub_type: i32) -> bool {
174 matches!(sub_type, 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 15 | 16 | 17)
175}