1use bytes::{Buf, BytesMut};
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::frame::FutuFrame;
5use crate::header::{FutuHeader, HEADER_SIZE};
6
7pub struct FutuCodec;
12
13impl Decoder for FutuCodec {
14 type Item = FutuFrame;
15 type Error = futu_core::error::FutuError;
16
17 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
18 let header = match FutuHeader::peek(src)? {
20 Some(h) => h,
21 None => return Ok(None),
22 };
23
24 let total_len = HEADER_SIZE + header.body_len as usize;
25
26 if src.len() < total_len {
28 src.reserve(total_len - src.len());
29 return Ok(None);
30 }
31
32 src.advance(HEADER_SIZE);
34
35 let body = src.split_to(header.body_len as usize).freeze();
37
38 let frame = FutuFrame { header, body };
39
40 Ok(Some(frame))
41 }
42}
43
44impl Encoder<FutuFrame> for FutuCodec {
45 type Error = futu_core::error::FutuError;
46
47 fn encode(&mut self, item: FutuFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
48 item.header.encode(dst);
49 dst.extend_from_slice(&item.body);
50 Ok(())
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57 use bytes::Bytes;
58
59 #[test]
60 fn test_codec_roundtrip() {
61 let mut codec = FutuCodec;
62 let body = Bytes::from_static(b"test payload");
63 let frame = FutuFrame::new(1001, 1, body.clone());
64
65 let mut buf = BytesMut::new();
67 codec.encode(frame, &mut buf).unwrap();
68
69 let decoded = codec.decode(&mut buf).unwrap().unwrap();
71 assert_eq!(decoded.header.proto_id, 1001);
72 assert_eq!(decoded.header.serial_no, 1);
73 assert_eq!(decoded.body, body);
74 assert!(decoded.verify_sha1());
75 }
76
77 #[test]
78 fn test_codec_partial_data() {
79 let mut codec = FutuCodec;
80 let body = Bytes::from_static(b"test payload");
81 let frame = FutuFrame::new(1001, 1, body);
82
83 let mut buf = BytesMut::new();
84 codec.encode(frame, &mut buf).unwrap();
85
86 let mut partial = buf.split_to(HEADER_SIZE + 2);
88 assert!(codec.decode(&mut partial).unwrap().is_none());
89 }
90
91 #[test]
92 fn test_codec_multiple_frames() {
93 let mut codec = FutuCodec;
94
95 let frame1 = FutuFrame::new(1001, 1, Bytes::from_static(b"frame1"));
96 let frame2 = FutuFrame::new(3001, 2, Bytes::from_static(b"frame2"));
97
98 let mut buf = BytesMut::new();
99 codec.encode(frame1, &mut buf).unwrap();
100 codec.encode(frame2, &mut buf).unwrap();
101
102 let decoded1 = codec.decode(&mut buf).unwrap().unwrap();
103 assert_eq!(decoded1.header.proto_id, 1001);
104 assert_eq!(decoded1.body.as_ref(), b"frame1");
105
106 let decoded2 = codec.decode(&mut buf).unwrap().unwrap();
107 assert_eq!(decoded2.header.proto_id, 3001);
108 assert_eq!(decoded2.body.as_ref(), b"frame2");
109
110 assert!(codec.decode(&mut buf).unwrap().is_none());
112 }
113}