Skip to main content

futu_backend/
nn_codec.rs

1// NNProtoCenter 内部协议帧编解码
2//
3// 对齐 C++ f3clogin `ProtocolHeader`(`protocol_header.h` + `protocol_header.cpp`):
4// 32 字节 struct,多字节字段 **`hton_uint16/uint32` 大端网络字节序**
5//
6// offset 0:  szMagicCode[2]       "FT"
7// offset 2:  nProtoVer            u8   (当前 39)
8// offset 3:  nClientType          u8
9// offset 4:  nClientVer           u16 BE
10// offset 6:  nLanguage            u8
11// offset 7:  nUserID              u32 BE
12// offset 11: flags                u8   (bit0=push, bit1=compress)
13// offset 12: nSerialNo            u32 BE
14// offset 16: nCmdID               u16 BE
15// offset 18: nExHeadBodyLen       u32 BE  (ex_head_len + body_len,**含扩展头**)
16// offset 22: szReserved[8]
17// offset 30: nExHeadLen           u16 BE  (紧接 reserved 后,**不是** reserved 的一部分)
18// Total: 32 bytes
19//
20// 收到的数据布局:
21//   [32 字节 header] [ex_head_len 字节扩展头 FtnnExtendHead] [真实 body body_len 字节]
22// decoder 会剥掉扩展头,只把真实 body 暴露给上层。
23
24use crate::ftlogin_wire;
25use bytes::{Buf, BufMut, BytesMut};
26use tokio_util::codec::{Decoder, Encoder};
27
28/// 内部帧头大小
29pub const NN_HEADER_SIZE: usize = ftlogin_wire::HEADER_LEN;
30
31/// 协议版本 (C++ 实际使用 39,非注释中的 30)
32pub const NN_PROTO_VER: u8 = ftlogin_wire::PROTO_VERSION;
33
34/// Magic bytes
35pub const MAGIC: [u8; 2] = ftlogin_wire::MAGIC;
36
37/// 内部协议帧头
38#[derive(Debug, Clone)]
39pub struct NNHeader {
40    pub proto_ver: u8,
41    pub client_type: u8,
42    pub client_ver: u16,
43    pub lang_id: u8,
44    pub user_id: u32,
45    /// C++ `ProtocolHeader::flags_.push_`.
46    pub is_push: bool,
47    /// C++ `ProtocolHeader::flags_.compress_`.
48    pub is_compressed: bool,
49    pub serial_no: u32,
50    pub cmd_id: u16,
51    /// **真实 body 长度**(不含 ex_head)—— 对齐 C++ `GetBodyLen()` 语义
52    pub body_len: u32,
53    /// reserved 8 字节,行情命令用 [0]=market_type [1]=ex_type
54    /// 注意:C++ reserved 只有 8 字节,后 2 字节是 ex_head_len
55    pub reserved: [u8; 8],
56    /// 扩展头长度(FtnnExtendHead protobuf 序列化后字节数)
57    pub ex_head_len: u16,
58}
59
60impl NNHeader {
61    pub fn new(cmd_id: u16, serial_no: u32) -> Self {
62        Self {
63            proto_ver: NN_PROTO_VER,
64            client_type: 0,
65            client_ver: 0,
66            lang_id: 0,
67            user_id: 0,
68            is_push: false,
69            is_compressed: false,
70            serial_no,
71            cmd_id,
72            body_len: 0,
73            reserved: [0u8; 8],
74            ex_head_len: 0,
75        }
76    }
77
78    /// 编码到字节流。`body_len` 字段按**真实 body 长度**传入(不含 ex_head),
79    /// 实际上线时我们 **不发送 ex_head**(ex_head_len=0),所以 wire 上的
80    /// `ex_head_body_len = body_len`。
81    pub fn encode(&self, dst: &mut BytesMut) {
82        let wire = ftlogin_wire::ProtocolHeader {
83            proto_version: self.proto_ver,
84            client_type: self.client_type,
85            client_version: self.client_ver,
86            language: self.lang_id,
87            user_id: self.user_id,
88            is_push: self.is_push,
89            is_compressed: self.is_compressed,
90            serial_num: self.serial_no,
91            cmd: self.cmd_id,
92            ex_head_body_len: self.ex_head_len as u32 + self.body_len,
93            reserved: self.reserved,
94            ex_head_len: self.ex_head_len,
95        };
96        dst.reserve(NN_HEADER_SIZE);
97        dst.put_slice(&wire.encode());
98    }
99
100    pub fn decode(src: &BytesMut) -> Result<Option<Self>, futu_core::error::FutuError> {
101        if src.len() < NN_HEADER_SIZE {
102            return Ok(None);
103        }
104
105        let wire = ftlogin_wire::ProtocolHeader::decode(&src[..NN_HEADER_SIZE])?;
106        let total_len = wire.frame_len()?;
107        if total_len > ftlogin_wire::MAX_PACKAGE_SIZE {
108            return Err(futu_core::error::FutuError::Codec(format!(
109                "FTLogin frame package too large: actual={total_len} max={}",
110                ftlogin_wire::MAX_PACKAGE_SIZE
111            )));
112        }
113        let body_len = wire.body_len()? as u32;
114
115        Ok(Some(Self {
116            proto_ver: wire.proto_version,
117            client_type: wire.client_type,
118            client_ver: wire.client_version,
119            lang_id: wire.language,
120            user_id: wire.user_id,
121            is_push: wire.is_push,
122            is_compressed: wire.is_compressed,
123            serial_no: wire.serial_num,
124            cmd_id: wire.cmd,
125            body_len,
126            reserved: wire.reserved,
127            ex_head_len: wire.ex_head_len,
128        }))
129    }
130
131    #[inline]
132    pub fn is_compressed(&self) -> bool {
133        self.is_compressed
134    }
135
136    #[inline]
137    pub fn flags(&self) -> u8 {
138        u8::from(self.is_push) | (u8::from(self.is_compressed) << 1)
139    }
140}
141
142/// 内部协议帧
143#[derive(Debug, Clone)]
144pub struct NNFrame {
145    pub header: NNHeader,
146    pub body: bytes::Bytes,
147    /// 服务端下发的扩展头原始字节(FtnnExtendHead protobuf)
148    /// 关键用途:body 为空时,服务端通常把业务错误塞在 ex_head.err_info 里
149    pub ex_head: bytes::Bytes,
150}
151
152/// 对齐 C++ `FTConnExtHead.proto` 的 `ErrorInfo`(field 5 of FtnnExtendHead)。
153#[derive(Debug, Clone, Default)]
154pub struct ExHeadErrorInfo {
155    /// 对齐 `CmdResult` enum:0=成功,1=失败
156    pub cmd_result: i32,
157    pub source: String,
158    pub code: i32,
159    pub message: String,
160}
161
162impl NNFrame {
163    /// 从 ex_head 里提取 err_info(field 5 = ErrorInfo nested message)
164    /// 如果 ex_head 为空或没 err_info 字段,返回 None。
165    pub fn parse_ex_head_error(&self) -> Option<ExHeadErrorInfo> {
166        if self.ex_head.is_empty() {
167            return None;
168        }
169        parse_err_info_from_ex_head(&self.ex_head)
170    }
171}
172
173/// 从 FtnnExtendHead protobuf 字节里提取 field 5 (ErrorInfo)。
174/// 手写 protobuf 解析,避免依赖 prost 生成代码。
175fn parse_err_info_from_ex_head(data: &[u8]) -> Option<ExHeadErrorInfo> {
176    let mut pos = 0;
177    // 遍历 FtnnExtendHead 的字段,找 field 5
178    while pos < data.len() {
179        let (tag, new_pos) = decode_varint(data, pos)?;
180        pos = new_pos;
181        let field_num = (tag >> 3) as u32;
182        let wire_type = (tag & 0x07) as u8;
183
184        if wire_type == 2 {
185            let (len, new_pos) = decode_varint(data, pos)?;
186            pos = new_pos;
187            let len = len as usize;
188            if pos + len > data.len() {
189                return None;
190            }
191            if field_num == 5 {
192                // ErrorInfo 子消息
193                return Some(decode_error_info(&data[pos..pos + len]));
194            }
195            pos += len;
196        } else if wire_type == 0 {
197            let (_v, new_pos) = decode_varint(data, pos)?;
198            pos = new_pos;
199        } else if wire_type == 1 {
200            pos += 8;
201        } else if wire_type == 5 {
202            pos += 4;
203        } else {
204            return None;
205        }
206    }
207    None
208}
209
210fn decode_error_info(data: &[u8]) -> ExHeadErrorInfo {
211    let mut info = ExHeadErrorInfo::default();
212    let mut pos = 0;
213    while pos < data.len() {
214        let Some((tag, new_pos)) = decode_varint(data, pos) else {
215            break;
216        };
217        pos = new_pos;
218        let field_num = (tag >> 3) as u32;
219        let wire_type = (tag & 0x07) as u8;
220
221        match (field_num, wire_type) {
222            (1, 0) => {
223                // cmd_result (int enum)
224                if let Some((v, p)) = decode_varint(data, pos) {
225                    info.cmd_result = v as i32;
226                    pos = p;
227                } else {
228                    break;
229                }
230            }
231            (3, 0) => {
232                // code (int32)
233                if let Some((v, p)) = decode_varint(data, pos) {
234                    info.code = v as i32;
235                    pos = p;
236                } else {
237                    break;
238                }
239            }
240            (2, 2) | (4, 2) => {
241                // source (field 2) / message (field 4)
242                let Some((len, new_pos)) = decode_varint(data, pos) else {
243                    break;
244                };
245                pos = new_pos;
246                let end = pos + len as usize;
247                if end > data.len() {
248                    break;
249                }
250                let s = String::from_utf8_lossy(&data[pos..end]).to_string();
251                if field_num == 2 {
252                    info.source = s;
253                } else {
254                    info.message = s;
255                }
256                pos = end;
257            }
258            (_, 0) => {
259                let Some((_v, p)) = decode_varint(data, pos) else {
260                    break;
261                };
262                pos = p;
263            }
264            (_, 2) => {
265                let Some((len, p)) = decode_varint(data, pos) else {
266                    break;
267                };
268                pos = p + len as usize;
269            }
270            (_, 1) => pos += 8,
271            (_, 5) => pos += 4,
272            _ => break,
273        }
274    }
275    info
276}
277
278fn decode_varint(data: &[u8], start: usize) -> Option<(u64, usize)> {
279    let mut result: u64 = 0;
280    let mut shift = 0;
281    let mut pos = start;
282    loop {
283        if pos >= data.len() {
284            return None;
285        }
286        let byte = data[pos];
287        result |= ((byte & 0x7F) as u64) << shift;
288        pos += 1;
289        if byte & 0x80 == 0 {
290            return Some((result, pos));
291        }
292        shift += 7;
293        if shift >= 64 {
294            return None;
295        }
296    }
297}
298
299/// 内部协议编解码器
300pub struct NNCodec;
301
302impl Decoder for NNCodec {
303    type Item = NNFrame;
304    type Error = futu_core::error::FutuError;
305
306    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
307        let header = match NNHeader::decode(src)? {
308            Some(h) => h,
309            None => return Ok(None),
310        };
311
312        // 线上布局:[32 header][ex_head_len 扩展头][body_len 真实 body]
313        let ex_head_body_len = (header.ex_head_len as usize)
314            .checked_add(header.body_len as usize)
315            .ok_or_else(|| futu_core::error::FutuError::Codec("NN frame length overflow".into()))?;
316        let total = NN_HEADER_SIZE
317            .checked_add(ex_head_body_len)
318            .ok_or_else(|| futu_core::error::FutuError::Codec("NN frame length overflow".into()))?;
319        if total > ftlogin_wire::MAX_PACKAGE_SIZE {
320            return Err(futu_core::error::FutuError::Codec(format!(
321                "FTLogin frame package too large: actual={total} max={}",
322                ftlogin_wire::MAX_PACKAGE_SIZE
323            )));
324        }
325        if src.len() < total {
326            src.reserve(total - src.len());
327            return Ok(None);
328        }
329
330        src.advance(NN_HEADER_SIZE);
331        // 保留扩展头字节 —— NNFrame::parse_ex_head_error 可以读 err_info
332        let ex_head = if header.ex_head_len > 0 {
333            src.split_to(header.ex_head_len as usize).freeze()
334        } else {
335            bytes::Bytes::new()
336        };
337        // Do not decompress here. C++ `channel_impl.cpp:1873-1900` decrypts
338        // encrypted command bodies first, then applies `IsDataCompressed()`.
339        // `NNCodec` owns only frame slicing; `BackendConn` owns the body pipeline.
340        let body = src.split_to(header.body_len as usize).freeze();
341
342        Ok(Some(NNFrame {
343            header,
344            body,
345            ex_head,
346        }))
347    }
348}
349
350impl Encoder<NNFrame> for NNCodec {
351    type Error = futu_core::error::FutuError;
352
353    fn encode(&mut self, item: NNFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
354        let mut header = item.header;
355        header.body_len = item.body.len() as u32;
356        header.encode(dst);
357        dst.extend_from_slice(&item.body);
358        Ok(())
359    }
360}
361
362/// 判断是否为登录命令(登录命令不加密)
363pub fn is_login_cmd(cmd_id: u16) -> bool {
364    matches!(cmd_id, 1001 | 6001 | 2001 | 4001)
365}
366
367/// 判断是否为不加密的命令(行情命令等)
368/// 对应 C++ gs_arrUnencryptedProtoCmd
369pub fn is_unencrypted_proto(cmd_id: u16) -> bool {
370    matches!(
371        cmd_id,
372        1306  | // Pull_ErrorCode
373        1316  | // Ping
374        1321  | // Pull_ConnIp (获取连接点列表)
375        20147 | // Broker UpdateConnIp (FTLogin kCmdUpdateConnIpBroker, no encrypt)
376        5115  | // 自选股列表更新
377        5120  | // Pull_USGroupSecList
378        5121  | // Pull_USGroupInfo
379        6682  | // ModifyUSGroupSecList (修改自选股)
380        6032  | // Qot_Request_Right
381        6128  | // Qot_Pull_Ticker
382        6160  | // Qot_Pull_TimeShare
383        6161  | // Qot_Pull_Kline
384        6211  | // Qot_Push_Sub
385        6212  | // Qot_Push_Push
386        6301  | // Qot_Push_EventNotice
387        6304  | // Qot_ReSub
388        6311  | // Qot_Pull_OptionStrikeDate
389        6337  | // Qot_Pull_FutureInfo
390        6365  | // Qot_Pull_TickerStatistics
391        6366  | // Qot_Pull_TickerStatisticsDetail
392        6371  | // Qot_Pull_TenBrokerHold
393        6379  | // Qot_Pull_HK_DailyShortVolume
394        6387  | // Qot_Pull_BonusInfo
395        6389  | // Qot_Pull_US_ShortInterest
396        6398  | // Qot_Pull_MainIncome (financials revenue breakdown)
397        6503  | // Qot_Pull_SpreadInfo
398        6513  | // Qot_Pull_Warrant
399        6600  | // Qot_Pull_PlateOrSetID
400        6608  | // Qot_Pull_SecOwnerPlate
401        6621  | // Qot_Pull_BrokerMonitor
402        6693  | // Qot_Pull_CapitalDistribution
403        6694  | // Qot_Pull_CapitalFlow
404        6695  | // Qot_Pull_HistoryCapitalFlow
405        6699  | // Qot_Pull_US_DailyShortVolume
406        6701  | // Qot_Pull_FutureRelated
407        6731  | // Qot_Pull_HK_ShortInterest
408        6733  | // Pull_TradeDate
409        6735  | // Qot_Pull_OptionStrategySpread
410        6736  | // Qot_Pull_OptionChain
411        6745  | // Qot_SecListCheckSumDiffReport
412        6746  | // Qot_Update_SecList
413        6747  | // Qot_Pull_StaticInfo
414        6763  | // Qot_Pull_HoldingPeriod
415        6795  | // Qot_Pull_ShareholdersOverview
416        6796  | // Qot_Pull_ShareholdersInstitutional
417        6797  | // Qot_Pull_ShareholdersHolderDetail
418        6801  | 6802 | 6804 | // 股价提醒
419        6803  | // Push_PriceReminderChange
420        6808  | // Pull_PriceReminder
421        6809  | // Set_PriceReminder
422        6811  | // Qot_Pull_Rehab
423        6822  | // Qot_Push_EventNotice_Sub
424        6823  | // Qot_Pull_EventNotice
425        6824  | // Qot_Pull_SubData
426        6825  | // Qot_Pull_MarketState
427        6835  | // Qot_Pull_StockSplit
428        6952  | // Qot_Pull_CompanyExecutives
429        6953  | // Qot_Pull_CompanyExecutiveBackground
430        6956  | // Qot_Pull_IpoList
431        6957  | // Qot_Pull_IpoDetail
432        6964  | // Qot_Pull_CompanyProfile
433        6968  | // Qot_Pull_SecCapTrack (top-ten buy/sell brokers realtime)
434        8017  | // 牛牛圈新消息push
435        9008  | // Qot_Pull_OptionVolatility
436        9406  | // Qot_Pull_ShareholdersHoldingChanges
437        9410  | // Qot_Pull_Buyback
438        9415  | // Qot_Pull_HKStockSplit
439        9421  | // Qot_Pull_OptionExerciseProbability
440        9435  | // Qot_Pull_ETFProfile
441        18004 | // Qot_Pull_FinancialData
442        18005 | // Qot_Pull_FinancialStructure
443        18008 | // Qot_Pull_BrokerInfo
444        18012 | // Qot_Pull_GetLv2RelatedExchange (v1.4.110 codex QOT Phase 4 Slice 7)
445        20057 | // Qot_Pull_TargetEstimateInfo
446        20067 | // Qot_Pull_ConsensusRecommendationsInfo
447        20231 | // Qot_Pull_GetRiskFreeRate
448        20234 | // Qot_Pull_RatingSummary
449        20237 | // Qot_Pull_AnalystStockRating
450        20239 | // Qot_Pull_InstStockRating
451        20351 | // Qot_Pull_OwnerInsiderStatistics
452        20352 | // Qot_Pull_OwnerInsiderHolderList
453        20353 | // Qot_Pull_OwnerInsiderTradeList
454        20556 | // Qot_Pull_OptionScreen
455        20584 | // Qot_Pull_MorningstarResearch
456        20585 | // Qot_Pull_MorningstarSimpleRating
457        20675 | // Qot_Pull_ChainStatistic
458        21122 | // Qot_Pull_FinancialsEarningsPriceMove
459        21146 | // Qot_Pull_FinancialsEarningsPriceHistory
460        21242 | // Qot_Pull_WarrantScreen
461        65507 | // FTLogin WebTCP kCmdWebRequest
462        65509 | // FTLogin WebTCP kCmdSiteConfigRequest
463        20106 | // Qot_Pull_GetSecuritiesInfo
464        20287 | // ConfirmLogPush
465        20334 // Qot_New_Pull_Warrant
466    )
467}
468
469/// 判断命令是否需要跳过加密(登录命令或未加密行情命令)
470pub fn should_skip_encryption(cmd_id: u16) -> bool {
471    is_login_cmd(cmd_id) || is_unencrypted_proto(cmd_id)
472}
473
474#[cfg(test)]
475mod tests;