1use std::sync::atomic::AtomicU32;
6use std::sync::Arc;
7
8use arc_swap::ArcSwap;
9use futu_backend::auth::{self, AuthConfig, AuthResult};
10use futu_backend::conn::{BackendConn, PushCallback};
11use futu_backend::login;
12use futu_cache::login_cache::LoginCache;
13use futu_cache::qot_cache::QotCache;
14use futu_cache::qot_right::QotRightCache;
15use futu_cache::static_data::StaticDataCache;
16use futu_cache::trd_cache::{CachedTrdAcc, TrdCache};
17use futu_core::error::{FutuError, Result};
18use futu_server::listener::ApiServer;
19use futu_server::metrics::GatewayMetrics;
20use futu_server::push::PushDispatcher;
21use futu_server::subscription::SubscriptionManager;
22
23use crate::handlers;
24
25pub type SharedBackend = Arc<ArcSwap<Option<Arc<BackendConn>>>>;
29
30fn listing_date_to_str(ts: u32) -> String {
32 let utc_secs = ts as i64 + 8 * 3600;
34 let days = utc_secs.div_euclid(86400);
35 let (y, m, d) = days_to_ymd(days as i32 + 719468);
36 format!("{y:04}-{m:02}-{d:02}")
37}
38
39fn timestamp_to_datetime_str(ts: f64) -> String {
41 if ts <= 0.0 {
42 return String::new();
43 }
44 let secs = ts as i64;
45 let utc_secs = secs + 8 * 3600;
47 let days = utc_secs.div_euclid(86400);
48 let day_secs = utc_secs.rem_euclid(86400);
49 let h = day_secs / 3600;
50 let m = (day_secs % 3600) / 60;
51 let s = day_secs % 60;
52 let (y, mo, d) = days_to_ymd(days as i32 + 719468); format!("{y:04}-{mo:02}-{d:02} {h:02}:{m:02}:{s:02}")
55}
56
57fn days_to_ymd(z: i32) -> (i32, u32, u32) {
59 let era = z.div_euclid(146097);
60 let doe = z.rem_euclid(146097) as u32;
61 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
62 let y = yoe as i32 + era * 400;
63 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
64 let mp = (5 * doy + 2) / 153;
65 let d = doy - (153 * mp + 2) / 5 + 1;
66 let m = if mp < 10 { mp + 3 } else { mp - 9 };
67 let y = if m <= 2 { y + 1 } else { y };
68 (y, m, d)
69}
70
71fn market_code_to_qot_market(market_code: u32) -> i32 {
74 match market_code {
75 1..=4 => 1,
77 5 | 6 => 1,
79 7 | 8 => 1,
81 10..=29 => 11, 30 | 32 | 33 | 34 | 36..=40 => 21, 31 | 35 => 22, 41..=49 => 11, 60..=109 => 11, 110..=119 => 1, 120..=123 => 81, 160..=179 => 31, 185..=194 => 41, 570 => 1, 1000..=1049 => 1, 1200..=1249 => 11, _ => 0, }
106}
107
108fn market_code_to_exch_type(market_code: u32) -> i32 {
111 match market_code {
112 1 => 1, 2 => 2, 3..=9 => 3, 10 => 4, 11 => 5, 12 => 7, 13 => 6, 30 => 14, 31 => 15, 41 => 8, _ => 0, }
124}
125
126#[derive(Debug, Clone)]
128pub struct GatewayConfig {
129 pub auth_server: String,
131 pub account: String,
133 pub password: String,
135 pub region: String,
137 pub listen_addr: String,
139 pub device_id: String,
141 pub app_lang: i32,
143}
144
145pub enum PushEvent {
147 QuotePush {
149 sec_key: String,
150 sub_type: i32,
151 proto_id: u32,
152 body: Vec<u8>,
153 },
154 BroadcastPush { proto_id: u32, body: Vec<u8> },
156 TradeReQuery {
158 acc_id: u64,
159 notice_type: u32,
160 order_ids: Vec<String>,
162 order_fill_ids: Vec<String>,
164 },
165}
166
167pub struct GatewayBridge {
169 pub qot_cache: Arc<QotCache>,
170 pub trd_cache: Arc<TrdCache>,
171 pub static_cache: Arc<StaticDataCache>,
172 pub login_cache: Arc<LoginCache>,
173 pub subscriptions: Arc<SubscriptionManager>,
174 pub backend: SharedBackend,
175 pub auth_result: Option<AuthResult>,
176 pub push_tx: Option<tokio::sync::mpsc::Sender<PushEvent>>,
178 pub suspend_cache: futu_backend::suspend_data::SuspendCache,
180 pub code_change_cache: futu_backend::code_change::CodeChangeCache,
182 pub app_lang: i32,
184 pub kl_quota_counter: Arc<AtomicU32>,
186 pub qot_right_cache: Arc<QotRightCache>,
188 pub metrics: Arc<GatewayMetrics>,
190}
191
192impl GatewayBridge {
193 pub fn new() -> Self {
194 Self {
195 qot_cache: Arc::new(QotCache::new()),
196 trd_cache: Arc::new(TrdCache::new()),
197 static_cache: Arc::new(StaticDataCache::new()),
198 login_cache: Arc::new(LoginCache::new()),
199 subscriptions: Arc::new(SubscriptionManager::new()),
200 backend: Arc::new(ArcSwap::new(Arc::new(None))),
201 auth_result: None,
202 push_tx: None,
203 suspend_cache: Arc::new(dashmap::DashMap::new()),
204 code_change_cache: futu_backend::code_change::new_cache(),
205 app_lang: 0, kl_quota_counter: Arc::new(AtomicU32::new(0)),
207 qot_right_cache: Arc::new(QotRightCache::new()),
208 metrics: Arc::new(GatewayMetrics::new()),
209 }
210 }
211
212 pub async fn initialize(
216 &mut self,
217 config: &GatewayConfig,
218 verify_cb: Option<auth::VerifyCodeCallback>,
219 ) -> Result<tokio::sync::mpsc::Receiver<PushEvent>> {
220 self.app_lang = config.app_lang;
221
222 tracing::info!("step 1/4: HTTP authentication...");
224 let auth_config = AuthConfig {
225 auth_server: config.auth_server.clone(),
226 account: config.account.clone(),
227 password: config.password.clone(),
228 device_id: config.device_id.clone(),
229 };
230 let auth_result = auth::authenticate_with_callback(&auth_config, verify_cb).await?;
231 tracing::info!(user_id = auth_result.user_id, "authentication succeeded");
232
233 let conn_points = auth::conn_points::get_by_region(&config.region);
235 let mut backend_addr = {
238 use std::collections::HashMap;
239 let all = auth::conn_points::ALL;
240 let mut by_region: HashMap<&str, Vec<(&str, u16)>> = HashMap::new();
241 for (ip, port, region) in all {
242 by_region.entry(region).or_default().push((ip, *port));
243 }
244 let mut candidates: Vec<String> = by_region
246 .values()
247 .map(|ips| {
248 let idx = (std::time::SystemTime::now()
249 .duration_since(std::time::UNIX_EPOCH)
250 .unwrap_or_default()
251 .subsec_nanos() as usize)
252 % ips.len();
253 let (ip, port) = ips[idx];
254 format!("{ip}:{port}")
255 })
256 .collect();
257 let seed = std::time::SystemTime::now()
259 .duration_since(std::time::UNIX_EPOCH)
260 .unwrap_or_default()
261 .subsec_nanos();
262 for i in (1..candidates.len()).rev() {
263 let j = (seed as usize + i) % (i + 1);
264 candidates.swap(i, j);
265 }
266 candidates
267 .first()
268 .cloned()
269 .unwrap_or_else(|| format!("{}:{}", conn_points[0].0, conn_points[0].1))
270 };
271
272 let (push_tx, push_rx) = tokio::sync::mpsc::channel::<PushEvent>(4096);
274 self.push_tx = Some(push_tx.clone());
275
276 let push_cb: PushCallback = {
277 let static_cache = Arc::clone(&self.static_cache);
278 let qot_cache = Arc::clone(&self.qot_cache);
279 let push_metrics = Arc::clone(&self.metrics);
280 Arc::new(move |cmd_id, body| {
281 push_metrics
282 .backend_pushes_received
283 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
284 match cmd_id {
285 6212 => {
286 Self::handle_quote_push(&body, &static_cache, &qot_cache, &push_tx);
288 }
289 4716 => {
290 Self::handle_trade_notify(&body, &push_tx);
292 }
293 5300 => {
294 Self::handle_msg_center_push(&body, &static_cache, &push_tx);
296 }
297 6803 => {
298 tracing::debug!(
300 body_len = body.len(),
301 "price reminder settings changed notification"
302 );
303 }
304 _ => {
305 tracing::debug!(cmd_id, body_len = body.len(), "backend push");
306 }
307 }
308 })
309 };
310
311 let (mut backend, login_result) = {
312 const MAX_REDIRECTS: usize = 3;
313 let mut login_result = None;
314 let mut final_backend = None;
315
316 for attempt in 0..=MAX_REDIRECTS {
317 tracing::info!(addr = %backend_addr, attempt, "connecting to backend...");
318 let conn = BackendConn::connect(&backend_addr, push_cb.clone()).await?;
319
320 tracing::info!("TCP login...");
321 match login::tcp_login(
322 &conn,
323 &auth_result,
324 &auth_result.client_key,
325 true,
326 attempt as u32,
327 )
328 .await
329 {
330 Ok(result) => {
331 conn.set_session_key(result.session_key);
332 conn.set_sec_data(result.sec_data);
333 conn.user_id
334 .store(result.user_id as u32, std::sync::atomic::Ordering::Relaxed);
335 login_result = Some(result);
336 final_backend = Some(conn);
337 break;
338 }
339 Err(FutuError::ServerError { ret_type: 1, msg }) => {
340 if let Some(new_addr) = msg.strip_prefix("redirect to ") {
342 tracing::info!(redirect_to = new_addr, "login redirect");
343 backend_addr = new_addr.to_string();
344 continue;
345 }
346 return Err(FutuError::ServerError { ret_type: 1, msg });
347 }
348 Err(e) => return Err(e),
349 }
350 }
351
352 let backend = final_backend.ok_or(FutuError::Codec("too many redirects".into()))?;
353 let login_result =
354 login_result.expect("login_result must be Some after successful login");
355 (backend, login_result)
356 };
357
358 tracing::info!(
359 user_id = login_result.user_id,
360 addr = %backend_addr,
361 "login succeeded"
362 );
363
364 tracing::info!("step 3b: fetching ConnIP list (CMD 1321)...");
366 let device_id_bytes = config.device_id.as_bytes().to_vec();
367 let conn_ip_list = futu_backend::conn_ip::fetch_conn_ip_list(
368 &backend,
369 login_result.user_id,
370 &device_id_bytes,
371 )
372 .await
373 .unwrap_or_else(|e| {
374 tracing::warn!(error = %e, "CMD1321 failed (non-fatal)");
375 vec![]
376 });
377
378 if !conn_ip_list.is_empty() {
381 let best = conn_ip_list
383 .iter()
384 .find(|p| p.region >= 10001)
385 .or_else(|| conn_ip_list.first());
386
387 if let Some(best_point) = best {
388 let new_addr = format!("{}:{}", best_point.ip, best_point.port);
389 if new_addr != backend_addr {
390 tracing::info!(
391 old_addr = %backend_addr,
392 new_addr = %new_addr,
393 region = best_point.region,
394 "ConnIP: switching to recommended server"
395 );
396
397 match BackendConn::connect(&new_addr, push_cb.clone()).await {
399 Ok(new_conn) => {
400 match login::tcp_login(
401 &new_conn,
402 &auth_result,
403 &auth_result.client_key,
404 true,
405 0,
406 )
407 .await
408 {
409 Ok(new_result) => {
410 new_conn.set_session_key(new_result.session_key);
411 new_conn.set_sec_data(new_result.sec_data);
412 new_conn.user_id.store(
413 new_result.user_id as u32,
414 std::sync::atomic::Ordering::Relaxed,
415 );
416 backend_addr = new_addr;
417 backend = new_conn;
418 tracing::info!(
419 addr = %backend_addr,
420 "ConnIP: reconnected to recommended server"
421 );
422 }
423 Err(e) => {
424 tracing::warn!(
425 error = %e,
426 addr = %new_addr,
427 "ConnIP: reconnect login failed, keeping original"
428 );
429 }
430 }
431 }
432 Err(e) => {
433 tracing::warn!(
434 error = %e,
435 addr = %new_addr,
436 "ConnIP: connect failed, keeping original"
437 );
438 }
439 }
440 }
441 }
442 }
443
444 self.login_cache
446 .set_login_state(futu_cache::login_cache::LoginState {
447 user_id: login_result.user_id as u32,
448 is_logged_in: true,
449 login_account: config.account.clone(),
450 region: config.region.clone(),
451 server_addr: backend_addr.clone(),
452 });
453
454 let backend = Arc::new(backend);
455 self.backend.store(Arc::new(Some(Arc::clone(&backend))));
456 let auth_result_for_reconnect = auth_result.clone();
457 self.auth_result = Some(auth_result);
458
459 tracing::info!("step 4/7: fetching account list...");
461 self.fetch_account_list(&backend).await?;
462
463 {
465 let q_backend = Arc::clone(&backend);
466 let q_cache = Arc::clone(&self.trd_cache);
467 tokio::spawn(async move {
468 futu_backend::trade_query::init_trade_data(&q_backend, &q_cache).await;
469 });
470 }
471
472 {
474 let qr_backend = Arc::clone(&backend);
475 let qr_cache = Arc::clone(&self.qot_right_cache);
476 tokio::spawn(async move {
477 Self::request_qot_right(&qr_backend, &qr_cache).await;
478 });
479 }
480
481 tracing::info!("step 5/7: registering markets...");
483 if let Err(e) = futu_backend::stock_list::register_markets(&backend).await {
484 tracing::warn!(error = %e, "market registration failed (non-fatal)");
485 }
486
487 tracing::info!("step 6/7: pulling market status...");
489 match futu_backend::stock_list::pull_market_status(&backend).await {
490 Ok(statuses) => {
491 for s in &statuses {
492 tracing::info!(
493 market_id = s.market_id,
494 status = s.status,
495 text = %s.status_text,
496 "market status"
497 );
498 }
499 }
500 Err(e) => tracing::warn!(error = %e, "market status pull failed (non-fatal)"),
501 }
502
503 {
505 let sync_backend = Arc::clone(&backend);
506 let sync_cache = Arc::clone(&self.static_cache);
507 tokio::spawn(async move {
508 tracing::info!("stock list sync started (background)");
509
510 let stock_db = match crate::stock_db::StockDb::open() {
512 Ok(db) => {
513 let persisted_version = db.get_version();
514 if persisted_version > 0 {
515 match db.load_all() {
517 Ok(items) => {
518 let mut loaded = 0usize;
519 for item in &items {
520 let qot_market =
521 market_code_to_qot_market(item.market_code);
522 let key = format!("{qot_market}_{}", item.code);
523 sync_cache.id_to_key.insert(item.stock_id, key.clone());
524 sync_cache.securities.insert(
525 key,
526 futu_cache::static_data::CachedSecurityInfo {
527 stock_id: item.stock_id,
528 market: qot_market,
529 code: item.code.clone(),
530 name: item.name_sc.clone(),
531 lot_size: item.lot_size as i32,
532 sec_type: item.instrument_type as i32,
533 list_time: listing_date_to_str(item.listing_date),
534 warrnt_stock_owner: item.warrnt_stock_owner,
535 delisting: item.delisting,
536 exch_type: market_code_to_exch_type(
537 item.market_code,
538 ),
539 no_search: item.no_search,
540 },
541 );
542 if item.warrnt_stock_owner != 0 {
543 sync_cache.add_warrant_owner(
544 item.stock_id,
545 item.warrnt_stock_owner,
546 );
547 }
548 loaded += 1;
549 }
550 tracing::info!(
551 version = persisted_version,
552 loaded,
553 "restored stock list from SQLite"
554 );
555 }
556 Err(e) => {
557 tracing::warn!(error = %e, "failed to load stocks from SQLite")
558 }
559 }
560 }
561 Some(db)
562 }
563 Err(e) => {
564 tracing::warn!(error = %e, "failed to open stock SQLite db");
565 None
566 }
567 };
568
569 let persisted_version = stock_db.as_ref().map(|db| db.get_version()).unwrap_or(0);
570 let version = std::sync::atomic::AtomicU64::new(persisted_version);
571
572 let mut retry_delay = 10u64; loop {
575 let sync_round = std::sync::atomic::AtomicUsize::new(0);
576 let count = std::sync::atomic::AtomicUsize::new(0);
577 let hk_eqty_count = std::sync::atomic::AtomicUsize::new(0);
578 let deleted_count = std::sync::atomic::AtomicUsize::new(0);
579 let unknown_market_count = std::sync::atomic::AtomicUsize::new(0);
580 let cache_ref = &sync_cache;
581 let db_ref = &stock_db;
582 if let Some(db) = db_ref {
584 let _ = db.begin_batch();
585 }
586 let mut callback = |info: futu_backend::stock_list::StockInfo| {
587 if info.delete_flag {
588 deleted_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
589 if let Some(old_key) = cache_ref.id_to_key.remove(&info.stock_id) {
590 cache_ref.securities.remove(&old_key.1);
591 }
592 if let Some(db) = db_ref {
594 let _ = db.delete(info.stock_id);
595 }
596 return;
597 }
598 let qot_market = market_code_to_qot_market(info.market_code);
599 if qot_market == 0 {
600 unknown_market_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
601 if unknown_market_count.load(std::sync::atomic::Ordering::Relaxed) <= 5
602 {
603 tracing::warn!(
604 market_code = info.market_code,
605 code = %info.code,
606 instrument_type = info.instrument_type,
607 "unknown market_code"
608 );
609 }
610 }
611 let key = format!("{qot_market}_{}", info.code);
612 cache_ref.id_to_key.insert(info.stock_id, key.clone());
613 let sec_type = info.instrument_type as i32;
614 if qot_market == 1 && sec_type == 3 {
615 hk_eqty_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
616 }
617 cache_ref.securities.insert(
618 key,
619 futu_cache::static_data::CachedSecurityInfo {
620 stock_id: info.stock_id,
621 market: qot_market,
622 code: info.code.clone(),
623 name: info.name_sc.clone(),
624 lot_size: info.lot_size as i32,
625 sec_type,
626 list_time: listing_date_to_str(info.listing_date),
627 warrnt_stock_owner: info.warrnt_stock_owner,
628 delisting: info.delisting,
629 exch_type: market_code_to_exch_type(info.market_code),
630 no_search: info.no_search,
631 },
632 );
633 if info.warrnt_stock_owner != 0 {
634 cache_ref.add_warrant_owner(info.stock_id, info.warrnt_stock_owner);
635 }
636 if let Some(db) = db_ref {
638 let _ = db.upsert(&crate::stock_db::DbStockItem {
639 stock_id: info.stock_id,
640 code: info.code.clone(),
641 name_sc: info.name_sc.clone(),
642 market_code: info.market_code,
643 instrument_type: info.instrument_type,
644 lot_size: info.lot_size,
645 delisting: info.delisting,
646 warrnt_stock_owner: info.warrnt_stock_owner,
647 no_search: info.no_search,
648 listing_date: info.listing_date,
649 });
650 }
651 count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
652 };
653 match futu_backend::stock_list::sync_stock_list(
654 &sync_backend,
655 &version,
656 &mut callback,
657 )
658 .await
659 {
660 Ok(result) => {
661 let interval = result.next_interval_secs.max(30); tracing::info!(
663 total = result.total_stocks,
664 cached = count.load(std::sync::atomic::Ordering::Relaxed),
665 deleted = deleted_count.load(std::sync::atomic::Ordering::Relaxed),
666 hk_eqty = hk_eqty_count.load(std::sync::atomic::Ordering::Relaxed),
667 unknown_market =
668 unknown_market_count.load(std::sync::atomic::Ordering::Relaxed),
669 next_interval = interval,
670 "stock list sync complete"
671 );
672 retry_delay = 10; if let Some(db) = &stock_db {
675 let _ = db.commit_batch();
676 let cur_ver = version.load(std::sync::atomic::Ordering::Relaxed);
677 let _ = db.set_version(cur_ver);
678 tracing::debug!(
679 version = cur_ver,
680 db_count = db.count(),
681 "stock list version persisted to SQLite"
682 );
683 }
684 let round =
685 sync_round.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
686 let sleep_secs = if round < 5 { 5 } else { interval as u64 };
688 tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
689 }
690 Err(e) => {
691 tracing::warn!(
692 error = %e,
693 retry_secs = retry_delay,
694 "stock list sync failed, retrying"
695 );
696 tokio::time::sleep(std::time::Duration::from_secs(retry_delay)).await;
697 retry_delay = (retry_delay * 2).min(300); }
699 }
700 }
701 });
702 }
703
704 {
706 let suspend_cache = Arc::clone(&self.suspend_cache);
707 tokio::spawn(async move {
708 tracing::info!("suspend data download started (background)");
709 let loaded = futu_backend::suspend_data::load_suspend_data().await;
710 for entry in loaded.iter() {
712 suspend_cache.insert(*entry.key(), entry.value().clone());
713 }
714 tracing::info!(
715 stocks = suspend_cache.len(),
716 "suspend data download complete"
717 );
718 });
719 }
720
721 {
723 let code_change_cache = Arc::clone(&self.code_change_cache);
724 tokio::spawn(async move {
725 tracing::info!("code change data download started (background)");
726 let loaded = futu_backend::code_change::load_code_change_data().await;
727 let count = loaded.len();
728 *code_change_cache.write() = loaded;
729 tracing::info!(records = count, "code change data download complete");
730 });
731 }
732
733 let heartbeat_interval =
735 std::time::Duration::from_secs(login_result.keep_alive_interval as u64);
736 tracing::info!(interval_secs = ?heartbeat_interval, "starting backend heartbeat");
737 let heartbeat_handle =
738 futu_backend::heartbeat::start_heartbeat(Arc::clone(&backend), heartbeat_interval);
739
740 {
743 let reconnect_config = config.clone();
744 let reconnect_auth = auth_result_for_reconnect;
745 let reconnect_trd_cache = Arc::clone(&self.trd_cache);
746 let reconnect_static_cache = Arc::clone(&self.static_cache);
747 let reconnect_subscriptions = Arc::clone(&self.subscriptions);
748 let reconnect_metrics = Arc::clone(&self.metrics);
749 let shared_backend = Arc::clone(&self.backend);
750 let reconnect_push_cb = push_cb;
751 tokio::spawn(async move {
752 let mut current_heartbeat = heartbeat_handle;
753
754 loop {
755 let _ = current_heartbeat.await;
757 tracing::error!("heartbeat exited — attempting reconnect...");
758
759 reconnect_metrics
761 .backend_online
762 .store(0, std::sync::atomic::Ordering::Relaxed);
763
764 shared_backend.store(Arc::new(None));
766
767 let mut delay = 3u64;
769 loop {
770 tracing::info!(delay_secs = delay, "reconnecting to backend...");
771 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
772
773 let conn_points =
775 auth::conn_points::get_by_region(&reconnect_config.region);
776 let (addr, _port) = match conn_points.first() {
777 Some(p) => p,
778 None => {
779 reconnect_metrics
780 .backend_reconnect_failures
781 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
782 delay = (delay * 2).min(60);
783 continue;
784 }
785 };
786 let backend_addr = format!("{addr}:{_port}");
787
788 let conn =
789 match BackendConn::connect(&backend_addr, reconnect_push_cb.clone())
790 .await
791 {
792 Ok(c) => c,
793 Err(e) => {
794 reconnect_metrics
795 .backend_reconnect_failures
796 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
797 tracing::warn!(error = %e, "reconnect TCP failed");
798 delay = (delay * 2).min(60);
799 continue;
800 }
801 };
802
803 match login::tcp_login(
804 &conn,
805 &reconnect_auth,
806 &reconnect_auth.client_key,
807 true,
808 0,
809 )
810 .await
811 {
812 Ok(lr) => {
813 conn.set_session_key(lr.session_key);
814 conn.set_sec_data(lr.sec_data);
815 conn.user_id
816 .store(lr.user_id as u32, std::sync::atomic::Ordering::Relaxed);
817 tracing::info!(user_id = lr.user_id, "reconnect login succeeded");
818
819 let _ = futu_backend::stock_list::register_markets(&conn).await;
821
822 let conn_arc = Arc::new(conn);
824 futu_backend::trade_query::init_trade_data(
825 &conn_arc,
826 &reconnect_trd_cache,
827 )
828 .await;
829
830 shared_backend.store(Arc::new(Some(Arc::clone(&conn_arc))));
832 tracing::info!("shared backend connection updated");
833
834 let resub_count = resubscribe_quotes(
837 &conn_arc,
838 &reconnect_subscriptions,
839 &reconnect_static_cache,
840 )
841 .await;
842 if resub_count > 0 {
843 reconnect_metrics.resubscribe_ops.fetch_add(
844 resub_count as u64,
845 std::sync::atomic::Ordering::Relaxed,
846 );
847 tracing::info!(
848 securities = resub_count,
849 "quote re-subscription complete"
850 );
851 }
852
853 reconnect_metrics
855 .backend_online
856 .store(1, std::sync::atomic::Ordering::Relaxed);
857 reconnect_metrics
858 .backend_reconnects
859 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
860 reconnect_metrics.last_reconnect_ms.store(
861 std::time::SystemTime::now()
862 .duration_since(std::time::UNIX_EPOCH)
863 .unwrap_or_default()
864 .as_millis() as u64,
865 std::sync::atomic::Ordering::Relaxed,
866 );
867
868 let hb_interval =
870 std::time::Duration::from_secs(lr.keep_alive_interval as u64);
871 current_heartbeat = futu_backend::heartbeat::start_heartbeat(
872 Arc::clone(&conn_arc),
873 hb_interval,
874 );
875
876 tracing::info!("reconnect complete");
877 break; }
879 Err(e) => {
880 reconnect_metrics
881 .backend_reconnect_failures
882 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
883 tracing::warn!(error = %e, "reconnect login failed");
884 delay = (delay * 2).min(60);
885 }
886 }
887 }
888 }
889 });
890 }
891
892 tracing::info!("gateway initialization complete");
893 Ok(push_rx)
894 }
895
896 fn handle_quote_push(
899 body: &[u8],
900 static_cache: &futu_cache::static_data::StaticDataCache,
901 qot_cache: &futu_cache::qot_cache::QotCache,
902 push_tx: &tokio::sync::mpsc::Sender<PushEvent>,
903 ) {
904 use futu_backend::proto_internal::ft_cmd_stock_quote_sub;
905
906 let push: ft_cmd_stock_quote_sub::QuotePush = match prost::Message::decode(body) {
907 Ok(p) => p,
908 Err(_) => {
909 Self::handle_quote_push_reply(body, static_cache, qot_cache, push_tx);
913 return;
914 }
915 };
916
917 tracing::debug!(
918 sec_count = push.security_qta_list.len(),
919 "CMD6212 push parsed"
920 );
921
922 for sec_quote in &push.security_qta_list {
923 let stock_id = match sec_quote.security_id {
924 Some(id) => id,
925 None => continue,
926 };
927
928 let sec_key = match static_cache.id_to_key.get(&stock_id) {
930 Some(k) => k.clone(),
931 None => {
932 tracing::debug!(stock_id, "CMD6212: stock_id not found in id_to_key cache");
933 continue;
934 }
935 };
936 let sec_info = static_cache.get_security_info(&sec_key);
937
938 for bit_quote in &sec_quote.bit_qta_list {
939 let bit = bit_quote.bit.unwrap_or(u32::MAX);
940 let data = match &bit_quote.data {
941 Some(d) => d.as_slice(),
942 None => continue,
943 };
944
945 tracing::debug!(
946 stock_id,
947 %sec_key,
948 bit,
949 data_len = data.len(),
950 "CMD6212 processing bit"
951 );
952
953 match bit {
954 0 => {
955 if let Some(ref info) = sec_info {
957 if let Some(event) =
958 Self::parse_price_to_push(data, &sec_key, info, qot_cache)
959 {
960 let _ = push_tx.try_send(event);
961 }
962 }
963 }
964 1 => {
965 Self::parse_stock_state(data, &sec_key, qot_cache);
967 if let Some(ref info) = sec_info {
969 if let Some(event) =
970 Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
971 {
972 let _ = push_tx.try_send(event);
973 }
974 }
975 }
976 3 => {
977 if let Some(ref info) = sec_info {
979 if let Some(event) =
980 Self::parse_order_book_to_push(data, &sec_key, info, qot_cache)
981 {
982 let _ = push_tx.try_send(event);
983 }
984 }
985 }
986 5 => {
987 Self::parse_deal_statistics(data, &sec_key, qot_cache);
989 if let Some(ref info) = sec_info {
991 if let Some(event) =
992 Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
993 {
994 let _ = push_tx.try_send(event);
995 }
996 }
997 }
998 9 => {
999 if let Some(ref info) = sec_info {
1001 if let Some(event) =
1002 Self::parse_broker_queue_to_push(data, &sec_key, info, qot_cache)
1003 {
1004 let _ = push_tx.try_send(event);
1005 }
1006 }
1007 }
1008 35 => {
1009 if let Some(ref info) = sec_info {
1011 if let Some(event) =
1012 Self::parse_ticker_to_push(data, &sec_key, info, qot_cache)
1013 {
1014 let _ = push_tx.try_send(event);
1015 }
1016 }
1017 }
1018 20 => {
1019 if let Some(ref info) = sec_info {
1021 if let Some(event) =
1022 Self::parse_rt_to_push(data, &sec_key, info, qot_cache)
1023 {
1024 let _ = push_tx.try_send(event);
1025 }
1026 }
1027 }
1028 21..=31 => {
1029 if let Some(ref info) = sec_info {
1031 if let Some(event) =
1032 Self::parse_kline_to_push(data, bit, &sec_key, info, qot_cache)
1033 {
1034 let _ = push_tx.try_send(event);
1035 }
1036 }
1037 }
1038 _ => {}
1039 }
1040 }
1041 }
1042 }
1043
1044 fn handle_quote_push_reply(
1050 body: &[u8],
1051 static_cache: &futu_cache::static_data::StaticDataCache,
1052 qot_cache: &futu_cache::qot_cache::QotCache,
1053 push_tx: &tokio::sync::mpsc::Sender<PushEvent>,
1054 ) {
1055 use futu_backend::proto_internal::ft_cmd_stock_quote_sub;
1056
1057 if body.len() < 8 {
1058 return;
1059 }
1060
1061 let data = &body[4..];
1064 let mut pos = 0;
1065 while pos < data.len() {
1067 let b = data[pos];
1068 pos += 1;
1069 if b & 0x80 == 0 {
1070 break;
1071 }
1072 }
1073
1074 if pos >= data.len() {
1075 tracing::debug!(
1076 body_len = body.len(),
1077 "CMD6212 reply: too short after prefix"
1078 );
1079 return;
1080 }
1081
1082 let proto_data = &data[pos..];
1083
1084 let push: ft_cmd_stock_quote_sub::QuotePush = match prost::Message::decode(proto_data) {
1086 Ok(p) => p,
1087 Err(_) => {
1088 match Self::parse_quote_push_truncated(proto_data) {
1092 Some(p) => p,
1093 None => {
1094 tracing::debug!(
1095 body_len = body.len(),
1096 proto_len = proto_data.len(),
1097 "CMD6212 reply: decode failed"
1098 );
1099 return;
1100 }
1101 }
1102 }
1103 };
1104
1105 tracing::debug!(
1106 sec_count = push.security_qta_list.len(),
1107 "CMD6212 reply parsed as QuotePush"
1108 );
1109
1110 for sec_quote in &push.security_qta_list {
1112 let stock_id = match sec_quote.security_id {
1113 Some(id) => id,
1114 None => continue,
1115 };
1116
1117 let sec_key = match static_cache.id_to_key.get(&stock_id) {
1118 Some(k) => k.clone(),
1119 None => {
1120 tracing::debug!(stock_id, "CMD6212 reply: stock_id not in cache");
1121 continue;
1122 }
1123 };
1124 let sec_info = static_cache.get_security_info(&sec_key);
1125
1126 for bit_quote in &sec_quote.bit_qta_list {
1127 let bit = bit_quote.bit.unwrap_or(u32::MAX);
1128 let data = match &bit_quote.data {
1129 Some(d) => d.as_slice(),
1130 None => continue,
1131 };
1132
1133 tracing::debug!(
1134 stock_id,
1135 %sec_key,
1136 bit,
1137 data_len = data.len(),
1138 "CMD6212 reply processing bit"
1139 );
1140
1141 match bit {
1142 0 => {
1143 if let Some(ref info) = sec_info {
1144 if let Some(event) =
1145 Self::parse_price_to_push(data, &sec_key, info, qot_cache)
1146 {
1147 let _ = push_tx.try_send(event);
1148 }
1149 }
1150 }
1151 1 => {
1152 Self::parse_stock_state(data, &sec_key, qot_cache);
1153 if let Some(ref info) = sec_info {
1154 if let Some(event) =
1155 Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
1156 {
1157 let _ = push_tx.try_send(event);
1158 }
1159 }
1160 }
1161 3 => {
1162 if let Some(ref info) = sec_info {
1163 if let Some(event) =
1164 Self::parse_order_book_to_push(data, &sec_key, info, qot_cache)
1165 {
1166 let _ = push_tx.try_send(event);
1167 }
1168 }
1169 }
1170 5 => {
1171 Self::parse_deal_statistics(data, &sec_key, qot_cache);
1172 if let Some(ref info) = sec_info {
1173 if let Some(event) =
1174 Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
1175 {
1176 let _ = push_tx.try_send(event);
1177 }
1178 }
1179 }
1180 9 => {
1181 if let Some(ref info) = sec_info {
1182 if let Some(event) =
1183 Self::parse_broker_queue_to_push(data, &sec_key, info, qot_cache)
1184 {
1185 let _ = push_tx.try_send(event);
1186 }
1187 }
1188 }
1189 35 => {
1190 if let Some(ref info) = sec_info {
1191 if let Some(event) =
1192 Self::parse_ticker_to_push(data, &sec_key, info, qot_cache)
1193 {
1194 let _ = push_tx.try_send(event);
1195 }
1196 }
1197 }
1198 20 => {
1199 if let Some(ref info) = sec_info {
1200 if let Some(event) =
1201 Self::parse_rt_to_push(data, &sec_key, info, qot_cache)
1202 {
1203 let _ = push_tx.try_send(event);
1204 }
1205 }
1206 }
1207 21..=31 => {
1208 if let Some(ref info) = sec_info {
1209 if let Some(event) =
1210 Self::parse_kline_to_push(data, bit, &sec_key, info, qot_cache)
1211 {
1212 let _ = push_tx.try_send(event);
1213 }
1214 }
1215 }
1216 _ => {
1217 tracing::debug!(bit, "CMD6212 reply: unhandled bit");
1218 }
1219 }
1220 }
1221 }
1222 }
1223
1224 fn parse_quote_push_truncated(
1229 data: &[u8],
1230 ) -> Option<futu_backend::proto_internal::ft_cmd_stock_quote_sub::QuotePush> {
1231 use futu_backend::proto_internal::ft_cmd_stock_quote_sub;
1232
1233 let mut entries = Vec::new();
1234 let mut pos = 0;
1235
1236 while pos < data.len() {
1237 let (tag, new_pos) = Self::read_varint(data, pos);
1239 if tag == 0 || new_pos >= data.len() {
1240 break;
1241 }
1242 pos = new_pos;
1243 let field_num = tag >> 3;
1244 let wire_type = tag & 7;
1245
1246 if wire_type != 2 {
1247 break; }
1249
1250 let (length, new_pos) = Self::read_varint(data, pos);
1251 pos = new_pos;
1252 let length = length as usize;
1253
1254 if pos + length > data.len() {
1255 if field_num == 1 {
1257 let partial = &data[pos..];
1258 if let Ok(sq) = prost::Message::decode(partial) {
1259 entries.push(sq);
1260 }
1261 }
1262 break;
1263 }
1264
1265 if field_num == 1 {
1266 let content = &data[pos..pos + length];
1267 if let Ok(sq) = prost::Message::decode(content) {
1268 entries.push(sq);
1269 }
1270 }
1271 pos += length;
1272 }
1273
1274 if entries.is_empty() {
1275 return None;
1276 }
1277
1278 Some(ft_cmd_stock_quote_sub::QuotePush {
1279 security_qta_list: entries,
1280 })
1281 }
1282
1283 fn read_varint(data: &[u8], mut pos: usize) -> (u64, usize) {
1285 let mut result: u64 = 0;
1286 let mut shift = 0;
1287 while pos < data.len() {
1288 let b = data[pos];
1289 result |= ((b & 0x7f) as u64) << shift;
1290 pos += 1;
1291 if b & 0x80 == 0 {
1292 break;
1293 }
1294 shift += 7;
1295 if shift >= 64 {
1296 break;
1297 }
1298 }
1299 (result, pos)
1300 }
1301
1302 fn parse_price_to_push(
1304 data: &[u8],
1305 sec_key: &str,
1306 sec_info: &futu_cache::static_data::CachedSecurityInfo,
1307 qot_cache: &futu_cache::qot_cache::QotCache,
1308 ) -> Option<PushEvent> {
1309 use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1310
1311 let price: ft_cmd_stock_quote_coverage_data::Price = prost::Message::decode(data).ok()?;
1312
1313 let cur_price = price.price_nominal.unwrap_or(0) as f64 / 1_000_000_000.0;
1314 let last_close = price.price_last_close.unwrap_or(0) as f64 / 1_000_000_000.0;
1315 let update_time_ms = price.exchange_data_time_ms.unwrap_or(0);
1316 let update_ts = update_time_ms as f64 / 1000.0;
1317
1318 if cur_price <= 0.0 {
1319 return None;
1320 }
1321
1322 let existing = qot_cache.get_basic_qot(sec_key);
1324 let (open, high, low, vol, tnov, tnov_rate, amp, suspended) = match &existing {
1325 Some(e) => (
1326 e.open_price,
1327 e.high_price,
1328 e.low_price,
1329 e.volume,
1330 e.turnover,
1331 e.turnover_rate,
1332 e.amplitude,
1333 e.is_suspended,
1334 ),
1335 None => (0.0, 0.0, 0.0, 0, 0.0, 0.0, 0.0, false),
1336 };
1337 let cached = futu_cache::qot_cache::CachedBasicQot {
1338 cur_price,
1339 last_close_price: last_close,
1340 open_price: open,
1341 high_price: high,
1342 low_price: low,
1343 volume: vol,
1344 turnover: tnov,
1345 turnover_rate: tnov_rate,
1346 amplitude: amp,
1347 is_suspended: suspended,
1348 update_time: timestamp_to_datetime_str(update_ts),
1349 update_timestamp: update_ts,
1350 };
1351 qot_cache.update_basic_qot(sec_key, cached.clone());
1352
1353 let security = futu_proto::qot_common::Security {
1355 market: sec_info.market,
1356 code: sec_info.code.clone(),
1357 };
1358 let basic_qot = futu_proto::qot_common::BasicQot {
1359 security,
1360 name: Some(sec_info.name.clone()),
1361 is_suspended: cached.is_suspended,
1362 list_time: sec_info.list_time.clone(),
1363 price_spread: 0.0,
1364 update_time: cached.update_time.clone(),
1365 high_price: cached.high_price,
1366 open_price: cached.open_price,
1367 low_price: cached.low_price,
1368 cur_price,
1369 last_close_price: last_close,
1370 volume: cached.volume,
1371 turnover: cached.turnover,
1372 turnover_rate: cached.turnover_rate,
1373 amplitude: cached.amplitude,
1374 dark_status: None,
1375 option_ex_data: None,
1376 list_timestamp: None,
1377 update_timestamp: Some(update_ts),
1378 pre_market: None,
1379 after_market: None,
1380 overnight: None,
1381 future_ex_data: None,
1382 sec_status: None,
1383 warrant_ex_data: None,
1384 };
1385
1386 let resp = futu_proto::qot_update_basic_qot::Response {
1387 ret_type: 0,
1388 ret_msg: None,
1389 err_code: None,
1390 s2c: Some(futu_proto::qot_update_basic_qot::S2c {
1391 basic_qot_list: vec![basic_qot],
1392 }),
1393 };
1394
1395 Some(PushEvent::QuotePush {
1396 sec_key: sec_key.to_string(),
1397 sub_type: 1, proto_id: futu_core::proto_id::QOT_UPDATE_BASIC_QOT,
1399 body: prost::Message::encode_to_vec(&resp),
1400 })
1401 }
1402
1403 fn handle_trade_notify(body: &[u8], push_tx: &tokio::sync::mpsc::Sender<PushEvent>) {
1407 use futu_backend::proto_internal::order_sys_interface;
1408
1409 let notify: order_sys_interface::NotifyMsg = match prost::Message::decode(body) {
1410 Ok(n) => n,
1411 Err(e) => {
1412 tracing::debug!(error = %e, "CMD4716 decode failed");
1413 return;
1414 }
1415 };
1416
1417 let notice_type = notify.notice_type.unwrap_or(0);
1418 let acc_id = notify
1419 .msg_header
1420 .as_ref()
1421 .and_then(|h| h.account_id)
1422 .unwrap_or(0);
1423
1424 let order_ids = notify.order_ids.clone();
1425 let fill_ids = notify.order_fill_ids.clone();
1426
1427 match notice_type {
1428 1 => {
1429 tracing::info!(acc_id, "cash update → re-query funds");
1431 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1432 acc_id,
1433 notice_type,
1434 order_ids: vec![],
1435 order_fill_ids: vec![],
1436 });
1437 }
1438 2 => {
1439 tracing::info!(acc_id, symbols = ?notify.symbols, "position update");
1441 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1442 acc_id,
1443 notice_type,
1444 order_ids: vec![],
1445 order_fill_ids: vec![],
1446 });
1447 }
1448 3 => {
1449 tracing::info!(acc_id, "position list update");
1451 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1452 acc_id,
1453 notice_type,
1454 order_ids: vec![],
1455 order_fill_ids: vec![],
1456 });
1457 }
1458 4 => {
1459 tracing::info!(acc_id, orders = ?order_ids, "order update");
1461 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1462 acc_id,
1463 notice_type,
1464 order_ids: order_ids.clone(),
1465 order_fill_ids: vec![],
1466 });
1467 }
1468 5 => {
1469 tracing::info!(acc_id, "order list update");
1471 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1472 acc_id,
1473 notice_type,
1474 order_ids: vec![],
1475 order_fill_ids: vec![],
1476 });
1477 }
1478 6 => {
1479 tracing::info!(acc_id, fills = ?fill_ids, "fill update");
1481 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1482 acc_id,
1483 notice_type,
1484 order_ids: vec![],
1485 order_fill_ids: fill_ids.clone(),
1486 });
1487 }
1488 7 => {
1489 tracing::info!(acc_id, "fill list update");
1491 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1492 acc_id,
1493 notice_type,
1494 order_ids: vec![],
1495 order_fill_ids: vec![],
1496 });
1497 }
1498 9 => {
1499 tracing::info!(acc_id, req_ids = ?notify.order_op_req_ids, "order op result");
1501 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1502 acc_id,
1503 notice_type,
1504 order_ids: order_ids.clone(),
1505 order_fill_ids: vec![],
1506 });
1507 }
1508 11 | 12 => {
1509 tracing::info!(acc_id, notice_type, "asset/quote update");
1511 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1512 acc_id,
1513 notice_type,
1514 order_ids: vec![],
1515 order_fill_ids: vec![],
1516 });
1517 }
1518 100 => {
1519 tracing::info!(acc_id, orders = ?order_ids, "order notification");
1521 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1522 acc_id,
1523 notice_type: 4,
1524 order_ids: order_ids.clone(),
1525 order_fill_ids: vec![],
1526 });
1527 }
1528 101 => {
1529 tracing::info!(acc_id, fills = ?fill_ids, "fill notification");
1531 let _ = push_tx.try_send(PushEvent::TradeReQuery {
1532 acc_id,
1533 notice_type: 6,
1534 order_ids: vec![],
1535 order_fill_ids: fill_ids.clone(),
1536 });
1537 }
1538 _ => {
1539 tracing::debug!(acc_id, notice_type, "unhandled trade notification");
1540 }
1541 }
1542 }
1543
1544 fn handle_msg_center_push(
1549 body: &[u8],
1550 static_cache: &Arc<StaticDataCache>,
1551 push_tx: &tokio::sync::mpsc::Sender<PushEvent>,
1552 ) {
1553 use futu_backend::proto_internal::msgcenter;
1554 use futu_backend::proto_internal::msgdefine;
1555
1556 let notify: msgcenter::MsgCenterNotify = match prost::Message::decode(body) {
1557 Ok(n) => n,
1558 Err(e) => {
1559 tracing::debug!(error = %e, "CMD5300 MsgCenterNotify decode failed");
1560 return;
1561 }
1562 };
1563
1564 let category = notify.msg_category.unwrap_or(0);
1565 tracing::debug!(
1566 user_id = notify.user_id.unwrap_or(0),
1567 category,
1568 "CMD5300 message center push"
1569 );
1570
1571 if category != 1 {
1573 tracing::debug!(category, "CMD5300: unhandled msg category");
1574 return;
1575 }
1576
1577 let msg_item = match ¬ify.notify_msg {
1579 Some(item) => item,
1580 None => {
1581 tracing::debug!("CMD5300: no notify_msg");
1582 return;
1583 }
1584 };
1585
1586 let msg_body = match &msg_item.body {
1587 Some(b) => b,
1588 None => {
1589 tracing::debug!("CMD5300: no msg body");
1590 return;
1591 }
1592 };
1593
1594 let price_warn: msgdefine::PriceWarnMsg = match prost::Message::decode(msg_body.as_ref()) {
1595 Ok(m) => m,
1596 Err(e) => {
1597 tracing::debug!(error = %e, "CMD5300: PriceWarnMsg decode failed");
1598 return;
1599 }
1600 };
1601
1602 let stock_id = price_warn.stock_id.unwrap_or(0);
1603
1604 let sec_info = static_cache.get_security_info_by_stock_id(stock_id);
1606 let (market, code) = match &sec_info {
1607 Some(info) => (info.market, info.code.clone()),
1608 None => {
1609 tracing::debug!(stock_id, "CMD5300: stock_id not found in cache");
1610 return;
1611 }
1612 };
1613
1614 let stock_name = sec_info
1615 .as_ref()
1616 .map(|i| i.name.clone())
1617 .unwrap_or_default();
1618
1619 let price = price_warn.price.unwrap_or(0) as f64 / 1000.0;
1621 let change_rate = price_warn.price_change_ratio.unwrap_or(0) as f64 / 1000.0;
1623 let market_status = price_warn.market_status.unwrap_or(0);
1624
1625 let content = price_warn.msg_content.clone().unwrap_or_default();
1627 let note = price_warn.note.clone().unwrap_or_default();
1628
1629 let (key, reminder_type, set_value, cur_value) =
1631 if let Some(detail) = &price_warn.warn_detail {
1632 let warn_type = detail.r#type.unwrap_or(0);
1633 let api_type = Self::warn_type_to_api_type(warn_type);
1634 let sv_raw = detail.set_value.unwrap_or(0);
1635 let cv_raw = detail.present_value.unwrap_or(0);
1636 let sv = Self::price_reminder_clt_value(sv_raw, warn_type, sec_info.as_ref());
1638 let cv = Self::price_reminder_clt_value(cv_raw, warn_type, sec_info.as_ref());
1639 (detail.key.unwrap_or(0) as i64, api_type, sv, cv)
1640 } else {
1641 (0i64, 0i32, 0.0f64, 0.0f64)
1642 };
1643
1644 tracing::info!(
1645 stock_id,
1646 market,
1647 code = %code,
1648 price,
1649 change_rate,
1650 reminder_type,
1651 "price reminder push → 3019"
1652 );
1653
1654 let s2c = futu_proto::qot_update_price_reminder::S2c {
1656 security: futu_proto::qot_common::Security { market, code },
1657 name: Some(stock_name),
1658 price,
1659 change_rate,
1660 market_status,
1661 content,
1662 note,
1663 key: Some(key),
1664 r#type: Some(reminder_type),
1665 set_value: Some(set_value),
1666 cur_value: Some(cur_value),
1667 };
1668
1669 let resp = futu_proto::qot_update_price_reminder::Response {
1670 ret_type: 0, ret_msg: None,
1672 err_code: None,
1673 s2c: Some(s2c),
1674 };
1675
1676 let push_body = prost::Message::encode_to_vec(&resp);
1677
1678 let _ = push_tx.try_send(PushEvent::BroadcastPush {
1679 proto_id: futu_core::proto_id::QOT_UPDATE_PRICE_REMINDER,
1680 body: push_body,
1681 });
1682 }
1683
1684 fn warn_type_to_api_type(warn_type: u32) -> i32 {
1686 match warn_type {
1687 4 => 1, 8 => 2, 1 => 3, 2 => 4, 9 => 5, 10 => 6, 11 => 7, 12 => 8, 13 => 9, 14 => 10, 15 => 11, 16 => 12, 17 => 13, 19 => 14, 20 => 15, _ => 0,
1703 }
1704 }
1705
1706 fn price_reminder_clt_value(
1714 raw_value: i64,
1715 warn_type: u32,
1716 sec_info: Option<&futu_cache::static_data::CachedSecurityInfo>,
1717 ) -> f64 {
1718 let mut value = raw_value as f64 / 1000.0;
1719
1720 let (is_option, is_future, is_cn) = if let Some(info) = sec_info {
1722 let is_opt = info.sec_type == 6 || info.sec_type == 8; let is_fut = info.sec_type == 5; let is_cn_market = info.market == 21 || info.market == 22; (is_opt, is_fut, is_cn_market)
1726 } else {
1727 (false, false, false)
1728 };
1729
1730 match warn_type {
1731 11 => {
1732 if !is_option && !is_future {
1734 value *= 10000.0;
1735 }
1736 if is_cn {
1737 value *= 100.0;
1738 }
1739 }
1740 12 => {
1741 value *= 10000.0;
1743 }
1744 16 | 17 => {
1745 if !is_option && !is_future {
1747 value *= 1000.0;
1748 }
1749 if is_cn {
1750 value *= 100.0;
1751 }
1752 }
1753 _ => {
1754 }
1756 }
1757
1758 value
1759 }
1760
1761 async fn request_qot_right(
1764 backend: &futu_backend::conn::BackendConn,
1765 qot_right_cache: &QotRightCache,
1766 ) {
1767 use futu_backend::proto_internal::qta_auth;
1768
1769 tracing::info!("CMD6024: requesting qot right...");
1770
1771 let req = qta_auth::QtaAuth6024Req {
1773 req_hk_auth: None,
1774 req_us_auth: None,
1775 req_cn_auth: None,
1776 };
1777
1778 let body = prost::Message::encode_to_vec(&req);
1779
1780 match backend.request(6032, body).await {
1783 Ok(frame) => {
1784 let body_bytes = frame.body.as_ref();
1785
1786 let rsp: qta_auth::QtaAuth6024Rsp = crate::handlers::decode_srpc_or_direct(
1788 body_bytes,
1789 |r: &qta_auth::QtaAuth6024Rsp| {
1790 r.hk_qut_got_auth.is_some()
1792 || r.us_qut_got_auth.is_some()
1793 || r.open_api_auth.is_some()
1794 },
1795 );
1796
1797 let (sub_limit, kl_limit) = if let Some(api_auth) = &rsp.open_api_auth {
1799 (api_auth.sub_limit, api_auth.history_k_line_limit)
1800 } else {
1801 (None, None)
1802 };
1803
1804 let us_future_detail = rsp.us_future_auth.as_ref().map(|f| {
1806 (
1807 f.open_api_cme_auth.unwrap_or(0),
1808 f.open_api_cbot_auth.unwrap_or(0),
1809 f.open_api_nymex_auth.unwrap_or(0),
1810 f.open_api_comex_auth.unwrap_or(0),
1811 )
1812 });
1813
1814 qot_right_cache.update_from_backend(
1815 rsp.hk_qut_got_auth,
1816 rsp.us_qut_got_auth,
1817 rsp.cn_qut_got_auth,
1818 rsp.shanghai_qut_auth,
1819 rsp.shenzhen_qut_auth,
1820 rsp.hk_option_auth,
1821 rsp.hk_future_auth,
1822 rsp.us_option_flag,
1823 rsp.us_future_cme_cboe_auth,
1824 us_future_detail,
1825 rsp.sgx_future_auth,
1826 rsp.jp_future_auth,
1827 sub_limit,
1828 kl_limit,
1829 );
1830
1831 tracing::info!(
1832 hk_got = rsp.hk_qut_got_auth.unwrap_or(0),
1833 us_got = rsp.us_qut_got_auth.unwrap_or(0),
1834 cn_got = rsp.cn_qut_got_auth.unwrap_or(0),
1835 sub_limit = sub_limit.unwrap_or(0),
1836 kl_limit = kl_limit.unwrap_or(0),
1837 "CMD6024 qot right updated"
1838 );
1839 }
1840 Err(e) => {
1841 tracing::warn!(error = %e, "CMD6024 request failed (using defaults)");
1842 }
1843 }
1844 }
1845
1846 fn parse_deal_statistics(
1848 data: &[u8],
1849 sec_key: &str,
1850 qot_cache: &futu_cache::qot_cache::QotCache,
1851 ) {
1852 use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1853
1854 let stats: ft_cmd_stock_quote_coverage_data::DealStatistics =
1855 match prost::Message::decode(data) {
1856 Ok(s) => s,
1857 Err(_) => return,
1858 };
1859
1860 let open = stats.price_open.unwrap_or(0) as f64 / 1_000_000_000.0;
1862 let high = stats.price_highest.unwrap_or(0) as f64 / 1_000_000_000.0;
1863 let low = stats.price_lowest.unwrap_or(0) as f64 / 1_000_000_000.0;
1864 let volume = stats.volume.unwrap_or(0);
1865 let turnover = stats.turnover.unwrap_or(0) as f64 / 1_000.0;
1866 let turnover_rate = stats.ratio_turnover.unwrap_or(0) as f64 / 100_000.0;
1867 let amplitude = stats.amplitude_price.unwrap_or(0) as f64 / 100_000.0;
1868
1869 if let Some(mut existing) = qot_cache.basic_qot.get_mut(sec_key) {
1871 if open > 0.0 {
1872 existing.open_price = open;
1873 }
1874 if high > 0.0 {
1875 existing.high_price = high;
1876 }
1877 if low > 0.0 {
1878 existing.low_price = low;
1879 }
1880 if volume > 0 {
1881 existing.volume = volume;
1882 }
1883 if turnover > 0.0 {
1884 existing.turnover = turnover;
1885 }
1886 if turnover_rate > 0.0 {
1887 existing.turnover_rate = turnover_rate;
1888 }
1889 if amplitude > 0.0 {
1890 existing.amplitude = amplitude;
1891 }
1892 } else if volume > 0 || open > 0.0 {
1893 qot_cache.update_basic_qot(
1895 sec_key,
1896 futu_cache::qot_cache::CachedBasicQot {
1897 cur_price: 0.0,
1898 last_close_price: 0.0,
1899 open_price: open,
1900 high_price: high,
1901 low_price: low,
1902 volume,
1903 turnover,
1904 turnover_rate,
1905 amplitude,
1906 is_suspended: false,
1907 update_time: String::new(),
1908 update_timestamp: 0.0,
1909 },
1910 );
1911 }
1912 }
1913
1914 fn parse_stock_state(data: &[u8], sec_key: &str, qot_cache: &futu_cache::qot_cache::QotCache) {
1916 use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1917
1918 let state: ft_cmd_stock_quote_coverage_data::StockState = match prost::Message::decode(data)
1919 {
1920 Ok(s) => s,
1921 Err(_) => return,
1922 };
1923
1924 let state_type = state.state_type.unwrap_or(0);
1925 let is_suspended = state_type == 8; if let Some(mut existing) = qot_cache.basic_qot.get_mut(sec_key) {
1928 existing.is_suspended = is_suspended;
1929 }
1930 }
1931
1932 fn build_basic_qot_push_from_cache(
1935 sec_key: &str,
1936 sec_info: &futu_cache::static_data::CachedSecurityInfo,
1937 qot_cache: &futu_cache::qot_cache::QotCache,
1938 ) -> Option<PushEvent> {
1939 let cached = qot_cache.get_basic_qot(sec_key)?;
1940
1941 let security = futu_proto::qot_common::Security {
1942 market: sec_info.market,
1943 code: sec_info.code.clone(),
1944 };
1945 let basic_qot = futu_proto::qot_common::BasicQot {
1946 security,
1947 name: Some(sec_info.name.clone()),
1948 is_suspended: cached.is_suspended,
1949 list_time: sec_info.list_time.clone(),
1950 price_spread: 0.0,
1951 update_time: cached.update_time.clone(),
1952 high_price: cached.high_price,
1953 open_price: cached.open_price,
1954 low_price: cached.low_price,
1955 cur_price: cached.cur_price,
1956 last_close_price: cached.last_close_price,
1957 volume: cached.volume,
1958 turnover: cached.turnover,
1959 turnover_rate: cached.turnover_rate,
1960 amplitude: cached.amplitude,
1961 dark_status: None,
1962 option_ex_data: None,
1963 list_timestamp: None,
1964 update_timestamp: Some(cached.update_timestamp),
1965 pre_market: None,
1966 after_market: None,
1967 overnight: None,
1968 future_ex_data: None,
1969 sec_status: None,
1970 warrant_ex_data: None,
1971 };
1972
1973 let resp = futu_proto::qot_update_basic_qot::Response {
1974 ret_type: 0,
1975 ret_msg: None,
1976 err_code: None,
1977 s2c: Some(futu_proto::qot_update_basic_qot::S2c {
1978 basic_qot_list: vec![basic_qot],
1979 }),
1980 };
1981
1982 Some(PushEvent::QuotePush {
1983 sec_key: sec_key.to_string(),
1984 sub_type: 1, proto_id: futu_core::proto_id::QOT_UPDATE_BASIC_QOT,
1986 body: prost::Message::encode_to_vec(&resp),
1987 })
1988 }
1989
1990 fn parse_order_book_to_push(
1992 data: &[u8],
1993 sec_key: &str,
1994 sec_info: &futu_cache::static_data::CachedSecurityInfo,
1995 qot_cache: &futu_cache::qot_cache::QotCache,
1996 ) -> Option<PushEvent> {
1997 use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1998
1999 let ob: ft_cmd_stock_quote_coverage_data::OrderBook = prost::Message::decode(data).ok()?;
2000
2001 let convert_levels =
2002 |items: &[ft_cmd_stock_quote_coverage_data::OrderBookItem]|
2003 -> Vec<futu_cache::qot_cache::CachedOrderBookLevel> {
2004 items
2005 .iter()
2006 .map(|item| futu_cache::qot_cache::CachedOrderBookLevel {
2007 price: item.price.unwrap_or(0) as f64 / 1_000_000_000.0,
2008 volume: item.volume.unwrap_or(0),
2009 order_count: item.order_count.unwrap_or(0) as i32,
2010 })
2011 .collect()
2012 };
2013
2014 let bid_list = convert_levels(&ob.bid);
2015 let ask_list = convert_levels(&ob.ask);
2016
2017 let svr_recv_bid_ts = ob
2018 .server_recv_bid_from_exchange_time_ms
2019 .map(|t| t as f64 / 1000.0);
2020 let svr_recv_ask_ts = ob
2021 .server_recv_ask_from_exchange_time_ms
2022 .map(|t| t as f64 / 1000.0);
2023
2024 qot_cache.order_books.insert(
2026 sec_key.to_string(),
2027 futu_cache::qot_cache::CachedOrderBook {
2028 bid_list: bid_list.clone(),
2029 ask_list: ask_list.clone(),
2030 svr_recv_time_bid: None,
2031 svr_recv_time_bid_timestamp: svr_recv_bid_ts,
2032 svr_recv_time_ask: None,
2033 svr_recv_time_ask_timestamp: svr_recv_ask_ts,
2034 },
2035 );
2036
2037 let security = futu_proto::qot_common::Security {
2039 market: sec_info.market,
2040 code: sec_info.code.clone(),
2041 };
2042 let to_proto = |levels: &[futu_cache::qot_cache::CachedOrderBookLevel]|
2043 -> Vec<futu_proto::qot_common::OrderBook> {
2044 levels.iter().map(|l| futu_proto::qot_common::OrderBook {
2045 price: l.price,
2046 volume: l.volume,
2047 oreder_count: l.order_count,
2048 detail_list: vec![],
2049 }).collect()
2050 };
2051
2052 let resp = futu_proto::qot_update_order_book::Response {
2053 ret_type: 0,
2054 ret_msg: None,
2055 err_code: None,
2056 s2c: Some(futu_proto::qot_update_order_book::S2c {
2057 security,
2058 name: Some(sec_info.name.clone()),
2059 order_book_ask_list: to_proto(&ask_list),
2060 order_book_bid_list: to_proto(&bid_list),
2061 svr_recv_time_bid: None,
2062 svr_recv_time_bid_timestamp: svr_recv_bid_ts,
2063 svr_recv_time_ask: None,
2064 svr_recv_time_ask_timestamp: svr_recv_ask_ts,
2065 }),
2066 };
2067
2068 Some(PushEvent::QuotePush {
2069 sec_key: sec_key.to_string(),
2070 sub_type: 2, proto_id: futu_core::proto_id::QOT_UPDATE_ORDER_BOOK,
2072 body: prost::Message::encode_to_vec(&resp),
2073 })
2074 }
2075
2076 fn parse_broker_queue_to_push(
2078 data: &[u8],
2079 sec_key: &str,
2080 sec_info: &futu_cache::static_data::CachedSecurityInfo,
2081 qot_cache: &futu_cache::qot_cache::QotCache,
2082 ) -> Option<PushEvent> {
2083 use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
2084
2085 let bq: ft_cmd_stock_quote_coverage_data::HkBrokerQueue =
2086 prost::Message::decode(data).ok()?;
2087
2088 let convert_side =
2091 |items: &[ft_cmd_stock_quote_coverage_data::HkBrokerQueueItem]|
2092 -> Vec<futu_cache::qot_cache::CachedBrokerItem> {
2093 let mut result = Vec::new();
2094 for item in items {
2095 let pos = item.rank_id.unwrap_or(0);
2096 for &bid in &item.broker_id {
2097 result.push(futu_cache::qot_cache::CachedBrokerItem {
2098 id: bid as i64,
2099 name: format!("Broker#{bid}"),
2100 pos,
2101 });
2102 }
2103 }
2104 result
2105 };
2106
2107 let bid_list = convert_side(&bq.bid);
2108 let ask_list = convert_side(&bq.ask);
2109
2110 qot_cache.update_broker(
2112 sec_key,
2113 futu_cache::qot_cache::CachedBroker {
2114 bid_list: bid_list.clone(),
2115 ask_list: ask_list.clone(),
2116 },
2117 );
2118
2119 let security = futu_proto::qot_common::Security {
2121 market: sec_info.market,
2122 code: sec_info.code.clone(),
2123 };
2124 let to_proto =
2125 |items: &[futu_cache::qot_cache::CachedBrokerItem]|
2126 -> Vec<futu_proto::qot_common::Broker> {
2127 items
2128 .iter()
2129 .map(|b| futu_proto::qot_common::Broker {
2130 id: b.id,
2131 name: b.name.clone(),
2132 pos: b.pos,
2133 order_id: None,
2134 volume: None,
2135 })
2136 .collect()
2137 };
2138
2139 let resp = futu_proto::qot_update_broker::Response {
2140 ret_type: 0,
2141 ret_msg: None,
2142 err_code: None,
2143 s2c: Some(futu_proto::qot_update_broker::S2c {
2144 security,
2145 name: Some(sec_info.name.clone()),
2146 broker_ask_list: to_proto(&ask_list),
2147 broker_bid_list: to_proto(&bid_list),
2148 }),
2149 };
2150
2151 Some(PushEvent::QuotePush {
2152 sec_key: sec_key.to_string(),
2153 sub_type: 14, proto_id: futu_core::proto_id::QOT_UPDATE_BROKER,
2155 body: prost::Message::encode_to_vec(&resp),
2156 })
2157 }
2158
2159 fn parse_ticker_to_push(
2161 data: &[u8],
2162 sec_key: &str,
2163 sec_info: &futu_cache::static_data::CachedSecurityInfo,
2164 qot_cache: &futu_cache::qot_cache::QotCache,
2165 ) -> Option<PushEvent> {
2166 use futu_backend::proto_internal::ft_cmd_stock_quote_accumulate_data;
2168
2169 let tick: ft_cmd_stock_quote_accumulate_data::Tick = prost::Message::decode(data).ok()?;
2170
2171 let mut ticker_list = Vec::new();
2172 let mut cached_tickers = Vec::new();
2173
2174 for item in &tick.items {
2175 let price = item.price.unwrap_or(0) as f64 / 1_000_000_000.0;
2176 let volume = item.volume.unwrap_or(0) as i64;
2177 let timestamp_ms = item.exchange_data_time_ms.unwrap_or(0);
2178 let timestamp = timestamp_ms as f64 / 1000.0;
2179 let sequence = item.tick_key.unwrap_or(0) as i64;
2180 let dir = item.r#type.unwrap_or(0);
2182 let turnover = price * volume as f64;
2183
2184 cached_tickers.push(futu_cache::qot_cache::CachedTicker {
2185 time: String::new(),
2186 sequence,
2187 dir,
2188 price,
2189 volume,
2190 turnover,
2191 recv_time: None,
2192 ticker_type: None,
2193 push_data_type: Some(1), timestamp: Some(timestamp),
2195 });
2196
2197 ticker_list.push(futu_proto::qot_common::Ticker {
2198 time: String::new(),
2199 sequence,
2200 dir,
2201 price,
2202 volume,
2203 turnover,
2204 recv_time: None,
2205 r#type: None,
2206 type_sign: None,
2207 push_data_type: Some(1),
2208 timestamp: Some(timestamp),
2209 });
2210 }
2211
2212 if ticker_list.is_empty() {
2213 return None;
2214 }
2215
2216 let max_cached = 1000;
2218 let mut entry = qot_cache.tickers.entry(sec_key.to_string()).or_default();
2219 entry.extend(cached_tickers);
2220 if entry.len() > max_cached {
2221 let drain_count = entry.len() - max_cached;
2222 entry.drain(0..drain_count);
2223 }
2224
2225 let security = futu_proto::qot_common::Security {
2227 market: sec_info.market,
2228 code: sec_info.code.clone(),
2229 };
2230 let resp = futu_proto::qot_update_ticker::Response {
2231 ret_type: 0,
2232 ret_msg: None,
2233 err_code: None,
2234 s2c: Some(futu_proto::qot_update_ticker::S2c {
2235 security,
2236 name: Some(sec_info.name.clone()),
2237 ticker_list,
2238 }),
2239 };
2240
2241 Some(PushEvent::QuotePush {
2242 sec_key: sec_key.to_string(),
2243 sub_type: 4, proto_id: futu_core::proto_id::QOT_UPDATE_TICKER,
2245 body: prost::Message::encode_to_vec(&resp),
2246 })
2247 }
2248
2249 fn sbit_to_kl_type(bit: u32) -> i32 {
2251 match bit {
2252 21 => 1, 22 => 10, 23 => 6, 24 => 7, 25 => 8, 26 => 9, 27 => 2, 28 => 3, 29 => 4, 30 => 11, 31 => 5, _ => 0,
2264 }
2265 }
2266
2267 fn parse_kline_to_push(
2269 data: &[u8],
2270 bit: u32,
2271 sec_key: &str,
2272 sec_info: &futu_cache::static_data::CachedSecurityInfo,
2273 qot_cache: &futu_cache::qot_cache::QotCache,
2274 ) -> Option<PushEvent> {
2275 use futu_backend::proto_internal::ft_cmd_stock_quote_accumulate_data;
2276
2277 let kline: ft_cmd_stock_quote_accumulate_data::Kline = prost::Message::decode(data).ok()?;
2278
2279 let mut kl_list = Vec::new();
2280 let mut cached_klines = Vec::new();
2281 let mut rehab_type = 0i32;
2282
2283 for point in &kline.point_list {
2284 rehab_type = point.exright_type.unwrap_or(0) as i32;
2285 if let Some(ref item) = point.item {
2286 let close = item.close_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2287 let open = item.open_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2288 let high = item.highest_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2289 let low = item.lowest_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2290 let last_close = item.last_close_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2291 let volume = item.volume.unwrap_or(0) as i64;
2292 let turnover = item.turnover.unwrap_or(0) as f64 / 1_000.0;
2293 let turnover_rate = item.turnover_rate.unwrap_or(0) as f64 / 100_000.0;
2294 let pe = item.pe.unwrap_or(0) as f64 / 1_000.0;
2295 let timestamp = item.time.unwrap_or(0) as f64;
2296
2297 let is_blank = close <= 0.0 && open <= 0.0;
2298
2299 kl_list.push(futu_proto::qot_common::KLine {
2300 time: String::new(),
2301 is_blank,
2302 high_price: Some(high),
2303 open_price: Some(open),
2304 low_price: Some(low),
2305 close_price: Some(close),
2306 last_close_price: Some(last_close),
2307 volume: Some(volume),
2308 turnover: Some(turnover),
2309 turnover_rate: Some(turnover_rate),
2310 pe: Some(pe),
2311 change_rate: if last_close > 0.0 {
2312 Some((close - last_close) / last_close * 100.0)
2313 } else {
2314 None
2315 },
2316 timestamp: Some(timestamp),
2317 });
2318
2319 if !is_blank {
2320 cached_klines.push(futu_cache::qot_cache::CachedKLine {
2321 time: String::new(),
2322 open_price: open,
2323 high_price: high,
2324 low_price: low,
2325 close_price: close,
2326 volume,
2327 turnover,
2328 });
2329 }
2330 }
2331 }
2332
2333 if kl_list.is_empty() {
2334 return None;
2335 }
2336
2337 let kl_type = Self::sbit_to_kl_type(bit);
2339 let cache_key = format!("{sec_key}:{kl_type}");
2340 let mut entry = qot_cache.klines.entry(cache_key).or_default();
2341 entry.extend(cached_klines);
2342 if entry.len() > 2000 {
2344 let drain = entry.len() - 2000;
2345 entry.drain(0..drain);
2346 }
2347
2348 let security = futu_proto::qot_common::Security {
2349 market: sec_info.market,
2350 code: sec_info.code.clone(),
2351 };
2352 let resp = futu_proto::qot_update_kl::Response {
2353 ret_type: 0,
2354 ret_msg: None,
2355 err_code: None,
2356 s2c: Some(futu_proto::qot_update_kl::S2c {
2357 rehab_type,
2358 kl_type,
2359 security,
2360 name: Some(sec_info.name.clone()),
2361 kl_list,
2362 }),
2363 };
2364
2365 let sub_type = match kl_type {
2367 1 => 11, 2 => 6, 3 => 12, 4 => 13, 5 => 16, 6 => 7, 7 => 8, 8 => 9, 9 => 10, 10 => 17, 11 => 15, _ => 6,
2379 };
2380
2381 Some(PushEvent::QuotePush {
2382 sec_key: sec_key.to_string(),
2383 sub_type,
2384 proto_id: futu_core::proto_id::QOT_UPDATE_KL,
2385 body: prost::Message::encode_to_vec(&resp),
2386 })
2387 }
2388
2389 fn parse_rt_to_push(
2391 data: &[u8],
2392 sec_key: &str,
2393 sec_info: &futu_cache::static_data::CachedSecurityInfo,
2394 qot_cache: &futu_cache::qot_cache::QotCache,
2395 ) -> Option<PushEvent> {
2396 use futu_backend::proto_internal::ft_cmd_stock_quote_accumulate_data;
2397
2398 let ts: ft_cmd_stock_quote_accumulate_data::TimeSharingPlans =
2399 prost::Message::decode(data).ok()?;
2400
2401 let point = ts.point.as_ref()?;
2402 let price = point.close_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2403 let volume = point.volume.unwrap_or(0) as i64;
2404 let turnover = point.turnover.unwrap_or(0) as f64 / 1_000.0;
2405 let timestamp = point.time.unwrap_or(0) as f64;
2406
2407 if price <= 0.0 {
2408 return None;
2409 }
2410
2411 let mut entry = qot_cache.rt_data.entry(sec_key.to_string()).or_default();
2413 entry.push(futu_cache::qot_cache::CachedTimeShare {
2414 time: String::new(),
2415 minute: 0,
2416 price,
2417 last_close_price: 0.0,
2418 avg_price: 0.0,
2419 volume,
2420 turnover,
2421 timestamp,
2422 });
2423 if entry.len() > 500 {
2424 let drain = entry.len() - 500;
2425 entry.drain(0..drain);
2426 }
2427
2428 let security = futu_proto::qot_common::Security {
2429 market: sec_info.market,
2430 code: sec_info.code.clone(),
2431 };
2432
2433 let rt = futu_proto::qot_common::TimeShare {
2434 time: String::new(),
2435 minute: 0,
2436 is_blank: false,
2437 price: Some(price),
2438 last_close_price: None,
2439 avg_price: None,
2440 volume: Some(volume),
2441 turnover: Some(turnover),
2442 timestamp: Some(timestamp),
2443 };
2444
2445 let resp = futu_proto::qot_update_rt::Response {
2446 ret_type: 0,
2447 ret_msg: None,
2448 err_code: None,
2449 s2c: Some(futu_proto::qot_update_rt::S2c {
2450 security,
2451 name: Some(sec_info.name.clone()),
2452 rt_list: vec![rt],
2453 }),
2454 };
2455
2456 Some(PushEvent::QuotePush {
2457 sec_key: sec_key.to_string(),
2458 sub_type: 5, proto_id: futu_core::proto_id::QOT_UPDATE_RT,
2460 body: prost::Message::encode_to_vec(&resp),
2461 })
2462 }
2463
2464 fn is_valid_real_trd_market(market: i32) -> bool {
2466 matches!(market, 1 | 2 | 4 | 5 | 6 | 8 | 15 | 111 | 112)
2467 }
2469
2470 fn broker_to_security_firm(broker_id: u32) -> i32 {
2472 match broker_id {
2473 1001 => 1, 1007 => 2, 1008 => 3, 1009 => 4, 1019 => 5, 1017 => 6, 1012 => 7, _ => 0, }
2482 }
2483
2484 fn get_acc_type(acc: &futu_backend::proto_internal::ft_usr_trd_acc::Account) -> Option<i32> {
2486 if let Some(ira) = acc.ira_type {
2488 match ira {
2489 1 => return Some(3), 2 => return Some(4), 3 => return Some(5), _ => {}
2493 }
2494 }
2495 if let Some(kouza) = acc.kouza_type {
2497 match kouza {
2498 1 => return Some(1), 2 => return Some(2), 3 => return Some(6), _ => {}
2502 }
2503 }
2504 Some(acc.r#type.unwrap_or(0) + 1) }
2507
2508 fn get_acc_role(acc: &futu_backend::proto_internal::ft_usr_trd_acc::Account) -> Option<i32> {
2510 if acc.is_esop.unwrap_or(false) {
2512 return Some(3); }
2514 Some(1) }
2520
2521 fn account_to_cached(
2523 acc: &futu_backend::proto_internal::ft_usr_trd_acc::Account,
2524 uni_card_num: Option<&str>,
2525 uni_role: Option<u32>,
2526 ) -> Option<CachedTrdAcc> {
2527 let acc_id = acc.id?;
2528 let broker_id = acc.broker.as_ref().map(|b| b.id).unwrap_or(0);
2529
2530 let acc_type = Self::get_acc_type(acc);
2531 let card_num = acc.card_number.as_ref().filter(|s| !s.is_empty()).cloned();
2532 let security_firm = Self::broker_to_security_firm(broker_id);
2533 let acc_status = acc.state.map(|s| if s == 1 { 0 } else { 1 });
2535
2536 let acc_role = match uni_role {
2538 Some(1) => Some(2), _ => Self::get_acc_role(acc),
2540 };
2541
2542 let mut trd_market_auth_list: Vec<i32> = Vec::new();
2545 let add_unique = |list: &mut Vec<i32>, val: i32| {
2546 if !list.contains(&val) {
2547 list.push(val);
2548 }
2549 };
2550 for &m in &acc.enable_market {
2551 let market = match m as i32 {
2554 11 => 111, 12 => 112, 13 => 113, 14 => 114, 15 => 15, 23 => 123, 24 => 124, other => other,
2562 };
2563
2564 if market == 114 {
2565 match broker_id {
2567 1001 => {
2568 add_unique(&mut trd_market_auth_list, 113);
2569 add_unique(&mut trd_market_auth_list, 123);
2570 }
2571 1008 => {
2572 add_unique(&mut trd_market_auth_list, 124);
2573 add_unique(&mut trd_market_auth_list, 123);
2574 }
2575 1017 => {
2576 add_unique(&mut trd_market_auth_list, 125);
2577 add_unique(&mut trd_market_auth_list, 123);
2578 }
2579 1012 => {
2580 add_unique(&mut trd_market_auth_list, 126);
2581 add_unique(&mut trd_market_auth_list, 123);
2582 }
2583 _ => {
2584 add_unique(&mut trd_market_auth_list, 113);
2585 add_unique(&mut trd_market_auth_list, 123);
2586 }
2587 }
2588 } else if Self::is_valid_real_trd_market(market)
2589 || market == 113
2590 || market == 123
2591 || market == 124
2592 || market == 125
2593 || market == 126
2594 {
2595 add_unique(&mut trd_market_auth_list, market);
2598 }
2599 }
2600
2601 let jp_acc_type: Vec<i32> = acc
2603 .sub_accounts
2604 .iter()
2605 .filter_map(|sub| sub.r#type)
2606 .map(|t| t as i32)
2607 .collect();
2608
2609 let backend_market = acc.market.unwrap_or(0) as u64;
2611 let intra_acc_id = acc.acc_id.unwrap_or(0) as u64;
2612 let sort_key = ((broker_id as u64) << 48) | (backend_market << 32) | intra_acc_id;
2613
2614 Some(CachedTrdAcc {
2615 acc_id,
2616 trd_env: 1, trd_market_auth_list,
2618 acc_type,
2619 card_num,
2620 security_firm: Some(security_firm),
2621 sim_acc_type: None,
2622 uni_card_num: uni_card_num.map(|s| s.to_string()),
2623 acc_status,
2624 acc_role,
2625 jp_acc_type,
2626 owner_uid: acc.owner_uid,
2628 opr_uid: acc.opr_uid,
2629 mixed_state: acc.mixed_state,
2630 ira_type: acc.ira_type.map(|t| t as i32),
2631 grant_state: acc.grant_state,
2632 kouza_type: acc.kouza_type.map(|k| k as i32),
2633 trd_market: acc.market.map(|m| m as i32),
2634 association_acc_id: None, acc_flag: None, order_index: 0, sort_key,
2638 })
2639 }
2640
2641 async fn fetch_real_accounts(&self, backend: &BackendConn) -> Result<Vec<CachedTrdAcc>> {
2643 use futu_backend::proto_internal::ft_cmd2282;
2644 use prost::Message;
2645
2646 let mut all_accounts = Vec::new();
2647
2648 let req = ft_cmd2282::Request {
2650 page: Some(0),
2651 need_close: Some(true),
2652 roa_cid: None,
2653 };
2654 let resp = backend.request(2282, req.encode_to_vec()).await?;
2656 let parsed: ft_cmd2282::Response = Message::decode(resp.body.as_ref())
2657 .map_err(|e| FutuError::Codec(format!("CMD2282 decode failed: {e}")))?;
2658
2659 if parsed.result.unwrap_or(-1) != 0 {
2660 let err = parsed.err_text.as_deref().unwrap_or("unknown");
2661 return Err(FutuError::Codec(format!("CMD2282 error: {err}")));
2662 }
2663
2664 for uni in &parsed.universal_account_list {
2666 let uni_card = uni.card_number.as_deref();
2667 let uni_role = uni.role;
2668 if let Some(sec_acc) = &uni.sec_account {
2670 if let Some(cached) = Self::account_to_cached(sec_acc, uni_card, uni_role) {
2671 all_accounts.push(cached);
2672 }
2673 }
2674 if let Some(fut_acc) = &uni.futures_account {
2676 if let Some(cached) = Self::account_to_cached(fut_acc, uni_card, uni_role) {
2677 all_accounts.push(cached);
2678 }
2679 }
2680 if let Some(crypto_acc) = &uni.crypto_account {
2682 if let Some(cached) = Self::account_to_cached(crypto_acc, uni_card, uni_role) {
2683 all_accounts.push(cached);
2684 }
2685 }
2686 if let Some(forex_acc) = &uni.forex_account {
2688 if let Some(cached) = Self::account_to_cached(forex_acc, uni_card, uni_role) {
2689 all_accounts.push(cached);
2690 }
2691 }
2692 }
2693
2694 for acc in &parsed.account_list {
2696 if let Some(cached) = Self::account_to_cached(acc, None, None) {
2697 all_accounts.push(cached);
2698 }
2699 }
2700
2701 for fund in &parsed.fund_acc_list {
2703 let fund_id = fund.fund_acc_id.as_ref().and_then(|f| f.id);
2704 let assoc_id = fund.association_acc_id.as_ref().and_then(|a| a.id);
2705 if let (Some(fid), Some(aid)) = (fund_id, assoc_id) {
2706 for acc in all_accounts.iter_mut() {
2708 if acc.acc_id == fid {
2709 acc.association_acc_id = Some(aid);
2710 }
2711 }
2712 }
2713 }
2714
2715 for acc in all_accounts.iter_mut() {
2718 if acc.uni_card_num.is_some() {
2719 acc.acc_flag = Some(1);
2720 }
2721 }
2722
2723 tracing::info!(count = all_accounts.len(), "CMD2282 accounts fetched");
2724
2725 let has_futu_or_sg = all_accounts.iter().any(|a| {
2728 matches!(a.security_firm, Some(1) | Some(2) | Some(3)) });
2730 if has_futu_or_sg {
2731 self.fetch_granted_accounts(backend, &mut all_accounts)
2732 .await;
2733 }
2734
2735 tracing::info!(
2736 count = all_accounts.len(),
2737 "real accounts fetched (2282+2298)"
2738 );
2739 Ok(all_accounts)
2740 }
2741
2742 async fn fetch_granted_accounts(
2744 &self,
2745 backend: &BackendConn,
2746 accounts: &mut Vec<CachedTrdAcc>,
2747 ) {
2748 use futu_backend::proto_internal::ft_cmd2298;
2749 use prost::Message;
2750
2751 let req = ft_cmd2298::Request { page: Some(0) };
2752 let resp = match backend.request(2298, req.encode_to_vec()).await {
2753 Ok(r) => r,
2754 Err(e) => {
2755 tracing::debug!(error = %e, "CMD2298 query skipped");
2756 return;
2757 }
2758 };
2759
2760 let parsed: ft_cmd2298::Response = match Message::decode(resp.body.as_ref()) {
2761 Ok(p) => p,
2762 Err(e) => {
2763 tracing::debug!(error = %e, "CMD2298 decode failed");
2764 return;
2765 }
2766 };
2767
2768 if parsed.result.unwrap_or(-1) != 0 {
2769 return;
2770 }
2771
2772 let mut count = 0;
2773 for uni in &parsed.authorized_account_list {
2774 let uni_card = uni.card_number.as_deref();
2775 let uni_role = uni.role;
2776 if let Some(sec_acc) = &uni.sec_account {
2777 if let Some(cached) = Self::account_to_cached(sec_acc, uni_card, uni_role) {
2778 accounts.push(cached);
2779 count += 1;
2780 }
2781 }
2782 if let Some(fut_acc) = &uni.futures_account {
2783 if let Some(cached) = Self::account_to_cached(fut_acc, uni_card, uni_role) {
2784 accounts.push(cached);
2785 count += 1;
2786 }
2787 }
2788 }
2789
2790 if count > 0 {
2791 tracing::info!(count, "CMD2298 granted accounts added");
2792 }
2793 }
2794
2795 async fn fetch_sim_accounts(&self, backend: &BackendConn) -> Result<Vec<CachedTrdAcc>> {
2797 use futu_backend::proto_internal::{
2798 sim_odr_sys_cmn, user_account_info_manager_service as uaims,
2799 };
2800 use prost::Message;
2801
2802 let user_id = backend.user_id.load(std::sync::atomic::Ordering::Relaxed) as u64;
2803
2804 let req = uaims::AccountListReq {
2805 msg_header: Some(sim_odr_sys_cmn::MsgHeader {
2806 req_id: Some(format!("{user_id}_sim")),
2807 account_id: Some(0),
2808 cipher: None,
2809 security_type: None,
2810 exchange_code: None,
2811 input_source: None,
2812 market: None,
2813 }),
2814 nn_uid: Some(user_id),
2815 };
2816
2817 let resp = match backend.request(14800, req.encode_to_vec()).await {
2819 Ok(r) => r,
2820 Err(e) => {
2821 tracing::warn!(error = %e, "sim account query failed, skipping");
2822 return Ok(vec![]);
2823 }
2824 };
2825
2826 tracing::info!(
2827 body_len = resp.body.len(),
2828 body_hex = ?&resp.body[..resp.body.len().min(64)],
2829 "proto14800 response"
2830 );
2831 let parsed: uaims::AccountListRsp = Message::decode(resp.body.as_ref()).map_err(|e| {
2833 FutuError::Codec(format!(
2834 "Proto14800 decode failed (body_len={}): {e}",
2835 resp.body.len()
2836 ))
2837 })?;
2838
2839 if parsed.result.unwrap_or(-1) != 0 {
2840 let err_msg = parsed.err_msg.as_deref().unwrap_or("unknown");
2841 tracing::warn!(
2842 result = parsed.result,
2843 err = err_msg,
2844 "sim account query error"
2845 );
2846 return Ok(vec![]);
2847 }
2848
2849 let accounts: Vec<CachedTrdAcc> = parsed
2852 .accouts .iter()
2854 .filter_map(|acc| {
2855 let acc_id = acc.account_id?;
2856 let raw_market = acc.market_id.unwrap_or(0) as i32;
2857
2858 let trd_market = match raw_market {
2861 9 => 1, 16 => 2, _ => raw_market,
2864 };
2865
2866 let (sim_acc_type, acc_type) = match raw_market {
2869 9 | 16 => (Some(2), Some(2)), _ if raw_market == 100 => (Some(3), Some(2)), _ if raw_market == 2 => (Some(1), Some(2)), _ => (Some(1), Some(1)), };
2874
2875 let sort_key = (raw_market as u64) << 32;
2877
2878 Some(CachedTrdAcc {
2879 acc_id,
2880 trd_env: 0, trd_market_auth_list: vec![trd_market],
2882 acc_type,
2883 card_num: None,
2884 security_firm: Some(0), sim_acc_type,
2886 uni_card_num: None,
2887 acc_status: Some(0), acc_role: None,
2889 jp_acc_type: vec![],
2890 owner_uid: None,
2891 opr_uid: None,
2892 mixed_state: None,
2893 ira_type: None,
2894 grant_state: None,
2895 kouza_type: None,
2896 trd_market: Some(raw_market),
2897 association_acc_id: None,
2898 acc_flag: None,
2899 order_index: 0, sort_key,
2901 })
2902 })
2903 .collect();
2904
2905 tracing::info!(count = accounts.len(), "sim accounts fetched");
2906 Ok(accounts)
2907 }
2908
2909 async fn fetch_account_list(&self, backend: &BackendConn) -> Result<()> {
2911 let mut all = self.fetch_real_accounts(backend).await?;
2912 let sim = self.fetch_sim_accounts(backend).await?;
2913 all.extend(sim);
2914
2915 tracing::info!(
2916 total = all.len(),
2917 accounts = ?all.iter().map(|a| (a.acc_id, a.trd_env, &a.trd_market_auth_list)).collect::<Vec<_>>(),
2918 "all accounts cached"
2919 );
2920 self.trd_cache.set_accounts(all);
2921 Ok(())
2922 }
2923
2924 pub fn register_handlers(&self, server: &ApiServer) {
2926 let router = server.router();
2927 handlers::qot::register_handlers(router, self);
2928 handlers::trd::register_handlers(router, self);
2929 handlers::sys::register_handlers(router, self);
2930 tracing::info!("all business handlers registered");
2931 }
2932
2933 pub fn create_push_dispatcher(
2935 &self,
2936 server: &ApiServer,
2937 external_sinks: Vec<Arc<dyn futu_server::push::ExternalPushSink>>,
2938 ) -> PushDispatcher {
2939 let mut dispatcher = PushDispatcher::new(
2940 Arc::clone(server.connections()),
2941 Arc::clone(&self.subscriptions),
2942 )
2943 .with_metrics(Arc::clone(&self.metrics));
2944 for sink in external_sinks {
2945 dispatcher = dispatcher.with_external_sink(sink);
2946 }
2947 dispatcher
2948 }
2949
2950 pub fn start_push_dispatcher(
2954 &self,
2955 server: &ApiServer,
2956 mut push_rx: tokio::sync::mpsc::Receiver<PushEvent>,
2957 external_sinks: Vec<Arc<dyn futu_server::push::ExternalPushSink>>,
2958 ) {
2959 let dispatcher = self.create_push_dispatcher(server, external_sinks);
2960 let shared_backend = Arc::clone(&self.backend);
2961 let trd_cache = Arc::clone(&self.trd_cache);
2962 tokio::spawn(async move {
2963 tracing::info!("push dispatcher task started");
2964 while let Some(event) = push_rx.recv().await {
2965 match event {
2966 PushEvent::QuotePush {
2967 sec_key,
2968 sub_type,
2969 proto_id,
2970 body,
2971 } => {
2972 dispatcher
2973 .push_qot(&sec_key, sub_type, proto_id, body)
2974 .await;
2975 }
2976 PushEvent::TradeReQuery {
2977 acc_id,
2978 notice_type,
2979 order_ids,
2980 order_fill_ids,
2981 } => {
2982 let be_opt = crate::handlers::load_backend(&shared_backend);
2984 if let Some(ref be) = be_opt {
2985 match notice_type {
2986 1 | 11 | 12 => {
2987 let _ = futu_backend::trade_query::query_account_info(
2989 be, acc_id, &trd_cache,
2990 )
2991 .await;
2992 }
2993 2 | 3 => {
2994 let _ = futu_backend::trade_query::query_account_info(
2996 be, acc_id, &trd_cache,
2997 )
2998 .await;
2999 }
3000 4 | 5 | 9 => {
3001 if let Ok(orders) = futu_backend::trade_query::query_orders(
3004 be, acc_id, &trd_cache,
3005 )
3006 .await
3007 {
3008 for order in &orders {
3010 let should_push = order_ids.is_empty()
3011 || order_ids
3012 .iter()
3013 .any(|id| id == &order.order_id_ex);
3014 if should_push {
3015 let push_body =
3016 Self::build_order_update_push(acc_id, order);
3017 dispatcher
3018 .push_trd_acc(
3019 acc_id,
3020 futu_core::proto_id::TRD_UPDATE_ORDER,
3021 push_body,
3022 )
3023 .await;
3024 }
3025 }
3026 }
3027 let _ = futu_backend::trade_query::query_account_info(
3028 be, acc_id, &trd_cache,
3029 )
3030 .await;
3031 }
3033 6 | 7 => {
3034 if let Ok(fills) =
3037 futu_backend::trade_query::query_order_fills(be, acc_id)
3038 .await
3039 {
3040 for fill in &fills {
3041 let should_push = order_fill_ids.is_empty()
3042 || order_fill_ids
3043 .iter()
3044 .any(|id| id == &fill.fill_id_ex);
3045 if should_push {
3046 let push_body = Self::build_order_fill_update_push(
3047 acc_id, fill,
3048 );
3049 dispatcher
3050 .push_trd_acc(
3051 acc_id,
3052 futu_core::proto_id::TRD_UPDATE_ORDER_FILL,
3053 push_body,
3054 )
3055 .await;
3056 }
3057 }
3058 }
3059 let _ = futu_backend::trade_query::query_account_info(
3061 be, acc_id, &trd_cache,
3062 )
3063 .await;
3064 }
3066 _ => {}
3067 }
3068 }
3069 }
3070 PushEvent::BroadcastPush { proto_id, body } => {
3071 dispatcher.push_broadcast(proto_id, body).await;
3073 }
3074 }
3075 }
3076 tracing::warn!("push dispatcher task ended");
3077 });
3078 }
3079
3080 fn build_order_update_push(acc_id: u64, order: &futu_cache::trd_cache::CachedOrder) -> Vec<u8> {
3082 let header = futu_proto::trd_common::TrdHeader {
3083 trd_env: 1,
3084 acc_id,
3085 trd_market: order.trd_market.unwrap_or(1),
3086 jp_acc_type: None,
3087 };
3088 let api_order = futu_proto::trd_common::Order {
3089 trd_side: order.trd_side,
3090 order_type: order.order_type,
3091 order_status: order.order_status,
3092 order_id: order.order_id,
3093 order_id_ex: order.order_id_ex.clone(),
3094 code: order.code.clone(),
3095 name: order.name.clone(),
3096 qty: order.qty,
3097 price: Some(order.price),
3098 create_time: order.create_time.clone(),
3099 update_time: order.update_time.clone(),
3100 fill_qty: Some(order.fill_qty),
3101 fill_avg_price: Some(order.fill_avg_price),
3102 last_err_msg: order.last_err_msg.clone(),
3103 sec_market: order.sec_market,
3104 create_timestamp: order.create_timestamp,
3105 update_timestamp: order.update_timestamp,
3106 remark: order.remark.clone(),
3107 time_in_force: order.time_in_force,
3108 fill_outside_rth: order.fill_outside_rth,
3109 aux_price: order.aux_price,
3110 trail_type: order.trail_type,
3111 trail_value: order.trail_value,
3112 trail_spread: order.trail_spread,
3113 currency: order.currency,
3114 trd_market: order.trd_market,
3115 session: None,
3116 jp_acc_type: None,
3117 };
3118 let resp = futu_proto::trd_update_order::Response {
3119 ret_type: 0,
3120 ret_msg: None,
3121 err_code: None,
3122 s2c: Some(futu_proto::trd_update_order::S2c {
3123 header,
3124 order: api_order,
3125 }),
3126 };
3127 prost::Message::encode_to_vec(&resp)
3128 }
3129
3130 fn build_order_fill_update_push(
3132 acc_id: u64,
3133 fill: &futu_backend::trade_query::OrderFillInfo,
3134 ) -> Vec<u8> {
3135 let header = futu_proto::trd_common::TrdHeader {
3136 trd_env: 1,
3137 acc_id,
3138 trd_market: fill.trd_market.unwrap_or(1),
3139 jp_acc_type: None,
3140 };
3141 let api_fill = futu_proto::trd_common::OrderFill {
3142 trd_side: fill.trd_side,
3143 fill_id: fill.fill_id,
3144 fill_id_ex: fill.fill_id_ex.clone(),
3145 order_id: Some(fill.order_id),
3146 order_id_ex: Some(fill.order_id_ex.clone()),
3147 code: fill.code.clone(),
3148 name: fill.name.clone(),
3149 qty: fill.qty,
3150 price: fill.price,
3151 create_time: String::new(),
3152 counter_broker_id: fill.counter_broker_id,
3153 counter_broker_name: None,
3154 sec_market: None,
3155 create_timestamp: fill.create_timestamp,
3156 update_timestamp: fill.update_timestamp,
3157 status: fill.status,
3158 trd_market: fill.trd_market,
3159 jp_acc_type: None,
3160 };
3161 let resp = futu_proto::trd_update_order_fill::Response {
3162 ret_type: 0,
3163 ret_msg: None,
3164 err_code: None,
3165 s2c: Some(futu_proto::trd_update_order_fill::S2c {
3166 header,
3167 order_fill: api_fill,
3168 }),
3169 };
3170 prost::Message::encode_to_vec(&resp)
3171 }
3172}
3173
3174impl Default for GatewayBridge {
3175 fn default() -> Self {
3176 Self::new()
3177 }
3178}
3179
3180async fn resubscribe_quotes(
3186 backend: &BackendConn,
3187 subscriptions: &SubscriptionManager,
3188 static_cache: &StaticDataCache,
3189) -> usize {
3190 use std::collections::{HashMap, HashSet};
3191
3192 let all_conn_ids = subscriptions.get_all_qot_conn_ids();
3194 let mut unique_subs: HashMap<String, HashSet<i32>> = HashMap::new();
3195
3196 for conn_id in &all_conn_ids {
3197 let conn_subs = subscriptions.get_conn_qot_subs(*conn_id);
3198 for (sub_type, sec_keys) in conn_subs {
3199 for sec_key in sec_keys {
3200 unique_subs.entry(sec_key).or_default().insert(sub_type);
3201 }
3202 }
3203 }
3204
3205 if unique_subs.is_empty() {
3206 tracing::debug!("no quote subscriptions to re-subscribe");
3207 return 0;
3208 }
3209
3210 let mut backend_subs: Vec<(u64, i32, Vec<i32>)> = Vec::new();
3212 for (sec_key, sub_types) in &unique_subs {
3213 if let Some(info) = static_cache.get_security_info(sec_key) {
3215 if info.stock_id > 0 {
3216 let market = sec_key
3217 .split('_')
3218 .next()
3219 .and_then(|s| s.parse::<i32>().ok())
3220 .unwrap_or(0);
3221 backend_subs.push((info.stock_id, market, sub_types.iter().copied().collect()));
3222 }
3223 }
3224 }
3225
3226 let count = backend_subs.len();
3227 if count == 0 {
3228 tracing::debug!("no valid stock_ids found for re-subscription");
3229 return 0;
3230 }
3231
3232 tracing::info!(
3233 securities = count,
3234 connections = all_conn_ids.len(),
3235 "re-subscribing quotes after reconnect"
3236 );
3237
3238 match futu_backend::quote_sub::subscribe_to_backend(backend, &backend_subs).await {
3239 Ok(()) => {
3240 tracing::info!(securities = count, "quote re-subscription succeeded");
3241 }
3242 Err(e) => {
3243 tracing::error!(error = %e, "quote re-subscription failed");
3244 }
3245 }
3246
3247 count
3248}