Skip to main content

futu_net/
connection.rs

1use std::future::Future;
2use std::io::{Error as IoError, ErrorKind};
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures::stream::SplitSink;
7use futures::{SinkExt, StreamExt};
8use tokio::net::TcpStream;
9use tokio_util::codec::Framed;
10
11use futu_codec::FutuCodec;
12use futu_codec::frame::FutuFrame;
13use futu_core::error::FutuError;
14
15// Rust client-side guard, not a C++ protocol constant: this caps futucli/SDK
16// waits when the local daemon endpoint never completes TCP connect. Ref:
17// crates/futu-backend/src/conn.rs:139-148 uses the same 10s cap after Linux
18// `tcp_syn_retries=6` evidence showed the OS default can hang for ~127s.
19// Revisit if client connect timeout becomes user-configurable.
20const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
21
22/// 底层 TCP 连接封装
23///
24/// 基于 tokio TcpStream + FutuCodec 的 Framed 连接,
25/// 提供异步发送/接收 FutuFrame 的能力。
26pub struct Connection {
27    sink: SplitSink<Framed<TcpStream, FutuCodec>, FutuFrame>,
28    stream: futures::stream::SplitStream<Framed<TcpStream, FutuCodec>>,
29}
30
31impl Connection {
32    /// 建立 TCP 连接
33    pub async fn connect(addr: &str) -> Result<Self, FutuError> {
34        let stream = connect_with_timeout(addr, CONNECT_TIMEOUT, TcpStream::connect(addr)).await?;
35        configure_connected_stream(&stream)?;
36        tracing::info!(addr = addr, "TCP connected");
37
38        let framed = Framed::new(stream, FutuCodec);
39        let (sink, stream) = framed.split();
40
41        Ok(Self { sink, stream })
42    }
43
44    /// 发送一个协议帧
45    pub async fn send(&mut self, frame: FutuFrame) -> Result<(), FutuError> {
46        self.sink.send(frame).await
47    }
48
49    /// 接收下一个协议帧
50    ///
51    /// 返回 None 表示连接已关闭。
52    pub async fn recv(&mut self) -> Result<Option<FutuFrame>, FutuError> {
53        match self.stream.next().await {
54            Some(Ok(frame)) => Ok(Some(frame)),
55            Some(Err(e)) => Err(e),
56            None => Ok(None),
57        }
58    }
59
60    /// 发送原始 protobuf 数据,自动构建 FutuFrame
61    pub fn build_frame(proto_id: u32, serial_no: u32, body: Vec<u8>) -> FutuFrame {
62        FutuFrame::new(proto_id, serial_no, Bytes::from(body))
63    }
64}
65
66fn configure_connected_stream(stream: &TcpStream) -> Result<(), FutuError> {
67    stream.set_nodelay(true)?;
68    socket2::SockRef::from(stream).set_keepalive(true)?;
69    Ok(())
70}
71
72async fn connect_with_timeout<T, F>(
73    addr: &str,
74    timeout: Duration,
75    connect: F,
76) -> Result<T, FutuError>
77where
78    F: Future<Output = std::io::Result<T>>,
79{
80    match tokio::time::timeout(timeout, connect).await {
81        Ok(Ok(stream)) => Ok(stream),
82        Ok(Err(err)) => Err(FutuError::Network(err)),
83        Err(_) => Err(FutuError::Network(IoError::new(
84            ErrorKind::TimedOut,
85            format!("connect to {addr} timed out after {}s", timeout.as_secs()),
86        ))),
87    }
88}
89
90#[cfg(test)]
91mod tests;