Skip to main content

futu_backend/
suspend_data.rs

1//! 停牌数据 HTTP 下载 + 解析
2//!
3//! C++ 对应: NNBiz_Qot_Suspend / NNData_Qot_Suspend
4//!
5//! 从腾讯云 CDN 下载 zip 文件,解压后得到 .dat 二进制文件,格式:
6//! - 2 字节 version (big-endian u16)
7//! - 4 字节 group_count (big-endian u32)
8//! - group_count 个组,每个:
9//!   - 4 字节 group_len (big-endian u32)
10//!   - group_len 字节 protobuf (StockSuspendRecordGroup)
11//!
12//! 解析后缓存在 DashMap<stock_id, Vec<timestamp>> 中,供 GetSuspend handler 查询。
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// 停牌数据缓存:stock_id → Vec<timestamp>(已排序的停牌日期时间戳列表)
18pub type SuspendCache = Arc<dashmap::DashMap<u64, Vec<u64>>>;
19
20/// CDN URLs 和对应的 zip 内文件名
21const SUSPEND_SOURCES: [(&str, &str); 3] = [
22    (
23        "http://openquote-1251001049.cosgz.myqcloud.com/hk_stock_suspend_record.zip",
24        "hk_stock_suspend_record",
25    ),
26    (
27        "http://openquote-1251001049.cosgz.myqcloud.com/us_stock_suspend_record.zip",
28        "us_stock_suspend_record",
29    ),
30    (
31        "http://openquote-1251001049.cosgz.myqcloud.com/cn_stock_suspend_record.zip",
32        "cn_stock_suspend_record",
33    ),
34];
35
36/// 下载并解析所有市场的停牌数据
37pub async fn load_suspend_data_with_client(client: &reqwest::Client) -> SuspendCache {
38    let cache: SuspendCache = Arc::new(dashmap::DashMap::new());
39
40    for (url, name) in &SUSPEND_SOURCES {
41        match download_and_parse(client, url, name).await {
42            Ok(data) => {
43                let count = data.len();
44                for (stock_id, timestamps) in data {
45                    cache.insert(stock_id, timestamps);
46                }
47                tracing::info!(market = *name, stocks = count, "loaded suspend data");
48            }
49            Err(e) => {
50                // v1.4.27:从 WARN 降 DEBUG。CDN 偶尔返 404 / HTML 错误页对终端
51                // 用户无 actionable 价值,也不影响交易 / 行情主功能;有兴趣的
52                // 运维可以用 `--log-level debug` 看到完整错误。
53                tracing::debug!(market = *name, error = %e, "failed to load suspend data");
54            }
55        }
56    }
57
58    cache
59}
60
61/// Download and parse all suspend data using a default timeout-bound client.
62pub async fn load_suspend_data() -> SuspendCache {
63    match crate::reference_http::build_client() {
64        Ok(client) => load_suspend_data_with_client(&client).await,
65        Err(error) => {
66            tracing::error!(
67                %error,
68                "reference-data HTTP client config failed; skipping suspend data preload"
69            );
70            Arc::new(dashmap::DashMap::new())
71        }
72    }
73}
74
75/// 下载单个市场的 zip 文件并解析
76///
77/// v1.4.27 修(BUG-3,加拿大同事 v1.4.26 回归测试发现):
78/// - HTTP 非 200 → 直接降级,不尝试解 ZIP
79/// - 内容不是 ZIP(magic number 不是 `PK\x03\x04`)→ 降级 + 用 DEBUG 打
80///   出前 N 字节 + Content-Type(帮助判断是否拿到了 HTML 错误页)
81/// - 只有真正是 ZIP 才进 `ZipArchive` 解析,避免 "Could not find EOCD" 这种
82///   对终端用户无 actionable 价值的 WARN
83async fn download_and_parse(
84    client: &reqwest::Client,
85    url: &str,
86    name: &str,
87) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
88    tracing::debug!(url, "downloading suspend data");
89    let resp = client.get(url).send().await?;
90    let status = resp.status();
91    let content_type = resp
92        .headers()
93        .get(reqwest::header::CONTENT_TYPE)
94        .and_then(|v| v.to_str().ok())
95        .unwrap_or("<none>")
96        .to_string();
97
98    if !status.is_success() {
99        return Err(format!("HTTP {status} from {url} (content-type={content_type})").into());
100    }
101
102    let zip_bytes = resp.bytes().await?;
103    tracing::debug!(url, bytes = zip_bytes.len(), "downloaded suspend response");
104
105    // ZIP magic number: `PK\x03\x04` (or `PK\x05\x06` for empty archive, `PK\x07\x08` for spanned)
106    let is_zip = zip_bytes.len() >= 4
107        && zip_bytes[0] == b'P'
108        && zip_bytes[1] == b'K'
109        && (zip_bytes[2] == 0x03 || zip_bytes[2] == 0x05 || zip_bytes[2] == 0x07);
110
111    if !is_zip {
112        let preview: String = zip_bytes
113            .iter()
114            .take(120)
115            .map(|&b| {
116                if b.is_ascii_graphic() || b == b' ' {
117                    b as char
118                } else {
119                    '·'
120                }
121            })
122            .collect();
123        tracing::debug!(
124            url,
125            content_type,
126            bytes = zip_bytes.len(),
127            preview,
128            "suspend data response is not a ZIP (magic number mismatch); CDN may have returned HTML error page"
129        );
130        return Err(
131            format!("not a ZIP archive (magic mismatch, content-type={content_type})").into(),
132        );
133    }
134
135    // 真正是 ZIP 才进解压(至此若报 "Could not find EOCD" 就是 ZIP 实际损坏,
136    // 值得 WARN;否则 magic-check 已经把 HTML 错误页 / 空响应挡在外面)
137    let cursor = std::io::Cursor::new(zip_bytes);
138    let mut archive = zip::ZipArchive::new(cursor)?;
139    let mut file = archive.by_name(name)?;
140    let mut dat_bytes = Vec::new();
141    std::io::Read::read_to_end(&mut file, &mut dat_bytes)?;
142
143    parse_suspend_dat(&dat_bytes)
144}
145
146/// 解析 .dat 二进制格式
147///
148/// C++ 对应: NNBiz_Qot_Suspend::LoadFile
149fn parse_suspend_dat(
150    data: &[u8],
151) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
152    use prost::Message;
153
154    if data.len() < 6 {
155        return Err("suspend dat too short".into());
156    }
157
158    let version = u16::from_be_bytes([data[0], data[1]]);
159    let group_count = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
160
161    // C++ 校验: nVersion <= 0 || nGroupCnt <= 0
162    if version == 0 || group_count == 0 {
163        return Err(
164            format!("invalid suspend dat: version={version}, group_count={group_count}").into(),
165        );
166    }
167
168    let mut result = HashMap::new();
169    let mut offset = 6_usize;
170    let mut read_groups = 0_u32;
171
172    while read_groups < group_count {
173        if offset + 4 > data.len() {
174            tracing::warn!(
175                offset,
176                remaining = data.len() - offset,
177                "suspend dat truncated at group_len"
178            );
179            break;
180        }
181
182        let group_len = u32::from_be_bytes([
183            data[offset],
184            data[offset + 1],
185            data[offset + 2],
186            data[offset + 3],
187        ]) as usize;
188        offset += 4;
189
190        // C++ 校验: nGroupLen <= 0
191        if group_len == 0 {
192            tracing::warn!(offset, "suspend dat: zero group_len");
193            break;
194        }
195
196        if offset + group_len > data.len() {
197            tracing::warn!(
198                offset,
199                group_len,
200                data_len = data.len(),
201                "suspend dat truncated at group data"
202            );
203            break;
204        }
205
206        let group: super::proto_internal::stock_suspend::StockSuspendRecordGroup =
207            match Message::decode(&data[offset..offset + group_len]) {
208                Ok(g) => g,
209                Err(e) => {
210                    tracing::warn!(error = %e, "suspend dat: protobuf decode failed");
211                    break;
212                }
213            };
214
215        for record in &group.stock_suspend_record_list {
216            if let Some(stock_id) = record.stock_id {
217                let mut timestamps: Vec<u64> = record
218                    .stock_sus_seq_list
219                    .iter()
220                    .filter_map(|seq| seq.time)
221                    .collect();
222                if !timestamps.is_empty() {
223                    // C++ 中 SetSuspendData 会 sort,GetSuspendData 用 lower_bound/upper_bound
224                    timestamps.sort_unstable();
225                    timestamps.dedup();
226                    result.insert(stock_id, timestamps);
227                }
228            }
229        }
230
231        offset += group_len;
232        read_groups += 1;
233    }
234
235    Ok(result)
236}
237
238#[cfg(test)]
239mod tests;