futu_backend/nn_codec.rs
1// NNProtoCenter 内部协议帧编解码
2//
3// 对齐 C++ f3clogin `ProtocolHeader`(`protocol_header.h` + `protocol_header.cpp`):
4// 32 字节 struct,多字节字段 **`hton_uint16/uint32` 大端网络字节序**
5//
6// offset 0: szMagicCode[2] "FT"
7// offset 2: nProtoVer u8 (当前 39)
8// offset 3: nClientType u8
9// offset 4: nClientVer u16 BE
10// offset 6: nLanguage u8
11// offset 7: nUserID u32 BE
12// offset 11: flags u8 (bit0=push, bit1=compress)
13// offset 12: nSerialNo u32 BE
14// offset 16: nCmdID u16 BE
15// offset 18: nExHeadBodyLen u32 BE (ex_head_len + body_len,**含扩展头**)
16// offset 22: szReserved[8]
17// offset 30: nExHeadLen u16 BE (紧接 reserved 后,**不是** reserved 的一部分)
18// Total: 32 bytes
19//
20// 收到的数据布局:
21// [32 字节 header] [ex_head_len 字节扩展头 FtnnExtendHead] [真实 body body_len 字节]
22// decoder 会剥掉扩展头,只把真实 body 暴露给上层。
23
24use crate::ftlogin_wire;
25use bytes::{Buf, BufMut, BytesMut};
26use tokio_util::codec::{Decoder, Encoder};
27
28/// 内部帧头大小
29pub const NN_HEADER_SIZE: usize = ftlogin_wire::HEADER_LEN;
30
31/// 协议版本 (C++ 实际使用 39,非注释中的 30)
32pub const NN_PROTO_VER: u8 = ftlogin_wire::PROTO_VERSION;
33
34/// Magic bytes
35pub const MAGIC: [u8; 2] = ftlogin_wire::MAGIC;
36
37/// 内部协议帧头
38#[derive(Debug, Clone)]
39pub struct NNHeader {
40 pub proto_ver: u8,
41 pub client_type: u8,
42 pub client_ver: u16,
43 pub lang_id: u8,
44 pub user_id: u32,
45 /// C++ `ProtocolHeader::flags_.push_`.
46 pub is_push: bool,
47 /// C++ `ProtocolHeader::flags_.compress_`.
48 pub is_compressed: bool,
49 pub serial_no: u32,
50 pub cmd_id: u16,
51 /// **真实 body 长度**(不含 ex_head)—— 对齐 C++ `GetBodyLen()` 语义
52 pub body_len: u32,
53 /// reserved 8 字节,行情命令用 [0]=market_type [1]=ex_type
54 /// 注意:C++ reserved 只有 8 字节,后 2 字节是 ex_head_len
55 pub reserved: [u8; 8],
56 /// 扩展头长度(FtnnExtendHead protobuf 序列化后字节数)
57 pub ex_head_len: u16,
58}
59
60impl NNHeader {
61 pub fn new(cmd_id: u16, serial_no: u32) -> Self {
62 Self {
63 proto_ver: NN_PROTO_VER,
64 client_type: 0,
65 client_ver: 0,
66 lang_id: 0,
67 user_id: 0,
68 is_push: false,
69 is_compressed: false,
70 serial_no,
71 cmd_id,
72 body_len: 0,
73 reserved: [0u8; 8],
74 ex_head_len: 0,
75 }
76 }
77
78 /// 编码到字节流。`body_len` 字段按**真实 body 长度**传入(不含 ex_head),
79 /// 实际上线时我们 **不发送 ex_head**(ex_head_len=0),所以 wire 上的
80 /// `ex_head_body_len = body_len`。
81 pub fn encode(&self, dst: &mut BytesMut) {
82 let wire = ftlogin_wire::ProtocolHeader {
83 proto_version: self.proto_ver,
84 client_type: self.client_type,
85 client_version: self.client_ver,
86 language: self.lang_id,
87 user_id: self.user_id,
88 is_push: self.is_push,
89 is_compressed: self.is_compressed,
90 serial_num: self.serial_no,
91 cmd: self.cmd_id,
92 ex_head_body_len: self.ex_head_len as u32 + self.body_len,
93 reserved: self.reserved,
94 ex_head_len: self.ex_head_len,
95 };
96 dst.reserve(NN_HEADER_SIZE);
97 dst.put_slice(&wire.encode());
98 }
99
100 pub fn decode(src: &BytesMut) -> Result<Option<Self>, futu_core::error::FutuError> {
101 if src.len() < NN_HEADER_SIZE {
102 return Ok(None);
103 }
104
105 let wire = ftlogin_wire::ProtocolHeader::decode(&src[..NN_HEADER_SIZE])?;
106 let total_len = wire.frame_len()?;
107 if total_len > ftlogin_wire::MAX_PACKAGE_SIZE {
108 return Err(futu_core::error::FutuError::Codec(format!(
109 "FTLogin frame package too large: actual={total_len} max={}",
110 ftlogin_wire::MAX_PACKAGE_SIZE
111 )));
112 }
113 let body_len = wire.body_len()? as u32;
114
115 Ok(Some(Self {
116 proto_ver: wire.proto_version,
117 client_type: wire.client_type,
118 client_ver: wire.client_version,
119 lang_id: wire.language,
120 user_id: wire.user_id,
121 is_push: wire.is_push,
122 is_compressed: wire.is_compressed,
123 serial_no: wire.serial_num,
124 cmd_id: wire.cmd,
125 body_len,
126 reserved: wire.reserved,
127 ex_head_len: wire.ex_head_len,
128 }))
129 }
130
131 #[inline]
132 pub fn is_compressed(&self) -> bool {
133 self.is_compressed
134 }
135
136 #[inline]
137 pub fn flags(&self) -> u8 {
138 u8::from(self.is_push) | (u8::from(self.is_compressed) << 1)
139 }
140}
141
142/// 内部协议帧
143#[derive(Debug, Clone)]
144pub struct NNFrame {
145 pub header: NNHeader,
146 pub body: bytes::Bytes,
147 /// 服务端下发的扩展头原始字节(FtnnExtendHead protobuf)
148 /// 关键用途:body 为空时,服务端通常把业务错误塞在 ex_head.err_info 里
149 pub ex_head: bytes::Bytes,
150}
151
152/// 对齐 C++ `FTConnExtHead.proto` 的 `ErrorInfo`(field 5 of FtnnExtendHead)。
153#[derive(Debug, Clone, Default)]
154pub struct ExHeadErrorInfo {
155 /// 对齐 `CmdResult` enum:0=成功,1=失败
156 pub cmd_result: i32,
157 pub source: String,
158 pub code: i32,
159 pub message: String,
160}
161
162impl NNFrame {
163 /// 从 ex_head 里提取 err_info(field 5 = ErrorInfo nested message)
164 /// 如果 ex_head 为空或没 err_info 字段,返回 None。
165 pub fn parse_ex_head_error(&self) -> Option<ExHeadErrorInfo> {
166 if self.ex_head.is_empty() {
167 return None;
168 }
169 parse_err_info_from_ex_head(&self.ex_head)
170 }
171}
172
173/// 从 FtnnExtendHead protobuf 字节里提取 field 5 (ErrorInfo)。
174/// 手写 protobuf 解析,避免依赖 prost 生成代码。
175fn parse_err_info_from_ex_head(data: &[u8]) -> Option<ExHeadErrorInfo> {
176 let mut pos = 0;
177 // 遍历 FtnnExtendHead 的字段,找 field 5
178 while pos < data.len() {
179 let (tag, new_pos) = decode_varint(data, pos)?;
180 pos = new_pos;
181 let field_num = (tag >> 3) as u32;
182 let wire_type = (tag & 0x07) as u8;
183
184 if wire_type == 2 {
185 let (len, new_pos) = decode_varint(data, pos)?;
186 pos = new_pos;
187 let len = len as usize;
188 if pos + len > data.len() {
189 return None;
190 }
191 if field_num == 5 {
192 // ErrorInfo 子消息
193 return Some(decode_error_info(&data[pos..pos + len]));
194 }
195 pos += len;
196 } else if wire_type == 0 {
197 let (_v, new_pos) = decode_varint(data, pos)?;
198 pos = new_pos;
199 } else if wire_type == 1 {
200 pos += 8;
201 } else if wire_type == 5 {
202 pos += 4;
203 } else {
204 return None;
205 }
206 }
207 None
208}
209
210fn decode_error_info(data: &[u8]) -> ExHeadErrorInfo {
211 let mut info = ExHeadErrorInfo::default();
212 let mut pos = 0;
213 while pos < data.len() {
214 let Some((tag, new_pos)) = decode_varint(data, pos) else {
215 break;
216 };
217 pos = new_pos;
218 let field_num = (tag >> 3) as u32;
219 let wire_type = (tag & 0x07) as u8;
220
221 match (field_num, wire_type) {
222 (1, 0) => {
223 // cmd_result (int enum)
224 if let Some((v, p)) = decode_varint(data, pos) {
225 info.cmd_result = v as i32;
226 pos = p;
227 } else {
228 break;
229 }
230 }
231 (3, 0) => {
232 // code (int32)
233 if let Some((v, p)) = decode_varint(data, pos) {
234 info.code = v as i32;
235 pos = p;
236 } else {
237 break;
238 }
239 }
240 (2, 2) | (4, 2) => {
241 // source (field 2) / message (field 4)
242 let Some((len, new_pos)) = decode_varint(data, pos) else {
243 break;
244 };
245 pos = new_pos;
246 let end = pos + len as usize;
247 if end > data.len() {
248 break;
249 }
250 let s = String::from_utf8_lossy(&data[pos..end]).to_string();
251 if field_num == 2 {
252 info.source = s;
253 } else {
254 info.message = s;
255 }
256 pos = end;
257 }
258 (_, 0) => {
259 let Some((_v, p)) = decode_varint(data, pos) else {
260 break;
261 };
262 pos = p;
263 }
264 (_, 2) => {
265 let Some((len, p)) = decode_varint(data, pos) else {
266 break;
267 };
268 pos = p + len as usize;
269 }
270 (_, 1) => pos += 8,
271 (_, 5) => pos += 4,
272 _ => break,
273 }
274 }
275 info
276}
277
278fn decode_varint(data: &[u8], start: usize) -> Option<(u64, usize)> {
279 let mut result: u64 = 0;
280 let mut shift = 0;
281 let mut pos = start;
282 loop {
283 if pos >= data.len() {
284 return None;
285 }
286 let byte = data[pos];
287 result |= ((byte & 0x7F) as u64) << shift;
288 pos += 1;
289 if byte & 0x80 == 0 {
290 return Some((result, pos));
291 }
292 shift += 7;
293 if shift >= 64 {
294 return None;
295 }
296 }
297}
298
299/// 内部协议编解码器
300pub struct NNCodec;
301
302impl Decoder for NNCodec {
303 type Item = NNFrame;
304 type Error = futu_core::error::FutuError;
305
306 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
307 let header = match NNHeader::decode(src)? {
308 Some(h) => h,
309 None => return Ok(None),
310 };
311
312 // 线上布局:[32 header][ex_head_len 扩展头][body_len 真实 body]
313 let ex_head_body_len = (header.ex_head_len as usize)
314 .checked_add(header.body_len as usize)
315 .ok_or_else(|| futu_core::error::FutuError::Codec("NN frame length overflow".into()))?;
316 let total = NN_HEADER_SIZE
317 .checked_add(ex_head_body_len)
318 .ok_or_else(|| futu_core::error::FutuError::Codec("NN frame length overflow".into()))?;
319 if total > ftlogin_wire::MAX_PACKAGE_SIZE {
320 return Err(futu_core::error::FutuError::Codec(format!(
321 "FTLogin frame package too large: actual={total} max={}",
322 ftlogin_wire::MAX_PACKAGE_SIZE
323 )));
324 }
325 if src.len() < total {
326 src.reserve(total - src.len());
327 return Ok(None);
328 }
329
330 src.advance(NN_HEADER_SIZE);
331 // 保留扩展头字节 —— NNFrame::parse_ex_head_error 可以读 err_info
332 let ex_head = if header.ex_head_len > 0 {
333 src.split_to(header.ex_head_len as usize).freeze()
334 } else {
335 bytes::Bytes::new()
336 };
337 // Do not decompress here. C++ `channel_impl.cpp:1873-1900` decrypts
338 // encrypted command bodies first, then applies `IsDataCompressed()`.
339 // `NNCodec` owns only frame slicing; `BackendConn` owns the body pipeline.
340 let body = src.split_to(header.body_len as usize).freeze();
341
342 Ok(Some(NNFrame {
343 header,
344 body,
345 ex_head,
346 }))
347 }
348}
349
350impl Encoder<NNFrame> for NNCodec {
351 type Error = futu_core::error::FutuError;
352
353 fn encode(&mut self, item: NNFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
354 let mut header = item.header;
355 header.body_len = item.body.len() as u32;
356 header.encode(dst);
357 dst.extend_from_slice(&item.body);
358 Ok(())
359 }
360}
361
362/// 判断是否为登录命令(登录命令不加密)
363pub fn is_login_cmd(cmd_id: u16) -> bool {
364 matches!(cmd_id, 1001 | 6001 | 2001 | 4001)
365}
366
367/// 判断是否为不加密的命令(行情命令等)
368/// 对应 C++ gs_arrUnencryptedProtoCmd
369pub fn is_unencrypted_proto(cmd_id: u16) -> bool {
370 matches!(
371 cmd_id,
372 1306 | // Pull_ErrorCode
373 1316 | // Ping
374 1321 | // Pull_ConnIp (获取连接点列表)
375 20147 | // Broker UpdateConnIp (FTLogin kCmdUpdateConnIpBroker, no encrypt)
376 5115 | // 自选股列表更新
377 5120 | // Pull_USGroupSecList
378 5121 | // Pull_USGroupInfo
379 6682 | // ModifyUSGroupSecList (修改自选股)
380 6032 | // Qot_Request_Right
381 6128 | // Qot_Pull_Ticker
382 6160 | // Qot_Pull_TimeShare
383 6161 | // Qot_Pull_Kline
384 6211 | // Qot_Push_Sub
385 6212 | // Qot_Push_Push
386 6301 | // Qot_Push_EventNotice
387 6304 | // Qot_ReSub
388 6311 | // Qot_Pull_OptionStrikeDate
389 6337 | // Qot_Pull_FutureInfo
390 6365 | // Qot_Pull_TickerStatistics
391 6366 | // Qot_Pull_TickerStatisticsDetail
392 6371 | // Qot_Pull_TenBrokerHold
393 6379 | // Qot_Pull_HK_DailyShortVolume
394 6387 | // Qot_Pull_BonusInfo
395 6389 | // Qot_Pull_US_ShortInterest
396 6398 | // Qot_Pull_MainIncome (financials revenue breakdown)
397 6503 | // Qot_Pull_SpreadInfo
398 6513 | // Qot_Pull_Warrant
399 6600 | // Qot_Pull_PlateOrSetID
400 6608 | // Qot_Pull_SecOwnerPlate
401 6621 | // Qot_Pull_BrokerMonitor
402 6693 | // Qot_Pull_CapitalDistribution
403 6694 | // Qot_Pull_CapitalFlow
404 6695 | // Qot_Pull_HistoryCapitalFlow
405 6699 | // Qot_Pull_US_DailyShortVolume
406 6701 | // Qot_Pull_FutureRelated
407 6731 | // Qot_Pull_HK_ShortInterest
408 6733 | // Pull_TradeDate
409 6735 | // Qot_Pull_OptionStrategySpread
410 6736 | // Qot_Pull_OptionChain
411 6745 | // Qot_SecListCheckSumDiffReport
412 6746 | // Qot_Update_SecList
413 6747 | // Qot_Pull_StaticInfo
414 6763 | // Qot_Pull_HoldingPeriod
415 6795 | // Qot_Pull_ShareholdersOverview
416 6796 | // Qot_Pull_ShareholdersInstitutional
417 6797 | // Qot_Pull_ShareholdersHolderDetail
418 6801 | 6802 | 6804 | // 股价提醒
419 6803 | // Push_PriceReminderChange
420 6808 | // Pull_PriceReminder
421 6809 | // Set_PriceReminder
422 6811 | // Qot_Pull_Rehab
423 6822 | // Qot_Push_EventNotice_Sub
424 6823 | // Qot_Pull_EventNotice
425 6824 | // Qot_Pull_SubData
426 6825 | // Qot_Pull_MarketState
427 6835 | // Qot_Pull_StockSplit
428 6952 | // Qot_Pull_CompanyExecutives
429 6953 | // Qot_Pull_CompanyExecutiveBackground
430 6956 | // Qot_Pull_IpoList
431 6957 | // Qot_Pull_IpoDetail
432 6964 | // Qot_Pull_CompanyProfile
433 6968 | // Qot_Pull_SecCapTrack (top-ten buy/sell brokers realtime)
434 8017 | // 牛牛圈新消息push
435 9008 | // Qot_Pull_OptionVolatility
436 9406 | // Qot_Pull_ShareholdersHoldingChanges
437 9410 | // Qot_Pull_Buyback
438 9415 | // Qot_Pull_HKStockSplit
439 9421 | // Qot_Pull_OptionExerciseProbability
440 9435 | // Qot_Pull_ETFProfile
441 18004 | // Qot_Pull_FinancialData
442 18005 | // Qot_Pull_FinancialStructure
443 18008 | // Qot_Pull_BrokerInfo
444 18012 | // Qot_Pull_GetLv2RelatedExchange (v1.4.110 codex QOT Phase 4 Slice 7)
445 20057 | // Qot_Pull_TargetEstimateInfo
446 20067 | // Qot_Pull_ConsensusRecommendationsInfo
447 20231 | // Qot_Pull_GetRiskFreeRate
448 20234 | // Qot_Pull_RatingSummary
449 20237 | // Qot_Pull_AnalystStockRating
450 20239 | // Qot_Pull_InstStockRating
451 20351 | // Qot_Pull_OwnerInsiderStatistics
452 20352 | // Qot_Pull_OwnerInsiderHolderList
453 20353 | // Qot_Pull_OwnerInsiderTradeList
454 20556 | // Qot_Pull_OptionScreen
455 20584 | // Qot_Pull_MorningstarResearch
456 20585 | // Qot_Pull_MorningstarSimpleRating
457 20675 | // Qot_Pull_ChainStatistic
458 21122 | // Qot_Pull_FinancialsEarningsPriceMove
459 21146 | // Qot_Pull_FinancialsEarningsPriceHistory
460 21242 | // Qot_Pull_WarrantScreen
461 65507 | // FTLogin WebTCP kCmdWebRequest
462 65509 | // FTLogin WebTCP kCmdSiteConfigRequest
463 20106 | // Qot_Pull_GetSecuritiesInfo
464 20287 | // ConfirmLogPush
465 20334 // Qot_New_Pull_Warrant
466 )
467}
468
469/// 判断命令是否需要跳过加密(登录命令或未加密行情命令)
470pub fn should_skip_encryption(cmd_id: u16) -> bool {
471 is_login_cmd(cmd_id) || is_unencrypted_proto(cmd_id)
472}
473
474#[cfg(test)]
475mod tests;