1mod push_regs;
29mod views;
30
31use std::collections::{HashMap, HashSet};
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::time::{Duration, Instant};
34
35use futu_core::qot_stock_key::QotSecurityKey;
36use parking_lot::RwLock;
37
38pub struct SubscriptionManager {
40 notify_subs: RwLock<HashSet<u64>>,
42
43 trd_acc_subs: RwLock<HashMap<u64, HashSet<u64>>>,
45
46 qot_subs: RwLock<HashMap<(QotSecurityKey, i32), HashSet<u64>>>,
49
50 qot_push_regs: RwLock<QotPushRegistrations>,
54
55 qot_sub_sessions: RwLock<HashMap<(QotSecurityKey, i32), HashMap<u64, i32>>>,
60
61 qot_orderbook_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
64
65 qot_broker_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
68
69 total_quota: RwLock<u32>,
73
74 qot_sub_times: RwLock<HashMap<(QotSecurityKey, i32), Instant>>,
78
79 qot_disconnected_conns: RwLock<HashSet<u64>>,
85
86 qot_disconnect_sync_generation: AtomicU64,
92}
93
94#[derive(Default)]
95struct QotPushRegistrations {
96 by_tuple: HashMap<(QotSecurityKey, i32, i32), HashSet<u64>>,
97 qot_push_regs_by_cache_key: HashMap<String, HashMap<(i32, i32), HashSet<u64>>>,
98}
99
100pub const TOTAL_QUOTA: u32 = 4000;
107
108pub const QOT_MIN_UNSUB_ELAPSED_SECS: u64 = 59;
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum SubResult {
119 NewGlobal,
121 AlreadyGlobal,
123 NoChange,
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub enum UnsubResult {
130 LastSubscriber,
133 StillSubscribed,
135 NotSubscribed,
138}
139
140impl SubscriptionManager {
141 pub fn new() -> Self {
142 Self {
143 notify_subs: RwLock::new(HashSet::new()),
144 trd_acc_subs: RwLock::new(HashMap::new()),
145 qot_subs: RwLock::new(HashMap::new()),
146 qot_push_regs: RwLock::new(QotPushRegistrations::default()),
147 qot_sub_sessions: RwLock::new(HashMap::new()),
148 qot_orderbook_detail: RwLock::new(HashMap::new()),
149 qot_broker_detail: RwLock::new(HashMap::new()),
150 total_quota: RwLock::new(TOTAL_QUOTA),
151 qot_sub_times: RwLock::new(HashMap::new()),
152 qot_disconnected_conns: RwLock::new(HashSet::new()),
153 qot_disconnect_sync_generation: AtomicU64::new(0),
154 }
155 }
156
157 pub fn subscribe_notify(&self, conn_id: u64) {
160 self.notify_subs.write().insert(conn_id);
161 }
162
163 pub fn unsubscribe_notify(&self, conn_id: u64) {
164 self.notify_subs.write().remove(&conn_id);
165 }
166
167 pub fn is_subscribed_notify(&self, conn_id: u64) -> bool {
168 self.notify_subs.read().contains(&conn_id)
169 }
170
171 pub fn subscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
174 self.trd_acc_subs
175 .write()
176 .entry(acc_id)
177 .or_default()
178 .insert(conn_id);
179 }
180
181 pub fn unsubscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
182 if let Some(subs) = self.trd_acc_subs.write().get_mut(&acc_id) {
183 subs.remove(&conn_id);
184 }
185 }
186
187 pub fn get_acc_subscribers(&self, acc_id: u64) -> Vec<u64> {
188 match self.trd_acc_subs.read().get(&acc_id) {
189 Some(subscribers) => subscribers.iter().copied().collect(),
190 None => Vec::new(),
191 }
192 }
193
194 pub fn make_qot_key(market: i32, code: &str, sub_type: i32) -> String {
198 format!("{market}_{code}:{sub_type}")
199 }
200
201 #[inline]
202 fn broker_key(sec_key: &QotSecurityKey) -> QotSecurityKey {
203 sec_key.clone()
204 }
205
206 pub fn subscribe_qot_broker(
212 &self,
213 conn_id: u64,
214 sec_key: &QotSecurityKey,
215 sub_type: i32,
216 ) -> SubResult {
217 self.subscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
218 }
219
220 fn subscribe_qot_inner(&self, conn_id: u64, key: QotSecurityKey, sub_type: i32) -> SubResult {
221 self.qot_disconnected_conns.write().remove(&conn_id);
222 let mut qot = self.qot_subs.write();
223 let map_key = (key.clone(), sub_type);
224 let entry = qot.entry(map_key.clone()).or_default();
225 let was_empty_global = entry.is_empty();
226 let inserted = entry.insert(conn_id);
227 if was_empty_global && inserted {
228 self.qot_sub_times.write().insert(map_key, Instant::now());
229 }
230 if !inserted {
231 SubResult::NoChange
232 } else if was_empty_global {
233 SubResult::NewGlobal
234 } else {
235 SubResult::AlreadyGlobal
236 }
237 }
238
239 pub fn unsubscribe_qot_broker(
242 &self,
243 conn_id: u64,
244 sec_key: &QotSecurityKey,
245 sub_type: i32,
246 ) -> UnsubResult {
247 self.unsubscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
248 }
249
250 fn unsubscribe_qot_inner(
251 &self,
252 conn_id: u64,
253 key: QotSecurityKey,
254 sub_type: i32,
255 ) -> UnsubResult {
256 let map_key = (key.clone(), sub_type);
257 let became_empty;
258 let was_member;
259 {
260 let mut qot = self.qot_subs.write();
261 let entry = match qot.get_mut(&map_key) {
262 Some(e) => e,
263 None => return UnsubResult::NotSubscribed,
264 };
265 was_member = entry.remove(&conn_id);
266 became_empty = entry.is_empty();
267 if became_empty {
268 qot.remove(&map_key);
269 }
270 }
271 if !was_member {
272 return UnsubResult::NotSubscribed;
273 }
274 if became_empty {
275 self.qot_sub_times.write().remove(&map_key);
276 }
277 {
279 let mut sess = self.qot_sub_sessions.write();
280 if let Some(e) = sess.get_mut(&map_key) {
281 e.remove(&conn_id);
282 if e.is_empty() {
283 sess.remove(&map_key);
284 }
285 }
286 }
287 if sub_type == sub_type_orderbook() {
288 let mut ob = self.qot_orderbook_detail.write();
289 if let Some(e) = ob.get_mut(&key) {
290 e.remove(&conn_id);
291 if e.is_empty() {
292 ob.remove(&key);
293 }
294 }
295 }
296 if sub_type == sub_type_broker() {
297 let mut br = self.qot_broker_detail.write();
298 if let Some(e) = br.get_mut(&key) {
299 e.remove(&conn_id);
300 if e.is_empty() {
301 br.remove(&key);
302 }
303 }
304 }
305 if became_empty {
306 UnsubResult::LastSubscriber
307 } else {
308 UnsubResult::StillSubscribed
309 }
310 }
311
312 pub fn is_qot_subscribed_broker(
314 &self,
315 conn_id: u64,
316 sec_key: &QotSecurityKey,
317 sub_type: i32,
318 ) -> bool {
319 self.qot_subs
320 .read()
321 .get(&(Self::broker_key(sec_key), sub_type))
322 .is_some_and(|subs| subs.contains(&conn_id))
323 }
324
325 pub fn is_globally_subscribed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
328 self.qot_subs
329 .read()
330 .get(&(Self::broker_key(sec_key), sub_type))
331 .is_some_and(|subs| !subs.is_empty())
332 }
333
334 pub fn qot_min_unsub_elapsed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
336 self.qot_sub_times
337 .read()
338 .get(&(Self::broker_key(sec_key), sub_type))
339 .map(|instant| instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS))
340 .unwrap_or(true)
341 }
342
343 pub fn qot_min_unsub_remaining_secs_broker(
345 &self,
346 sec_key: &QotSecurityKey,
347 sub_type: i32,
348 ) -> u64 {
349 self.qot_sub_times
350 .read()
351 .get(&(Self::broker_key(sec_key), sub_type))
352 .map(|instant| QOT_MIN_UNSUB_ELAPSED_SECS.saturating_sub(instant.elapsed().as_secs()))
353 .unwrap_or(0)
354 }
355
356 pub fn qot_disconnect_sync_generation(&self) -> u64 {
358 self.qot_disconnect_sync_generation.load(Ordering::SeqCst)
359 }
360
361 fn bump_qot_disconnect_sync_generation(&self) {
362 self.qot_disconnect_sync_generation
363 .fetch_add(1, Ordering::SeqCst);
364 }
365
366 fn conn_has_qot_subs(&self, conn_id: u64) -> bool {
367 self.qot_subs
368 .read()
369 .values()
370 .any(|subs| subs.contains(&conn_id))
371 }
372
373 #[doc(hidden)]
374 pub fn backdate_qot_sub_time_broker_for_test(
375 &self,
376 sec_key: &QotSecurityKey,
377 sub_type: i32,
378 elapsed: Duration,
379 ) {
380 let map_key = (Self::broker_key(sec_key), sub_type);
381 let instant = Instant::now()
382 .checked_sub(elapsed)
383 .unwrap_or_else(Instant::now);
384 self.qot_sub_times.write().insert(map_key, instant);
385 }
386
387 pub fn unsubscribe_all_qot_collect_global_empty(&self, conn_id: u64) -> Vec<(String, i32)> {
392 let mut became_empty: Vec<(String, i32)> = Vec::new();
393 let keys_to_check: Vec<(QotSecurityKey, i32)> = self
394 .qot_subs
395 .read()
396 .iter()
397 .filter(|(_, set)| set.contains(&conn_id))
398 .map(|(k, _)| k.clone())
399 .collect();
400 {
401 let mut qot = self.qot_subs.write();
402 for k in keys_to_check {
403 if let Some(set) = qot.get_mut(&k) {
404 set.remove(&conn_id);
405 if set.is_empty() {
406 became_empty.push((k.0.cache_key(), k.1));
407 let removed_key = k.clone();
408 qot.remove(&k);
409 self.qot_sub_times.write().remove(&removed_key);
410 }
411 }
412 }
413 }
414 {
416 let mut sess = self.qot_sub_sessions.write();
417 sess.retain(|_, m| {
418 m.remove(&conn_id);
419 !m.is_empty()
420 });
421 }
422 {
423 let mut ob = self.qot_orderbook_detail.write();
424 ob.retain(|_, m| {
425 m.remove(&conn_id);
426 !m.is_empty()
427 });
428 }
429 {
430 let mut br = self.qot_broker_detail.write();
431 br.retain(|_, m| {
432 m.remove(&conn_id);
433 !m.is_empty()
434 });
435 }
436 {
437 let mut pr = self.qot_push_regs.write();
438 pr.by_tuple.retain(|_, set| {
439 set.remove(&conn_id);
440 !set.is_empty()
441 });
442 pr.qot_push_regs_by_cache_key.retain(|_, by_sub| {
443 by_sub.retain(|_, set| {
444 set.remove(&conn_id);
445 !set.is_empty()
446 });
447 !by_sub.is_empty()
448 });
449 }
450 became_empty
451 }
452
453 pub fn cleanup_due_disconnected_qot(&self) -> Vec<(String, i32)> {
461 let disconnected: Vec<u64> = self.qot_disconnected_conns.read().iter().copied().collect();
462 if disconnected.is_empty() {
463 return Vec::new();
464 }
465
466 let mut to_remove: Vec<(u64, QotSecurityKey, i32)> = Vec::new();
467 {
468 let qot = self.qot_subs.read();
469 let sub_times = self.qot_sub_times.read();
470 for conn_id in &disconnected {
471 for ((key, sub_type), subs) in qot.iter() {
472 if !subs.contains(conn_id) {
473 continue;
474 }
475 let elapsed_ok = sub_times
476 .get(&(key.clone(), *sub_type))
477 .map(|instant| {
478 instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS)
479 })
480 .unwrap_or(true);
481 if elapsed_ok {
482 to_remove.push((*conn_id, key.clone(), *sub_type));
483 }
484 }
485 }
486 }
487
488 let mut became_empty = Vec::new();
489 for (conn_id, key, sub_type) in to_remove {
490 let display_key = key.cache_key();
491 if matches!(
492 self.unsubscribe_qot_inner(conn_id, key, sub_type),
493 UnsubResult::LastSubscriber
494 ) {
495 became_empty.push((display_key, sub_type));
496 }
497 }
498
499 {
500 let mut disconnected = self.qot_disconnected_conns.write();
501 disconnected.retain(|conn_id| self.conn_has_qot_subs(*conn_id));
502 }
503
504 if !became_empty.is_empty() {
505 self.bump_qot_disconnect_sync_generation();
506 }
507 became_empty
508 }
509
510 pub fn unsubscribe_all_qot_dry_run(&self, conn_id: u64) -> Vec<(String, i32)> {
522 let mut became_empty: Vec<(String, i32)> = Vec::new();
523 let qot = self.qot_subs.read();
524 for ((k, sub_type), set) in qot.iter() {
525 if !set.contains(&conn_id) {
526 continue;
527 }
528 if set.len() == 1 {
530 became_empty.push((k.cache_key(), *sub_type));
531 }
532 }
533 became_empty
534 }
535
536 pub fn unsubscribe_all_qot_commit(&self, conn_id: u64) -> Vec<(String, i32)> {
540 self.unsubscribe_all_qot_collect_global_empty(conn_id)
541 }
542
543 pub fn get_qot_subscribers_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> Vec<u64> {
547 match self
548 .qot_subs
549 .read()
550 .get(&(Self::broker_key(sec_key), sub_type))
551 {
552 Some(subscribers) => subscribers.iter().copied().collect(),
553 None => Vec::new(),
554 }
555 }
556
557 pub fn crypto_stock_globally_unsubscribed(&self, stock_id: u64) -> bool {
570 let qot = self.qot_subs.read();
571 !qot.iter()
572 .any(|((key, _sub_type), subs)| !subs.is_empty() && key.stock_key.stock_id == stock_id)
573 }
574
575 pub fn crypto_stock_broker_globally_unsubscribed(&self, stock_id: u64, broker_id: u32) -> bool {
586 let target_broker = std::num::NonZeroU32::new(broker_id);
587 let qot = self.qot_subs.read();
588 !qot.iter().any(|((key, _sub_type), subs)| {
589 !subs.is_empty()
590 && key.stock_key.stock_id == stock_id
591 && key.stock_key.broker_id == target_broker
592 })
593 }
594
595 pub fn set_conn_session_broker(
598 &self,
599 conn_id: u64,
600 sec_key: &QotSecurityKey,
601 sub_type: i32,
602 session: i32,
603 ) {
604 self.qot_sub_sessions
605 .write()
606 .entry((Self::broker_key(sec_key), sub_type))
607 .or_default()
608 .insert(conn_id, session);
609 }
610
611 pub fn get_global_session_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> i32 {
612 self.qot_sub_sessions
613 .read()
614 .get(&(Self::broker_key(sec_key), sub_type))
615 .map(|m| m.values().copied().max().unwrap_or(1))
616 .unwrap_or(1)
617 }
618
619 pub fn get_conn_session_broker(
621 &self,
622 conn_id: u64,
623 sec_key: &QotSecurityKey,
624 sub_type: i32,
625 ) -> i32 {
626 self.qot_sub_sessions
627 .read()
628 .get(&(Self::broker_key(sec_key), sub_type))
629 .and_then(|m| m.get(&conn_id).copied())
630 .unwrap_or(1)
631 }
632
633 pub fn set_conn_orderbook_detail_broker(
634 &self,
635 conn_id: u64,
636 sec_key: &QotSecurityKey,
637 detail: bool,
638 ) {
639 self.qot_orderbook_detail
640 .write()
641 .entry(Self::broker_key(sec_key))
642 .or_default()
643 .insert(conn_id, detail);
644 }
645
646 pub fn is_global_orderbook_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
647 self.qot_orderbook_detail
648 .read()
649 .get(&Self::broker_key(sec_key))
650 .map(|m| m.values().any(|&d| d))
651 .unwrap_or(false)
652 }
653
654 pub fn set_conn_broker_detail_broker(
655 &self,
656 conn_id: u64,
657 sec_key: &QotSecurityKey,
658 detail: bool,
659 ) {
660 self.qot_broker_detail
661 .write()
662 .entry(Self::broker_key(sec_key))
663 .or_default()
664 .insert(conn_id, detail);
665 }
666
667 pub fn is_global_broker_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
668 self.qot_broker_detail
669 .read()
670 .get(&Self::broker_key(sec_key))
671 .map(|m| m.values().any(|&d| d))
672 .unwrap_or(false)
673 }
674
675 pub fn on_disconnect(&self, conn_id: u64) -> Vec<(String, i32)> {
678 self.notify_subs.write().remove(&conn_id);
679
680 {
681 let mut trd = self.trd_acc_subs.write();
682 for subs in trd.values_mut() {
683 subs.remove(&conn_id);
684 }
685 }
686
687 {
688 let mut pr = self.qot_push_regs.write();
689 pr.by_tuple.retain(|_, set| {
690 set.remove(&conn_id);
691 !set.is_empty()
692 });
693 pr.qot_push_regs_by_cache_key.retain(|_, by_sub| {
694 by_sub.retain(|_, set| {
695 set.remove(&conn_id);
696 !set.is_empty()
697 });
698 !by_sub.is_empty()
699 });
700 }
701
702 if self.conn_has_qot_subs(conn_id) {
703 self.qot_disconnected_conns.write().insert(conn_id);
704 }
705 self.cleanup_due_disconnected_qot()
706 }
707}
708
709impl Default for SubscriptionManager {
710 fn default() -> Self {
711 Self::new()
712 }
713}
714
715#[inline]
716fn sub_type_orderbook() -> i32 {
717 2
718}
719
720#[inline]
721fn sub_type_broker() -> i32 {
722 14
723}
724
725#[cfg(test)]
726mod tests;