futu_codec/
codec.rs

1use bytes::{Buf, BytesMut};
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::frame::FutuFrame;
5use crate::header::{FutuHeader, HEADER_SIZE};
6
7/// FutuOpenD 协议帧编解码器
8///
9/// 实现 tokio_util 的 Decoder/Encoder trait,
10/// 用于在 TCP 流上进行帧边界识别和编解码。
11pub struct FutuCodec;
12
13impl Decoder for FutuCodec {
14    type Item = FutuFrame;
15    type Error = futu_core::error::FutuError;
16
17    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
18        // 尝试解析帧头(不消费)
19        let header = match FutuHeader::peek(src)? {
20            Some(h) => h,
21            None => return Ok(None),
22        };
23
24        let total_len = HEADER_SIZE + header.body_len as usize;
25
26        // 检查是否有足够的数据
27        if src.len() < total_len {
28            src.reserve(total_len - src.len());
29            return Ok(None);
30        }
31
32        // 消费帧头
33        src.advance(HEADER_SIZE);
34
35        // 提取 body
36        let body = src.split_to(header.body_len as usize).freeze();
37
38        let frame = FutuFrame { header, body };
39
40        Ok(Some(frame))
41    }
42}
43
44impl Encoder<FutuFrame> for FutuCodec {
45    type Error = futu_core::error::FutuError;
46
47    fn encode(&mut self, item: FutuFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
48        item.header.encode(dst);
49        dst.extend_from_slice(&item.body);
50        Ok(())
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use bytes::Bytes;
58
59    #[test]
60    fn test_codec_roundtrip() {
61        let mut codec = FutuCodec;
62        let body = Bytes::from_static(b"test payload");
63        let frame = FutuFrame::new(1001, 1, body.clone());
64
65        // Encode
66        let mut buf = BytesMut::new();
67        codec.encode(frame, &mut buf).unwrap();
68
69        // Decode
70        let decoded = codec.decode(&mut buf).unwrap().unwrap();
71        assert_eq!(decoded.header.proto_id, 1001);
72        assert_eq!(decoded.header.serial_no, 1);
73        assert_eq!(decoded.body, body);
74        assert!(decoded.verify_sha1());
75    }
76
77    #[test]
78    fn test_codec_partial_data() {
79        let mut codec = FutuCodec;
80        let body = Bytes::from_static(b"test payload");
81        let frame = FutuFrame::new(1001, 1, body);
82
83        let mut buf = BytesMut::new();
84        codec.encode(frame, &mut buf).unwrap();
85
86        // 只提供部分数据
87        let mut partial = buf.split_to(HEADER_SIZE + 2);
88        assert!(codec.decode(&mut partial).unwrap().is_none());
89    }
90
91    #[test]
92    fn test_codec_multiple_frames() {
93        let mut codec = FutuCodec;
94
95        let frame1 = FutuFrame::new(1001, 1, Bytes::from_static(b"frame1"));
96        let frame2 = FutuFrame::new(3001, 2, Bytes::from_static(b"frame2"));
97
98        let mut buf = BytesMut::new();
99        codec.encode(frame1, &mut buf).unwrap();
100        codec.encode(frame2, &mut buf).unwrap();
101
102        let decoded1 = codec.decode(&mut buf).unwrap().unwrap();
103        assert_eq!(decoded1.header.proto_id, 1001);
104        assert_eq!(decoded1.body.as_ref(), b"frame1");
105
106        let decoded2 = codec.decode(&mut buf).unwrap().unwrap();
107        assert_eq!(decoded2.header.proto_id, 3001);
108        assert_eq!(decoded2.body.as_ref(), b"frame2");
109
110        // 没有更多数据
111        assert!(codec.decode(&mut buf).unwrap().is_none());
112    }
113}