futu_net/
connection.rs

1use bytes::Bytes;
2use futures::stream::SplitSink;
3use futures::{SinkExt, StreamExt};
4use tokio::net::TcpStream;
5use tokio_util::codec::Framed;
6
7use futu_codec::frame::FutuFrame;
8use futu_codec::FutuCodec;
9use futu_core::error::FutuError;
10
11/// 底层 TCP 连接封装
12///
13/// 基于 tokio TcpStream + FutuCodec 的 Framed 连接,
14/// 提供异步发送/接收 FutuFrame 的能力。
15pub struct Connection {
16    sink: SplitSink<Framed<TcpStream, FutuCodec>, FutuFrame>,
17    stream: futures::stream::SplitStream<Framed<TcpStream, FutuCodec>>,
18}
19
20impl Connection {
21    /// 建立 TCP 连接
22    pub async fn connect(addr: &str) -> Result<Self, FutuError> {
23        let stream = TcpStream::connect(addr).await?;
24        stream.set_nodelay(true)?;
25        tracing::info!(addr = addr, "TCP connected");
26
27        let framed = Framed::new(stream, FutuCodec);
28        let (sink, stream) = framed.split();
29
30        Ok(Self { sink, stream })
31    }
32
33    /// 发送一个协议帧
34    pub async fn send(&mut self, frame: FutuFrame) -> Result<(), FutuError> {
35        self.sink.send(frame).await
36    }
37
38    /// 接收下一个协议帧
39    ///
40    /// 返回 None 表示连接已关闭。
41    pub async fn recv(&mut self) -> Result<Option<FutuFrame>, FutuError> {
42        match self.stream.next().await {
43            Some(Ok(frame)) => Ok(Some(frame)),
44            Some(Err(e)) => Err(e),
45            None => Ok(None),
46        }
47    }
48
49    /// 发送原始 protobuf 数据,自动构建 FutuFrame
50    pub fn build_frame(proto_id: u32, serial_no: u32, body: Vec<u8>) -> FutuFrame {
51        FutuFrame::new(proto_id, serial_no, Bytes::from(body))
52    }
53}