Skip to main content

futu_backend/
conn.rs

1// 后端 TCP 连接管理(直连富途后端服务器)
2//
3// 加密方式:
4// - 登录命令(1001/6001/26001): 不加密
5// - 其他命令: AES-128 加密,body = encrypt(sec_data(4B BE) + proto_data)
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU32, Ordering};
10use std::time::SystemTime;
11
12use bytes::Bytes;
13use futures::{SinkExt, StreamExt};
14use parking_lot::Mutex;
15use tokio::net::TcpStream;
16use tokio::sync::{mpsc, oneshot, watch};
17use tokio_util::codec::Framed;
18
19use futu_core::error::{FutuError, Result};
20use futu_core::log_redact::endpoint_log_fingerprint;
21use futu_net::encrypt::{aes_cbc_md5_decrypt_var, aes_cbc_md5_encrypt_var};
22
23use crate::nn_codec::{NNCodec, NNFrame, NNHeader, should_skip_encryption};
24
25/// 后端连接
26pub struct BackendConn {
27    serial_no: AtomicU32,
28    sec_data: AtomicU32,
29    connected: Arc<std::sync::atomic::AtomicBool>,
30    connected_tx: watch::Sender<bool>,
31    /// 共享的 session key — 接收任务和发送方共用同一个 Arc。
32    /// 对齐 C++ `Logger::session_key_` 是 `std::string`(`logger.h:152`),
33    /// 长度由服务端下发决定,Platform 通常 16 字节(AES-128),Broker 可能是
34    /// 32 字节(AES-256)。v1.4.7 之前固定 `[u8; 16]` 对 broker session_key
35    /// 截断会导致服务端 `CONN decrypt failed`。
36    session_key: Arc<Mutex<Option<Vec<u8>>>>,
37    cmd_tx: mpsc::Sender<BackendCmd>,
38    pending: PendingResponses,
39    shutdown_tx: watch::Sender<bool>,
40    pub user_id: AtomicU32,
41    /// RspEncryptData.client_ip(field 14), set after TCP login.
42    ///
43    /// C++ `logger.cpp:511-516` stores this server-observed public IP on the
44    /// working TCP client, then `logger.cpp:1122-1125` echoes it in CMD20147.
45    client_ip: Mutex<String>,
46    pub client_type: u8,
47    pub client_ver: u16,
48    pub lang_id: u8,
49}
50
51enum BackendCmd {
52    Send(NNFrame),
53}
54
55type PendingResponses = Arc<Mutex<HashMap<u32, oneshot::Sender<NNFrame>>>>;
56
57/// 后端推送回调
58pub type PushCallback = Arc<dyn Fn(u16, Bytes, SystemTime) + Send + Sync + 'static>;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61enum InboundFrameDecision {
62    Deliver,
63    Drop,
64}
65
66fn decode_inbound_frame_body(
67    frame: &mut NNFrame,
68    session_key: Option<&[u8]>,
69) -> InboundFrameDecision {
70    if !should_skip_encryption(frame.header.cmd_id)
71        && let Some(key) = session_key
72    {
73        let body_len = frame.body.len();
74        // C++ `logger.cpp:1840-1855` 的约束:加密包 body_len 必须 >= 32
75        // 且是 16 字节对齐(AES-CBC-MD5 最小输出 32 字节)。
76        // C++ 还有特例:body_len == 32 认为是空包,直接清空。
77        if body_len >= 32 && body_len.is_multiple_of(16) {
78            match aes_cbc_md5_decrypt_var(key, &frame.body) {
79                Ok(decrypted) => {
80                    // 后端响应不含 sec_data 前缀(仅 client→server 方向有)
81                    frame.body = Bytes::from(decrypted);
82                }
83                Err(e) => {
84                    // C++ `NNTCPConnBase.cpp:347-351` 会先用 current session
85                    // key 解密,再用 old session key 重试;两次失败后
86                    // `If_OMWarn_ReturnVoid`,不会把原始密文交给业务层。
87                    tracing::warn!(
88                        cmd_id = frame.header.cmd_id,
89                        body_len = body_len,
90                        key_len = key.len(),
91                        error = %e,
92                        "decrypt failed, dropping inbound frame"
93                    );
94                    return InboundFrameDecision::Drop;
95                }
96            }
97        } else {
98            tracing::debug!(
99                cmd_id = frame.header.cmd_id,
100                body_len = body_len,
101                "body not encrypted (len not aligned to 16)"
102            );
103        }
104    }
105
106    if frame.header.is_compressed() {
107        let compressed_body_len = frame.body.len();
108        match crate::ftlogin_wire::decode_inbound_body(true, frame.body.as_ref()) {
109            Ok(decompressed) => {
110                frame.body = Bytes::from(decompressed);
111                frame.header.body_len = frame.body.len() as u32;
112                tracing::debug!(
113                    cmd_id = frame.header.cmd_id,
114                    serial_no = frame.header.serial_no,
115                    compressed_body_len,
116                    body_len = frame.body.len(),
117                    "decompressed inbound frame after decrypt"
118                );
119            }
120            Err(e) => {
121                // C++ FTLogin `channel_impl.cpp:1905-1954` marks
122                // decompression errors as `kSdkMsgRecvDataError`; it does not
123                // deliver the compressed raw body as a successful business
124                // payload.
125                tracing::warn!(
126                    cmd_id = frame.header.cmd_id,
127                    serial_no = frame.header.serial_no,
128                    body_len = compressed_body_len,
129                    error = %e,
130                    "decompress failed after decrypt, dropping inbound frame"
131                );
132                return InboundFrameDecision::Drop;
133            }
134        }
135    }
136
137    InboundFrameDecision::Deliver
138}
139
140fn release_pending_on_inbound_decode_error(frame: &NNFrame, pending: &PendingResponses) {
141    if !frame.header.is_push {
142        pending.lock().remove(&frame.header.serial_no);
143    }
144}
145
146fn mark_disconnected(
147    connected: &Arc<std::sync::atomic::AtomicBool>,
148    connected_tx: &watch::Sender<bool>,
149) {
150    let was_connected = connected.swap(false, Ordering::AcqRel);
151    if was_connected {
152        let _ = connected_tx.send(false);
153    }
154}
155
156impl BackendConn {
157    /// 连接 TCP 的超时时间。
158    ///
159    /// Linux 默认 `tcp_syn_retries=6` 时 `TcpStream::connect` 等到 `ETIMEDOUT`
160    /// 需要约 127 秒——用户启动 opend 时如果选到一个不通的 IP 就卡 2 分钟后
161    /// 才报错 offline mode(某位 Rocky Linux 用户踩过)。加 10s 超时快速失败,
162    /// 让上层(`bridge.rs` 的 connect 循环)有机会 fallback 到下一个候选 IP。
163    pub const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
164
165    /// Backend 侧可识别的 Rust OpenD 客户端版本号。
166    ///
167    /// Rust OpenD 的后台诊断版本号。
168    ///
169    /// 该值与普通 C++ OpenD 拉开,便于后台排查时区分 Rust 链路。服务端若要求
170    /// version >= 800,低版本会返回 `error_code=45 "当前应用版本过低"`。
171    pub const CLIENT_VER_FTGTW: u16 = 1030;
172
173    /// 建立底层 TcpStream —— 带超时 + set_nodelay。不 spawn 任何 task。
174    async fn establish_stream(addr: &str, timeout: std::time::Duration) -> Result<TcpStream> {
175        let stream = match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
176            Ok(Ok(s)) => s,
177            Ok(Err(e)) => return Err(e.into()),
178            Err(_elapsed) => {
179                return Err(FutuError::Network(std::io::Error::new(
180                    std::io::ErrorKind::TimedOut,
181                    format!("connect to {addr} timed out after {}s", timeout.as_secs()),
182                )));
183            }
184        };
185        stream.set_nodelay(true)?;
186        Ok(stream)
187    }
188
189    /// 连接到后端服务器(带 10s 超时)
190    pub async fn connect(addr: &str, push_callback: PushCallback) -> Result<Self> {
191        let stream = Self::establish_stream(addr, Self::CONNECT_TIMEOUT).await?;
192        tracing::info!(
193            addr_fp = %endpoint_log_fingerprint(addr),
194            "connected to backend"
195        );
196        Ok(Self::from_stream(stream, push_callback))
197    }
198
199    /// 并发连接多个候选地址,谁先通用谁(对齐 C++ `connector.cpp:175-189`
200    /// `ConnectStrategyAddr` 的 concurrency_ip 语义)。
201    ///
202    /// - 每个候选独立带 `CONNECT_TIMEOUT`(10s)超时
203    /// - 第一个 `Ok(stream)` 胜出,其余 pending task drop 时会关闭半连接
204    /// - 全部失败返回最后一个错误
205    ///
206    /// 返回 `(BackendConn, winner_addr)`,调用方用 winner_addr 做登录协议里的
207    /// host_ip/host_port 字段。
208    pub async fn connect_race(
209        addrs: &[String],
210        push_callback: PushCallback,
211    ) -> Result<(Self, String)> {
212        use futures::stream::{FuturesUnordered, StreamExt};
213
214        if addrs.is_empty() {
215            return Err(FutuError::Network(std::io::Error::new(
216                std::io::ErrorKind::InvalidInput,
217                "connect_race: empty address list",
218            )));
219        }
220
221        tracing::info!(
222            candidates = addrs.len(),
223            candidate_fps = ?addrs
224                .iter()
225                .map(|addr| endpoint_log_fingerprint(addr))
226                .collect::<Vec<_>>(),
227            "racing parallel connects"
228        );
229
230        let mut attempts: FuturesUnordered<_> = addrs
231            .iter()
232            .cloned()
233            .map(|addr| async move {
234                let result = Self::establish_stream(&addr, Self::CONNECT_TIMEOUT).await;
235                (addr, result)
236            })
237            .collect();
238
239        let mut last_err: Option<FutuError> = None;
240        while let Some((addr, result)) = attempts.next().await {
241            match result {
242                Ok(stream) => {
243                    tracing::info!(
244                        addr_fp = %endpoint_log_fingerprint(&addr),
245                        remaining_losers = attempts.len(),
246                        "connect race winner"
247                    );
248                    drop(attempts); // 其他 FuturesUnordered 里的 future drop 即取消
249                    let conn = Self::from_stream(stream, push_callback);
250                    return Ok((conn, addr));
251                }
252                Err(e) => {
253                    tracing::debug!(
254                        addr_fp = %endpoint_log_fingerprint(&addr),
255                        error = %e,
256                        "candidate failed"
257                    );
258                    last_err = Some(e);
259                }
260            }
261        }
262
263        Err(last_err.unwrap_or_else(|| {
264            FutuError::Network(std::io::Error::other("connect_race: all candidates failed"))
265        }))
266    }
267
268    /// v1.4.70 D1: test-only 从 `tokio::io::DuplexStream` 构造 BackendConn
269    ///
270    /// 用于 integration tests(`crates/futu-gateway/tests/common/mock_backend.rs`)
271    /// 替代真 `TcpStream`。生产路径通过 `connect()` → `from_stream()` 不变。
272    #[cfg(feature = "test-util")]
273    pub fn from_duplex(stream: tokio::io::DuplexStream, push_callback: PushCallback) -> Self {
274        Self::from_stream_inner(stream, push_callback)
275    }
276
277    /// 从已建立的 TcpStream 构造 BackendConn(spawn recv/send task)
278    fn from_stream(stream: TcpStream, push_callback: PushCallback) -> Self {
279        Self::from_stream_inner(stream, push_callback)
280    }
281
282    /// v1.4.70 D1: 泛型化的 stream → BackendConn 构造(内部实现),
283    /// 生产路径用 `TcpStream`,test 用 `DuplexStream`。
284    pub(crate) fn from_stream_inner<S>(stream: S, push_callback: PushCallback) -> Self
285    where
286        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
287    {
288        let framed = Framed::new(stream, NNCodec);
289        let (mut sink, mut stream_rx) = framed.split();
290
291        let (cmd_tx, mut cmd_rx) = mpsc::channel::<BackendCmd>(256);
292
293        let pending: PendingResponses =
294            Arc::new(Mutex::new(HashMap::<u32, oneshot::Sender<NNFrame>>::new()));
295        let pending_recv = pending.clone();
296        let pending_send = pending.clone();
297        let connected = Arc::new(std::sync::atomic::AtomicBool::new(true));
298        let (connected_tx, _) = watch::channel(true);
299        let (shutdown_tx, mut shutdown_rx_recv) = watch::channel(false);
300        let mut shutdown_rx_send = shutdown_tx.subscribe();
301        let connected_recv = Arc::clone(&connected);
302        let connected_send = Arc::clone(&connected);
303        let connected_tx_recv = connected_tx.clone();
304        let connected_tx_send = connected_tx.clone();
305        let session_key: Arc<Mutex<Option<Vec<u8>>>> = Arc::new(Mutex::new(None));
306        let session_key_for_recv = session_key.clone();
307
308        // 接收任务
309        tokio::spawn(async move {
310            loop {
311                let next_frame = tokio::select! {
312                    _ = shutdown_rx_recv.changed() => {
313                        break;
314                    }
315                    next = stream_rx.next() => next,
316                };
317                let Some(Ok(mut frame)) = next_frame else {
318                    break;
319                };
320
321                // 解密(非登录命令)
322                tracing::debug!(
323                    cmd_id = frame.header.cmd_id,
324                    serial_no = frame.header.serial_no,
325                    is_push = frame.header.is_push,
326                    is_compressed = frame.header.is_compressed,
327                    ex_head_len = frame.header.ex_head_len,
328                    body_len = frame.header.body_len,
329                    actual_body_len = frame.body.len(),
330                    "recv frame"
331                );
332
333                // 如果 ex_head 里有业务错误(err_info.cmd_result != 0),
334                // 把它 log 出来 —— 服务端 body 空时会通过 ex_head 返回错误。
335                //
336                // 软失败特判:某些返回 code 是正常的"无此服务/无数据"信号(例如
337                // 某些账号在 Futu HK broker 上没有授权账户时 CMD 2298 会收到
338                // `code=-102 CONN can not find command service`)。这类日志降到
339                // DEBUG 避免噪声;其他错误保持 WARN。
340                if let Some(err) = frame.parse_ex_head_error()
341                    && (err.cmd_result != 0
342                        || err.code != 0
343                        || !err.message.is_empty()
344                        || !err.source.is_empty())
345                {
346                    // `code=-102` 是服务端的软失败信号:"此账户/通道不支持该 cmd"。
347                    // 服务端实际把 "CONN can not find command service" 放在 `source`
348                    // 字段里,`message` 为空(和 C++ ErrorInfo 字段 2/4 定义略有偏差)。
349                    // 例如:账户在 Futu HK broker 通道不认 CMD 2298 / CMD 1003 heartbeat
350                    // 都会走到这里。降级到 debug 避免日志噪声;其他 code 保持 warn。
351                    let is_soft_fail = err.code == -102;
352                    if is_soft_fail {
353                        tracing::debug!(
354                            cmd_id = frame.header.cmd_id,
355                            code = err.code,
356                            source = %err.source,
357                            message = %err.message,
358                            "server: cmd not available on this channel (soft-fail)"
359                        );
360                    } else {
361                        tracing::warn!(
362                            cmd_id = frame.header.cmd_id,
363                            cmd_result = err.cmd_result,
364                            code = err.code,
365                            source = %err.source,
366                            message = %err.message,
367                            "server returned err_info in ex_head"
368                        );
369                    }
370                }
371
372                let session_key = session_key_for_recv.lock().clone();
373                if decode_inbound_frame_body(&mut frame, session_key.as_deref())
374                    == InboundFrameDecision::Drop
375                {
376                    release_pending_on_inbound_decode_error(&frame, &pending_recv);
377                    continue;
378                }
379
380                // 判断是否为推送帧:
381                // 1. flags.push_ == 1 (标准推送)
382                // 2. flags.push_ == 0 但 serial_no == 0 (后端首次订阅快照,
383                //    以 Reply 形式发送, 如 CMD6212 的初始摆盘/报价)
384                let is_push = frame.header.is_push
385                    || (frame.header.serial_no == 0 && !pending_recv.lock().contains_key(&0));
386                if is_push {
387                    tracing::debug!(
388                        cmd_id = frame.header.cmd_id,
389                        body_len = frame.body.len(),
390                        is_push = frame.header.is_push,
391                        is_compressed = frame.header.is_compressed,
392                        reserved = ?frame.header.reserved,
393                        "backend push received"
394                    );
395                    push_callback(frame.header.cmd_id, frame.body, SystemTime::now());
396                } else {
397                    let tx = pending_recv.lock().remove(&frame.header.serial_no);
398                    if let Some(tx) = tx
399                        && let Err(frame) = tx.send(frame)
400                    {
401                        tracing::debug!(
402                            cmd_id = frame.header.cmd_id,
403                            serial_no = frame.header.serial_no,
404                            body_len = frame.body.len(),
405                            "backend response receiver dropped before frame delivery"
406                        );
407                    }
408                }
409            }
410            // 连接断开 — 清理所有 pending 请求 (C++ OnDisConnectRelpyAll)
411            mark_disconnected(&connected_recv, &connected_tx_recv);
412            tracing::warn!("backend connection closed");
413            let mut pending = pending_recv.lock();
414            let count = pending.len();
415            if count > 0 {
416                tracing::warn!(
417                    pending_count = count,
418                    "aborting pending requests due to disconnect"
419                );
420            }
421            // 清空 pending HashMap 会 drop 所有 oneshot::Sender
422            // 这会导致对应的 oneshot::Receiver 返回 RecvError
423            // 从而让 request() 方法返回 "response channel closed" 错误
424            pending.clear();
425        });
426
427        // 发送任务
428        tokio::spawn(async move {
429            loop {
430                let next_cmd = tokio::select! {
431                    _ = shutdown_rx_send.changed() => {
432                        break;
433                    }
434                    cmd = cmd_rx.recv() => cmd,
435                };
436                let Some(cmd) = next_cmd else {
437                    break;
438                };
439
440                match cmd {
441                    BackendCmd::Send(frame) => {
442                        if let Err(e) = sink.send(frame).await {
443                            mark_disconnected(&connected_send, &connected_tx_send);
444                            tracing::error!(error = %e, "backend send failed");
445                            let mut pending = pending_send.lock();
446                            let count = pending.len();
447                            if count > 0 {
448                                tracing::warn!(
449                                    pending_count = count,
450                                    "aborting pending requests due to send failure"
451                                );
452                            }
453                            pending.clear();
454                            break;
455                        }
456                    }
457                }
458            }
459        });
460
461        Self {
462            serial_no: AtomicU32::new(0),
463            sec_data: AtomicU32::new(1),
464            connected,
465            connected_tx,
466            session_key, // 与接收任务共享同一个 Arc
467            cmd_tx,
468            pending,
469            shutdown_tx,
470            user_id: AtomicU32::new(0),
471            client_ip: Mutex::new(String::new()),
472            client_type: 40, // NN_ClientType_ApiGateway
473            client_ver: Self::CLIENT_VER_FTGTW,
474            lang_id: 0,
475        }
476    }
477
478    /// 设置 session key(登录成功后调用)。接受变长字节,16/24/32 分别对应
479    /// AES-128/192/256。对齐 C++ `Logger::session_key_` 是 `std::string`,
480    /// 长度取决于服务端下发的 `RspEncryptData.session_key` 字段原始长度。
481    pub fn set_session_key(&self, key: Vec<u8>) {
482        *self.session_key.lock() = Some(key);
483    }
484
485    /// 设置 sec_data 初始值(登录成功后调用)
486    pub fn set_sec_data(&self, val: u32) {
487        self.sec_data.store(val, Ordering::Relaxed);
488    }
489
490    /// 设置登录响应里的客户端外网 IP。
491    pub fn set_client_ip(&self, ip: String) {
492        *self.client_ip.lock() = ip;
493    }
494
495    /// 读取登录响应里的客户端外网 IP。
496    pub fn client_ip(&self) -> String {
497        self.client_ip.lock().clone()
498    }
499
500    pub fn is_connected(&self) -> bool {
501        self.connected.load(Ordering::Acquire)
502    }
503
504    /// Subscribe to low-level connection liveness changes.
505    ///
506    /// C++ FTLogin publishes a connection-closed event immediately
507    /// (`GTWCmdAndPushReply::OMEvProc_ConnClosed`) and lets the gateway enter
508    /// reconnecting state without waiting for a later business request. Rust
509    /// uses this watch channel to wake the reconnect monitor as soon as the
510    /// recv/send task marks the TCP channel disconnected.
511    pub fn subscribe_connection_state(&self) -> watch::Receiver<bool> {
512        self.connected_tx.subscribe()
513    }
514
515    /// 发送请求并等待响应
516    pub async fn request(&self, cmd_id: u16, body: Vec<u8>) -> Result<NNFrame> {
517        self.request_with_reserved(cmd_id, body, [0u8; 10]).await
518    }
519
520    /// 发送请求并等待响应(带 header reserved 字段,用于行情命令传递市场类型)
521    pub async fn request_with_reserved(
522        &self,
523        cmd_id: u16,
524        body: Vec<u8>,
525        reserved: [u8; 10],
526    ) -> Result<NNFrame> {
527        self.request_with_reserved_timeout(
528            cmd_id,
529            body,
530            reserved,
531            std::time::Duration::from_secs(10),
532        )
533        .await
534    }
535
536    pub(crate) async fn request_with_reserved_timeout(
537        &self,
538        cmd_id: u16,
539        body: Vec<u8>,
540        reserved: [u8; 10],
541        timeout: std::time::Duration,
542    ) -> Result<NNFrame> {
543        let deadline = tokio::time::Instant::now() + timeout;
544        let frame = self.build_outbound_frame(cmd_id, body, reserved)?;
545        let serial_no = frame.header.serial_no;
546
547        let (resp_tx, resp_rx) = oneshot::channel();
548        self.pending.lock().insert(serial_no, resp_tx);
549        match tokio::time::timeout_at(deadline, self.cmd_tx.send(BackendCmd::Send(frame))).await {
550            Ok(Ok(())) => {}
551            Ok(Err(_closed)) => {
552                self.pending.lock().remove(&serial_no);
553                mark_disconnected(&self.connected, &self.connected_tx);
554                return Err(FutuError::NotInitialized);
555            }
556            Err(_elapsed) => {
557                self.pending.lock().remove(&serial_no);
558                return Err(FutuError::Timeout);
559            }
560        }
561
562        let resp = crate::delay_stats::trace_backend_request(cmd_id, async {
563            match tokio::time::timeout_at(deadline, resp_rx).await {
564                Ok(Ok(resp)) => Ok(resp),
565                Ok(Err(_closed)) => {
566                    self.pending.lock().remove(&serial_no);
567                    Err(FutuError::Codec("response channel closed".into()))
568                }
569                Err(_elapsed) => {
570                    self.pending.lock().remove(&serial_no);
571                    Err(FutuError::Timeout)
572                }
573            }
574        })
575        .await?;
576
577        Ok(resp)
578    }
579
580    /// 发送无需等待响应的消息
581    pub async fn send_fire_and_forget(&self, cmd_id: u16, body: Vec<u8>) -> Result<()> {
582        let frame = self.build_outbound_frame(cmd_id, body, [0u8; 10])?;
583        if self.cmd_tx.send(BackendCmd::Send(frame)).await.is_err() {
584            mark_disconnected(&self.connected, &self.connected_tx);
585            return Err(FutuError::NotInitialized);
586        }
587
588        Ok(())
589    }
590
591    fn build_outbound_frame(
592        &self,
593        cmd_id: u16,
594        body: Vec<u8>,
595        reserved: [u8; 10],
596    ) -> Result<NNFrame> {
597        let serial = self.next_serial();
598        let mut header = NNHeader::new(cmd_id, serial);
599        header.user_id = self.user_id.load(Ordering::Relaxed);
600        header.client_type = self.client_type;
601        header.client_ver = self.client_ver;
602        header.lang_id = self.lang_id;
603        // 行情命令实际用到的只有 reserved[0..2](market_type + ex_type),
604        // 后 2 字节(原 [8..10])在 C++ protocol header 里是 ex_head_len 位置,
605        // 我们不发 ex_head 所以这 2 字节固定为 0。
606        header.reserved.copy_from_slice(&reserved[..8]);
607
608        let final_body = self.encode_outbound_body(cmd_id, body)?;
609        header.body_len = final_body.len() as u32;
610
611        Ok(NNFrame {
612            header,
613            body: Bytes::from(final_body),
614            ex_head: Bytes::new(),
615        })
616    }
617
618    fn encode_outbound_body(&self, cmd_id: u16, body: Vec<u8>) -> Result<Vec<u8>> {
619        if should_skip_encryption(cmd_id) {
620            return Ok(body);
621        }
622
623        let key = self.session_key.lock().clone();
624        match key {
625            Some(key) => {
626                // C++ 先 m_nSecData++ 再使用,所以要用 +1 后的值。
627                let sec = self.sec_data.fetch_add(1, Ordering::Relaxed) + 1;
628                let mut plaintext = Vec::with_capacity(4 + body.len());
629                plaintext.extend_from_slice(&sec.to_be_bytes());
630                plaintext.extend_from_slice(&body);
631                // var 版支持 16/24/32 字节 key —— broker session_key 可能是 32 字节。
632                aes_cbc_md5_encrypt_var(&key, &plaintext)
633            }
634            None => Ok(body),
635        }
636    }
637
638    fn next_serial(&self) -> u32 {
639        self.serial_no.fetch_add(1, Ordering::Relaxed) + 1
640    }
641}
642
643impl Drop for BackendConn {
644    fn drop(&mut self) {
645        let _ = self.shutdown_tx.send(true);
646        mark_disconnected(&self.connected, &self.connected_tx);
647        self.pending.lock().clear();
648    }
649}
650
651#[cfg(test)]
652mod tests;