Skip to main content

futu_server/subscription/
views.rs

1use std::collections::{HashMap, HashSet};
2use std::hash::Hash;
3use std::time::Duration;
4
5use futu_core::qot_stock_key::QotSecurityKey;
6
7use super::SubscriptionManager;
8
9impl SubscriptionManager {
10    // ===== Quota =====
11
12    /// per-conn used quota — count 该 conn 的 (security_key, sub_type) 对数.
13    pub fn get_conn_used_quota(&self, conn_id: u64) -> u32 {
14        self.qot_subs
15            .read()
16            .iter()
17            .filter(|(_, set)| set.contains(&conn_id))
18            .count() as u32
19    }
20
21    /// per-conn used quota with a caller-provided key classifier.
22    ///
23    /// C++ 10.7 separates normal subscription quota from option-chain quota:
24    /// `QotSubscribe::GetUsedQuota` skips keys resolvable by
25    /// `ResolveOptionChainKey`; `GetUsedOptionQuota` handles those separately.
26    /// Rust keeps the option classifier outside `SubscriptionManager` because
27    /// it depends on `StaticDataCache` metadata.
28    pub fn get_conn_used_quota_by<F>(&self, conn_id: u64, include: F) -> u32
29    where
30        F: Fn(&QotSecurityKey) -> bool,
31    {
32        self.qot_subs
33            .read()
34            .iter()
35            .filter(|((key, _), set)| set.contains(&conn_id) && include(key))
36            .count() as u32
37    }
38
39    /// **全局 used quota** — 对齐 C++ `m_nAllUsedQuota` (累加全局唯一
40    /// SubKey count). 不重复计 conn — 多 conn 订同 (stock, sub_type) 全局算 1.
41    pub fn get_total_used_quota(&self) -> u32 {
42        self.qot_subs.read().len() as u32
43    }
44
45    /// Global used quota with a caller-provided key classifier.
46    pub fn get_total_used_quota_by<F>(&self, include: F) -> u32
47    where
48        F: Fn(&QotSecurityKey) -> bool,
49    {
50        self.qot_subs
51            .read()
52            .iter()
53            .filter(|((key, _), set)| !set.is_empty() && include(key))
54            .count() as u32
55    }
56
57    /// C++ `INNData_APIInterLimit::GetOptionSubQuota()` is `GetSubQuota()/5`.
58    pub fn get_option_total_quota(&self) -> u32 {
59        self.get_total_quota() / 5
60    }
61
62    /// Global option-chain quota, deduped by `(option_chain_key, sub_type)`.
63    ///
64    /// The supplied closure mirrors C++ `ResolveOptionChainKey`: `None` means
65    /// the key is a normal security; `Some(chain)` means it participates in the
66    /// option quota bill set.
67    pub fn get_total_used_option_quota_by<F, K>(&self, option_bill_key: F) -> u32
68    where
69        F: Fn(&QotSecurityKey) -> Option<K>,
70        K: Eq + Hash,
71    {
72        let mut bills = HashSet::new();
73        for ((key, sub_type), set) in self.qot_subs.read().iter() {
74            if set.is_empty() {
75                continue;
76            }
77            if let Some(chain) = option_bill_key(key) {
78                bills.insert((chain, *sub_type));
79            }
80        }
81        bills.len() as u32
82    }
83
84    /// Per-connection option-chain quota, deduped by `(option_chain_key, sub_type)`.
85    pub fn get_conn_used_option_quota_by<F, K>(&self, conn_id: u64, option_bill_key: F) -> u32
86    where
87        F: Fn(&QotSecurityKey) -> Option<K>,
88        K: Eq + Hash,
89    {
90        let mut bills = HashSet::new();
91        for ((key, sub_type), set) in self.qot_subs.read().iter() {
92            if !set.contains(&conn_id) {
93                continue;
94            }
95            if let Some(chain) = option_bill_key(key) {
96                bills.insert((chain, *sub_type));
97            }
98        }
99        bills.len() as u32
100    }
101
102    /// 获取 total quota (动态, backend 下发后更新).
103    pub fn get_total_quota(&self) -> u32 {
104        *self.total_quota.read()
105    }
106
107    /// **v1.4.106 codex 1131 F5 [P2]**: backend 下发的 quota 真值 setter.
108    pub fn set_total_quota_from_backend(&self, value: u32) {
109        let mut q = self.total_quota.write();
110        if *q != value {
111            tracing::info!(
112                old = *q,
113                new = value,
114                "v1.4.106 audit 1131 F5: total_quota updated from backend"
115            );
116            *q = value;
117        }
118    }
119
120    pub fn get_remain_quota(&self) -> u32 {
121        let total = self.get_total_quota();
122        let used = self.get_total_used_quota();
123        total.saturating_sub(used)
124    }
125
126    // ===== Conn-level views =====
127
128    pub fn get_conn_qot_subs(&self, conn_id: u64) -> HashMap<i32, Vec<String>> {
129        let qot = self.qot_subs.read();
130        let mut result: HashMap<i32, Vec<String>> = HashMap::new();
131        for ((key, sub_type), conn_ids) in qot.iter() {
132            if conn_ids.contains(&conn_id) {
133                result.entry(*sub_type).or_default().push(key.cache_key());
134            }
135        }
136        result
137    }
138
139    pub fn get_all_qot_conn_ids(&self) -> HashSet<u64> {
140        let qot = self.qot_subs.read();
141        let mut ids = HashSet::new();
142        for conn_ids in qot.values() {
143            ids.extend(conn_ids);
144        }
145        ids
146    }
147
148    /// v1.4.106 codex 0932 F5 [P2]: 获取所有连接 ID(有交易账户订阅的).
149    pub fn get_all_trd_conn_ids(&self) -> HashSet<u64> {
150        let trd = self.trd_acc_subs.read();
151        let mut ids = HashSet::new();
152        for conn_ids in trd.values() {
153            ids.extend(conn_ids);
154        }
155        ids
156    }
157
158    /// v1.4.106 codex 1131 F2 [P1]: 计算全局 desired set.
159    /// 返 (sec_key_display, sub_type), sec_key_display 是 cache_key 形态
160    /// (`"market_code"` or `"market_code@b{id}"`).
161    pub fn compute_global_desired_set(&self) -> Vec<(String, i32)> {
162        let qot = self.qot_subs.read();
163        let mut out = Vec::with_capacity(qot.len());
164        for (k, sub_type) in qot.keys() {
165            out.push((k.cache_key(), *sub_type));
166        }
167        out
168    }
169
170    /// **v1.4.106 codex 0631 F2 [P2]**: shared global desired-set keys helper.
171    ///
172    /// 返当前全集 `Vec<(sec_key, sub_type)>` (与 `compute_global_desired_set`
173    /// 等价, 命名对齐 codex 0631 audit 习惯). 共享给 SubHandler / unsub /
174    /// unsub_all / resubscribe_quotes 三路径, 不再每条 sub 单独发 delta —
175    /// 对齐 CMD 6211 set-state 协议 (per-market full set replaces).
176    ///
177    /// caller 应做的事:
178    /// 1. 调本 fn 拿当前全集
179    /// 2. 应用 delta (sub: 加; unsub: 减; unsub_all: 移除本 conn 独占的)
180    /// 3. resolve sec_key → stock_id (cache); cache miss → loud warn (本 fn 不
181    ///    替 caller 决定 — 由 caller 选择 fail-loud 还是 partial-submit)
182    /// 4. 调 `submit_global_desired_set(NEW set)` 让 backend ack
183    ///
184    /// 命名: keys = `(sec_key, sub_type)` 二元组, 不是 `(stock_id, market)` —
185    /// stock_id resolve 是 caller 责任 (因为依赖 StaticDataCache).
186    #[inline]
187    pub fn qot_global_desired_keys(&self) -> Vec<(String, i32)> {
188        self.compute_global_desired_set()
189    }
190
191    /// C++ delay-statistics gate equivalent for QOT push reporting.
192    ///
193    /// Ref: `APIServer_Qot_StockBasic.cpp:218-223`,
194    /// `APIServer_Qot_OrderBook.cpp:238-243`, and
195    /// `APIServer_Qot_Broker.cpp:194-199`: `IsHasSubFewTime(..., 3)` must be
196    /// true before `Push_Count_Add`.
197    pub fn qot_sub_elapsed_at_least_broker(
198        &self,
199        sec_key: &QotSecurityKey,
200        sub_type: i32,
201        elapsed: Duration,
202    ) -> bool {
203        self.qot_sub_times
204            .read()
205            .get(&(Self::broker_key(sec_key), sub_type))
206            .map(|instant| instant.elapsed() >= elapsed)
207            .unwrap_or(true)
208    }
209}