Skip to main content

futu_backend/
ftlogin_wire.rs

1//! FTLogin/F3CLogin channel wire layer.
2//!
3//! C++ Futu_OpenD receives already-unwrapped protobuf bodies from
4//! F3CLogin.framework. This Rust daemon does not link that framework, so this
5//! module implements the equivalent inbound wire layer from FTLogin source.
6//!
7//! Hardcoded / assumption ledger:
8//! - Header layout and protocol version are protocol constants from
9//!   `FTLogin/Src/ftlogin/channel/impl/protocol_header.h`
10//!   and `protocol_header.cpp` (`kProtoVersion = 39`).
11//!   We emit v39, but inbound non-v39 frames are decoded with the legacy byte-11
12//!   `cmd_type` interpretation used by the pre-v39 Rust decoder. This prevents a
13//!   broker-side mixed-version response from tearing down the whole channel.
14//! - Compressed bodies reserve the first 4 body bytes for BE uncompressed size,
15//!   then carry an LZ4 raw block. Ref:
16//!   `FTLogin/Src/ftlogin/channel/impl/channel_impl.cpp`
17//!   (`kBodyFrontReservedBytes = 4`, `LZ4_decompress_safe`).
18//! - Decompressed length limit is 12 MiB per the same C++ file
19//!   (`kDecompressedDataLengthLimit = 12 * 1024 * 1024`).
20
21use futu_core::error::FutuError;
22
23pub const MAGIC: [u8; 2] = [b'F', b'T'];
24pub const PROTO_VERSION: u8 = 39;
25pub const HEADER_LEN: usize = 32;
26pub const RESERVED_LEN: usize = 8;
27pub const BODY_FRONT_RESERVED_BYTES: usize = 4;
28/// C++ `FTLogin/Src/ftlogin/channel/impl/channel_impl.h:91`
29/// `kMaxPackageSize = 12 * 1024 * 1024`, enforced in `decoder.cpp:54-61`
30/// before waiting for a sticky package body.
31pub const MAX_PACKAGE_SIZE: usize = 12 * 1024 * 1024;
32pub const DECOMPRESSED_DATA_LEN_LIMIT: usize = 12 * 1024 * 1024;
33
34const FLAG_PUSH: u8 = 0b0000_0001;
35const FLAG_COMPRESS: u8 = 0b0000_0010;
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ProtocolHeader {
39    pub proto_version: u8,
40    pub client_type: u8,
41    pub client_version: u16,
42    pub language: u8,
43    pub user_id: u32,
44    pub is_push: bool,
45    pub is_compressed: bool,
46    pub serial_num: u32,
47    pub cmd: u16,
48    pub ex_head_body_len: u32,
49    pub reserved: [u8; RESERVED_LEN],
50    pub ex_head_len: u16,
51}
52
53impl ProtocolHeader {
54    pub fn decode(bytes: &[u8]) -> Result<Self, FutuError> {
55        if bytes.len() < HEADER_LEN {
56            return Err(FutuError::Codec(format!(
57                "FTLogin header too short: actual={} expected={HEADER_LEN}",
58                bytes.len()
59            )));
60        }
61        if bytes[0..2] != MAGIC {
62            return Err(FutuError::InvalidHeader);
63        }
64        let proto_version = bytes[2];
65        let flags = bytes[11];
66        let ex_head_body_len = read_be_u32(bytes, 18, "ex_head_body_len")?;
67        let ex_head_len = read_be_u16(bytes, 30, "ex_head_len")?;
68        if u32::from(ex_head_len) > ex_head_body_len {
69            return Err(FutuError::Codec(format!(
70                "FTLogin header ex_head_len > ex_head_body_len: ex_head_len={ex_head_len} ex_head_body_len={ex_head_body_len}"
71            )));
72        }
73        let cmd = read_be_u16(bytes, 16, "cmd")?;
74        let (is_push, is_compressed) = if proto_version == PROTO_VERSION {
75            (flags & FLAG_PUSH != 0, flags & FLAG_COMPRESS != 0)
76        } else {
77            // v1.4.107 tolerated older NN-style frames on broker channels because
78            // byte 11 was treated as cmd_type (1 = push, other = reply). Keep that
79            // compatibility while still decoding v39 flags for compressed pushes.
80            tracing::warn!(
81                actual = proto_version,
82                expected = PROTO_VERSION,
83                cmd,
84                "FTLogin protocol version mismatch; falling back to legacy cmd_type semantics"
85            );
86            (flags == FLAG_PUSH, false)
87        };
88
89        Ok(Self {
90            proto_version,
91            client_type: bytes[3],
92            client_version: read_be_u16(bytes, 4, "client_version")?,
93            language: bytes[6],
94            user_id: read_be_u32(bytes, 7, "user_id")?,
95            is_push,
96            is_compressed,
97            serial_num: read_be_u32(bytes, 12, "serial_num")?,
98            cmd,
99            ex_head_body_len,
100            reserved: read_exact::<RESERVED_LEN>(bytes, 22, "reserved")?,
101            ex_head_len,
102        })
103    }
104
105    pub fn encode(&self) -> [u8; HEADER_LEN] {
106        let mut out = [0u8; HEADER_LEN];
107        out[0..2].copy_from_slice(&MAGIC);
108        out[2] = self.proto_version;
109        out[3] = self.client_type;
110        out[4..6].copy_from_slice(&self.client_version.to_be_bytes());
111        out[6] = self.language;
112        out[7..11].copy_from_slice(&self.user_id.to_be_bytes());
113        out[11] = encode_flags(self.is_push, self.is_compressed);
114        out[12..16].copy_from_slice(&self.serial_num.to_be_bytes());
115        out[16..18].copy_from_slice(&self.cmd.to_be_bytes());
116        out[18..22].copy_from_slice(&self.ex_head_body_len.to_be_bytes());
117        out[22..30].copy_from_slice(&self.reserved);
118        out[30..32].copy_from_slice(&self.ex_head_len.to_be_bytes());
119        out
120    }
121
122    pub fn flags(&self) -> u8 {
123        encode_flags(self.is_push, self.is_compressed)
124    }
125
126    pub fn body_len(&self) -> Result<usize, FutuError> {
127        let body_len = self
128            .ex_head_body_len
129            .checked_sub(u32::from(self.ex_head_len))
130            .ok_or_else(|| {
131                FutuError::Codec(format!(
132                    "FTLogin header ex_head_len > ex_head_body_len: ex_head_len={} ex_head_body_len={}",
133                    self.ex_head_len, self.ex_head_body_len
134                ))
135            })?;
136        usize::try_from(body_len)
137            .map_err(|_| FutuError::Codec("FTLogin body length too large".into()))
138    }
139
140    pub fn frame_len(&self) -> Result<usize, FutuError> {
141        HEADER_LEN
142            .checked_add(self.ex_head_body_len as usize)
143            .ok_or_else(|| FutuError::Codec("FTLogin frame length overflow".into()))
144    }
145}
146
147pub fn decode_inbound_body(is_compressed: bool, raw_body: &[u8]) -> Result<Vec<u8>, FutuError> {
148    if !is_compressed {
149        return Ok(raw_body.to_vec());
150    }
151
152    let original_len = compressed_original_len(raw_body)?;
153    decompress_lz4_block(&raw_body[BODY_FRONT_RESERVED_BYTES..], original_len)
154}
155
156pub fn compressed_original_len(body: &[u8]) -> Result<usize, FutuError> {
157    if body.len() <= BODY_FRONT_RESERVED_BYTES {
158        return Err(FutuError::Codec(format!(
159            "FTLogin compressed body too short: actual={} min={}",
160            body.len(),
161            BODY_FRONT_RESERVED_BYTES + 1
162        )));
163    }
164
165    let len = read_be_u32(body, 0, "compressed_original_len")? as usize;
166    if len > DECOMPRESSED_DATA_LEN_LIMIT {
167        return Err(FutuError::Codec(format!(
168            "FTLogin compressed body declares oversized decompressed len: actual={len} max={DECOMPRESSED_DATA_LEN_LIMIT}"
169        )));
170    }
171    Ok(len)
172}
173
174/// LZ4 raw block decompression compatible with C++ `LZ4_decompress_safe`.
175pub fn decompress_lz4_block(compressed: &[u8], original_len: usize) -> Result<Vec<u8>, FutuError> {
176    let mut input_pos = 0usize;
177    let mut output = Vec::with_capacity(original_len);
178
179    while input_pos < compressed.len() {
180        let token = compressed[input_pos];
181        input_pos += 1;
182
183        let literal_len = read_extended_len(compressed, &mut input_pos, usize::from(token >> 4))?;
184        let literal_end = input_pos
185            .checked_add(literal_len)
186            .ok_or_else(|| FutuError::Codec("LZ4 literal length overflow".into()))?;
187        if literal_end > compressed.len() {
188            return Err(FutuError::Codec("LZ4 literal run exceeds input".into()));
189        }
190        if output.len().saturating_add(literal_len) > original_len {
191            return Err(FutuError::Codec(
192                "LZ4 literal run exceeds output limit".into(),
193            ));
194        }
195        output.extend_from_slice(&compressed[input_pos..literal_end]);
196        input_pos = literal_end;
197
198        if input_pos == compressed.len() {
199            break;
200        }
201
202        if input_pos + 2 > compressed.len() {
203            return Err(FutuError::Codec("LZ4 missing match offset".into()));
204        }
205        let offset = usize::from(u16::from_le_bytes([
206            compressed[input_pos],
207            compressed[input_pos + 1],
208        ]));
209        input_pos += 2;
210        if offset == 0 || offset > output.len() {
211            return Err(FutuError::Codec(format!(
212                "LZ4 invalid match offset: {offset}"
213            )));
214        }
215
216        let match_len = read_extended_len(compressed, &mut input_pos, usize::from(token & 0x0f))?
217            .checked_add(4)
218            .ok_or_else(|| FutuError::Codec("LZ4 match length overflow".into()))?;
219        if output.len().saturating_add(match_len) > original_len {
220            return Err(FutuError::Codec(
221                "LZ4 match run exceeds output limit".into(),
222            ));
223        }
224
225        let mut read_pos = output.len() - offset;
226        let mut copied = 0;
227        // LZ4 match run requires byte-by-byte append from a moving read_pos
228        // (allowing overlap with the just-appended bytes). Keep this as a
229        // while loop; iterator rewrites can accidentally read from the
230        // pre-append buffer only and break overlapping matches.
231        while copied < match_len {
232            let byte = output[read_pos];
233            output.push(byte);
234            read_pos += 1;
235            copied += 1;
236        }
237    }
238
239    if output.len() != original_len {
240        return Err(FutuError::Codec(format!(
241            "LZ4 decompressed length mismatch: expected={original_len} actual={}",
242            output.len()
243        )));
244    }
245    Ok(output)
246}
247
248fn read_extended_len(
249    input: &[u8],
250    input_pos: &mut usize,
251    base_len: usize,
252) -> Result<usize, FutuError> {
253    let mut len = base_len;
254    if base_len != 15 {
255        return Ok(len);
256    }
257
258    loop {
259        if *input_pos >= input.len() {
260            return Err(FutuError::Codec("LZ4 truncated extended length".into()));
261        }
262        let value = input[*input_pos];
263        *input_pos += 1;
264        len = len
265            .checked_add(usize::from(value))
266            .ok_or_else(|| FutuError::Codec("LZ4 extended length overflow".into()))?;
267        if value != 255 {
268            return Ok(len);
269        }
270    }
271}
272
273fn encode_flags(is_push: bool, is_compressed: bool) -> u8 {
274    u8::from(is_push) | (u8::from(is_compressed) << 1)
275}
276
277fn read_be_u16(bytes: &[u8], start: usize, field: &str) -> Result<u16, FutuError> {
278    Ok(u16::from_be_bytes(read_exact::<2>(bytes, start, field)?))
279}
280
281fn read_be_u32(bytes: &[u8], start: usize, field: &str) -> Result<u32, FutuError> {
282    Ok(u32::from_be_bytes(read_exact::<4>(bytes, start, field)?))
283}
284
285fn read_exact<const N: usize>(
286    bytes: &[u8],
287    start: usize,
288    field: &str,
289) -> Result<[u8; N], FutuError> {
290    let end = start
291        .checked_add(N)
292        .ok_or_else(|| FutuError::Codec(format!("FTLogin field offset overflow: {field}")))?;
293    let slice = bytes.get(start..end).ok_or_else(|| {
294        FutuError::Codec(format!(
295            "FTLogin field too short: {field} start={start} len={N} actual={}",
296            bytes.len()
297        ))
298    })?;
299    let mut out = [0u8; N];
300    out.copy_from_slice(slice);
301    Ok(out)
302}
303
304#[cfg(test)]
305mod tests;