futu_backend/
suspend_data.rs1use std::collections::HashMap;
15use std::sync::Arc;
16
17pub type SuspendCache = Arc<dashmap::DashMap<u64, Vec<u64>>>;
19
20const SUSPEND_SOURCES: [(&str, &str); 3] = [
22 (
23 "http://openquote-1251001049.cosgz.myqcloud.com/hk_stock_suspend_record.zip",
24 "hk_stock_suspend_record",
25 ),
26 (
27 "http://openquote-1251001049.cosgz.myqcloud.com/us_stock_suspend_record.zip",
28 "us_stock_suspend_record",
29 ),
30 (
31 "http://openquote-1251001049.cosgz.myqcloud.com/cn_stock_suspend_record.zip",
32 "cn_stock_suspend_record",
33 ),
34];
35
36pub async fn load_suspend_data_with_client(client: &reqwest::Client) -> SuspendCache {
38 let cache: SuspendCache = Arc::new(dashmap::DashMap::new());
39
40 for (url, name) in &SUSPEND_SOURCES {
41 match download_and_parse(client, url, name).await {
42 Ok(data) => {
43 let count = data.len();
44 for (stock_id, timestamps) in data {
45 cache.insert(stock_id, timestamps);
46 }
47 tracing::info!(market = *name, stocks = count, "loaded suspend data");
48 }
49 Err(e) => {
50 tracing::debug!(market = *name, error = %e, "failed to load suspend data");
54 }
55 }
56 }
57
58 cache
59}
60
61pub async fn load_suspend_data() -> SuspendCache {
63 match crate::reference_http::build_client() {
64 Ok(client) => load_suspend_data_with_client(&client).await,
65 Err(error) => {
66 tracing::error!(
67 %error,
68 "reference-data HTTP client config failed; skipping suspend data preload"
69 );
70 Arc::new(dashmap::DashMap::new())
71 }
72 }
73}
74
75async fn download_and_parse(
84 client: &reqwest::Client,
85 url: &str,
86 name: &str,
87) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
88 tracing::debug!(url, "downloading suspend data");
89 let resp = client.get(url).send().await?;
90 let status = resp.status();
91 let content_type = resp
92 .headers()
93 .get(reqwest::header::CONTENT_TYPE)
94 .and_then(|v| v.to_str().ok())
95 .unwrap_or("<none>")
96 .to_string();
97
98 if !status.is_success() {
99 return Err(format!("HTTP {status} from {url} (content-type={content_type})").into());
100 }
101
102 let zip_bytes = resp.bytes().await?;
103 tracing::debug!(url, bytes = zip_bytes.len(), "downloaded suspend response");
104
105 let is_zip = zip_bytes.len() >= 4
107 && zip_bytes[0] == b'P'
108 && zip_bytes[1] == b'K'
109 && (zip_bytes[2] == 0x03 || zip_bytes[2] == 0x05 || zip_bytes[2] == 0x07);
110
111 if !is_zip {
112 let preview: String = zip_bytes
113 .iter()
114 .take(120)
115 .map(|&b| {
116 if b.is_ascii_graphic() || b == b' ' {
117 b as char
118 } else {
119 '·'
120 }
121 })
122 .collect();
123 tracing::debug!(
124 url,
125 content_type,
126 bytes = zip_bytes.len(),
127 preview,
128 "suspend data response is not a ZIP (magic number mismatch); CDN may have returned HTML error page"
129 );
130 return Err(
131 format!("not a ZIP archive (magic mismatch, content-type={content_type})").into(),
132 );
133 }
134
135 let cursor = std::io::Cursor::new(zip_bytes);
138 let mut archive = zip::ZipArchive::new(cursor)?;
139 let mut file = archive.by_name(name)?;
140 let mut dat_bytes = Vec::new();
141 std::io::Read::read_to_end(&mut file, &mut dat_bytes)?;
142
143 parse_suspend_dat(&dat_bytes)
144}
145
146fn parse_suspend_dat(
150 data: &[u8],
151) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
152 use prost::Message;
153
154 if data.len() < 6 {
155 return Err("suspend dat too short".into());
156 }
157
158 let version = u16::from_be_bytes([data[0], data[1]]);
159 let group_count = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
160
161 if version == 0 || group_count == 0 {
163 return Err(
164 format!("invalid suspend dat: version={version}, group_count={group_count}").into(),
165 );
166 }
167
168 let mut result = HashMap::new();
169 let mut offset = 6_usize;
170 let mut read_groups = 0_u32;
171
172 while read_groups < group_count {
173 if offset + 4 > data.len() {
174 tracing::warn!(
175 offset,
176 remaining = data.len() - offset,
177 "suspend dat truncated at group_len"
178 );
179 break;
180 }
181
182 let group_len = u32::from_be_bytes([
183 data[offset],
184 data[offset + 1],
185 data[offset + 2],
186 data[offset + 3],
187 ]) as usize;
188 offset += 4;
189
190 if group_len == 0 {
192 tracing::warn!(offset, "suspend dat: zero group_len");
193 break;
194 }
195
196 if offset + group_len > data.len() {
197 tracing::warn!(
198 offset,
199 group_len,
200 data_len = data.len(),
201 "suspend dat truncated at group data"
202 );
203 break;
204 }
205
206 let group: super::proto_internal::stock_suspend::StockSuspendRecordGroup =
207 match Message::decode(&data[offset..offset + group_len]) {
208 Ok(g) => g,
209 Err(e) => {
210 tracing::warn!(error = %e, "suspend dat: protobuf decode failed");
211 break;
212 }
213 };
214
215 for record in &group.stock_suspend_record_list {
216 if let Some(stock_id) = record.stock_id {
217 let mut timestamps: Vec<u64> = record
218 .stock_sus_seq_list
219 .iter()
220 .filter_map(|seq| seq.time)
221 .collect();
222 if !timestamps.is_empty() {
223 timestamps.sort_unstable();
225 timestamps.dedup();
226 result.insert(stock_id, timestamps);
227 }
228 }
229 }
230
231 offset += group_len;
232 read_groups += 1;
233 }
234
235 Ok(result)
236}
237
238#[cfg(test)]
239mod tests;