1use dashmap::DashMap;
4use std::collections::HashSet;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, RwLock};
7
8mod types;
9
10pub use types::{
11 CachedPlateInfo, CachedSecurityInfo, CachedTradeDate, CryptoPairInfo, CryptoTradeConfig,
12 OptionContractInfo, SecurityInfoSource,
13};
14
15pub struct StaticDataCache {
17 securities: DashMap<String, Arc<CachedSecurityInfo>>,
23 id_to_key: DashMap<u64, String>,
26 future_main_link_aliases: DashMap<u64, HashSet<String>>,
33 option_contracts: DashMap<u64, OptionContractInfo>,
35 crypto_pairs: DashMap<String, CryptoPairInfo>,
37 crypto_trade_configs: DashMap<String, CryptoTradeConfig>,
39 pub trade_dates: DashMap<String, Vec<CachedTradeDate>>,
41 pub plates: DashMap<String, Vec<CachedPlateInfo>>,
43 owner_to_warrants: RwLock<std::collections::HashMap<u64, HashSet<u64>>>,
50
51 pub stale_mkt_ids: DashMap<String, ()>,
59
60 pub mkt_id_refresh_marked_total: AtomicU64,
66 pub mkt_id_refresh_done_total: AtomicU64,
67 pub mkt_id_refresh_failed_total: AtomicU64,
68}
69
70impl StaticDataCache {
71 pub fn new() -> Self {
72 Self {
73 securities: DashMap::new(),
74 id_to_key: DashMap::new(),
75 future_main_link_aliases: DashMap::new(),
76 option_contracts: DashMap::new(),
77 crypto_pairs: DashMap::new(),
78 crypto_trade_configs: DashMap::new(),
79 trade_dates: DashMap::new(),
80 plates: DashMap::new(),
81 owner_to_warrants: RwLock::new(std::collections::HashMap::new()),
82 stale_mkt_ids: DashMap::new(),
83 mkt_id_refresh_marked_total: AtomicU64::new(0),
84 mkt_id_refresh_done_total: AtomicU64::new(0),
85 mkt_id_refresh_failed_total: AtomicU64::new(0),
86 }
87 }
88
89 pub fn get_security_info_trigger_refresh(&self, key: &str) -> Option<CachedSecurityInfo> {
99 let info = self.get_security_info(key)?;
100 if info.needs_mkt_id_refresh() {
101 self.mark_stale_mkt_id(key);
102 }
103 Some(info)
104 }
105
106 pub fn mark_stale_mkt_id(&self, key: &str) {
111 self.stale_mkt_ids.insert(key.to_string(), ());
112 self.mkt_id_refresh_marked_total
113 .fetch_add(1, Ordering::Relaxed);
114 }
115
116 pub fn drain_stale_mkt_ids(&self) -> Vec<String> {
134 let keys: Vec<String> = self.stale_mkt_ids.iter().map(|e| e.key().clone()).collect();
135 for k in &keys {
136 self.stale_mkt_ids.remove(k);
137 }
138 keys
139 }
140
141 pub fn update_mkt_id(&self, key: &str, new_mkt_id: u32) -> bool {
146 if let Some(mut entry) = self.securities.get_mut(key) {
147 Arc::make_mut(&mut entry).mkt_id = new_mkt_id;
148 self.mkt_id_refresh_done_total
149 .fetch_add(1, Ordering::Relaxed);
150 true
151 } else {
152 false
153 }
154 }
155
156 pub fn record_mkt_id_refresh_failed(&self) {
158 self.mkt_id_refresh_failed_total
159 .fetch_add(1, Ordering::Relaxed);
160 }
161
162 #[must_use]
164 pub fn stale_mkt_ids_count(&self) -> usize {
165 self.stale_mkt_ids.len()
166 }
167
168 pub fn upsert_full_security_info(&self, key: &str, info: CachedSecurityInfo) {
179 debug_assert!(
180 info.source.is_complete(),
181 "upsert_full_security_info called with non-complete source ({:?})",
182 info.source
183 );
184 self.upsert_with_owner_index_maintenance(key, info);
185 }
186
187 pub fn upsert_crypto_pair_info(&self, key: &str, pair: CryptoPairInfo) {
189 if pair.cc_origin.is_empty() && pair.cc_destination.is_empty() {
190 self.crypto_pairs.remove(key);
191 } else {
192 self.crypto_pairs.insert(key.to_string(), pair);
193 }
194 }
195
196 pub fn set_option_contract_info(&self, stock_id: u64, info: OptionContractInfo) {
198 if stock_id == 0 {
199 return;
200 }
201 self.option_contracts.insert(stock_id, info);
202 }
203
204 pub fn get_option_contract_info_by_stock_id(
206 &self,
207 stock_id: u64,
208 ) -> Option<OptionContractInfo> {
209 self.option_contracts
210 .get(&stock_id)
211 .map(|entry| *entry.value())
212 }
213
214 pub fn get_crypto_pair_info(&self, key: &str) -> Option<CryptoPairInfo> {
216 self.crypto_pairs.get(key).map(|v| v.clone())
217 }
218
219 fn crypto_trade_config_key(broker_id: u32, symbol: &str, exchange: &str) -> String {
220 format!(
221 "{broker_id}:{}:{}",
222 symbol.trim().to_ascii_uppercase(),
223 exchange.trim().to_ascii_uppercase()
224 )
225 }
226
227 pub fn set_crypto_trade_configs_for_broker(
229 &self,
230 broker_id: u32,
231 configs: Vec<CryptoTradeConfig>,
232 ) {
233 let prefix = format!("{broker_id}:");
234 self.crypto_trade_configs
235 .retain(|key, _| !key.starts_with(&prefix));
236 for config in configs {
237 if config.symbol.trim().is_empty() || config.exchange.trim().is_empty() {
238 continue;
239 }
240 let key = Self::crypto_trade_config_key(broker_id, &config.symbol, &config.exchange);
241 self.crypto_trade_configs.insert(key, config);
242 }
243 }
244
245 pub fn get_crypto_trade_config(
247 &self,
248 broker_id: u32,
249 symbol: &str,
250 exchange: &str,
251 ) -> Option<CryptoTradeConfig> {
252 let key = Self::crypto_trade_config_key(broker_id, symbol, exchange);
253 self.crypto_trade_configs.get(&key).map(|v| v.clone())
254 }
255
256 pub fn upsert_basic_security_info(&self, key: &str, info: CachedSecurityInfo) {
262 debug_assert!(
263 !info.source.is_complete(),
264 "upsert_basic_security_info called with complete source ({:?}); use upsert_full",
265 info.source
266 );
267 debug_assert_eq!(
268 info.warrnt_stock_owner, 0,
269 "OnDemandBasic must have warrnt_stock_owner=0 (caller didn't query the field)"
270 );
271 let old_info = if let Some(existing) = self.securities.get(key) {
277 if existing.is_complete() {
278 tracing::debug!(
279 key,
280 "upsert_basic_security_info skipped: existing complete row prevails"
281 );
282 return;
283 }
284 Some(Arc::clone(existing.value()))
285 } else {
286 None
287 };
288 if let Some(old_info) = old_info {
289 self.remove_future_main_link_aliases(key, &old_info);
290 }
291 self.securities
292 .insert(key.to_string(), Arc::new(info.clone()));
293 self.id_to_key.insert(info.stock_id, key.to_string());
294 self.add_future_main_link_aliases(key, &info);
295 }
296
297 pub fn delete_security_info(&self, stock_id: u64) -> bool {
303 let Some((_, key)) = self.id_to_key.remove(&stock_id) else {
304 return false;
305 };
306 self.option_contracts.remove(&stock_id);
307 let old_info = self.securities.remove(&key).map(|(_, info)| info);
309 let old_owner = old_info.as_ref().map(|r| r.warrnt_stock_owner).unwrap_or(0);
310 if let Some(old_info) = old_info.as_ref() {
311 self.remove_future_main_link_aliases(&key, old_info);
312 }
313 self.crypto_pairs.remove(&key);
314 if old_owner != 0
316 && let Ok(mut map) = self.owner_to_warrants.write()
317 && let Some(set) = map.get_mut(&old_owner)
318 {
319 set.remove(&stock_id);
320 if set.is_empty() {
321 map.remove(&old_owner);
322 }
323 }
324 if let Ok(mut map) = self.owner_to_warrants.write() {
326 map.remove(&stock_id);
327 }
328 true
329 }
330
331 fn upsert_with_owner_index_maintenance(&self, key: &str, info: CachedSecurityInfo) {
333 let old_info = self.securities.get(key).map(|r| Arc::clone(r.value()));
335 let old_owner = old_info.as_ref().map(|r| r.warrnt_stock_owner).unwrap_or(0);
336 let new_owner = info.warrnt_stock_owner;
337
338 if let Some(old) = old_info.as_ref() {
339 self.remove_future_main_link_aliases(key, old);
340 }
341
342 let stock_id = info.stock_id;
344 self.securities
345 .insert(key.to_string(), Arc::new(info.clone()));
346 self.id_to_key.insert(stock_id, key.to_string());
347 self.add_future_main_link_aliases(key, &info);
348
349 if old_owner != new_owner {
351 if let Ok(mut map) = self.owner_to_warrants.write() {
353 if old_owner != 0
354 && let Some(set) = map.get_mut(&old_owner)
355 {
356 set.remove(&stock_id);
357 if set.is_empty() {
358 map.remove(&old_owner);
359 }
360 }
361 if new_owner != 0 {
362 map.entry(new_owner).or_default().insert(stock_id);
363 }
364 }
365 } else if new_owner != 0 {
366 if let Ok(mut map) = self.owner_to_warrants.write() {
368 map.entry(new_owner).or_default().insert(stock_id);
369 }
370 }
371 }
372
373 fn future_main_link_target_ids(info: &CachedSecurityInfo) -> Vec<u64> {
374 let mut ids = Vec::with_capacity(2);
375 for target in [info.future_origin_id, info.zhuli_id] {
376 if target != 0 && target != info.stock_id && !ids.contains(&target) {
377 ids.push(target);
378 }
379 }
380 ids
381 }
382
383 fn add_future_main_link_aliases(&self, key: &str, info: &CachedSecurityInfo) {
384 for target in Self::future_main_link_target_ids(info) {
385 self.future_main_link_aliases
386 .entry(target)
387 .or_default()
388 .insert(key.to_string());
389 }
390 }
391
392 fn remove_future_main_link_aliases(&self, key: &str, info: &CachedSecurityInfo) {
393 for target in Self::future_main_link_target_ids(info) {
394 if let Some(mut aliases) = self.future_main_link_aliases.get_mut(&target) {
395 aliases.remove(key);
396 let empty = aliases.is_empty();
397 drop(aliases);
398 if empty {
399 self.future_main_link_aliases.remove(&target);
400 }
401 }
402 }
403 }
404
405 #[must_use]
411 pub fn get_future_main_link_alias_keys(&self, stock_id: u64) -> Vec<String> {
412 let Some(aliases) = self.future_main_link_aliases.get(&stock_id) else {
413 return Vec::new();
414 };
415 let mut keys: Vec<String> = aliases.iter().cloned().collect();
416 keys.sort();
417 keys
418 }
419
420 #[must_use]
427 pub fn quote_push_targets_for_stock_id(
428 &self,
429 stock_id: u64,
430 ) -> Vec<(String, Arc<CachedSecurityInfo>)> {
431 let mut targets = Vec::new();
432
433 if let Some(sec_key_ref) = self.id_to_key.get(&stock_id) {
434 let sec_key = sec_key_ref.clone();
435 drop(sec_key_ref);
436 if let Some(info) = self.get_security_info_arc(&sec_key) {
437 targets.push((sec_key, info));
438 }
439 }
440
441 for alias_key in self.get_future_main_link_alias_keys(stock_id) {
442 if targets.iter().any(|(key, _)| key == &alias_key) {
443 continue;
444 }
445 if let Some(info) = self.get_security_info_arc(&alias_key) {
446 targets.push((alias_key, info));
447 }
448 }
449
450 targets
451 }
452
453 #[must_use]
472 pub fn quote_push_targets_for_stock_key(
473 &self,
474 stock_id: u64,
475 broker_id: Option<std::num::NonZeroU32>,
476 ) -> Vec<(
477 futu_core::qot_stock_key::QotSecurityKey,
478 Arc<CachedSecurityInfo>,
479 )> {
480 let bare = self.quote_push_targets_for_stock_id(stock_id);
484 bare.into_iter()
485 .map(|(public_sec_key, info)| {
486 let key = match broker_id {
487 Some(nz) => futu_core::qot_stock_key::QotSecurityKey::from_broker_id(
488 public_sec_key,
489 stock_id,
490 nz.get(),
491 ),
492 None => futu_core::qot_stock_key::QotSecurityKey::no_broker(
493 public_sec_key,
494 stock_id,
495 ),
496 };
497 (key, info)
498 })
499 .collect()
500 }
501
502 #[deprecated(
510 since = "1.4.106",
511 note = "use upsert_full_security_info / upsert_basic_security_info / delete_security_info"
512 )]
513 pub fn set_security_info(&self, key: &str, info: CachedSecurityInfo) {
514 if info.source.is_complete() {
515 self.upsert_full_security_info(key, info);
516 } else {
517 self.upsert_basic_security_info(key, info);
518 }
519 }
520
521 pub fn get_security_info(&self, key: &str) -> Option<CachedSecurityInfo> {
522 self.get_security_info_arc(key)
523 .map(|info| info.as_ref().clone())
524 }
525
526 pub fn get_security_info_arc(&self, key: &str) -> Option<Arc<CachedSecurityInfo>> {
527 self.securities.get(key).map(|v| Arc::clone(v.value()))
528 }
529
530 pub fn security_id_for_key(&self, key: &str) -> Option<u64> {
531 self.get_security_info(key)
532 .map(|info| info.stock_id)
533 .filter(|stock_id| *stock_id > 0)
534 }
535
536 pub fn security_info_snapshot(&self) -> Vec<CachedSecurityInfo> {
537 self.security_info_snapshot_matching(|_| true)
538 }
539
540 pub fn security_info_snapshot_matching(
541 &self,
542 mut predicate: impl FnMut(&CachedSecurityInfo) -> bool,
543 ) -> Vec<CachedSecurityInfo> {
544 self.securities
545 .iter()
546 .filter_map(|entry| {
547 let info = entry.value();
548 predicate(info.as_ref()).then(|| info.as_ref().clone())
549 })
550 .collect()
551 }
552
553 pub fn security_key_by_stock_id(&self, stock_id: u64) -> Option<String> {
554 self.id_to_key.get(&stock_id).map(|key| key.value().clone())
555 }
556
557 pub fn get_security_info_by_stock_id(&self, stock_id: u64) -> Option<CachedSecurityInfo> {
559 let key = self.security_key_by_stock_id(stock_id)?;
560 self.get_security_info(&key)
561 }
562
563 pub fn get_security_info_by_stock_id_trigger_refresh(
564 &self,
565 stock_id: u64,
566 ) -> Option<CachedSecurityInfo> {
567 let key = self.security_key_by_stock_id(stock_id)?;
568 self.get_security_info_trigger_refresh(&key)
569 }
570
571 pub fn add_warrant_owner(&self, warrant_stock_id: u64, owner_stock_id: u64) {
576 if owner_stock_id == 0 {
577 return;
578 }
579 if let Ok(mut map) = self.owner_to_warrants.write() {
580 map.entry(owner_stock_id)
581 .or_default()
582 .insert(warrant_stock_id);
583 }
584 }
585
586 #[must_use]
598 pub fn search_warrants_by_owner(&self, owner_stock_id: u64) -> Vec<u64> {
599 match self.owner_to_warrants.read() {
600 Ok(map) => map
601 .get(&owner_stock_id)
602 .map(|set| set.iter().copied().collect())
603 .unwrap_or_default(),
604 _ => Vec::new(),
605 }
606 }
607}
608
609impl Default for StaticDataCache {
610 fn default() -> Self {
611 Self::new()
612 }
613}
614
615#[cfg(test)]
616mod tests;