futu_gateway/
bridge.rs

1// 业务桥接层:直连富途后端的完整网关
2//
3// 启动流程:HTTP 认证 → TCP 连接后端 → 登录 → 获取 session_key → 拉取账户列表 → 启动 API 服务
4
5use std::sync::atomic::AtomicU32;
6use std::sync::Arc;
7
8use arc_swap::ArcSwap;
9use futu_backend::auth::{self, AuthConfig, AuthResult};
10use futu_backend::conn::{BackendConn, PushCallback};
11use futu_backend::login;
12use futu_cache::login_cache::LoginCache;
13use futu_cache::qot_cache::QotCache;
14use futu_cache::qot_right::QotRightCache;
15use futu_cache::static_data::StaticDataCache;
16use futu_cache::trd_cache::{CachedTrdAcc, TrdCache};
17use futu_core::error::{FutuError, Result};
18use futu_server::listener::ApiServer;
19use futu_server::metrics::GatewayMetrics;
20use futu_server::push::PushDispatcher;
21use futu_server::subscription::SubscriptionManager;
22
23use crate::handlers;
24
25/// 共享的后端连接引用,支持重连后原子替换。
26/// 所有 handler 持有同一个 Arc,重连时通过 ArcSwap::store 替换内部值,
27/// handler 下次 load() 即可获取新连接。
28pub type SharedBackend = Arc<ArcSwap<Option<Arc<BackendConn>>>>;
29
30/// listing_date (Unix timestamp 秒) → "YYYY-MM-DD" (东八区)
31fn listing_date_to_str(ts: u32) -> String {
32    // C++ 对 listing_date=0 也正常格式化为 "1970-01-01"(不跳过)
33    let utc_secs = ts as i64 + 8 * 3600;
34    let days = utc_secs.div_euclid(86400);
35    let (y, m, d) = days_to_ymd(days as i32 + 719468);
36    format!("{y:04}-{m:02}-{d:02}")
37}
38
39/// Unix 时间戳(秒) → "YYYY-MM-DD HH:MM:SS" (东八区)
40fn timestamp_to_datetime_str(ts: f64) -> String {
41    if ts <= 0.0 {
42        return String::new();
43    }
44    let secs = ts as i64;
45    // 东八区 offset = +8h
46    let utc_secs = secs + 8 * 3600;
47    let days = utc_secs.div_euclid(86400);
48    let day_secs = utc_secs.rem_euclid(86400);
49    let h = day_secs / 3600;
50    let m = (day_secs % 3600) / 60;
51    let s = day_secs % 60;
52    // 从 Unix epoch (1970-01-01) 计算日期
53    let (y, mo, d) = days_to_ymd(days as i32 + 719468); // 719468 = days from 0000-03-01 to 1970-01-01
54    format!("{y:04}-{mo:02}-{d:02} {h:02}:{m:02}:{s:02}")
55}
56
57/// 内部: 从 civil day number 转 (year, month, day)
58fn days_to_ymd(z: i32) -> (i32, u32, u32) {
59    let era = z.div_euclid(146097);
60    let doe = z.rem_euclid(146097) as u32;
61    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
62    let y = yoe as i32 + era * 400;
63    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
64    let mp = (5 * doy + 2) / 153;
65    let d = doy - (153 * mp + 2) / 5 + 1;
66    let m = if mp < 10 { mp + 3 } else { mp - 9 };
67    let y = if m <= 2 { y + 1 } else { y };
68    (y, m, d)
69}
70
71/// 后端 market_code (NN_QuoteMktID) → FTAPI QotMarket 映射
72/// 对齐 C++: NN_QuoteMktType_From_NN_QuoteMktID() → Market_NNToAPI()
73fn market_code_to_qot_market(market_code: u32) -> i32 {
74    match market_code {
75        // HK market: mainboard, gemboard, nasdaq, extend → QotMarket_HK_Security
76        1..=4 => 1,
77        // HK futures (old/new) → QotMarket_HK_Security (QotMarket_HK_Future=2 已废弃)
78        5 | 6 => 1,
79        // HK options (stock option, index option) → QotMarket_HK_Security
80        7 | 8 => 1,
81        // US market: NYSE, NASDAQ, AMEX, PINK, etc.
82        10..=29 => 11, // QotMarket_US_Security
83        // CN A-shares
84        30 | 32 | 33 | 34 | 36..=40 => 21, // SH, KCB → QotMarket_CNSH_Security
85        31 | 35 => 22,                     // SZ, CYB → QotMarket_CNSZ_Security
86        // US options
87        41..=49 => 11, // QotMarket_US_Security
88        // US futures: NYMEX, COMEX, CBOT, CME, CBOE
89        60..=109 => 11, // QotMarket_US_Security
90        // HK futures (new range)
91        110..=119 => 1, // QotMarket_HK_Security
92        // Forex
93        120..=123 => 81, // QotMarket_FX_Security
94        // SG futures
95        160..=179 => 31, // QotMarket_SG_Security
96        // JP futures
97        185..=194 => 41, // QotMarket_JP_Security
98        // HK index option
99        570 => 1, // QotMarket_HK_Security
100        // HK HSI index
101        1000..=1049 => 1, // QotMarket_HK_Security
102        // US new market codes
103        1200..=1249 => 11, // QotMarket_US_Security
104        _ => 0,            // Unknown
105    }
106}
107
108/// 后端 market_code → FTAPI ExchType 映射
109/// 参考 C++ Qot_Common.proto ExchType 枚举
110fn market_code_to_exch_type(market_code: u32) -> i32 {
111    match market_code {
112        1 => 1,     // HK MainBoard → ExchType_HK_MainBoard
113        2 => 2,     // HK GEM → ExchType_HK_GEMBoard
114        3..=9 => 3, // HK other → ExchType_HK_HKEX
115        10 => 4,    // US NYSE → ExchType_US_NYSE
116        11 => 5,    // US Nasdaq → ExchType_US_Nasdaq
117        12 => 7,    // US AMEX → ExchType_US_AMEX
118        13 => 6,    // US OTC → ExchType_US_Pink
119        30 => 14,   // CN SH → ExchType_CN_SH
120        31 => 15,   // CN SZ → ExchType_CN_SZ
121        41 => 8,    // US Option → ExchType_US_Option
122        _ => 0,     // Unknown
123    }
124}
125
126/// 网关配置
127#[derive(Debug, Clone)]
128pub struct GatewayConfig {
129    /// 认证服务器 URL
130    pub auth_server: String,
131    /// 登录账号
132    pub account: String,
133    /// 密码(明文,内部做 MD5)
134    pub password: String,
135    /// 后端服务器区域 (gz/sh/hk)
136    pub region: String,
137    /// API 服务监听地址
138    pub listen_addr: String,
139    /// 设备 ID
140    pub device_id: String,
141    /// App 语言: 0=简体 1=繁体 2=英文 (对应 C++ NN_AppLanguage)
142    pub app_lang: i32,
143}
144
145/// 推送事件 (从 push_callback 通过 channel 发送)
146pub enum PushEvent {
147    /// 行情推送 → 按订阅转发给客户端
148    QuotePush {
149        sec_key: String,
150        sub_type: i32,
151        proto_id: u32,
152        body: Vec<u8>,
153    },
154    /// 广播推送 → 发送给所有已连接的客户端 (到价提醒等)
155    BroadcastPush { proto_id: u32, body: Vec<u8> },
156    /// 交易数据重查 (收到 _UPDATE 通知后触发)
157    TradeReQuery {
158        acc_id: u64,
159        notice_type: u32,
160        /// CMD 4716 中携带的 order_ids (notice_type=4/100 有效)
161        order_ids: Vec<String>,
162        /// CMD 4716 中携带的 order_fill_ids (notice_type=6/101 有效)
163        order_fill_ids: Vec<String>,
164    },
165}
166
167/// 网关桥接器
168pub struct GatewayBridge {
169    pub qot_cache: Arc<QotCache>,
170    pub trd_cache: Arc<TrdCache>,
171    pub static_cache: Arc<StaticDataCache>,
172    pub login_cache: Arc<LoginCache>,
173    pub subscriptions: Arc<SubscriptionManager>,
174    pub backend: SharedBackend,
175    pub auth_result: Option<AuthResult>,
176    /// 推送事件 channel (push_callback → tokio 任务 → PushDispatcher)
177    pub push_tx: Option<tokio::sync::mpsc::Sender<PushEvent>>,
178    /// 停牌数据缓存 (stock_id → Vec<timestamp>)
179    pub suspend_cache: futu_backend::suspend_data::SuspendCache,
180    /// 代码变更数据缓存 (HTTP 下载的 CodeRelation + CodeTemp)
181    pub code_change_cache: futu_backend::code_change::CodeChangeCache,
182    /// App 语言: 0=简体 1=繁体 2=英文 (对应 C++ NN_AppLanguage)
183    pub app_lang: i32,
184    /// 历史 K 线请求次数计数器 (GetUsedQuota 返回)
185    pub kl_quota_counter: Arc<AtomicU32>,
186    /// 行情权限缓存 (CMD 6024 响应)
187    pub qot_right_cache: Arc<QotRightCache>,
188    /// 运行时监控指标
189    pub metrics: Arc<GatewayMetrics>,
190}
191
192impl GatewayBridge {
193    pub fn new() -> Self {
194        Self {
195            qot_cache: Arc::new(QotCache::new()),
196            trd_cache: Arc::new(TrdCache::new()),
197            static_cache: Arc::new(StaticDataCache::new()),
198            login_cache: Arc::new(LoginCache::new()),
199            subscriptions: Arc::new(SubscriptionManager::new()),
200            backend: Arc::new(ArcSwap::new(Arc::new(None))),
201            auth_result: None,
202            push_tx: None,
203            suspend_cache: Arc::new(dashmap::DashMap::new()),
204            code_change_cache: futu_backend::code_change::new_cache(),
205            app_lang: 0, // 默认简体中文,initialize() 时从 config 更新
206            kl_quota_counter: Arc::new(AtomicU32::new(0)),
207            qot_right_cache: Arc::new(QotRightCache::new()),
208            metrics: Arc::new(GatewayMetrics::new()),
209        }
210    }
211
212    /// 完整的启动流程: 认证 → 连接后端 → 登录 → 拉取数据
213    ///
214    /// `verify_cb`: 短信验证码回调(GUI 模式传入,CLI 模式传 None 使用 stdin)
215    pub async fn initialize(
216        &mut self,
217        config: &GatewayConfig,
218        verify_cb: Option<auth::VerifyCodeCallback>,
219    ) -> Result<tokio::sync::mpsc::Receiver<PushEvent>> {
220        self.app_lang = config.app_lang;
221
222        // Step 1: HTTP 认证
223        tracing::info!("step 1/4: HTTP authentication...");
224        let auth_config = AuthConfig {
225            auth_server: config.auth_server.clone(),
226            account: config.account.clone(),
227            password: config.password.clone(),
228            device_id: config.device_id.clone(),
229        };
230        let auth_result = auth::authenticate_with_callback(&auth_config, verify_cb).await?;
231        tracing::info!(user_id = auth_result.user_id, "authentication succeeded");
232
233        // Step 2+3: TCP 连接后端 + 登录(支持重定向)
234        let conn_points = auth::conn_points::get_by_region(&config.region);
235        // C++ f3clogin 从每个区域随机选一个 IP 并发尝试
236        // 我们按区域分组、每组随机选一个、顺序尝试
237        let mut backend_addr = {
238            use std::collections::HashMap;
239            let all = auth::conn_points::ALL;
240            let mut by_region: HashMap<&str, Vec<(&str, u16)>> = HashMap::new();
241            for (ip, port, region) in all {
242                by_region.entry(region).or_default().push((ip, *port));
243            }
244            // 从每个区域随机选一个
245            let mut candidates: Vec<String> = by_region
246                .values()
247                .map(|ips| {
248                    let idx = (std::time::SystemTime::now()
249                        .duration_since(std::time::UNIX_EPOCH)
250                        .unwrap_or_default()
251                        .subsec_nanos() as usize)
252                        % ips.len();
253                    let (ip, port) = ips[idx];
254                    format!("{ip}:{port}")
255                })
256                .collect();
257            // 随机排列
258            let seed = std::time::SystemTime::now()
259                .duration_since(std::time::UNIX_EPOCH)
260                .unwrap_or_default()
261                .subsec_nanos();
262            for i in (1..candidates.len()).rev() {
263                let j = (seed as usize + i) % (i + 1);
264                candidates.swap(i, j);
265            }
266            candidates
267                .first()
268                .cloned()
269                .unwrap_or_else(|| format!("{}:{}", conn_points[0].0, conn_points[0].1))
270        };
271
272        // 推送 channel: push_callback (同步) → tokio 任务 (异步) → PushDispatcher
273        let (push_tx, push_rx) = tokio::sync::mpsc::channel::<PushEvent>(4096);
274        self.push_tx = Some(push_tx.clone());
275
276        let push_cb: PushCallback = {
277            let static_cache = Arc::clone(&self.static_cache);
278            let qot_cache = Arc::clone(&self.qot_cache);
279            let push_metrics = Arc::clone(&self.metrics);
280            Arc::new(move |cmd_id, body| {
281                push_metrics
282                    .backend_pushes_received
283                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
284                match cmd_id {
285                    6212 => {
286                        // CMD 6212: 行情推送 → 解析 + 缓存 + 转发客户端
287                        Self::handle_quote_push(&body, &static_cache, &qot_cache, &push_tx);
288                    }
289                    4716 => {
290                        // CMD 4716: 交易通知 → 解析 + 触发重查
291                        Self::handle_trade_notify(&body, &push_tx);
292                    }
293                    5300 => {
294                        // CMD 5300: 消息中心推送(到价提醒等)
295                        Self::handle_msg_center_push(&body, &static_cache, &push_tx);
296                    }
297                    6803 => {
298                        // CMD 6803: 到价提醒设置变更通知(内部信号,不直接推送客户端)
299                        tracing::debug!(
300                            body_len = body.len(),
301                            "price reminder settings changed notification"
302                        );
303                    }
304                    _ => {
305                        tracing::debug!(cmd_id, body_len = body.len(), "backend push");
306                    }
307                }
308            })
309        };
310
311        let (mut backend, login_result) = {
312            const MAX_REDIRECTS: usize = 3;
313            let mut login_result = None;
314            let mut final_backend = None;
315
316            for attempt in 0..=MAX_REDIRECTS {
317                tracing::info!(addr = %backend_addr, attempt, "connecting to backend...");
318                let conn = BackendConn::connect(&backend_addr, push_cb.clone()).await?;
319
320                tracing::info!("TCP login...");
321                match login::tcp_login(
322                    &conn,
323                    &auth_result,
324                    &auth_result.client_key,
325                    true,
326                    attempt as u32,
327                )
328                .await
329                {
330                    Ok(result) => {
331                        conn.set_session_key(result.session_key);
332                        conn.set_sec_data(result.sec_data);
333                        conn.user_id
334                            .store(result.user_id as u32, std::sync::atomic::Ordering::Relaxed);
335                        login_result = Some(result);
336                        final_backend = Some(conn);
337                        break;
338                    }
339                    Err(FutuError::ServerError { ret_type: 1, msg }) => {
340                        // 重定向: 从错误消息提取新地址
341                        if let Some(new_addr) = msg.strip_prefix("redirect to ") {
342                            tracing::info!(redirect_to = new_addr, "login redirect");
343                            backend_addr = new_addr.to_string();
344                            continue;
345                        }
346                        return Err(FutuError::ServerError { ret_type: 1, msg });
347                    }
348                    Err(e) => return Err(e),
349                }
350            }
351
352            let backend = final_backend.ok_or(FutuError::Codec("too many redirects".into()))?;
353            let login_result =
354                login_result.expect("login_result must be Some after successful login");
355            (backend, login_result)
356        };
357
358        tracing::info!(
359            user_id = login_result.user_id,
360            addr = %backend_addr,
361            "login succeeded"
362        );
363
364        // Step 3b: 发送 CMD 1321 获取动态 IP 列表(对应 C++ UpdateConnIpAfterLogin)
365        tracing::info!("step 3b: fetching ConnIP list (CMD 1321)...");
366        let device_id_bytes = config.device_id.as_bytes().to_vec();
367        let conn_ip_list = futu_backend::conn_ip::fetch_conn_ip_list(
368            &backend,
369            login_result.user_id,
370            &device_id_bytes,
371        )
372        .await
373        .unwrap_or_else(|e| {
374            tracing::warn!(error = %e, "CMD1321 failed (non-fatal)");
375            vec![]
376        });
377
378        // 如果动态 IP 列表中有推荐地址(REGION_VIRTUAL_RECOMMANDED),
379        // 尝试重连到推荐的服务器(C++ 优先使用推荐地址)
380        if !conn_ip_list.is_empty() {
381            // 优先选推荐地址 (region >= 10001),其次选第一个
382            let best = conn_ip_list
383                .iter()
384                .find(|p| p.region >= 10001)
385                .or_else(|| conn_ip_list.first());
386
387            if let Some(best_point) = best {
388                let new_addr = format!("{}:{}", best_point.ip, best_point.port);
389                if new_addr != backend_addr {
390                    tracing::info!(
391                        old_addr = %backend_addr,
392                        new_addr = %new_addr,
393                        region = best_point.region,
394                        "ConnIP: switching to recommended server"
395                    );
396
397                    // 重连到新地址
398                    match BackendConn::connect(&new_addr, push_cb.clone()).await {
399                        Ok(new_conn) => {
400                            match login::tcp_login(
401                                &new_conn,
402                                &auth_result,
403                                &auth_result.client_key,
404                                true,
405                                0,
406                            )
407                            .await
408                            {
409                                Ok(new_result) => {
410                                    new_conn.set_session_key(new_result.session_key);
411                                    new_conn.set_sec_data(new_result.sec_data);
412                                    new_conn.user_id.store(
413                                        new_result.user_id as u32,
414                                        std::sync::atomic::Ordering::Relaxed,
415                                    );
416                                    backend_addr = new_addr;
417                                    backend = new_conn;
418                                    tracing::info!(
419                                        addr = %backend_addr,
420                                        "ConnIP: reconnected to recommended server"
421                                    );
422                                }
423                                Err(e) => {
424                                    tracing::warn!(
425                                        error = %e,
426                                        addr = %new_addr,
427                                        "ConnIP: reconnect login failed, keeping original"
428                                    );
429                                }
430                            }
431                        }
432                        Err(e) => {
433                            tracing::warn!(
434                                error = %e,
435                                addr = %new_addr,
436                                "ConnIP: connect failed, keeping original"
437                            );
438                        }
439                    }
440                }
441            }
442        }
443
444        // 保存登录状态
445        self.login_cache
446            .set_login_state(futu_cache::login_cache::LoginState {
447                user_id: login_result.user_id as u32,
448                is_logged_in: true,
449                login_account: config.account.clone(),
450                region: config.region.clone(),
451                server_addr: backend_addr.clone(),
452            });
453
454        let backend = Arc::new(backend);
455        self.backend.store(Arc::new(Some(Arc::clone(&backend))));
456        let auth_result_for_reconnect = auth_result.clone();
457        self.auth_result = Some(auth_result);
458
459        // Step 4: 拉取账户列表
460        tracing::info!("step 4/7: fetching account list...");
461        self.fetch_account_list(&backend).await?;
462
463        // Step 4b: 后台查询资金/持仓 (不阻塞启动)
464        {
465            let q_backend = Arc::clone(&backend);
466            let q_cache = Arc::clone(&self.trd_cache);
467            tokio::spawn(async move {
468                futu_backend::trade_query::init_trade_data(&q_backend, &q_cache).await;
469            });
470        }
471
472        // Step 4c: 请求行情权限 (CMD 6024, 对齐 C++ RequestAPIQuota)
473        {
474            let qr_backend = Arc::clone(&backend);
475            let qr_cache = Arc::clone(&self.qot_right_cache);
476            tokio::spawn(async move {
477                Self::request_qot_right(&qr_backend, &qr_cache).await;
478            });
479        }
480
481        // Step 5: 注册市场事件推送
482        tracing::info!("step 5/7: registering markets...");
483        if let Err(e) = futu_backend::stock_list::register_markets(&backend).await {
484            tracing::warn!(error = %e, "market registration failed (non-fatal)");
485        }
486
487        // Step 6: 拉取市场状态
488        tracing::info!("step 6/7: pulling market status...");
489        match futu_backend::stock_list::pull_market_status(&backend).await {
490            Ok(statuses) => {
491                for s in &statuses {
492                    tracing::info!(
493                        market_id = s.market_id,
494                        status = s.status,
495                        text = %s.status_text,
496                        "market status"
497                    );
498                }
499            }
500            Err(e) => tracing::warn!(error = %e, "market status pull failed (non-fatal)"),
501        }
502
503        // Step 7: 后台定期同步股票列表 (C++ SecListDBUpdater 对齐)
504        {
505            let sync_backend = Arc::clone(&backend);
506            let sync_cache = Arc::clone(&self.static_cache);
507            tokio::spawn(async move {
508                tracing::info!("stock list sync started (background)");
509
510                // 从 SQLite 加载上次同步的数据(对齐 C++ SecListDBHelper)
511                let stock_db = match crate::stock_db::StockDb::open() {
512                    Ok(db) => {
513                        let persisted_version = db.get_version();
514                        if persisted_version > 0 {
515                            // 从本地数据库恢复缓存
516                            match db.load_all() {
517                                Ok(items) => {
518                                    let mut loaded = 0usize;
519                                    for item in &items {
520                                        let qot_market =
521                                            market_code_to_qot_market(item.market_code);
522                                        let key = format!("{qot_market}_{}", item.code);
523                                        sync_cache.id_to_key.insert(item.stock_id, key.clone());
524                                        sync_cache.securities.insert(
525                                            key,
526                                            futu_cache::static_data::CachedSecurityInfo {
527                                                stock_id: item.stock_id,
528                                                market: qot_market,
529                                                code: item.code.clone(),
530                                                name: item.name_sc.clone(),
531                                                lot_size: item.lot_size as i32,
532                                                sec_type: item.instrument_type as i32,
533                                                list_time: listing_date_to_str(item.listing_date),
534                                                warrnt_stock_owner: item.warrnt_stock_owner,
535                                                delisting: item.delisting,
536                                                exch_type: market_code_to_exch_type(
537                                                    item.market_code,
538                                                ),
539                                                no_search: item.no_search,
540                                            },
541                                        );
542                                        if item.warrnt_stock_owner != 0 {
543                                            sync_cache.add_warrant_owner(
544                                                item.stock_id,
545                                                item.warrnt_stock_owner,
546                                            );
547                                        }
548                                        loaded += 1;
549                                    }
550                                    tracing::info!(
551                                        version = persisted_version,
552                                        loaded,
553                                        "restored stock list from SQLite"
554                                    );
555                                }
556                                Err(e) => {
557                                    tracing::warn!(error = %e, "failed to load stocks from SQLite")
558                                }
559                            }
560                        }
561                        Some(db)
562                    }
563                    Err(e) => {
564                        tracing::warn!(error = %e, "failed to open stock SQLite db");
565                        None
566                    }
567                };
568
569                let persisted_version = stock_db.as_ref().map(|db| db.get_version()).unwrap_or(0);
570                let version = std::sync::atomic::AtomicU64::new(persisted_version);
571
572                // 首次同步 + 按 next_interval 定期刷新
573                let mut retry_delay = 10u64; // 失败重试间隔(C++ UpdateFailRetryPause_S = 10)
574                loop {
575                    let sync_round = std::sync::atomic::AtomicUsize::new(0);
576                    let count = std::sync::atomic::AtomicUsize::new(0);
577                    let hk_eqty_count = std::sync::atomic::AtomicUsize::new(0);
578                    let deleted_count = std::sync::atomic::AtomicUsize::new(0);
579                    let unknown_market_count = std::sync::atomic::AtomicUsize::new(0);
580                    let cache_ref = &sync_cache;
581                    let db_ref = &stock_db;
582                    // 开始 SQLite 事务
583                    if let Some(db) = db_ref {
584                        let _ = db.begin_batch();
585                    }
586                    let mut callback = |info: futu_backend::stock_list::StockInfo| {
587                        if info.delete_flag {
588                            deleted_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
589                            if let Some(old_key) = cache_ref.id_to_key.remove(&info.stock_id) {
590                                cache_ref.securities.remove(&old_key.1);
591                            }
592                            // SQLite 也删除
593                            if let Some(db) = db_ref {
594                                let _ = db.delete(info.stock_id);
595                            }
596                            return;
597                        }
598                        let qot_market = market_code_to_qot_market(info.market_code);
599                        if qot_market == 0 {
600                            unknown_market_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
601                            if unknown_market_count.load(std::sync::atomic::Ordering::Relaxed) <= 5
602                            {
603                                tracing::warn!(
604                                    market_code = info.market_code,
605                                    code = %info.code,
606                                    instrument_type = info.instrument_type,
607                                    "unknown market_code"
608                                );
609                            }
610                        }
611                        let key = format!("{qot_market}_{}", info.code);
612                        cache_ref.id_to_key.insert(info.stock_id, key.clone());
613                        let sec_type = info.instrument_type as i32;
614                        if qot_market == 1 && sec_type == 3 {
615                            hk_eqty_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
616                        }
617                        cache_ref.securities.insert(
618                            key,
619                            futu_cache::static_data::CachedSecurityInfo {
620                                stock_id: info.stock_id,
621                                market: qot_market,
622                                code: info.code.clone(),
623                                name: info.name_sc.clone(),
624                                lot_size: info.lot_size as i32,
625                                sec_type,
626                                list_time: listing_date_to_str(info.listing_date),
627                                warrnt_stock_owner: info.warrnt_stock_owner,
628                                delisting: info.delisting,
629                                exch_type: market_code_to_exch_type(info.market_code),
630                                no_search: info.no_search,
631                            },
632                        );
633                        if info.warrnt_stock_owner != 0 {
634                            cache_ref.add_warrant_owner(info.stock_id, info.warrnt_stock_owner);
635                        }
636                        // 写入 SQLite
637                        if let Some(db) = db_ref {
638                            let _ = db.upsert(&crate::stock_db::DbStockItem {
639                                stock_id: info.stock_id,
640                                code: info.code.clone(),
641                                name_sc: info.name_sc.clone(),
642                                market_code: info.market_code,
643                                instrument_type: info.instrument_type,
644                                lot_size: info.lot_size,
645                                delisting: info.delisting,
646                                warrnt_stock_owner: info.warrnt_stock_owner,
647                                no_search: info.no_search,
648                                listing_date: info.listing_date,
649                            });
650                        }
651                        count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
652                    };
653                    match futu_backend::stock_list::sync_stock_list(
654                        &sync_backend,
655                        &version,
656                        &mut callback,
657                    )
658                    .await
659                    {
660                        Ok(result) => {
661                            let interval = result.next_interval_secs.max(30); // 至少 30 秒
662                            tracing::info!(
663                                total = result.total_stocks,
664                                cached = count.load(std::sync::atomic::Ordering::Relaxed),
665                                deleted = deleted_count.load(std::sync::atomic::Ordering::Relaxed),
666                                hk_eqty = hk_eqty_count.load(std::sync::atomic::Ordering::Relaxed),
667                                unknown_market =
668                                    unknown_market_count.load(std::sync::atomic::Ordering::Relaxed),
669                                next_interval = interval,
670                                "stock list sync complete"
671                            );
672                            retry_delay = 10; // 成功后重置重试间隔
673                                              // 提交 SQLite 事务 + 保存 version
674                            if let Some(db) = &stock_db {
675                                let _ = db.commit_batch();
676                                let cur_ver = version.load(std::sync::atomic::Ordering::Relaxed);
677                                let _ = db.set_version(cur_ver);
678                                tracing::debug!(
679                                    version = cur_ver,
680                                    db_count = db.count(),
681                                    "stock list version persisted to SQLite"
682                                );
683                            }
684                            let round =
685                                sync_round.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
686                            // 前几轮快速同步以加速数据收敛(对齐 C++ 长期运行的 SQLite 状态)
687                            let sleep_secs = if round < 5 { 5 } else { interval as u64 };
688                            tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
689                        }
690                        Err(e) => {
691                            tracing::warn!(
692                                error = %e,
693                                retry_secs = retry_delay,
694                                "stock list sync failed, retrying"
695                            );
696                            tokio::time::sleep(std::time::Duration::from_secs(retry_delay)).await;
697                            retry_delay = (retry_delay * 2).min(300); // 指数退避,最大 5 分钟
698                        }
699                    }
700                }
701            });
702        }
703
704        // 后台异步加载停牌数据 (不阻塞启动)
705        {
706            let suspend_cache = Arc::clone(&self.suspend_cache);
707            tokio::spawn(async move {
708                tracing::info!("suspend data download started (background)");
709                let loaded = futu_backend::suspend_data::load_suspend_data().await;
710                // 将下载的数据合并到 bridge 的 suspend_cache
711                for entry in loaded.iter() {
712                    suspend_cache.insert(*entry.key(), entry.value().clone());
713                }
714                tracing::info!(
715                    stocks = suspend_cache.len(),
716                    "suspend data download complete"
717                );
718            });
719        }
720
721        // 后台异步加载代码变更数据 (不阻塞启动)
722        {
723            let code_change_cache = Arc::clone(&self.code_change_cache);
724            tokio::spawn(async move {
725                tracing::info!("code change data download started (background)");
726                let loaded = futu_backend::code_change::load_code_change_data().await;
727                let count = loaded.len();
728                *code_change_cache.write() = loaded;
729                tracing::info!(records = count, "code change data download complete");
730            });
731        }
732
733        // 启动后端心跳 + 断连监控
734        let heartbeat_interval =
735            std::time::Duration::from_secs(login_result.keep_alive_interval as u64);
736        tracing::info!(interval_secs = ?heartbeat_interval, "starting backend heartbeat");
737        let heartbeat_handle =
738            futu_backend::heartbeat::start_heartbeat(Arc::clone(&backend), heartbeat_interval);
739
740        // 监控心跳任务——心跳退出后自动重连
741        // 持有 SharedBackend 的克隆,重连成功后原子替换,所有 handler 自动获取新连接
742        {
743            let reconnect_config = config.clone();
744            let reconnect_auth = auth_result_for_reconnect;
745            let reconnect_trd_cache = Arc::clone(&self.trd_cache);
746            let reconnect_static_cache = Arc::clone(&self.static_cache);
747            let reconnect_subscriptions = Arc::clone(&self.subscriptions);
748            let reconnect_metrics = Arc::clone(&self.metrics);
749            let shared_backend = Arc::clone(&self.backend);
750            let reconnect_push_cb = push_cb;
751            tokio::spawn(async move {
752                let mut current_heartbeat = heartbeat_handle;
753
754                loop {
755                    // 等待当前心跳退出
756                    let _ = current_heartbeat.await;
757                    tracing::error!("heartbeat exited — attempting reconnect...");
758
759                    // 标记后端离线
760                    reconnect_metrics
761                        .backend_online
762                        .store(0, std::sync::atomic::Ordering::Relaxed);
763
764                    // 立即清空共享连接,防止 handler 继续使用已断开的连接
765                    shared_backend.store(Arc::new(None));
766
767                    // 指数退避重连(无限重试,与 C++ OnDisConnectRelpyAll 对齐)
768                    let mut delay = 3u64;
769                    loop {
770                        tracing::info!(delay_secs = delay, "reconnecting to backend...");
771                        tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
772
773                        // 重新连接 + 登录
774                        let conn_points =
775                            auth::conn_points::get_by_region(&reconnect_config.region);
776                        let (addr, _port) = match conn_points.first() {
777                            Some(p) => p,
778                            None => {
779                                reconnect_metrics
780                                    .backend_reconnect_failures
781                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
782                                delay = (delay * 2).min(60);
783                                continue;
784                            }
785                        };
786                        let backend_addr = format!("{addr}:{_port}");
787
788                        let conn =
789                            match BackendConn::connect(&backend_addr, reconnect_push_cb.clone())
790                                .await
791                            {
792                                Ok(c) => c,
793                                Err(e) => {
794                                    reconnect_metrics
795                                        .backend_reconnect_failures
796                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
797                                    tracing::warn!(error = %e, "reconnect TCP failed");
798                                    delay = (delay * 2).min(60);
799                                    continue;
800                                }
801                            };
802
803                        match login::tcp_login(
804                            &conn,
805                            &reconnect_auth,
806                            &reconnect_auth.client_key,
807                            true,
808                            0,
809                        )
810                        .await
811                        {
812                            Ok(lr) => {
813                                conn.set_session_key(lr.session_key);
814                                conn.set_sec_data(lr.sec_data);
815                                conn.user_id
816                                    .store(lr.user_id as u32, std::sync::atomic::Ordering::Relaxed);
817                                tracing::info!(user_id = lr.user_id, "reconnect login succeeded");
818
819                                // 重新注册市场
820                                let _ = futu_backend::stock_list::register_markets(&conn).await;
821
822                                // 重新查询资金/持仓
823                                let conn_arc = Arc::new(conn);
824                                futu_backend::trade_query::init_trade_data(
825                                    &conn_arc,
826                                    &reconnect_trd_cache,
827                                )
828                                .await;
829
830                                // 原子替换共享连接——所有 handler 立刻可见
831                                shared_backend.store(Arc::new(Some(Arc::clone(&conn_arc))));
832                                tracing::info!("shared backend connection updated");
833
834                                // ===== 重新订阅行情 (reconnect hardening) =====
835                                // 收集所有客户端的行情订阅,重新发送 CMD 6211
836                                let resub_count = resubscribe_quotes(
837                                    &conn_arc,
838                                    &reconnect_subscriptions,
839                                    &reconnect_static_cache,
840                                )
841                                .await;
842                                if resub_count > 0 {
843                                    reconnect_metrics.resubscribe_ops.fetch_add(
844                                        resub_count as u64,
845                                        std::sync::atomic::Ordering::Relaxed,
846                                    );
847                                    tracing::info!(
848                                        securities = resub_count,
849                                        "quote re-subscription complete"
850                                    );
851                                }
852
853                                // 标记后端在线 + 更新指标
854                                reconnect_metrics
855                                    .backend_online
856                                    .store(1, std::sync::atomic::Ordering::Relaxed);
857                                reconnect_metrics
858                                    .backend_reconnects
859                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
860                                reconnect_metrics.last_reconnect_ms.store(
861                                    std::time::SystemTime::now()
862                                        .duration_since(std::time::UNIX_EPOCH)
863                                        .unwrap_or_default()
864                                        .as_millis() as u64,
865                                    std::sync::atomic::Ordering::Relaxed,
866                                );
867
868                                // 重启心跳——保存 handle 用于下次循环
869                                let hb_interval =
870                                    std::time::Duration::from_secs(lr.keep_alive_interval as u64);
871                                current_heartbeat = futu_backend::heartbeat::start_heartbeat(
872                                    Arc::clone(&conn_arc),
873                                    hb_interval,
874                                );
875
876                                tracing::info!("reconnect complete");
877                                break; // 跳出重试循环,回到外层 loop 等待新心跳退出
878                            }
879                            Err(e) => {
880                                reconnect_metrics
881                                    .backend_reconnect_failures
882                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
883                                tracing::warn!(error = %e, "reconnect login failed");
884                                delay = (delay * 2).min(60);
885                            }
886                        }
887                    }
888                }
889            });
890        }
891
892        tracing::info!("gateway initialization complete");
893        Ok(push_rx)
894    }
895
896    /// 处理后端 CMD 6212 行情推送
897    /// 解析 QuotePush → 更新 qot_cache → 通过 channel 发送 FTAPI 推送事件
898    fn handle_quote_push(
899        body: &[u8],
900        static_cache: &futu_cache::static_data::StaticDataCache,
901        qot_cache: &futu_cache::qot_cache::QotCache,
902        push_tx: &tokio::sync::mpsc::Sender<PushEvent>,
903    ) {
904        use futu_backend::proto_internal::ft_cmd_stock_quote_sub;
905
906        let push: ft_cmd_stock_quote_sub::QuotePush = match prost::Message::decode(body) {
907            Ok(p) => p,
908            Err(_) => {
909                // Reply-type CMD6212 (首次订阅快照) 格式不同于标准 QuotePush:
910                // 4字节前缀 + SecurityQuote-like protobuf (可能末尾截断)
911                // 直接从中提取 BitQuote 数据并处理
912                Self::handle_quote_push_reply(body, static_cache, qot_cache, push_tx);
913                return;
914            }
915        };
916
917        tracing::debug!(
918            sec_count = push.security_qta_list.len(),
919            "CMD6212 push parsed"
920        );
921
922        for sec_quote in &push.security_qta_list {
923            let stock_id = match sec_quote.security_id {
924                Some(id) => id,
925                None => continue,
926            };
927
928            // stock_id → security key + info
929            let sec_key = match static_cache.id_to_key.get(&stock_id) {
930                Some(k) => k.clone(),
931                None => {
932                    tracing::debug!(stock_id, "CMD6212: stock_id not found in id_to_key cache");
933                    continue;
934                }
935            };
936            let sec_info = static_cache.get_security_info(&sec_key);
937
938            for bit_quote in &sec_quote.bit_qta_list {
939                let bit = bit_quote.bit.unwrap_or(u32::MAX);
940                let data = match &bit_quote.data {
941                    Some(d) => d.as_slice(),
942                    None => continue,
943                };
944
945                tracing::debug!(
946                    stock_id,
947                    %sec_key,
948                    bit,
949                    data_len = data.len(),
950                    "CMD6212 processing bit"
951                );
952
953                match bit {
954                    0 => {
955                        // SBIT_PRICE: 解析 + 缓存 + 构建 FTAPI push
956                        if let Some(ref info) = sec_info {
957                            if let Some(event) =
958                                Self::parse_price_to_push(data, &sec_key, info, qot_cache)
959                            {
960                                let _ = push_tx.try_send(event);
961                            }
962                        }
963                    }
964                    1 => {
965                        // SBIT_STOCK_STATE: 停牌状态 → 更新缓存 + 发推送
966                        Self::parse_stock_state(data, &sec_key, qot_cache);
967                        // C++ 每个 bit 处理完都推送当前累积状态
968                        if let Some(ref info) = sec_info {
969                            if let Some(event) =
970                                Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
971                            {
972                                let _ = push_tx.try_send(event);
973                            }
974                        }
975                    }
976                    3 => {
977                        // SBIT_ORDER_BOOK: 买卖盘
978                        if let Some(ref info) = sec_info {
979                            if let Some(event) =
980                                Self::parse_order_book_to_push(data, &sec_key, info, qot_cache)
981                            {
982                                let _ = push_tx.try_send(event);
983                            }
984                        }
985                    }
986                    5 => {
987                        // SBIT_DEAL_STATISTICS: 成交量/额/OHLC → 更新缓存 + 发推送
988                        Self::parse_deal_statistics(data, &sec_key, qot_cache);
989                        // C++ 每个 bit 处理完都推送当前累积状态
990                        if let Some(ref info) = sec_info {
991                            if let Some(event) =
992                                Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
993                            {
994                                let _ = push_tx.try_send(event);
995                            }
996                        }
997                    }
998                    9 => {
999                        // SBIT_HK_BROKER_QUEUE: 港股经纪队列
1000                        if let Some(ref info) = sec_info {
1001                            if let Some(event) =
1002                                Self::parse_broker_queue_to_push(data, &sec_key, info, qot_cache)
1003                            {
1004                                let _ = push_tx.try_send(event);
1005                            }
1006                        }
1007                    }
1008                    35 => {
1009                        // SBIT_TICK: 逐笔成交
1010                        if let Some(ref info) = sec_info {
1011                            if let Some(event) =
1012                                Self::parse_ticker_to_push(data, &sec_key, info, qot_cache)
1013                            {
1014                                let _ = push_tx.try_send(event);
1015                            }
1016                        }
1017                    }
1018                    20 => {
1019                        // SBIT_TIME_SHARING: 分时数据
1020                        if let Some(ref info) = sec_info {
1021                            if let Some(event) =
1022                                Self::parse_rt_to_push(data, &sec_key, info, qot_cache)
1023                            {
1024                                let _ = push_tx.try_send(event);
1025                            }
1026                        }
1027                    }
1028                    21..=31 => {
1029                        // SBIT_KLINE_*: K 线数据 (1min=21, 3min=22, ..., year=31)
1030                        if let Some(ref info) = sec_info {
1031                            if let Some(event) =
1032                                Self::parse_kline_to_push(data, bit, &sec_key, info, qot_cache)
1033                            {
1034                                let _ = push_tx.try_send(event);
1035                            }
1036                        }
1037                    }
1038                    _ => {}
1039                }
1040            }
1041        }
1042    }
1043
1044    /// 处理 Reply-type CMD6212 帧
1045    ///
1046    /// 后端在首次订阅时发送 Reply-type (非 Push-type) 的 CMD6212 帧,
1047    /// 格式: 4字节 BE 前缀 + 1个 varint (变长) + QuotePush protobuf (可能末尾截断).
1048    /// 跳过前缀和 varint 后, 直接用 prost 解码为 QuotePush.
1049    fn handle_quote_push_reply(
1050        body: &[u8],
1051        static_cache: &futu_cache::static_data::StaticDataCache,
1052        qot_cache: &futu_cache::qot_cache::QotCache,
1053        push_tx: &tokio::sync::mpsc::Sender<PushEvent>,
1054    ) {
1055        use futu_backend::proto_internal::ft_cmd_stock_quote_sub;
1056
1057        if body.len() < 8 {
1058            return;
1059        }
1060
1061        // Reply 格式: 4字节 BE 前缀 + 1个 varint + QuotePush protobuf
1062        // 跳过 4 字节前缀, 然后读取并跳过一个 varint
1063        let data = &body[4..];
1064        let mut pos = 0;
1065        // 读取并跳过一个 varint
1066        while pos < data.len() {
1067            let b = data[pos];
1068            pos += 1;
1069            if b & 0x80 == 0 {
1070                break;
1071            }
1072        }
1073
1074        if pos >= data.len() {
1075            tracing::debug!(
1076                body_len = body.len(),
1077                "CMD6212 reply: too short after prefix"
1078            );
1079            return;
1080        }
1081
1082        let proto_data = &data[pos..];
1083
1084        // 尝试解码为 QuotePush (末尾可能截断, 所以用 decode_length_delimited 的方式手动处理)
1085        let push: ft_cmd_stock_quote_sub::QuotePush = match prost::Message::decode(proto_data) {
1086            Ok(p) => p,
1087            Err(_) => {
1088                // 末尾截断时 prost 可能失败, 尝试只解析到可用长度
1089                // QuotePush = { repeated SecurityQuote security_qta_list = 1; }
1090                // 手动解析 field 1 (length-delimited) 条目
1091                match Self::parse_quote_push_truncated(proto_data) {
1092                    Some(p) => p,
1093                    None => {
1094                        tracing::debug!(
1095                            body_len = body.len(),
1096                            proto_len = proto_data.len(),
1097                            "CMD6212 reply: decode failed"
1098                        );
1099                        return;
1100                    }
1101                }
1102            }
1103        };
1104
1105        tracing::debug!(
1106            sec_count = push.security_qta_list.len(),
1107            "CMD6212 reply parsed as QuotePush"
1108        );
1109
1110        // 复用标准 QuotePush 处理逻辑
1111        for sec_quote in &push.security_qta_list {
1112            let stock_id = match sec_quote.security_id {
1113                Some(id) => id,
1114                None => continue,
1115            };
1116
1117            let sec_key = match static_cache.id_to_key.get(&stock_id) {
1118                Some(k) => k.clone(),
1119                None => {
1120                    tracing::debug!(stock_id, "CMD6212 reply: stock_id not in cache");
1121                    continue;
1122                }
1123            };
1124            let sec_info = static_cache.get_security_info(&sec_key);
1125
1126            for bit_quote in &sec_quote.bit_qta_list {
1127                let bit = bit_quote.bit.unwrap_or(u32::MAX);
1128                let data = match &bit_quote.data {
1129                    Some(d) => d.as_slice(),
1130                    None => continue,
1131                };
1132
1133                tracing::debug!(
1134                    stock_id,
1135                    %sec_key,
1136                    bit,
1137                    data_len = data.len(),
1138                    "CMD6212 reply processing bit"
1139                );
1140
1141                match bit {
1142                    0 => {
1143                        if let Some(ref info) = sec_info {
1144                            if let Some(event) =
1145                                Self::parse_price_to_push(data, &sec_key, info, qot_cache)
1146                            {
1147                                let _ = push_tx.try_send(event);
1148                            }
1149                        }
1150                    }
1151                    1 => {
1152                        Self::parse_stock_state(data, &sec_key, qot_cache);
1153                        if let Some(ref info) = sec_info {
1154                            if let Some(event) =
1155                                Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
1156                            {
1157                                let _ = push_tx.try_send(event);
1158                            }
1159                        }
1160                    }
1161                    3 => {
1162                        if let Some(ref info) = sec_info {
1163                            if let Some(event) =
1164                                Self::parse_order_book_to_push(data, &sec_key, info, qot_cache)
1165                            {
1166                                let _ = push_tx.try_send(event);
1167                            }
1168                        }
1169                    }
1170                    5 => {
1171                        Self::parse_deal_statistics(data, &sec_key, qot_cache);
1172                        if let Some(ref info) = sec_info {
1173                            if let Some(event) =
1174                                Self::build_basic_qot_push_from_cache(&sec_key, info, qot_cache)
1175                            {
1176                                let _ = push_tx.try_send(event);
1177                            }
1178                        }
1179                    }
1180                    9 => {
1181                        if let Some(ref info) = sec_info {
1182                            if let Some(event) =
1183                                Self::parse_broker_queue_to_push(data, &sec_key, info, qot_cache)
1184                            {
1185                                let _ = push_tx.try_send(event);
1186                            }
1187                        }
1188                    }
1189                    35 => {
1190                        if let Some(ref info) = sec_info {
1191                            if let Some(event) =
1192                                Self::parse_ticker_to_push(data, &sec_key, info, qot_cache)
1193                            {
1194                                let _ = push_tx.try_send(event);
1195                            }
1196                        }
1197                    }
1198                    20 => {
1199                        if let Some(ref info) = sec_info {
1200                            if let Some(event) =
1201                                Self::parse_rt_to_push(data, &sec_key, info, qot_cache)
1202                            {
1203                                let _ = push_tx.try_send(event);
1204                            }
1205                        }
1206                    }
1207                    21..=31 => {
1208                        if let Some(ref info) = sec_info {
1209                            if let Some(event) =
1210                                Self::parse_kline_to_push(data, bit, &sec_key, info, qot_cache)
1211                            {
1212                                let _ = push_tx.try_send(event);
1213                            }
1214                        }
1215                    }
1216                    _ => {
1217                        tracing::debug!(bit, "CMD6212 reply: unhandled bit");
1218                    }
1219                }
1220            }
1221        }
1222    }
1223
1224    /// 解析可能截断的 QuotePush protobuf
1225    ///
1226    /// 当末尾截断时 prost::Message::decode 会失败,
1227    /// 这里手动解析 field 1 (repeated SecurityQuote) 并忽略截断的尾部.
1228    fn parse_quote_push_truncated(
1229        data: &[u8],
1230    ) -> Option<futu_backend::proto_internal::ft_cmd_stock_quote_sub::QuotePush> {
1231        use futu_backend::proto_internal::ft_cmd_stock_quote_sub;
1232
1233        let mut entries = Vec::new();
1234        let mut pos = 0;
1235
1236        while pos < data.len() {
1237            // 读取 tag
1238            let (tag, new_pos) = Self::read_varint(data, pos);
1239            if tag == 0 || new_pos >= data.len() {
1240                break;
1241            }
1242            pos = new_pos;
1243            let field_num = tag >> 3;
1244            let wire_type = tag & 7;
1245
1246            if wire_type != 2 {
1247                break; // 只处理 length-delimited
1248            }
1249
1250            let (length, new_pos) = Self::read_varint(data, pos);
1251            pos = new_pos;
1252            let length = length as usize;
1253
1254            if pos + length > data.len() {
1255                // 截断 — 尝试解析已有部分
1256                if field_num == 1 {
1257                    let partial = &data[pos..];
1258                    if let Ok(sq) = prost::Message::decode(partial) {
1259                        entries.push(sq);
1260                    }
1261                }
1262                break;
1263            }
1264
1265            if field_num == 1 {
1266                let content = &data[pos..pos + length];
1267                if let Ok(sq) = prost::Message::decode(content) {
1268                    entries.push(sq);
1269                }
1270            }
1271            pos += length;
1272        }
1273
1274        if entries.is_empty() {
1275            return None;
1276        }
1277
1278        Some(ft_cmd_stock_quote_sub::QuotePush {
1279            security_qta_list: entries,
1280        })
1281    }
1282
1283    /// 从 protobuf 字节流读取 varint
1284    fn read_varint(data: &[u8], mut pos: usize) -> (u64, usize) {
1285        let mut result: u64 = 0;
1286        let mut shift = 0;
1287        while pos < data.len() {
1288            let b = data[pos];
1289            result |= ((b & 0x7f) as u64) << shift;
1290            pos += 1;
1291            if b & 0x80 == 0 {
1292                break;
1293            }
1294            shift += 7;
1295            if shift >= 64 {
1296                break;
1297            }
1298        }
1299        (result, pos)
1300    }
1301
1302    /// 解析 SBIT_PRICE → 更新缓存 + 构建 Qot_UpdateBasicQot push event
1303    fn parse_price_to_push(
1304        data: &[u8],
1305        sec_key: &str,
1306        sec_info: &futu_cache::static_data::CachedSecurityInfo,
1307        qot_cache: &futu_cache::qot_cache::QotCache,
1308    ) -> Option<PushEvent> {
1309        use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1310
1311        let price: ft_cmd_stock_quote_coverage_data::Price = prost::Message::decode(data).ok()?;
1312
1313        let cur_price = price.price_nominal.unwrap_or(0) as f64 / 1_000_000_000.0;
1314        let last_close = price.price_last_close.unwrap_or(0) as f64 / 1_000_000_000.0;
1315        let update_time_ms = price.exchange_data_time_ms.unwrap_or(0);
1316        let update_ts = update_time_ms as f64 / 1000.0;
1317
1318        if cur_price <= 0.0 {
1319            return None;
1320        }
1321
1322        // 更新缓存 — 合并已有 OHLCV (来自 SBIT_DEAL_STATISTICS) + 停牌状态 (来自 SBIT_STOCK_STATE)
1323        let existing = qot_cache.get_basic_qot(sec_key);
1324        let (open, high, low, vol, tnov, tnov_rate, amp, suspended) = match &existing {
1325            Some(e) => (
1326                e.open_price,
1327                e.high_price,
1328                e.low_price,
1329                e.volume,
1330                e.turnover,
1331                e.turnover_rate,
1332                e.amplitude,
1333                e.is_suspended,
1334            ),
1335            None => (0.0, 0.0, 0.0, 0, 0.0, 0.0, 0.0, false),
1336        };
1337        let cached = futu_cache::qot_cache::CachedBasicQot {
1338            cur_price,
1339            last_close_price: last_close,
1340            open_price: open,
1341            high_price: high,
1342            low_price: low,
1343            volume: vol,
1344            turnover: tnov,
1345            turnover_rate: tnov_rate,
1346            amplitude: amp,
1347            is_suspended: suspended,
1348            update_time: timestamp_to_datetime_str(update_ts),
1349            update_timestamp: update_ts,
1350        };
1351        qot_cache.update_basic_qot(sec_key, cached.clone());
1352
1353        // 构建 FTAPI Qot_UpdateBasicQot::Response — 包含完整 OHLCV
1354        let security = futu_proto::qot_common::Security {
1355            market: sec_info.market,
1356            code: sec_info.code.clone(),
1357        };
1358        let basic_qot = futu_proto::qot_common::BasicQot {
1359            security,
1360            name: Some(sec_info.name.clone()),
1361            is_suspended: cached.is_suspended,
1362            list_time: sec_info.list_time.clone(),
1363            price_spread: 0.0,
1364            update_time: cached.update_time.clone(),
1365            high_price: cached.high_price,
1366            open_price: cached.open_price,
1367            low_price: cached.low_price,
1368            cur_price,
1369            last_close_price: last_close,
1370            volume: cached.volume,
1371            turnover: cached.turnover,
1372            turnover_rate: cached.turnover_rate,
1373            amplitude: cached.amplitude,
1374            dark_status: None,
1375            option_ex_data: None,
1376            list_timestamp: None,
1377            update_timestamp: Some(update_ts),
1378            pre_market: None,
1379            after_market: None,
1380            overnight: None,
1381            future_ex_data: None,
1382            sec_status: None,
1383            warrant_ex_data: None,
1384        };
1385
1386        let resp = futu_proto::qot_update_basic_qot::Response {
1387            ret_type: 0,
1388            ret_msg: None,
1389            err_code: None,
1390            s2c: Some(futu_proto::qot_update_basic_qot::S2c {
1391                basic_qot_list: vec![basic_qot],
1392            }),
1393        };
1394
1395        Some(PushEvent::QuotePush {
1396            sec_key: sec_key.to_string(),
1397            sub_type: 1, // SubType_Basic
1398            proto_id: futu_core::proto_id::QOT_UPDATE_BASIC_QOT,
1399            body: prost::Message::encode_to_vec(&resp),
1400        })
1401    }
1402
1403    /// 处理后端 CMD 4716 交易通知
1404    /// C++ NoticeType: 1=CASH, 2=PSTN, 3=PSTN_LIST, 4=ORDER, 5=ORDER_LIST,
1405    /// 6=FILL, 7=FILL_LIST, 9=OP_RESULT, 11=ASSET, 12=QUOTE, 100=ORDER_NTF, 101=FILL_NTF
1406    fn handle_trade_notify(body: &[u8], push_tx: &tokio::sync::mpsc::Sender<PushEvent>) {
1407        use futu_backend::proto_internal::order_sys_interface;
1408
1409        let notify: order_sys_interface::NotifyMsg = match prost::Message::decode(body) {
1410            Ok(n) => n,
1411            Err(e) => {
1412                tracing::debug!(error = %e, "CMD4716 decode failed");
1413                return;
1414            }
1415        };
1416
1417        let notice_type = notify.notice_type.unwrap_or(0);
1418        let acc_id = notify
1419            .msg_header
1420            .as_ref()
1421            .and_then(|h| h.account_id)
1422            .unwrap_or(0);
1423
1424        let order_ids = notify.order_ids.clone();
1425        let fill_ids = notify.order_fill_ids.clone();
1426
1427        match notice_type {
1428            1 => {
1429                // NOTICE_TYPE_CASH_UPDATE → 重查资金
1430                tracing::info!(acc_id, "cash update → re-query funds");
1431                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1432                    acc_id,
1433                    notice_type,
1434                    order_ids: vec![],
1435                    order_fill_ids: vec![],
1436                });
1437            }
1438            2 => {
1439                // NOTICE_TYPE_PSTN_UPDATE → 重查持仓
1440                tracing::info!(acc_id, symbols = ?notify.symbols, "position update");
1441                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1442                    acc_id,
1443                    notice_type,
1444                    order_ids: vec![],
1445                    order_fill_ids: vec![],
1446                });
1447            }
1448            3 => {
1449                // NOTICE_TYPE_PSTN_LIST_UPDATE → 重查持仓列表
1450                tracing::info!(acc_id, "position list update");
1451                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1452                    acc_id,
1453                    notice_type,
1454                    order_ids: vec![],
1455                    order_fill_ids: vec![],
1456                });
1457            }
1458            4 => {
1459                // NOTICE_TYPE_ORDER_UPDATE → 重查订单 + 推送
1460                tracing::info!(acc_id, orders = ?order_ids, "order update");
1461                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1462                    acc_id,
1463                    notice_type,
1464                    order_ids: order_ids.clone(),
1465                    order_fill_ids: vec![],
1466                });
1467            }
1468            5 => {
1469                // NOTICE_TYPE_ORDER_LIST_UPDATE → 重查订单列表
1470                tracing::info!(acc_id, "order list update");
1471                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1472                    acc_id,
1473                    notice_type,
1474                    order_ids: vec![],
1475                    order_fill_ids: vec![],
1476                });
1477            }
1478            6 => {
1479                // NOTICE_TYPE_ORDER_FILL_UPDATE → 重查成交 + 推送
1480                tracing::info!(acc_id, fills = ?fill_ids, "fill update");
1481                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1482                    acc_id,
1483                    notice_type,
1484                    order_ids: vec![],
1485                    order_fill_ids: fill_ids.clone(),
1486                });
1487            }
1488            7 => {
1489                // NOTICE_TYPE_ORDER_FILL_LIST_UPDATE → 重查成交列表
1490                tracing::info!(acc_id, "fill list update");
1491                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1492                    acc_id,
1493                    notice_type,
1494                    order_ids: vec![],
1495                    order_fill_ids: vec![],
1496                });
1497            }
1498            9 => {
1499                // NOTICE_TYPE_ORDER_OP_RESULT → 重查订单
1500                tracing::info!(acc_id, req_ids = ?notify.order_op_req_ids, "order op result");
1501                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1502                    acc_id,
1503                    notice_type,
1504                    order_ids: order_ids.clone(),
1505                    order_fill_ids: vec![],
1506                });
1507            }
1508            11 | 12 => {
1509                // NOTICE_TYPE_ASSET_UPDATE / QUOTE_UPDATE → 重查资金
1510                tracing::info!(acc_id, notice_type, "asset/quote update");
1511                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1512                    acc_id,
1513                    notice_type,
1514                    order_ids: vec![],
1515                    order_fill_ids: vec![],
1516                });
1517            }
1518            100 => {
1519                // NOTICE_TYPE_ORDER_NTF → 订单推送 + 重查
1520                tracing::info!(acc_id, orders = ?order_ids, "order notification");
1521                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1522                    acc_id,
1523                    notice_type: 4,
1524                    order_ids: order_ids.clone(),
1525                    order_fill_ids: vec![],
1526                });
1527            }
1528            101 => {
1529                // NOTICE_TYPE_ORDER_FILL_NTF → 成交推送 + 重查
1530                tracing::info!(acc_id, fills = ?fill_ids, "fill notification");
1531                let _ = push_tx.try_send(PushEvent::TradeReQuery {
1532                    acc_id,
1533                    notice_type: 6,
1534                    order_ids: vec![],
1535                    order_fill_ids: fill_ids.clone(),
1536                });
1537            }
1538            _ => {
1539                tracing::debug!(acc_id, notice_type, "unhandled trade notification");
1540            }
1541        }
1542    }
1543
1544    /// 处理后端 CMD 5300 消息中心推送
1545    ///
1546    /// C++ 流程: MsgCenterNotify → 按 msg_category 分类处理
1547    /// category=1 (MSG_CATEGORY_PRICEWARN): 解析 PriceWarnMsg → 转换为 Qot_UpdatePriceReminder 推送
1548    fn handle_msg_center_push(
1549        body: &[u8],
1550        static_cache: &Arc<StaticDataCache>,
1551        push_tx: &tokio::sync::mpsc::Sender<PushEvent>,
1552    ) {
1553        use futu_backend::proto_internal::msgcenter;
1554        use futu_backend::proto_internal::msgdefine;
1555
1556        let notify: msgcenter::MsgCenterNotify = match prost::Message::decode(body) {
1557            Ok(n) => n,
1558            Err(e) => {
1559                tracing::debug!(error = %e, "CMD5300 MsgCenterNotify decode failed");
1560                return;
1561            }
1562        };
1563
1564        let category = notify.msg_category.unwrap_or(0);
1565        tracing::debug!(
1566            user_id = notify.user_id.unwrap_or(0),
1567            category,
1568            "CMD5300 message center push"
1569        );
1570
1571        // 目前只处理到价提醒 (category=1)
1572        if category != 1 {
1573            tracing::debug!(category, "CMD5300: unhandled msg category");
1574            return;
1575        }
1576
1577        // 解析消息体
1578        let msg_item = match &notify.notify_msg {
1579            Some(item) => item,
1580            None => {
1581                tracing::debug!("CMD5300: no notify_msg");
1582                return;
1583            }
1584        };
1585
1586        let msg_body = match &msg_item.body {
1587            Some(b) => b,
1588            None => {
1589                tracing::debug!("CMD5300: no msg body");
1590                return;
1591            }
1592        };
1593
1594        let price_warn: msgdefine::PriceWarnMsg = match prost::Message::decode(msg_body.as_ref()) {
1595            Ok(m) => m,
1596            Err(e) => {
1597                tracing::debug!(error = %e, "CMD5300: PriceWarnMsg decode failed");
1598                return;
1599            }
1600        };
1601
1602        let stock_id = price_warn.stock_id.unwrap_or(0);
1603
1604        // 从 static_cache 中查找 stock_id 对应的 security
1605        let sec_info = static_cache.get_security_info_by_stock_id(stock_id);
1606        let (market, code) = match &sec_info {
1607            Some(info) => (info.market, info.code.clone()),
1608            None => {
1609                tracing::debug!(stock_id, "CMD5300: stock_id not found in cache");
1610                return;
1611            }
1612        };
1613
1614        let stock_name = sec_info
1615            .as_ref()
1616            .map(|i| i.name.clone())
1617            .unwrap_or_default();
1618
1619        // 价格: 后端 price * 1e3 → 真实价格
1620        let price = price_warn.price.unwrap_or(0) as f64 / 1000.0;
1621        // 涨跌幅: 后端 * 1e3 → 百分比
1622        let change_rate = price_warn.price_change_ratio.unwrap_or(0) as f64 / 1000.0;
1623        let market_status = price_warn.market_status.unwrap_or(0);
1624
1625        // 消息内容(根据语言选择,默认简体)
1626        let content = price_warn.msg_content.clone().unwrap_or_default();
1627        let note = price_warn.note.clone().unwrap_or_default();
1628
1629        // 解析 warn_detail (含市场类型相关的单位换算)
1630        let (key, reminder_type, set_value, cur_value) =
1631            if let Some(detail) = &price_warn.warn_detail {
1632                let warn_type = detail.r#type.unwrap_or(0);
1633                let api_type = Self::warn_type_to_api_type(warn_type);
1634                let sv_raw = detail.set_value.unwrap_or(0);
1635                let cv_raw = detail.present_value.unwrap_or(0);
1636                // 对齐 C++ GetPriceReminderCltValue: 按市场类型和提醒类型做单位换算
1637                let sv = Self::price_reminder_clt_value(sv_raw, warn_type, sec_info.as_ref());
1638                let cv = Self::price_reminder_clt_value(cv_raw, warn_type, sec_info.as_ref());
1639                (detail.key.unwrap_or(0) as i64, api_type, sv, cv)
1640            } else {
1641                (0i64, 0i32, 0.0f64, 0.0f64)
1642            };
1643
1644        tracing::info!(
1645            stock_id,
1646            market,
1647            code = %code,
1648            price,
1649            change_rate,
1650            reminder_type,
1651            "price reminder push → 3019"
1652        );
1653
1654        // 构建 Qot_UpdatePriceReminder::Response
1655        let s2c = futu_proto::qot_update_price_reminder::S2c {
1656            security: futu_proto::qot_common::Security { market, code },
1657            name: Some(stock_name),
1658            price,
1659            change_rate,
1660            market_status,
1661            content,
1662            note,
1663            key: Some(key),
1664            r#type: Some(reminder_type),
1665            set_value: Some(set_value),
1666            cur_value: Some(cur_value),
1667        };
1668
1669        let resp = futu_proto::qot_update_price_reminder::Response {
1670            ret_type: 0, // RetType_Succeed
1671            ret_msg: None,
1672            err_code: None,
1673            s2c: Some(s2c),
1674        };
1675
1676        let push_body = prost::Message::encode_to_vec(&resp);
1677
1678        let _ = push_tx.try_send(PushEvent::BroadcastPush {
1679            proto_id: futu_core::proto_id::QOT_UPDATE_PRICE_REMINDER,
1680            body: push_body,
1681        });
1682    }
1683
1684    /// 后端 WarnType → API PriceReminderType 映射
1685    fn warn_type_to_api_type(warn_type: u32) -> i32 {
1686        match warn_type {
1687            4 => 1,   // UP_PRICE -> PriceUp
1688            8 => 2,   // DOWN_PRICE -> PriceDown
1689            1 => 3,   // DAY_UP_RATIO -> ChangeRateUp
1690            2 => 4,   // DAY_DOWN_RATIO -> ChangeRateDown
1691            9 => 5,   // FIVE_MIN_UP_RATIO
1692            10 => 6,  // FIVE_MIN_DOWN_RATIO
1693            11 => 7,  // UP_VOLUME
1694            12 => 8,  // UP_TURNOVER
1695            13 => 9,  // UP_TOR
1696            14 => 10, // UP_BUY_PRICE_1
1697            15 => 11, // DOWN_SELL_PRICE_1
1698            16 => 12, // UP_BUY_VOLUME_1
1699            17 => 13, // DOWN_SELL_VOLUME_1
1700            19 => 14, // THREE_MIN_UP_RATIO
1701            20 => 15, // THREE_MIN_DOWN_RATIO
1702            _ => 0,
1703        }
1704    }
1705
1706    /// 后端提醒值 → 客户端显示值的单位换算
1707    ///
1708    /// 对齐 C++ GetPriceReminderCltValue():
1709    /// - 基础: nValue / 1000.0
1710    /// - VolumeUp (11): 非期权/期货 × 10000; A股额外 × 100
1711    /// - TurnOverUp (12): × 10000
1712    /// - BuyVolumeUp (16) / SellVolumeUp (17): 非期权/期货 × 1000; A股额外 × 100
1713    fn price_reminder_clt_value(
1714        raw_value: i64,
1715        warn_type: u32,
1716        sec_info: Option<&futu_cache::static_data::CachedSecurityInfo>,
1717    ) -> f64 {
1718        let mut value = raw_value as f64 / 1000.0;
1719
1720        // 判断市场类型
1721        let (is_option, is_future, is_cn) = if let Some(info) = sec_info {
1722            let is_opt = info.sec_type == 6 || info.sec_type == 8; // OPTION, INDEX_OPTION
1723            let is_fut = info.sec_type == 5; // FUTURE
1724            let is_cn_market = info.market == 21 || info.market == 22; // CNSH, CNSZ
1725            (is_opt, is_fut, is_cn_market)
1726        } else {
1727            (false, false, false)
1728        };
1729
1730        match warn_type {
1731            11 => {
1732                // VolumeUp (成交量): 非期权/期货 ×10000; A股额外 ×100
1733                if !is_option && !is_future {
1734                    value *= 10000.0;
1735                }
1736                if is_cn {
1737                    value *= 100.0;
1738                }
1739            }
1740            12 => {
1741                // TurnOverUp (成交额): ×10000
1742                value *= 10000.0;
1743            }
1744            16 | 17 => {
1745                // BuyVolumeUp / SellVolumeUp: 非期权/期货 ×1000; A股额外 ×100
1746                if !is_option && !is_future {
1747                    value *= 1000.0;
1748                }
1749                if is_cn {
1750                    value *= 100.0;
1751                }
1752            }
1753            _ => {
1754                // 价格类型的提醒: 不需要额外换算,/ 1000.0 已足够
1755            }
1756        }
1757
1758        value
1759    }
1760
1761    /// 请求行情权限 (CMD 6024)
1762    /// 对齐 C++ RequestAPIQuota / RequestMkrQotRightWithPushedQotRight
1763    async fn request_qot_right(
1764        backend: &futu_backend::conn::BackendConn,
1765        qot_right_cache: &QotRightCache,
1766    ) {
1767        use futu_backend::proto_internal::qta_auth;
1768
1769        tracing::info!("CMD6024: requesting qot right...");
1770
1771        // C++ 发送空请求(不设置任何字段)
1772        let req = qta_auth::QtaAuth6024Req {
1773            req_hk_auth: None,
1774            req_us_auth: None,
1775            req_cn_auth: None,
1776        };
1777
1778        let body = prost::Message::encode_to_vec(&req);
1779
1780        // C++ CMD ID = 6032 (NN_ProtoCmd_Qot_Request_Right)
1781        // proto 文件名虽然叫 FTCmdQtaAuth6024,但实际 CMD ID 是 6032
1782        match backend.request(6032, body).await {
1783            Ok(frame) => {
1784                let body_bytes = frame.body.as_ref();
1785
1786                // CMD 6032 响应是裸 protobuf (已验证不需要 SRPC 解包)
1787                let rsp: qta_auth::QtaAuth6024Rsp = crate::handlers::decode_srpc_or_direct(
1788                    body_bytes,
1789                    |r: &qta_auth::QtaAuth6024Rsp| {
1790                        // 只要有任何权限字段就认为有效
1791                        r.hk_qut_got_auth.is_some()
1792                            || r.us_qut_got_auth.is_some()
1793                            || r.open_api_auth.is_some()
1794                    },
1795                );
1796
1797                // 提取 OpenAPI 额度
1798                let (sub_limit, kl_limit) = if let Some(api_auth) = &rsp.open_api_auth {
1799                    (api_auth.sub_limit, api_auth.history_k_line_limit)
1800                } else {
1801                    (None, None)
1802                };
1803
1804                // 提取 US 期货细分权限
1805                let us_future_detail = rsp.us_future_auth.as_ref().map(|f| {
1806                    (
1807                        f.open_api_cme_auth.unwrap_or(0),
1808                        f.open_api_cbot_auth.unwrap_or(0),
1809                        f.open_api_nymex_auth.unwrap_or(0),
1810                        f.open_api_comex_auth.unwrap_or(0),
1811                    )
1812                });
1813
1814                qot_right_cache.update_from_backend(
1815                    rsp.hk_qut_got_auth,
1816                    rsp.us_qut_got_auth,
1817                    rsp.cn_qut_got_auth,
1818                    rsp.shanghai_qut_auth,
1819                    rsp.shenzhen_qut_auth,
1820                    rsp.hk_option_auth,
1821                    rsp.hk_future_auth,
1822                    rsp.us_option_flag,
1823                    rsp.us_future_cme_cboe_auth,
1824                    us_future_detail,
1825                    rsp.sgx_future_auth,
1826                    rsp.jp_future_auth,
1827                    sub_limit,
1828                    kl_limit,
1829                );
1830
1831                tracing::info!(
1832                    hk_got = rsp.hk_qut_got_auth.unwrap_or(0),
1833                    us_got = rsp.us_qut_got_auth.unwrap_or(0),
1834                    cn_got = rsp.cn_qut_got_auth.unwrap_or(0),
1835                    sub_limit = sub_limit.unwrap_or(0),
1836                    kl_limit = kl_limit.unwrap_or(0),
1837                    "CMD6024 qot right updated"
1838                );
1839            }
1840            Err(e) => {
1841                tracing::warn!(error = %e, "CMD6024 request failed (using defaults)");
1842            }
1843        }
1844    }
1845
1846    /// 解析 SBIT_DEAL_STATISTICS(5) → 更新 BasicQot 缓存的成交量/额/OHLC
1847    fn parse_deal_statistics(
1848        data: &[u8],
1849        sec_key: &str,
1850        qot_cache: &futu_cache::qot_cache::QotCache,
1851    ) {
1852        use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1853
1854        let stats: ft_cmd_stock_quote_coverage_data::DealStatistics =
1855            match prost::Message::decode(data) {
1856                Ok(s) => s,
1857                Err(_) => return,
1858            };
1859
1860        // 价格放大 10^9, turnover 放大 10^3, turnover_rate 放大 10^5, amplitude 放大 10^5
1861        let open = stats.price_open.unwrap_or(0) as f64 / 1_000_000_000.0;
1862        let high = stats.price_highest.unwrap_or(0) as f64 / 1_000_000_000.0;
1863        let low = stats.price_lowest.unwrap_or(0) as f64 / 1_000_000_000.0;
1864        let volume = stats.volume.unwrap_or(0);
1865        let turnover = stats.turnover.unwrap_or(0) as f64 / 1_000.0;
1866        let turnover_rate = stats.ratio_turnover.unwrap_or(0) as f64 / 100_000.0;
1867        let amplitude = stats.amplitude_price.unwrap_or(0) as f64 / 100_000.0;
1868
1869        // 合并更新:读取已有缓存,只更新非零字段
1870        if let Some(mut existing) = qot_cache.basic_qot.get_mut(sec_key) {
1871            if open > 0.0 {
1872                existing.open_price = open;
1873            }
1874            if high > 0.0 {
1875                existing.high_price = high;
1876            }
1877            if low > 0.0 {
1878                existing.low_price = low;
1879            }
1880            if volume > 0 {
1881                existing.volume = volume;
1882            }
1883            if turnover > 0.0 {
1884                existing.turnover = turnover;
1885            }
1886            if turnover_rate > 0.0 {
1887                existing.turnover_rate = turnover_rate;
1888            }
1889            if amplitude > 0.0 {
1890                existing.amplitude = amplitude;
1891            }
1892        } else if volume > 0 || open > 0.0 {
1893            // 还没有 price 数据,先创建
1894            qot_cache.update_basic_qot(
1895                sec_key,
1896                futu_cache::qot_cache::CachedBasicQot {
1897                    cur_price: 0.0,
1898                    last_close_price: 0.0,
1899                    open_price: open,
1900                    high_price: high,
1901                    low_price: low,
1902                    volume,
1903                    turnover,
1904                    turnover_rate,
1905                    amplitude,
1906                    is_suspended: false,
1907                    update_time: String::new(),
1908                    update_timestamp: 0.0,
1909                },
1910            );
1911        }
1912    }
1913
1914    /// 解析 SBIT_STOCK_STATE(1) → 更新 BasicQot 停牌状态
1915    fn parse_stock_state(data: &[u8], sec_key: &str, qot_cache: &futu_cache::qot_cache::QotCache) {
1916        use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1917
1918        let state: ft_cmd_stock_quote_coverage_data::StockState = match prost::Message::decode(data)
1919        {
1920            Ok(s) => s,
1921            Err(_) => return,
1922        };
1923
1924        let state_type = state.state_type.unwrap_or(0);
1925        let is_suspended = state_type == 8; // STOCK_STATE_SUSPENDED = 8
1926
1927        if let Some(mut existing) = qot_cache.basic_qot.get_mut(sec_key) {
1928            existing.is_suspended = is_suspended;
1929        }
1930    }
1931
1932    /// 从缓存构建 Qot_UpdateBasicQot push event(供 bit 1/5 处理后触发推送)
1933    /// C++ 中每个 bit 处理完都会调用 UpdateRealTimeData 推送累积状态
1934    fn build_basic_qot_push_from_cache(
1935        sec_key: &str,
1936        sec_info: &futu_cache::static_data::CachedSecurityInfo,
1937        qot_cache: &futu_cache::qot_cache::QotCache,
1938    ) -> Option<PushEvent> {
1939        let cached = qot_cache.get_basic_qot(sec_key)?;
1940
1941        let security = futu_proto::qot_common::Security {
1942            market: sec_info.market,
1943            code: sec_info.code.clone(),
1944        };
1945        let basic_qot = futu_proto::qot_common::BasicQot {
1946            security,
1947            name: Some(sec_info.name.clone()),
1948            is_suspended: cached.is_suspended,
1949            list_time: sec_info.list_time.clone(),
1950            price_spread: 0.0,
1951            update_time: cached.update_time.clone(),
1952            high_price: cached.high_price,
1953            open_price: cached.open_price,
1954            low_price: cached.low_price,
1955            cur_price: cached.cur_price,
1956            last_close_price: cached.last_close_price,
1957            volume: cached.volume,
1958            turnover: cached.turnover,
1959            turnover_rate: cached.turnover_rate,
1960            amplitude: cached.amplitude,
1961            dark_status: None,
1962            option_ex_data: None,
1963            list_timestamp: None,
1964            update_timestamp: Some(cached.update_timestamp),
1965            pre_market: None,
1966            after_market: None,
1967            overnight: None,
1968            future_ex_data: None,
1969            sec_status: None,
1970            warrant_ex_data: None,
1971        };
1972
1973        let resp = futu_proto::qot_update_basic_qot::Response {
1974            ret_type: 0,
1975            ret_msg: None,
1976            err_code: None,
1977            s2c: Some(futu_proto::qot_update_basic_qot::S2c {
1978                basic_qot_list: vec![basic_qot],
1979            }),
1980        };
1981
1982        Some(PushEvent::QuotePush {
1983            sec_key: sec_key.to_string(),
1984            sub_type: 1, // SubType_Basic
1985            proto_id: futu_core::proto_id::QOT_UPDATE_BASIC_QOT,
1986            body: prost::Message::encode_to_vec(&resp),
1987        })
1988    }
1989
1990    /// 解析 SBIT_ORDER_BOOK(3) → 更新缓存 + 构建 Qot_UpdateOrderBook push
1991    fn parse_order_book_to_push(
1992        data: &[u8],
1993        sec_key: &str,
1994        sec_info: &futu_cache::static_data::CachedSecurityInfo,
1995        qot_cache: &futu_cache::qot_cache::QotCache,
1996    ) -> Option<PushEvent> {
1997        use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
1998
1999        let ob: ft_cmd_stock_quote_coverage_data::OrderBook = prost::Message::decode(data).ok()?;
2000
2001        let convert_levels =
2002            |items: &[ft_cmd_stock_quote_coverage_data::OrderBookItem]|
2003             -> Vec<futu_cache::qot_cache::CachedOrderBookLevel> {
2004                items
2005                    .iter()
2006                    .map(|item| futu_cache::qot_cache::CachedOrderBookLevel {
2007                        price: item.price.unwrap_or(0) as f64 / 1_000_000_000.0,
2008                        volume: item.volume.unwrap_or(0),
2009                        order_count: item.order_count.unwrap_or(0) as i32,
2010                    })
2011                    .collect()
2012            };
2013
2014        let bid_list = convert_levels(&ob.bid);
2015        let ask_list = convert_levels(&ob.ask);
2016
2017        let svr_recv_bid_ts = ob
2018            .server_recv_bid_from_exchange_time_ms
2019            .map(|t| t as f64 / 1000.0);
2020        let svr_recv_ask_ts = ob
2021            .server_recv_ask_from_exchange_time_ms
2022            .map(|t| t as f64 / 1000.0);
2023
2024        // 更新缓存
2025        qot_cache.order_books.insert(
2026            sec_key.to_string(),
2027            futu_cache::qot_cache::CachedOrderBook {
2028                bid_list: bid_list.clone(),
2029                ask_list: ask_list.clone(),
2030                svr_recv_time_bid: None,
2031                svr_recv_time_bid_timestamp: svr_recv_bid_ts,
2032                svr_recv_time_ask: None,
2033                svr_recv_time_ask_timestamp: svr_recv_ask_ts,
2034            },
2035        );
2036
2037        // 构建 FTAPI push
2038        let security = futu_proto::qot_common::Security {
2039            market: sec_info.market,
2040            code: sec_info.code.clone(),
2041        };
2042        let to_proto = |levels: &[futu_cache::qot_cache::CachedOrderBookLevel]|
2043         -> Vec<futu_proto::qot_common::OrderBook> {
2044            levels.iter().map(|l| futu_proto::qot_common::OrderBook {
2045                price: l.price,
2046                volume: l.volume,
2047                oreder_count: l.order_count,
2048                detail_list: vec![],
2049            }).collect()
2050        };
2051
2052        let resp = futu_proto::qot_update_order_book::Response {
2053            ret_type: 0,
2054            ret_msg: None,
2055            err_code: None,
2056            s2c: Some(futu_proto::qot_update_order_book::S2c {
2057                security,
2058                name: Some(sec_info.name.clone()),
2059                order_book_ask_list: to_proto(&ask_list),
2060                order_book_bid_list: to_proto(&bid_list),
2061                svr_recv_time_bid: None,
2062                svr_recv_time_bid_timestamp: svr_recv_bid_ts,
2063                svr_recv_time_ask: None,
2064                svr_recv_time_ask_timestamp: svr_recv_ask_ts,
2065            }),
2066        };
2067
2068        Some(PushEvent::QuotePush {
2069            sec_key: sec_key.to_string(),
2070            sub_type: 2, // SubType_OrderBook
2071            proto_id: futu_core::proto_id::QOT_UPDATE_ORDER_BOOK,
2072            body: prost::Message::encode_to_vec(&resp),
2073        })
2074    }
2075
2076    /// 解析 SBIT_HK_BROKER_QUEUE(9) → 更新缓存 + 构建 Qot_UpdateBroker push
2077    fn parse_broker_queue_to_push(
2078        data: &[u8],
2079        sec_key: &str,
2080        sec_info: &futu_cache::static_data::CachedSecurityInfo,
2081        qot_cache: &futu_cache::qot_cache::QotCache,
2082    ) -> Option<PushEvent> {
2083        use futu_backend::proto_internal::ft_cmd_stock_quote_coverage_data;
2084
2085        let bq: ft_cmd_stock_quote_coverage_data::HkBrokerQueue =
2086            prost::Message::decode(data).ok()?;
2087
2088        // 转换内部 HKBrokerQueueItem → CachedBrokerItem
2089        // 每个 rank 中的每个 broker_id 各生成一条 CachedBrokerItem
2090        let convert_side =
2091            |items: &[ft_cmd_stock_quote_coverage_data::HkBrokerQueueItem]|
2092             -> Vec<futu_cache::qot_cache::CachedBrokerItem> {
2093                let mut result = Vec::new();
2094                for item in items {
2095                    let pos = item.rank_id.unwrap_or(0);
2096                    for &bid in &item.broker_id {
2097                        result.push(futu_cache::qot_cache::CachedBrokerItem {
2098                            id: bid as i64,
2099                            name: format!("Broker#{bid}"),
2100                            pos,
2101                        });
2102                    }
2103                }
2104                result
2105            };
2106
2107        let bid_list = convert_side(&bq.bid);
2108        let ask_list = convert_side(&bq.ask);
2109
2110        // 更新缓存
2111        qot_cache.update_broker(
2112            sec_key,
2113            futu_cache::qot_cache::CachedBroker {
2114                bid_list: bid_list.clone(),
2115                ask_list: ask_list.clone(),
2116            },
2117        );
2118
2119        // 构建 FTAPI push
2120        let security = futu_proto::qot_common::Security {
2121            market: sec_info.market,
2122            code: sec_info.code.clone(),
2123        };
2124        let to_proto =
2125            |items: &[futu_cache::qot_cache::CachedBrokerItem]|
2126             -> Vec<futu_proto::qot_common::Broker> {
2127                items
2128                    .iter()
2129                    .map(|b| futu_proto::qot_common::Broker {
2130                        id: b.id,
2131                        name: b.name.clone(),
2132                        pos: b.pos,
2133                        order_id: None,
2134                        volume: None,
2135                    })
2136                    .collect()
2137            };
2138
2139        let resp = futu_proto::qot_update_broker::Response {
2140            ret_type: 0,
2141            ret_msg: None,
2142            err_code: None,
2143            s2c: Some(futu_proto::qot_update_broker::S2c {
2144                security,
2145                name: Some(sec_info.name.clone()),
2146                broker_ask_list: to_proto(&ask_list),
2147                broker_bid_list: to_proto(&bid_list),
2148            }),
2149        };
2150
2151        Some(PushEvent::QuotePush {
2152            sec_key: sec_key.to_string(),
2153            sub_type: 14, // SubType_Broker
2154            proto_id: futu_core::proto_id::QOT_UPDATE_BROKER,
2155            body: prost::Message::encode_to_vec(&resp),
2156        })
2157    }
2158
2159    /// 解析 SBIT_TICK(35) → 更新缓存 + 构建 Qot_UpdateTicker push
2160    fn parse_ticker_to_push(
2161        data: &[u8],
2162        sec_key: &str,
2163        sec_info: &futu_cache::static_data::CachedSecurityInfo,
2164        qot_cache: &futu_cache::qot_cache::QotCache,
2165    ) -> Option<PushEvent> {
2166        // Tick 在 FTCmdStockQuoteAccumulateData.proto 中定义
2167        use futu_backend::proto_internal::ft_cmd_stock_quote_accumulate_data;
2168
2169        let tick: ft_cmd_stock_quote_accumulate_data::Tick = prost::Message::decode(data).ok()?;
2170
2171        let mut ticker_list = Vec::new();
2172        let mut cached_tickers = Vec::new();
2173
2174        for item in &tick.items {
2175            let price = item.price.unwrap_or(0) as f64 / 1_000_000_000.0;
2176            let volume = item.volume.unwrap_or(0) as i64;
2177            let timestamp_ms = item.exchange_data_time_ms.unwrap_or(0);
2178            let timestamp = timestamp_ms as f64 / 1000.0;
2179            let sequence = item.tick_key.unwrap_or(0) as i64;
2180            // C++ TickItemType: SELL=1→dir=1(Bid), BUY=2→dir=2(Ask), NEUTRAL=3→dir=3
2181            let dir = item.r#type.unwrap_or(0);
2182            let turnover = price * volume as f64;
2183
2184            cached_tickers.push(futu_cache::qot_cache::CachedTicker {
2185                time: String::new(),
2186                sequence,
2187                dir,
2188                price,
2189                volume,
2190                turnover,
2191                recv_time: None,
2192                ticker_type: None,
2193                push_data_type: Some(1), // push
2194                timestamp: Some(timestamp),
2195            });
2196
2197            ticker_list.push(futu_proto::qot_common::Ticker {
2198                time: String::new(),
2199                sequence,
2200                dir,
2201                price,
2202                volume,
2203                turnover,
2204                recv_time: None,
2205                r#type: None,
2206                type_sign: None,
2207                push_data_type: Some(1),
2208                timestamp: Some(timestamp),
2209            });
2210        }
2211
2212        if ticker_list.is_empty() {
2213            return None;
2214        }
2215
2216        // 缓存 — 累加 (C++ 是 AddTickerList,不是替换)
2217        let max_cached = 1000;
2218        let mut entry = qot_cache.tickers.entry(sec_key.to_string()).or_default();
2219        entry.extend(cached_tickers);
2220        if entry.len() > max_cached {
2221            let drain_count = entry.len() - max_cached;
2222            entry.drain(0..drain_count);
2223        }
2224
2225        // 构建 FTAPI push
2226        let security = futu_proto::qot_common::Security {
2227            market: sec_info.market,
2228            code: sec_info.code.clone(),
2229        };
2230        let resp = futu_proto::qot_update_ticker::Response {
2231            ret_type: 0,
2232            ret_msg: None,
2233            err_code: None,
2234            s2c: Some(futu_proto::qot_update_ticker::S2c {
2235                security,
2236                name: Some(sec_info.name.clone()),
2237                ticker_list,
2238            }),
2239        };
2240
2241        Some(PushEvent::QuotePush {
2242            sec_key: sec_key.to_string(),
2243            sub_type: 4, // SubType_Ticker
2244            proto_id: futu_core::proto_id::QOT_UPDATE_TICKER,
2245            body: prost::Message::encode_to_vec(&resp),
2246        })
2247    }
2248
2249    /// SBIT 值 → FTAPI KLType 枚举
2250    fn sbit_to_kl_type(bit: u32) -> i32 {
2251        match bit {
2252            21 => 1,  // KLType_1Min
2253            22 => 10, // KLType_3Min
2254            23 => 6,  // KLType_5Min
2255            24 => 7,  // KLType_15Min
2256            25 => 8,  // KLType_30Min
2257            26 => 9,  // KLType_60Min
2258            27 => 2,  // KLType_Day
2259            28 => 3,  // KLType_Week
2260            29 => 4,  // KLType_Month
2261            30 => 11, // KLType_Quarter
2262            31 => 5,  // KLType_Year
2263            _ => 0,
2264        }
2265    }
2266
2267    /// 解析 SBIT_KLINE(21-31) → 构建 Qot_UpdateKL push
2268    fn parse_kline_to_push(
2269        data: &[u8],
2270        bit: u32,
2271        sec_key: &str,
2272        sec_info: &futu_cache::static_data::CachedSecurityInfo,
2273        qot_cache: &futu_cache::qot_cache::QotCache,
2274    ) -> Option<PushEvent> {
2275        use futu_backend::proto_internal::ft_cmd_stock_quote_accumulate_data;
2276
2277        let kline: ft_cmd_stock_quote_accumulate_data::Kline = prost::Message::decode(data).ok()?;
2278
2279        let mut kl_list = Vec::new();
2280        let mut cached_klines = Vec::new();
2281        let mut rehab_type = 0i32;
2282
2283        for point in &kline.point_list {
2284            rehab_type = point.exright_type.unwrap_or(0) as i32;
2285            if let Some(ref item) = point.item {
2286                let close = item.close_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2287                let open = item.open_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2288                let high = item.highest_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2289                let low = item.lowest_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2290                let last_close = item.last_close_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2291                let volume = item.volume.unwrap_or(0) as i64;
2292                let turnover = item.turnover.unwrap_or(0) as f64 / 1_000.0;
2293                let turnover_rate = item.turnover_rate.unwrap_or(0) as f64 / 100_000.0;
2294                let pe = item.pe.unwrap_or(0) as f64 / 1_000.0;
2295                let timestamp = item.time.unwrap_or(0) as f64;
2296
2297                let is_blank = close <= 0.0 && open <= 0.0;
2298
2299                kl_list.push(futu_proto::qot_common::KLine {
2300                    time: String::new(),
2301                    is_blank,
2302                    high_price: Some(high),
2303                    open_price: Some(open),
2304                    low_price: Some(low),
2305                    close_price: Some(close),
2306                    last_close_price: Some(last_close),
2307                    volume: Some(volume),
2308                    turnover: Some(turnover),
2309                    turnover_rate: Some(turnover_rate),
2310                    pe: Some(pe),
2311                    change_rate: if last_close > 0.0 {
2312                        Some((close - last_close) / last_close * 100.0)
2313                    } else {
2314                        None
2315                    },
2316                    timestamp: Some(timestamp),
2317                });
2318
2319                if !is_blank {
2320                    cached_klines.push(futu_cache::qot_cache::CachedKLine {
2321                        time: String::new(),
2322                        open_price: open,
2323                        high_price: high,
2324                        low_price: low,
2325                        close_price: close,
2326                        volume,
2327                        turnover,
2328                    });
2329                }
2330            }
2331        }
2332
2333        if kl_list.is_empty() {
2334            return None;
2335        }
2336
2337        // 缓存 K 线
2338        let kl_type = Self::sbit_to_kl_type(bit);
2339        let cache_key = format!("{sec_key}:{kl_type}");
2340        let mut entry = qot_cache.klines.entry(cache_key).or_default();
2341        entry.extend(cached_klines);
2342        // 限制缓存大小
2343        if entry.len() > 2000 {
2344            let drain = entry.len() - 2000;
2345            entry.drain(0..drain);
2346        }
2347
2348        let security = futu_proto::qot_common::Security {
2349            market: sec_info.market,
2350            code: sec_info.code.clone(),
2351        };
2352        let resp = futu_proto::qot_update_kl::Response {
2353            ret_type: 0,
2354            ret_msg: None,
2355            err_code: None,
2356            s2c: Some(futu_proto::qot_update_kl::S2c {
2357                rehab_type,
2358                kl_type,
2359                security,
2360                name: Some(sec_info.name.clone()),
2361                kl_list,
2362            }),
2363        };
2364
2365        // SubType 映射: KLType → SubType
2366        let sub_type = match kl_type {
2367            1 => 11,  // KL_1Min → SubType_KL_1Min
2368            2 => 6,   // KL_Day → SubType_KL_Day
2369            3 => 12,  // KL_Week
2370            4 => 13,  // KL_Month
2371            5 => 16,  // KL_Year
2372            6 => 7,   // KL_5Min
2373            7 => 8,   // KL_15Min
2374            8 => 9,   // KL_30Min
2375            9 => 10,  // KL_60Min
2376            10 => 17, // KL_3Min
2377            11 => 15, // KL_Quarter
2378            _ => 6,
2379        };
2380
2381        Some(PushEvent::QuotePush {
2382            sec_key: sec_key.to_string(),
2383            sub_type,
2384            proto_id: futu_core::proto_id::QOT_UPDATE_KL,
2385            body: prost::Message::encode_to_vec(&resp),
2386        })
2387    }
2388
2389    /// 解析 SBIT_TIME_SHARING(20) → 构建 Qot_UpdateRT push
2390    fn parse_rt_to_push(
2391        data: &[u8],
2392        sec_key: &str,
2393        sec_info: &futu_cache::static_data::CachedSecurityInfo,
2394        qot_cache: &futu_cache::qot_cache::QotCache,
2395    ) -> Option<PushEvent> {
2396        use futu_backend::proto_internal::ft_cmd_stock_quote_accumulate_data;
2397
2398        let ts: ft_cmd_stock_quote_accumulate_data::TimeSharingPlans =
2399            prost::Message::decode(data).ok()?;
2400
2401        let point = ts.point.as_ref()?;
2402        let price = point.close_price.unwrap_or(0) as f64 / 1_000_000_000.0;
2403        let volume = point.volume.unwrap_or(0) as i64;
2404        let turnover = point.turnover.unwrap_or(0) as f64 / 1_000.0;
2405        let timestamp = point.time.unwrap_or(0) as f64;
2406
2407        if price <= 0.0 {
2408            return None;
2409        }
2410
2411        // 缓存分时数据 — 累加模式
2412        let mut entry = qot_cache.rt_data.entry(sec_key.to_string()).or_default();
2413        entry.push(futu_cache::qot_cache::CachedTimeShare {
2414            time: String::new(),
2415            minute: 0,
2416            price,
2417            last_close_price: 0.0,
2418            avg_price: 0.0,
2419            volume,
2420            turnover,
2421            timestamp,
2422        });
2423        if entry.len() > 500 {
2424            let drain = entry.len() - 500;
2425            entry.drain(0..drain);
2426        }
2427
2428        let security = futu_proto::qot_common::Security {
2429            market: sec_info.market,
2430            code: sec_info.code.clone(),
2431        };
2432
2433        let rt = futu_proto::qot_common::TimeShare {
2434            time: String::new(),
2435            minute: 0,
2436            is_blank: false,
2437            price: Some(price),
2438            last_close_price: None,
2439            avg_price: None,
2440            volume: Some(volume),
2441            turnover: Some(turnover),
2442            timestamp: Some(timestamp),
2443        };
2444
2445        let resp = futu_proto::qot_update_rt::Response {
2446            ret_type: 0,
2447            ret_msg: None,
2448            err_code: None,
2449            s2c: Some(futu_proto::qot_update_rt::S2c {
2450                security,
2451                name: Some(sec_info.name.clone()),
2452                rt_list: vec![rt],
2453            }),
2454        };
2455
2456        Some(PushEvent::QuotePush {
2457            sec_key: sec_key.to_string(),
2458            sub_type: 5, // SubType_RT
2459            proto_id: futu_core::proto_id::QOT_UPDATE_RT,
2460            body: prost::Message::encode_to_vec(&resp),
2461        })
2462    }
2463
2464    /// C++ IsValidTrdMarket for Real env
2465    fn is_valid_real_trd_market(market: i32) -> bool {
2466        matches!(market, 1 | 2 | 4 | 5 | 6 | 8 | 15 | 111 | 112)
2467        // HK, US, HKCC, Futures, SG, AU, JP, MY, CA
2468    }
2469
2470    /// broker_id → SecurityFirm 枚举 (C++ SecurityFirm_NNToAPI)
2471    fn broker_to_security_firm(broker_id: u32) -> i32 {
2472        match broker_id {
2473            1001 => 1, // NN_BrokerID_Futu → FutuSecurities
2474            1007 => 2, // NN_BrokerID_FutuUS (MooMoo) → FutuInc
2475            1008 => 3, // NN_BrokerID_FutuSG → FutuSG
2476            1009 => 4, // NN_BrokerID_FutuAU → FutuAU
2477            1019 => 5, // NN_BrokerID_FutuCA → FutuCA
2478            1017 => 6, // NN_BrokerID_FutuMY → FutuMY
2479            1012 => 7, // NN_BrokerID_FutuJP → FutuJP
2480            _ => 0,    // Unknown
2481        }
2482    }
2483
2484    /// 从 Account proto 获取 acc_type (C++ GetAccTypeFromAccItem)
2485    fn get_acc_type(acc: &futu_backend::proto_internal::ft_usr_trd_acc::Account) -> Option<i32> {
2486        // Priority 1: IRA type (CA)
2487        if let Some(ira) = acc.ira_type {
2488            match ira {
2489                1 => return Some(3), // CA_TFSA → TrdAccType_TFSA
2490                2 => return Some(4), // CA_RRSP → TrdAccType_RRSP
2491                3 => return Some(5), // CA_SRRSP → TrdAccType_SRRSP
2492                _ => {}
2493            }
2494        }
2495        // Priority 2: Kouza type (JP)
2496        if let Some(kouza) = acc.kouza_type {
2497            match kouza {
2498                1 => return Some(1), // Cash → TrdAccType_Cash
2499                2 => return Some(2), // Margin → TrdAccType_Margin
2500                3 => return Some(6), // Derivative → TrdAccType_Derivatives
2501                _ => {}
2502            }
2503        }
2504        // Priority 3: General type (C++ does type() + 1 unconditionally, default CASH=0→1)
2505        Some(acc.r#type.unwrap_or(0) + 1) // CASH=0→1, MARGIN=1→2
2506    }
2507
2508    /// 从 Account proto 获取 acc_role (C++ GetAccRoleFromAccItem)
2509    fn get_acc_role(acc: &futu_backend::proto_internal::ft_usr_trd_acc::Account) -> Option<i32> {
2510        // IPO check (MY broker)
2511        if acc.is_esop.unwrap_or(false) {
2512            return Some(3); // TrdAccRole_IPO
2513        }
2514        // Role mapping: ORDINARY=0→Normal(1), MASTER=1→Master(2)
2515        // C++ NN_AccRole: Ordinary=0, Master=1
2516        // FTAPI TrdAccRole: Unknown=0, Normal=1, Master=2, IPO=3
2517        // 从 UniversalAccount.role 或 Account 层面获取
2518        Some(1) // Default: Normal
2519    }
2520
2521    /// 从 Account proto 提取 CachedTrdAcc
2522    fn account_to_cached(
2523        acc: &futu_backend::proto_internal::ft_usr_trd_acc::Account,
2524        uni_card_num: Option<&str>,
2525        uni_role: Option<u32>,
2526    ) -> Option<CachedTrdAcc> {
2527        let acc_id = acc.id?;
2528        let broker_id = acc.broker.as_ref().map(|b| b.id).unwrap_or(0);
2529
2530        let acc_type = Self::get_acc_type(acc);
2531        let card_num = acc.card_number.as_ref().filter(|s| !s.is_empty()).cloned();
2532        let security_firm = Self::broker_to_security_firm(broker_id);
2533        // acc_status: state OPENED=1 → Active(0), 其他 → Disabled(1)
2534        let acc_status = acc.state.map(|s| if s == 1 { 0 } else { 1 });
2535
2536        // acc_role: 从 UniversalAccount.role 或 Account 自身推断
2537        let acc_role = match uni_role {
2538            Some(1) => Some(2), // MASTER
2539            _ => Self::get_acc_role(acc),
2540        };
2541
2542        // enable_market 数组 → trd_market_auth_list
2543        // C++ SubAccToAccItem: ConvS2C + IsValidTrdMarket + Fund 展开
2544        let mut trd_market_auth_list: Vec<i32> = Vec::new();
2545        let add_unique = |list: &mut Vec<i32>, val: i32| {
2546            if !list.contains(&val) {
2547                list.push(val);
2548            }
2549        };
2550        for &m in &acc.enable_market {
2551            // C++ NN_TrdMarket_ConvS2C for real env
2552            // 后端 enable_market 值 → 客户端 TrdMarket 值
2553            let market = match m as i32 {
2554                11 => 111, // 后端 11 → NN_TrdMarket_MY(111)
2555                12 => 112, // 后端 12 → NN_TrdMarket_CA(112)
2556                13 => 113, // 后端 13 → NN_TrdMarket_HK_Fund(113)
2557                14 => 114, // 后端 14 → NN_TrdMarket_Fund(114)
2558                15 => 15,  // 后端 15 → NN_TrdMarket_JP(15)
2559                23 => 123, // 后端 23 → NN_TrdMarket_US_Fund(123)
2560                24 => 124, // 后端 24 → NN_TrdMarket_SG_Fund(124)
2561                other => other,
2562            };
2563
2564            if market == 114 {
2565                // NN_TrdMarket_Fund → 按券商展开
2566                match broker_id {
2567                    1001 => {
2568                        add_unique(&mut trd_market_auth_list, 113);
2569                        add_unique(&mut trd_market_auth_list, 123);
2570                    }
2571                    1008 => {
2572                        add_unique(&mut trd_market_auth_list, 124);
2573                        add_unique(&mut trd_market_auth_list, 123);
2574                    }
2575                    1017 => {
2576                        add_unique(&mut trd_market_auth_list, 125);
2577                        add_unique(&mut trd_market_auth_list, 123);
2578                    }
2579                    1012 => {
2580                        add_unique(&mut trd_market_auth_list, 126);
2581                        add_unique(&mut trd_market_auth_list, 123);
2582                    }
2583                    _ => {
2584                        add_unique(&mut trd_market_auth_list, 113);
2585                        add_unique(&mut trd_market_auth_list, 123);
2586                    }
2587                }
2588            } else if Self::is_valid_real_trd_market(market)
2589                || market == 113
2590                || market == 123
2591                || market == 124
2592                || market == 125
2593                || market == 126
2594            {
2595                // C++ IsValidTrdMarket for Real: HK(1), US(2), HKCC(4), Futures(5), SG(6), AU(8), JP(15), MY(111), CA(112)
2596                // 或 Fund 子市场
2597                add_unique(&mut trd_market_auth_list, market);
2598            }
2599        }
2600
2601        // JP sub-account types (SubAccount.type)
2602        let jp_acc_type: Vec<i32> = acc
2603            .sub_accounts
2604            .iter()
2605            .filter_map(|sub| sub.r#type)
2606            .map(|t| t as i32)
2607            .collect();
2608
2609        // C++ 排序 key: (BrokerID << 48) | (TrdMkt << 32) | IntraAccID
2610        let backend_market = acc.market.unwrap_or(0) as u64;
2611        let intra_acc_id = acc.acc_id.unwrap_or(0) as u64;
2612        let sort_key = ((broker_id as u64) << 48) | (backend_market << 32) | intra_acc_id;
2613
2614        Some(CachedTrdAcc {
2615            acc_id,
2616            trd_env: 1, // TrdEnv_Real
2617            trd_market_auth_list,
2618            acc_type,
2619            card_num,
2620            security_firm: Some(security_firm),
2621            sim_acc_type: None,
2622            uni_card_num: uni_card_num.map(|s| s.to_string()),
2623            acc_status,
2624            acc_role,
2625            jp_acc_type,
2626            // 审计补全字段
2627            owner_uid: acc.owner_uid,
2628            opr_uid: acc.opr_uid,
2629            mixed_state: acc.mixed_state,
2630            ira_type: acc.ira_type.map(|t| t as i32),
2631            grant_state: acc.grant_state,
2632            kouza_type: acc.kouza_type.map(|k| k as i32),
2633            trd_market: acc.market.map(|m| m as i32),
2634            association_acc_id: None, // 后续从 fund_acc_list 填充
2635            acc_flag: None,           // 后续从 universal account 标记
2636            order_index: 0,           // set_accounts 会重新赋值
2637            sort_key,
2638        })
2639    }
2640
2641    /// 从后端拉取真实账户列表 (CMD 2282 + CMD 2298)
2642    async fn fetch_real_accounts(&self, backend: &BackendConn) -> Result<Vec<CachedTrdAcc>> {
2643        use futu_backend::proto_internal::ft_cmd2282;
2644        use prost::Message;
2645
2646        let mut all_accounts = Vec::new();
2647
2648        // CMD 2282: 自有账户
2649        let req = ft_cmd2282::Request {
2650            page: Some(0),
2651            need_close: Some(true),
2652            roa_cid: None,
2653        };
2654        // CMD 2282 用裸 protobuf (SendTCPProto_ProtoBuf_WithAccID)
2655        let resp = backend.request(2282, req.encode_to_vec()).await?;
2656        let parsed: ft_cmd2282::Response = Message::decode(resp.body.as_ref())
2657            .map_err(|e| FutuError::Codec(format!("CMD2282 decode failed: {e}")))?;
2658
2659        if parsed.result.unwrap_or(-1) != 0 {
2660            let err = parsed.err_text.as_deref().unwrap_or("unknown");
2661            return Err(FutuError::Codec(format!("CMD2282 error: {err}")));
2662        }
2663
2664        // 从 universal_account_list 提取 (综合账户: 证券+期货子账户)
2665        for uni in &parsed.universal_account_list {
2666            let uni_card = uni.card_number.as_deref();
2667            let uni_role = uni.role;
2668            // 证券子账户
2669            if let Some(sec_acc) = &uni.sec_account {
2670                if let Some(cached) = Self::account_to_cached(sec_acc, uni_card, uni_role) {
2671                    all_accounts.push(cached);
2672                }
2673            }
2674            // 期货子账户
2675            if let Some(fut_acc) = &uni.futures_account {
2676                if let Some(cached) = Self::account_to_cached(fut_acc, uni_card, uni_role) {
2677                    all_accounts.push(cached);
2678                }
2679            }
2680            // 加密货币子账户
2681            if let Some(crypto_acc) = &uni.crypto_account {
2682                if let Some(cached) = Self::account_to_cached(crypto_acc, uni_card, uni_role) {
2683                    all_accounts.push(cached);
2684                }
2685            }
2686            // 外汇子账户
2687            if let Some(forex_acc) = &uni.forex_account {
2688                if let Some(cached) = Self::account_to_cached(forex_acc, uni_card, uni_role) {
2689                    all_accounts.push(cached);
2690                }
2691            }
2692        }
2693
2694        // 从 account_list 提取 (单市场独立账户)
2695        for acc in &parsed.account_list {
2696            if let Some(cached) = Self::account_to_cached(acc, None, None) {
2697                all_accounts.push(cached);
2698            }
2699        }
2700
2701        // 处理 fund_acc_list: 建立基金账户 ↔ 证券账户的关联
2702        for fund in &parsed.fund_acc_list {
2703            let fund_id = fund.fund_acc_id.as_ref().and_then(|f| f.id);
2704            let assoc_id = fund.association_acc_id.as_ref().and_then(|a| a.id);
2705            if let (Some(fid), Some(aid)) = (fund_id, assoc_id) {
2706                // 在已有账户中找到 fund_acc_id 匹配的,设置关联
2707                for acc in all_accounts.iter_mut() {
2708                    if acc.acc_id == fid {
2709                        acc.association_acc_id = Some(aid);
2710                    }
2711                }
2712            }
2713        }
2714
2715        // 标记综合账户子账户的 acc_flag
2716        // uni_card_num 非空的就是综合账户下的子账户
2717        for acc in all_accounts.iter_mut() {
2718            if acc.uni_card_num.is_some() {
2719                acc.acc_flag = Some(1);
2720            }
2721        }
2722
2723        tracing::info!(count = all_accounts.len(), "CMD2282 accounts fetched");
2724
2725        // CMD 2298: 授权账户 — C++ 只对 Futu(1001) 和 FutuSG(1008) 查询
2726        // 检查是否有 Futu/FutuSG 账户来决定是否查询 2298
2727        let has_futu_or_sg = all_accounts.iter().any(|a| {
2728            matches!(a.security_firm, Some(1) | Some(2) | Some(3)) // FutuSec/FutuInc/FutuSG
2729        });
2730        if has_futu_or_sg {
2731            self.fetch_granted_accounts(backend, &mut all_accounts)
2732                .await;
2733        }
2734
2735        tracing::info!(
2736            count = all_accounts.len(),
2737            "real accounts fetched (2282+2298)"
2738        );
2739        Ok(all_accounts)
2740    }
2741
2742    /// CMD 2298: 拉取授权账户
2743    async fn fetch_granted_accounts(
2744        &self,
2745        backend: &BackendConn,
2746        accounts: &mut Vec<CachedTrdAcc>,
2747    ) {
2748        use futu_backend::proto_internal::ft_cmd2298;
2749        use prost::Message;
2750
2751        let req = ft_cmd2298::Request { page: Some(0) };
2752        let resp = match backend.request(2298, req.encode_to_vec()).await {
2753            Ok(r) => r,
2754            Err(e) => {
2755                tracing::debug!(error = %e, "CMD2298 query skipped");
2756                return;
2757            }
2758        };
2759
2760        let parsed: ft_cmd2298::Response = match Message::decode(resp.body.as_ref()) {
2761            Ok(p) => p,
2762            Err(e) => {
2763                tracing::debug!(error = %e, "CMD2298 decode failed");
2764                return;
2765            }
2766        };
2767
2768        if parsed.result.unwrap_or(-1) != 0 {
2769            return;
2770        }
2771
2772        let mut count = 0;
2773        for uni in &parsed.authorized_account_list {
2774            let uni_card = uni.card_number.as_deref();
2775            let uni_role = uni.role;
2776            if let Some(sec_acc) = &uni.sec_account {
2777                if let Some(cached) = Self::account_to_cached(sec_acc, uni_card, uni_role) {
2778                    accounts.push(cached);
2779                    count += 1;
2780                }
2781            }
2782            if let Some(fut_acc) = &uni.futures_account {
2783                if let Some(cached) = Self::account_to_cached(fut_acc, uni_card, uni_role) {
2784                    accounts.push(cached);
2785                    count += 1;
2786                }
2787            }
2788        }
2789
2790        if count > 0 {
2791            tracing::info!(count, "CMD2298 granted accounts added");
2792        }
2793    }
2794
2795    /// 从后端拉取模拟账户列表 (Proto 14800)
2796    async fn fetch_sim_accounts(&self, backend: &BackendConn) -> Result<Vec<CachedTrdAcc>> {
2797        use futu_backend::proto_internal::{
2798            sim_odr_sys_cmn, user_account_info_manager_service as uaims,
2799        };
2800        use prost::Message;
2801
2802        let user_id = backend.user_id.load(std::sync::atomic::Ordering::Relaxed) as u64;
2803
2804        let req = uaims::AccountListReq {
2805            msg_header: Some(sim_odr_sys_cmn::MsgHeader {
2806                req_id: Some(format!("{user_id}_sim")),
2807                account_id: Some(0),
2808                cipher: None,
2809                security_type: None,
2810                exchange_code: None,
2811                input_source: None,
2812                market: None,
2813            }),
2814            nn_uid: Some(user_id),
2815        };
2816
2817        // Proto 14800 不用 OMBinSrz 包装,直接发 protobuf
2818        let resp = match backend.request(14800, req.encode_to_vec()).await {
2819            Ok(r) => r,
2820            Err(e) => {
2821                tracing::warn!(error = %e, "sim account query failed, skipping");
2822                return Ok(vec![]);
2823            }
2824        };
2825
2826        tracing::info!(
2827            body_len = resp.body.len(),
2828            body_hex = ?&resp.body[..resp.body.len().min(64)],
2829            "proto14800 response"
2830        );
2831        // Proto 14800 不用 OMBinSrz,直接解析 protobuf
2832        let parsed: uaims::AccountListRsp = Message::decode(resp.body.as_ref()).map_err(|e| {
2833            FutuError::Codec(format!(
2834                "Proto14800 decode failed (body_len={}): {e}",
2835                resp.body.len()
2836            ))
2837        })?;
2838
2839        if parsed.result.unwrap_or(-1) != 0 {
2840            let err_msg = parsed.err_msg.as_deref().unwrap_or("unknown");
2841            tracing::warn!(
2842                result = parsed.result,
2843                err = err_msg,
2844                "sim account query error"
2845            );
2846            return Ok(vec![]);
2847        }
2848
2849        // 模拟账户: market_id 已是 TrdMarket 值
2850        // C++ 特殊映射: Sim_HK_Option → HK, Sim_US_Option/Margin → US
2851        let accounts: Vec<CachedTrdAcc> = parsed
2852            .accouts // proto 中的拼写错误 (accouts)
2853            .iter()
2854            .filter_map(|acc| {
2855                let acc_id = acc.account_id?;
2856                let raw_market = acc.market_id.unwrap_or(0) as i32;
2857
2858                // 特殊市场映射 (C++ APIServer_Trd_GetAccList.cpp:138-150)
2859                // Sim_HK_Option(9) → HK(1), Sim_US_Option(16)/Sim_US_Margin → US(2)
2860                let trd_market = match raw_market {
2861                    9 => 1,  // HK option → TrdMarket_HK
2862                    16 => 2, // US option → TrdMarket_US
2863                    _ => raw_market,
2864                };
2865
2866                // 从 market_id 推断 sim_acc_type 和 acc_type
2867                // C++ NNProto_Trd_SimAccList::UnPackAccList
2868                let (sim_acc_type, acc_type) = match raw_market {
2869                    9 | 16 => (Some(2), Some(2)), // 期权 → SimType_Option, AccType_Margin
2870                    _ if raw_market == 100 => (Some(3), Some(2)), // 期货 → SimType_Futures, AccType_Margin
2871                    _ if raw_market == 2 => (Some(1), Some(2)), // US 模拟 → SimType_Stock, AccType_Margin (卖空支持)
2872                    _ => (Some(1), Some(1)), // 其他股票 → SimType_Stock, AccType_Cash
2873                };
2874
2875                // C++ sim: enBrokerID=NN_BrokerID_Unknown(0), nIntraAccID=0
2876                let sort_key = (raw_market as u64) << 32;
2877
2878                Some(CachedTrdAcc {
2879                    acc_id,
2880                    trd_env: 0, // TrdEnv_Simulate
2881                    trd_market_auth_list: vec![trd_market],
2882                    acc_type,
2883                    card_num: None,
2884                    security_firm: Some(0), // SecurityFirm_Unknown
2885                    sim_acc_type,
2886                    uni_card_num: None,
2887                    acc_status: Some(0), // Active
2888                    acc_role: None,
2889                    jp_acc_type: vec![],
2890                    owner_uid: None,
2891                    opr_uid: None,
2892                    mixed_state: None,
2893                    ira_type: None,
2894                    grant_state: None,
2895                    kouza_type: None,
2896                    trd_market: Some(raw_market),
2897                    association_acc_id: None,
2898                    acc_flag: None,
2899                    order_index: 0, // set_accounts 会重新赋值
2900                    sort_key,
2901                })
2902            })
2903            .collect();
2904
2905        tracing::info!(count = accounts.len(), "sim accounts fetched");
2906        Ok(accounts)
2907    }
2908
2909    /// 拉取所有账户(真实 + 模拟)并填充缓存
2910    async fn fetch_account_list(&self, backend: &BackendConn) -> Result<()> {
2911        let mut all = self.fetch_real_accounts(backend).await?;
2912        let sim = self.fetch_sim_accounts(backend).await?;
2913        all.extend(sim);
2914
2915        tracing::info!(
2916            total = all.len(),
2917            accounts = ?all.iter().map(|a| (a.acc_id, a.trd_env, &a.trd_market_auth_list)).collect::<Vec<_>>(),
2918            "all accounts cached"
2919        );
2920        self.trd_cache.set_accounts(all);
2921        Ok(())
2922    }
2923
2924    /// 注册所有业务处理器
2925    pub fn register_handlers(&self, server: &ApiServer) {
2926        let router = server.router();
2927        handlers::qot::register_handlers(router, self);
2928        handlers::trd::register_handlers(router, self);
2929        handlers::sys::register_handlers(router, self);
2930        tracing::info!("all business handlers registered");
2931    }
2932
2933    /// 创建推送分发器
2934    pub fn create_push_dispatcher(
2935        &self,
2936        server: &ApiServer,
2937        external_sinks: Vec<Arc<dyn futu_server::push::ExternalPushSink>>,
2938    ) -> PushDispatcher {
2939        let mut dispatcher = PushDispatcher::new(
2940            Arc::clone(server.connections()),
2941            Arc::clone(&self.subscriptions),
2942        )
2943        .with_metrics(Arc::clone(&self.metrics));
2944        for sink in external_sinks {
2945            dispatcher = dispatcher.with_external_sink(sink);
2946        }
2947        dispatcher
2948    }
2949
2950    /// 启动推送分发任务: 消费 push_rx channel,处理行情推送和交易重查
2951    ///
2952    /// `external_sinks`: 外部推送接收器列表 (REST WebSocket, gRPC 等)
2953    pub fn start_push_dispatcher(
2954        &self,
2955        server: &ApiServer,
2956        mut push_rx: tokio::sync::mpsc::Receiver<PushEvent>,
2957        external_sinks: Vec<Arc<dyn futu_server::push::ExternalPushSink>>,
2958    ) {
2959        let dispatcher = self.create_push_dispatcher(server, external_sinks);
2960        let shared_backend = Arc::clone(&self.backend);
2961        let trd_cache = Arc::clone(&self.trd_cache);
2962        tokio::spawn(async move {
2963            tracing::info!("push dispatcher task started");
2964            while let Some(event) = push_rx.recv().await {
2965                match event {
2966                    PushEvent::QuotePush {
2967                        sec_key,
2968                        sub_type,
2969                        proto_id,
2970                        body,
2971                    } => {
2972                        dispatcher
2973                            .push_qot(&sec_key, sub_type, proto_id, body)
2974                            .await;
2975                    }
2976                    PushEvent::TradeReQuery {
2977                        acc_id,
2978                        notice_type,
2979                        order_ids,
2980                        order_fill_ids,
2981                    } => {
2982                        // 收到交易通知后重查对应数据 + 推送给订阅客户端
2983                        let be_opt = crate::handlers::load_backend(&shared_backend);
2984                        if let Some(ref be) = be_opt {
2985                            match notice_type {
2986                                1 | 11 | 12 => {
2987                                    // CASH_UPDATE / ASSET_UPDATE / QUOTE_UPDATE → 重查资金
2988                                    let _ = futu_backend::trade_query::query_account_info(
2989                                        be, acc_id, &trd_cache,
2990                                    )
2991                                    .await;
2992                                }
2993                                2 | 3 => {
2994                                    // PSTN_UPDATE / PSTN_LIST_UPDATE → 重查持仓+资金
2995                                    let _ = futu_backend::trade_query::query_account_info(
2996                                        be, acc_id, &trd_cache,
2997                                    )
2998                                    .await;
2999                                }
3000                                4 | 5 | 9 => {
3001                                    // ORDER_UPDATE / ORDER_LIST_UPDATE / ORDER_OP_RESULT
3002                                    // → 重查订单 + 资金 + 持仓,推送 Trd_UpdateOrder
3003                                    if let Ok(orders) = futu_backend::trade_query::query_orders(
3004                                        be, acc_id, &trd_cache,
3005                                    )
3006                                    .await
3007                                    {
3008                                        // 推送变动的订单 (如有 order_ids 则只推对应订单,否则推全部)
3009                                        for order in &orders {
3010                                            let should_push = order_ids.is_empty()
3011                                                || order_ids
3012                                                    .iter()
3013                                                    .any(|id| id == &order.order_id_ex);
3014                                            if should_push {
3015                                                let push_body =
3016                                                    Self::build_order_update_push(acc_id, order);
3017                                                dispatcher
3018                                                    .push_trd_acc(
3019                                                        acc_id,
3020                                                        futu_core::proto_id::TRD_UPDATE_ORDER,
3021                                                        push_body,
3022                                                    )
3023                                                    .await;
3024                                            }
3025                                        }
3026                                    }
3027                                    let _ = futu_backend::trade_query::query_account_info(
3028                                        be, acc_id, &trd_cache,
3029                                    )
3030                                    .await;
3031                                    // query_account_info already fetched positions too
3032                                }
3033                                6 | 7 => {
3034                                    // ORDER_FILL_UPDATE / ORDER_FILL_LIST_UPDATE
3035                                    // → 查成交列表 + 推送 Trd_UpdateOrderFill
3036                                    if let Ok(fills) =
3037                                        futu_backend::trade_query::query_order_fills(be, acc_id)
3038                                            .await
3039                                    {
3040                                        for fill in &fills {
3041                                            let should_push = order_fill_ids.is_empty()
3042                                                || order_fill_ids
3043                                                    .iter()
3044                                                    .any(|id| id == &fill.fill_id_ex);
3045                                            if should_push {
3046                                                let push_body = Self::build_order_fill_update_push(
3047                                                    acc_id, fill,
3048                                                );
3049                                                dispatcher
3050                                                    .push_trd_acc(
3051                                                        acc_id,
3052                                                        futu_core::proto_id::TRD_UPDATE_ORDER_FILL,
3053                                                        push_body,
3054                                                    )
3055                                                    .await;
3056                                            }
3057                                        }
3058                                    }
3059                                    // 成交变动通常伴随资金/持仓变化
3060                                    let _ = futu_backend::trade_query::query_account_info(
3061                                        be, acc_id, &trd_cache,
3062                                    )
3063                                    .await;
3064                                    // query_account_info already fetched positions too
3065                                }
3066                                _ => {}
3067                            }
3068                        }
3069                    }
3070                    PushEvent::BroadcastPush { proto_id, body } => {
3071                        // 广播推送: 发送给所有已注册推送通知的客户端
3072                        dispatcher.push_broadcast(proto_id, body).await;
3073                    }
3074                }
3075            }
3076            tracing::warn!("push dispatcher task ended");
3077        });
3078    }
3079
3080    /// 构建 Trd_UpdateOrder 推送 body (C++ APIServer_Trd_UpdateOrder 对齐)
3081    fn build_order_update_push(acc_id: u64, order: &futu_cache::trd_cache::CachedOrder) -> Vec<u8> {
3082        let header = futu_proto::trd_common::TrdHeader {
3083            trd_env: 1,
3084            acc_id,
3085            trd_market: order.trd_market.unwrap_or(1),
3086            jp_acc_type: None,
3087        };
3088        let api_order = futu_proto::trd_common::Order {
3089            trd_side: order.trd_side,
3090            order_type: order.order_type,
3091            order_status: order.order_status,
3092            order_id: order.order_id,
3093            order_id_ex: order.order_id_ex.clone(),
3094            code: order.code.clone(),
3095            name: order.name.clone(),
3096            qty: order.qty,
3097            price: Some(order.price),
3098            create_time: order.create_time.clone(),
3099            update_time: order.update_time.clone(),
3100            fill_qty: Some(order.fill_qty),
3101            fill_avg_price: Some(order.fill_avg_price),
3102            last_err_msg: order.last_err_msg.clone(),
3103            sec_market: order.sec_market,
3104            create_timestamp: order.create_timestamp,
3105            update_timestamp: order.update_timestamp,
3106            remark: order.remark.clone(),
3107            time_in_force: order.time_in_force,
3108            fill_outside_rth: order.fill_outside_rth,
3109            aux_price: order.aux_price,
3110            trail_type: order.trail_type,
3111            trail_value: order.trail_value,
3112            trail_spread: order.trail_spread,
3113            currency: order.currency,
3114            trd_market: order.trd_market,
3115            session: None,
3116            jp_acc_type: None,
3117        };
3118        let resp = futu_proto::trd_update_order::Response {
3119            ret_type: 0,
3120            ret_msg: None,
3121            err_code: None,
3122            s2c: Some(futu_proto::trd_update_order::S2c {
3123                header,
3124                order: api_order,
3125            }),
3126        };
3127        prost::Message::encode_to_vec(&resp)
3128    }
3129
3130    /// 构建 Trd_UpdateOrderFill 推送 body (C++ APIServer_Trd_UpdateOrderFill 对齐)
3131    fn build_order_fill_update_push(
3132        acc_id: u64,
3133        fill: &futu_backend::trade_query::OrderFillInfo,
3134    ) -> Vec<u8> {
3135        let header = futu_proto::trd_common::TrdHeader {
3136            trd_env: 1,
3137            acc_id,
3138            trd_market: fill.trd_market.unwrap_or(1),
3139            jp_acc_type: None,
3140        };
3141        let api_fill = futu_proto::trd_common::OrderFill {
3142            trd_side: fill.trd_side,
3143            fill_id: fill.fill_id,
3144            fill_id_ex: fill.fill_id_ex.clone(),
3145            order_id: Some(fill.order_id),
3146            order_id_ex: Some(fill.order_id_ex.clone()),
3147            code: fill.code.clone(),
3148            name: fill.name.clone(),
3149            qty: fill.qty,
3150            price: fill.price,
3151            create_time: String::new(),
3152            counter_broker_id: fill.counter_broker_id,
3153            counter_broker_name: None,
3154            sec_market: None,
3155            create_timestamp: fill.create_timestamp,
3156            update_timestamp: fill.update_timestamp,
3157            status: fill.status,
3158            trd_market: fill.trd_market,
3159            jp_acc_type: None,
3160        };
3161        let resp = futu_proto::trd_update_order_fill::Response {
3162            ret_type: 0,
3163            ret_msg: None,
3164            err_code: None,
3165            s2c: Some(futu_proto::trd_update_order_fill::S2c {
3166                header,
3167                order_fill: api_fill,
3168            }),
3169        };
3170        prost::Message::encode_to_vec(&resp)
3171    }
3172}
3173
3174impl Default for GatewayBridge {
3175    fn default() -> Self {
3176        Self::new()
3177    }
3178}
3179
3180/// 重连后重新订阅行情
3181///
3182/// 遍历 SubscriptionManager 中所有客户端的行情订阅,
3183/// 聚合后发送 CMD 6211 到新的后端连接。
3184/// 返回重新订阅的证券数量。
3185async fn resubscribe_quotes(
3186    backend: &BackendConn,
3187    subscriptions: &SubscriptionManager,
3188    static_cache: &StaticDataCache,
3189) -> usize {
3190    use std::collections::{HashMap, HashSet};
3191
3192    // 收集所有唯一的 (security_key, sub_type) 对
3193    let all_conn_ids = subscriptions.get_all_qot_conn_ids();
3194    let mut unique_subs: HashMap<String, HashSet<i32>> = HashMap::new();
3195
3196    for conn_id in &all_conn_ids {
3197        let conn_subs = subscriptions.get_conn_qot_subs(*conn_id);
3198        for (sub_type, sec_keys) in conn_subs {
3199            for sec_key in sec_keys {
3200                unique_subs.entry(sec_key).or_default().insert(sub_type);
3201            }
3202        }
3203    }
3204
3205    if unique_subs.is_empty() {
3206        tracing::debug!("no quote subscriptions to re-subscribe");
3207        return 0;
3208    }
3209
3210    // 构建 (stock_id, ftapi_market, sub_types) 列表
3211    let mut backend_subs: Vec<(u64, i32, Vec<i32>)> = Vec::new();
3212    for (sec_key, sub_types) in &unique_subs {
3213        // sec_key 格式: "market_code"
3214        if let Some(info) = static_cache.get_security_info(sec_key) {
3215            if info.stock_id > 0 {
3216                let market = sec_key
3217                    .split('_')
3218                    .next()
3219                    .and_then(|s| s.parse::<i32>().ok())
3220                    .unwrap_or(0);
3221                backend_subs.push((info.stock_id, market, sub_types.iter().copied().collect()));
3222            }
3223        }
3224    }
3225
3226    let count = backend_subs.len();
3227    if count == 0 {
3228        tracing::debug!("no valid stock_ids found for re-subscription");
3229        return 0;
3230    }
3231
3232    tracing::info!(
3233        securities = count,
3234        connections = all_conn_ids.len(),
3235        "re-subscribing quotes after reconnect"
3236    );
3237
3238    match futu_backend::quote_sub::subscribe_to_backend(backend, &backend_subs).await {
3239        Ok(()) => {
3240            tracing::info!(securities = count, "quote re-subscription succeeded");
3241        }
3242        Err(e) => {
3243            tracing::error!(error = %e, "quote re-subscription failed");
3244        }
3245    }
3246
3247    count
3248}