1use crate::conn::{BackendConn, PushCallback};
8use futu_core::error::{FutuError, Result};
9use rustls_pki_types::ServerName;
10use std::net::{IpAddr, SocketAddr};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::net::TcpSocket;
14use tokio_rustls::TlsConnector;
15use tokio_rustls::rustls::{ClientConfig, RootCertStore};
16
17const WEB_REQUEST_CMD: u16 = 65507;
18const WEBTCP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
21const WEBTCP_TLS_TIMEOUT: Duration = Duration::from_secs(10);
25const WEBTCP_SEND_BUFFER_SIZE: u32 = 64 * 1024;
27const WEBTCP_RECV_BUFFER_SIZE: u32 = 512 * 1024;
29static RUSTLS_PROVIDER_INIT: std::sync::Once = std::sync::Once::new();
30
31pub fn install_default_rustls_crypto_provider() {
35 RUSTLS_PROVIDER_INIT.call_once(|| {
36 if tokio_rustls::rustls::crypto::ring::default_provider()
37 .install_default()
38 .is_err()
39 {
40 tracing::debug!(
41 "rustls crypto provider already installed; keeping existing process-level provider"
42 );
43 }
44 });
45}
46
47#[derive(Debug)]
48pub(super) struct ProtoError(String);
49
50impl std::fmt::Display for ProtoError {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 self.0.fmt(f)
53 }
54}
55
56impl std::error::Error for ProtoError {}
57
58pub(super) type ProtoResult<T> = std::result::Result<T, ProtoError>;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub(super) enum WireType {
62 Varint = 0,
63 Fixed64 = 1,
64 LengthDelimited = 2,
65 Fixed32 = 5,
66}
67
68impl WireType {
69 fn from_u64(value: u64) -> ProtoResult<Self> {
70 match value {
71 0 => Ok(Self::Varint),
72 1 => Ok(Self::Fixed64),
73 2 => Ok(Self::LengthDelimited),
74 5 => Ok(Self::Fixed32),
75 other => Err(ProtoError(format!("unsupported wire type {other}"))),
76 }
77 }
78}
79
80pub(super) struct ProtoWriter {
81 buf: Vec<u8>,
82}
83
84impl ProtoWriter {
85 pub(super) fn new() -> Self {
86 Self { buf: Vec::new() }
87 }
88
89 pub(super) fn finish(self) -> Vec<u8> {
90 self.buf
91 }
92
93 pub(super) fn write_key(&mut self, field: u32, wire_type: WireType) {
94 self.write_varint(((field as u64) << 3) | wire_type as u64);
95 }
96
97 pub(super) fn write_varint(&mut self, mut value: u64) {
98 while value >= 0x80 {
99 self.buf.push((value as u8) | 0x80);
100 value >>= 7;
101 }
102 self.buf.push(value as u8);
103 }
104
105 pub(super) fn write_bytes(&mut self, field: u32, value: &[u8]) {
106 self.write_key(field, WireType::LengthDelimited);
107 self.write_varint(value.len() as u64);
108 self.buf.extend_from_slice(value);
109 }
110
111 pub(super) fn write_string(&mut self, field: u32, value: &str) {
112 self.write_bytes(field, value.as_bytes());
113 }
114
115 pub(super) fn write_message(&mut self, field: u32, value: Vec<u8>) {
116 self.write_bytes(field, &value);
117 }
118}
119
120pub(super) struct ProtoReader<'a> {
121 data: &'a [u8],
122 pos: usize,
123}
124
125impl<'a> ProtoReader<'a> {
126 pub(super) fn new(data: &'a [u8]) -> Self {
127 Self { data, pos: 0 }
128 }
129
130 fn eof(&self) -> bool {
131 self.pos >= self.data.len()
132 }
133
134 fn read_byte(&mut self) -> ProtoResult<u8> {
135 let byte = *self
136 .data
137 .get(self.pos)
138 .ok_or_else(|| ProtoError("unexpected eof".into()))?;
139 self.pos += 1;
140 Ok(byte)
141 }
142
143 pub(super) fn read_varint(&mut self) -> ProtoResult<u64> {
144 let mut value = 0u64;
145 for shift in (0..64).step_by(7) {
146 let byte = self.read_byte()? as u64;
147 value |= (byte & 0x7f) << shift;
148 if byte & 0x80 == 0 {
149 return Ok(value);
150 }
151 }
152 Err(ProtoError("varint too long".into()))
153 }
154
155 pub(super) fn next_key(&mut self) -> ProtoResult<Option<(u32, WireType)>> {
156 if self.eof() {
157 return Ok(None);
158 }
159 let key = self.read_varint()?;
160 Ok(Some(((key >> 3) as u32, WireType::from_u64(key & 0x07)?)))
161 }
162
163 pub(super) fn read_bytes(&mut self) -> ProtoResult<&'a [u8]> {
164 let len = self.read_varint()? as usize;
165 let end = self
166 .pos
167 .checked_add(len)
168 .ok_or_else(|| ProtoError("length overflow".into()))?;
169 if end > self.data.len() {
170 return Err(ProtoError(format!(
171 "length-delimited out of bounds len={len} remaining={}",
172 self.data.len().saturating_sub(self.pos)
173 )));
174 }
175 let out = &self.data[self.pos..end];
176 self.pos = end;
177 Ok(out)
178 }
179
180 pub(super) fn read_string(&mut self) -> ProtoResult<String> {
181 let bytes = self.read_bytes()?;
182 std::str::from_utf8(bytes)
183 .map(|s| s.to_string())
184 .map_err(|e| ProtoError(format!("utf8: {e}")))
185 }
186
187 pub(super) fn read_i32(&mut self) -> ProtoResult<i32> {
188 Ok(self.read_varint()? as i32)
189 }
190
191 fn read_bool(&mut self) -> ProtoResult<bool> {
192 Ok(self.read_varint()? != 0)
193 }
194
195 pub(super) fn skip_field(&mut self, wire_type: WireType) -> ProtoResult<()> {
196 match wire_type {
197 WireType::Varint => {
198 self.read_varint()?;
199 }
200 WireType::Fixed64 => {
201 self.skip_bytes(8)?;
202 }
203 WireType::LengthDelimited => {
204 let len = self.read_varint()? as usize;
205 self.skip_bytes(len)?;
206 }
207 WireType::Fixed32 => {
208 self.skip_bytes(4)?;
209 }
210 }
211 Ok(())
212 }
213
214 fn skip_bytes(&mut self, len: usize) -> ProtoResult<()> {
215 let end = self
216 .pos
217 .checked_add(len)
218 .ok_or_else(|| ProtoError("skip overflow".into()))?;
219 if end > self.data.len() {
220 return Err(ProtoError("skip out of bounds".into()));
221 }
222 self.pos = end;
223 Ok(())
224 }
225}
226
227#[derive(Debug, Clone)]
228struct TcpHttpHeader {
229 key: String,
230 value: String,
231}
232
233impl TcpHttpHeader {
234 fn encode(&self) -> Vec<u8> {
235 let mut writer = ProtoWriter::new();
236 writer.write_string(1, &self.key);
237 writer.write_string(2, &self.value);
238 writer.finish()
239 }
240}
241
242#[derive(Debug, Clone)]
243struct TcpHttpRequest {
244 method: String,
245 url: String,
246 headers: Vec<TcpHttpHeader>,
247 body: Vec<u8>,
248}
249
250impl TcpHttpRequest {
251 fn encode(&self) -> Vec<u8> {
252 let mut writer = ProtoWriter::new();
253 writer.write_string(1, &self.method);
254 writer.write_string(2, &self.url);
255 for header in &self.headers {
256 writer.write_message(3, header.encode());
257 }
258 writer.write_bytes(4, &self.body);
259 writer.finish()
260 }
261}
262
263#[derive(Debug, Clone)]
264struct TcpHttpResponseBody {
265 status_code: Option<i32>,
266 message: Option<String>,
267 data: Option<Vec<u8>>,
268}
269
270impl TcpHttpResponseBody {
271 fn decode(bytes: &[u8]) -> ProtoResult<Self> {
272 let mut reader = ProtoReader::new(bytes);
273 let mut status_code = None;
274 let mut message = None;
275 let mut data = None;
276 while let Some((field, wire_type)) = reader.next_key()? {
277 match (field, wire_type) {
278 (1, WireType::Varint) => status_code = Some(reader.read_i32()?),
279 (2, WireType::LengthDelimited) => message = Some(reader.read_string()?),
280 (5, WireType::LengthDelimited) => data = Some(reader.read_bytes()?.to_vec()),
281 _ => reader.skip_field(wire_type)?,
282 }
283 }
284 Ok(Self {
285 status_code,
286 message,
287 data,
288 })
289 }
290}
291
292#[derive(Debug, Clone)]
293struct TcpHttpResponse {
294 code: Option<i32>,
295 message: Option<String>,
296 response_body: Option<TcpHttpResponseBody>,
297 current_need_fallback: Option<bool>,
298}
299
300impl TcpHttpResponse {
301 fn decode(bytes: &[u8]) -> ProtoResult<Self> {
302 let mut reader = ProtoReader::new(bytes);
303 let mut code = None;
304 let mut message = None;
305 let mut response_body = None;
306 let mut current_need_fallback = None;
307 while let Some((field, wire_type)) = reader.next_key()? {
308 match (field, wire_type) {
309 (1, WireType::Varint) => code = Some(reader.read_i32()?),
310 (2, WireType::LengthDelimited) => message = Some(reader.read_string()?),
311 (3, WireType::LengthDelimited) => {
312 response_body = Some(TcpHttpResponseBody::decode(reader.read_bytes()?)?)
313 }
314 (4, WireType::Varint) => current_need_fallback = Some(reader.read_bool()?),
315 _ => reader.skip_field(wire_type)?,
316 }
317 }
318 Ok(Self {
319 code,
320 message,
321 response_body,
322 current_need_fallback,
323 })
324 }
325}
326
327pub(super) fn tls_server_name_for_web_identity(identity: u32) -> &'static str {
330 match identity {
331 crate::auth::commconfig::CONN_WEB_CN | crate::auth::commconfig::CONN_WEB_HK => {
332 "www.futunn.com"
333 }
334 _ => "www.moomoo.com",
335 }
336}
337
338fn tls_connector() -> TlsConnector {
339 install_default_rustls_crypto_provider();
340
341 let roots = RootCertStore {
342 roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
343 };
344 let mut config = ClientConfig::builder()
345 .with_root_certificates(roots)
346 .with_no_client_auth();
347 config.enable_sni = false;
351 TlsConnector::from(Arc::new(config))
352}
353
354fn proto_err(context: &str, err: ProtoError) -> FutuError {
355 FutuError::Codec(format!("webtcp {context}: {err}"))
356}
357
358#[derive(Debug)]
359pub(crate) struct WebTcpRequestError {
360 error: FutuError,
361 http_fallback_allowed: bool,
362}
363
364impl WebTcpRequestError {
365 fn fallback_allowed(error: FutuError) -> Self {
366 Self {
367 error,
368 http_fallback_allowed: true,
369 }
370 }
371
372 fn fallback_forbidden(error: FutuError) -> Self {
373 Self {
374 error,
375 http_fallback_allowed: false,
376 }
377 }
378
379 pub(crate) fn allows_http_fallback(&self) -> bool {
380 self.http_fallback_allowed
381 }
382
383 pub(crate) fn into_futu_error(self) -> FutuError {
384 self.error
385 }
386}
387
388impl std::fmt::Display for WebTcpRequestError {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 self.error.fmt(f)
391 }
392}
393
394impl std::error::Error for WebTcpRequestError {}
395
396fn decode_response_json(
397 frame_body: &[u8],
398) -> std::result::Result<serde_json::Value, WebTcpRequestError> {
399 let response = TcpHttpResponse::decode(frame_body)
400 .map_err(|e| WebTcpRequestError::fallback_forbidden(proto_err("decode", e)))?;
401 if response.code.unwrap_or(-1) != 0 {
402 let error = FutuError::Codec(format!(
403 "webtcp response code={} message={:?} fallback={:?}",
404 response.code.unwrap_or(-1),
405 response.message,
406 response.current_need_fallback
407 ));
408 return Err(if response.current_need_fallback.unwrap_or(false) {
409 WebTcpRequestError::fallback_allowed(error)
410 } else {
411 WebTcpRequestError::fallback_forbidden(error)
412 });
413 }
414 if response.current_need_fallback.unwrap_or(false) {
415 return Err(WebTcpRequestError::fallback_allowed(FutuError::Codec(
416 format!(
417 "webtcp response requested fallback message={:?}",
418 response.message
419 ),
420 )));
421 }
422 let response_body = response.response_body.ok_or_else(|| {
423 WebTcpRequestError::fallback_forbidden(FutuError::Codec(
424 "webtcp missing response_body".into(),
425 ))
426 })?;
427 let status_code = response_body.status_code.unwrap_or(0);
428 if !(200..300).contains(&status_code) {
429 return Err(WebTcpRequestError::fallback_forbidden(FutuError::Codec(
430 format!(
431 "webtcp http status={status_code} message={:?}",
432 response_body.message
433 ),
434 )));
435 }
436 let data = response_body.data.ok_or_else(|| {
437 WebTcpRequestError::fallback_forbidden(FutuError::Codec(
438 "webtcp missing http response data".into(),
439 ))
440 })?;
441 serde_json::from_slice(&data).map_err(|e| {
442 WebTcpRequestError::fallback_forbidden(FutuError::Codec(format!("webtcp json decode: {e}")))
443 })
444}
445
446fn cpp_auth_headers(client_type: u8, host: String, device_id: &str) -> Vec<TcpHttpHeader> {
456 let trace_headers = super::http_client::AuthTraceHeaders::new();
457 let mut headers = vec![
458 TcpHttpHeader {
459 key: "Host".to_string(),
460 value: host,
461 },
462 TcpHttpHeader {
463 key: "Content-Type".to_string(),
464 value: "application/json".to_string(),
465 },
466 TcpHttpHeader {
467 key: "User-Agent".to_string(),
468 value: super::http_client::opend_auth_user_agent(client_type),
469 },
470 TcpHttpHeader {
471 key: "Cookie".to_string(),
472 value: format!("device_id={device_id}"),
473 },
474 ];
475 headers.extend(
476 trace_headers
477 .entries()
478 .into_iter()
479 .map(|(key, value)| TcpHttpHeader {
480 key: key.to_string(),
481 value: value.to_string(),
482 }),
483 );
484 headers
485}
486
487fn webtcp_url_fingerprint(url: &str) -> (String, String, Vec<String>) {
488 match reqwest::Url::parse(url) {
489 Ok(parsed) => (
490 parsed.host_str().unwrap_or("").to_string(),
491 parsed.path().to_string(),
492 parsed
493 .query_pairs()
494 .map(|(key, _)| key.into_owned())
495 .collect(),
496 ),
497 Err(_) => (
498 "<parse-error>".to_string(),
499 "<parse-error>".to_string(),
500 Vec::new(),
501 ),
502 }
503}
504
505fn webtcp_json_value_shape(value: &serde_json::Value) -> &'static str {
506 match value {
507 serde_json::Value::Null => "null",
508 serde_json::Value::Bool(_) => "bool",
509 serde_json::Value::Number(_) => "number",
510 serde_json::Value::String(_) => "string",
511 serde_json::Value::Array(_) => "array",
512 serde_json::Value::Object(_) => "object",
513 }
514}
515
516fn webtcp_body_field_shapes(body: Option<&[u8]>) -> Vec<String> {
517 body.and_then(|body| serde_json::from_slice::<serde_json::Value>(body).ok())
518 .and_then(|value| {
519 value.as_object().map(|object| {
520 object
521 .iter()
522 .map(|(key, value)| {
523 let suffix = value
524 .as_str()
525 .map(|value| format!(":string_len={}", value.len()))
526 .unwrap_or_else(|| format!(":{}", webtcp_json_value_shape(value)));
527 format!("{key}{suffix}")
528 })
529 .collect()
530 })
531 })
532 .unwrap_or_default()
533}
534
535fn log_webtcp_request_fingerprint(
536 method: &str,
537 url: &str,
538 headers: &[TcpHttpHeader],
539 body: Option<&[u8]>,
540) {
541 let (url_host, url_path, query_keys) = webtcp_url_fingerprint(url);
542 let header_keys: Vec<&str> = headers.iter().map(|header| header.key.as_str()).collect();
543 let x_futu_header_keys: Vec<&str> = header_keys
544 .iter()
545 .copied()
546 .filter(|name| name.to_ascii_lowercase().starts_with("x-futu-"))
547 .collect();
548 let user_agent = headers
549 .iter()
550 .find(|header| header.key.eq_ignore_ascii_case("User-Agent"))
551 .map(|header| header.value.as_str())
552 .unwrap_or("");
553 let cookie_len = headers
554 .iter()
555 .find(|header| header.key.eq_ignore_ascii_case("Cookie"))
556 .map(|header| header.value.len())
557 .unwrap_or(0);
558 let body_fields = webtcp_body_field_shapes(body);
559
560 tracing::info!(
561 transport = "webtcp-short",
562 method,
563 url_host,
564 url_path,
565 query_keys = ?query_keys,
566 header_keys = ?header_keys,
567 x_futu_header_keys = ?x_futu_header_keys,
568 has_explicit_host_header = headers.iter().any(|header| header.key.eq_ignore_ascii_case("Host")),
569 user_agent,
570 cookie_len,
571 body_fields = ?body_fields,
572 "primary auth request fingerprint"
573 );
574}
575
576fn encode_tcp_http_request(
577 client_type: u8,
578 method: &str,
579 url: &str,
580 body: Option<&[u8]>,
581 device_id: &str,
582) -> Result<Vec<u8>> {
583 let parsed = reqwest::Url::parse(url)
584 .map_err(|e| FutuError::Codec(format!("webtcp invalid url {url}: {e}")))?;
585 let host = parsed
586 .host_str()
587 .ok_or_else(|| FutuError::Codec(format!("webtcp url missing host: {url}")))?
588 .to_string();
589 let headers = cpp_auth_headers(client_type, host, device_id);
590 log_webtcp_request_fingerprint(method, url, &headers, body);
591 Ok(TcpHttpRequest {
592 method: method.to_string(),
593 url: url.to_string(),
594 headers,
595 body: body.map_or_else(Vec::new, ToOwned::to_owned),
596 }
597 .encode())
598}
599
600pub(super) async fn connect_webtcp(
601 ip: &str,
602 port: u16,
603 tls_server_name: &'static str,
604) -> Result<BackendConn> {
605 let ip_addr = ip
606 .parse::<IpAddr>()
607 .map_err(|e| FutuError::Codec(format!("webtcp invalid ip {ip}: {e}")))?;
608 let addr = SocketAddr::new(ip_addr, port);
609 let socket = match addr {
610 SocketAddr::V4(_) => TcpSocket::new_v4(),
611 SocketAddr::V6(_) => TcpSocket::new_v6(),
612 }
613 .map_err(FutuError::Network)?;
614 socket.set_nodelay(true)?;
620 socket.set_send_buffer_size(WEBTCP_SEND_BUFFER_SIZE)?;
621 socket.set_recv_buffer_size(WEBTCP_RECV_BUFFER_SIZE)?;
622
623 let stream = tokio::time::timeout(WEBTCP_CONNECT_TIMEOUT, socket.connect(addr))
624 .await
625 .map_err(|elapsed| {
626 FutuError::Network(std::io::Error::new(
627 std::io::ErrorKind::TimedOut,
628 format!("webtcp connect {addr} timed out: {elapsed}"),
629 ))
630 })?
631 .map_err(FutuError::Network)?;
632
633 let server_name = ServerName::try_from(tls_server_name.to_string()).map_err(|e| {
634 FutuError::Codec(format!(
635 "webtcp invalid tls server name {tls_server_name}: {e}"
636 ))
637 })?;
638 let tls = tokio::time::timeout(
639 WEBTCP_TLS_TIMEOUT,
640 tls_connector().connect(server_name, stream),
641 )
642 .await
643 .map_err(|elapsed| {
644 FutuError::Network(std::io::Error::new(
645 std::io::ErrorKind::TimedOut,
646 format!("webtcp tls connect {addr} timed out: {elapsed}"),
647 ))
648 })?
649 .map_err(|e| FutuError::Codec(format!("webtcp tls connect {addr}: {e}")))?;
650
651 let noop: PushCallback = Arc::new(|_, _, _| {});
652 Ok(BackendConn::from_stream_inner(tls, noop))
653}
654
655pub(crate) async fn request_json_via_webtcp(
657 client_type: u8,
658 web_identity: u32,
659 addrs: &[(String, u16)],
660 method: &str,
661 url: &str,
662 body: Option<&[u8]>,
663 device_id: &str,
664) -> std::result::Result<serde_json::Value, WebTcpRequestError> {
665 if addrs.is_empty() {
666 return Err(WebTcpRequestError::fallback_allowed(FutuError::Codec(
667 format!("webtcp identity {web_identity} has empty addr pool"),
668 )));
669 }
670 let request_body = encode_tcp_http_request(client_type, method, url, body, device_id)
671 .map_err(WebTcpRequestError::fallback_forbidden)?;
672
673 let tls_server_name = tls_server_name_for_web_identity(web_identity);
674 let mut last_error: Option<WebTcpRequestError> = None;
675 for (ip, port) in addrs {
676 tracing::debug!(
677 web_identity,
678 ip,
679 port,
680 tls_server_name,
681 sni_enabled = false,
682 "auth webtcp-short connecting"
683 );
684 let conn = match connect_webtcp(ip, *port, tls_server_name).await {
685 Ok(conn) => conn,
686 Err(e) => {
687 tracing::warn!(
688 web_identity,
689 ip,
690 port,
691 tls_server_name,
692 sni_enabled = false,
693 error = %e,
694 "auth webtcp-short connect failed; trying next IP"
695 );
696 last_error = Some(WebTcpRequestError::fallback_allowed(e));
697 continue;
698 }
699 };
700 let frame = match conn.request(WEB_REQUEST_CMD, request_body.clone()).await {
701 Ok(frame) => frame,
702 Err(e) => {
703 tracing::warn!(
704 web_identity,
705 ip,
706 port,
707 tls_server_name,
708 sni_enabled = false,
709 error = %e,
710 "auth webtcp-short request failed; trying next IP"
711 );
712 last_error = Some(WebTcpRequestError::fallback_allowed(e));
713 continue;
714 }
715 };
716 return decode_response_json(&frame.body);
717 }
718 Err(last_error.unwrap_or_else(|| {
719 WebTcpRequestError::fallback_allowed(FutuError::Network(std::io::Error::other(format!(
720 "webtcp identity {web_identity}: all IPs failed"
721 ))))
722 }))
723}
724
725pub(crate) async fn post_json_via_webtcp(
727 client_type: u8,
728 web_identity: u32,
729 addrs: &[(String, u16)],
730 url: &str,
731 body: &serde_json::Value,
732 device_id: &str,
733) -> std::result::Result<serde_json::Value, WebTcpRequestError> {
734 let payload = serde_json::to_vec(body).map_err(|e| {
735 WebTcpRequestError::fallback_forbidden(FutuError::Codec(format!("webtcp json encode: {e}")))
736 })?;
737 request_json_via_webtcp(
738 client_type,
739 web_identity,
740 addrs,
741 "POST",
742 url,
743 Some(&payload),
744 device_id,
745 )
746 .await
747}
748
749#[cfg(test)]
750mod tests;