1use bytes::Bytes;
2use futures::stream::SplitSink;
3use futures::{SinkExt, StreamExt};
4use tokio::net::TcpStream;
5use tokio_util::codec::Framed;
6
7use futu_codec::frame::FutuFrame;
8use futu_codec::FutuCodec;
9use futu_core::error::FutuError;
10
11pub struct Connection {
16 sink: SplitSink<Framed<TcpStream, FutuCodec>, FutuFrame>,
17 stream: futures::stream::SplitStream<Framed<TcpStream, FutuCodec>>,
18}
19
20impl Connection {
21 pub async fn connect(addr: &str) -> Result<Self, FutuError> {
23 let stream = TcpStream::connect(addr).await?;
24 stream.set_nodelay(true)?;
25 tracing::info!(addr = addr, "TCP connected");
26
27 let framed = Framed::new(stream, FutuCodec);
28 let (sink, stream) = framed.split();
29
30 Ok(Self { sink, stream })
31 }
32
33 pub async fn send(&mut self, frame: FutuFrame) -> Result<(), FutuError> {
35 self.sink.send(frame).await
36 }
37
38 pub async fn recv(&mut self) -> Result<Option<FutuFrame>, FutuError> {
42 match self.stream.next().await {
43 Some(Ok(frame)) => Ok(Some(frame)),
44 Some(Err(e)) => Err(e),
45 None => Ok(None),
46 }
47 }
48
49 pub fn build_frame(proto_id: u32, serial_no: u32, body: Vec<u8>) -> FutuFrame {
51 FutuFrame::new(proto_id, serial_no, Bytes::from(body))
52 }
53}