Skip to main content

futu_cache/trd_cache/
order_list.rs

1use std::collections::HashSet;
2
3use super::*;
4
5impl TrdCache {
6    /// Mark an existing cached order as local Deleted(23) after a successful
7    /// DeleteFailOrder operation.
8    ///
9    /// C++ keeps local orders (`bIsLocalOrder=true`) across `UpdateOrderList`
10    /// backend refreshes. Rust must do the same for delete success, otherwise a
11    /// backend refresh that omits the just-deleted failed order makes
12    /// `order_list_query()` return empty while official C++ still shows
13    /// Deleted(23).
14    #[must_use]
15    pub fn mark_order_deleted_local(
16        &self,
17        acc_id: u64,
18        order_id: u64,
19        order_id_ex: Option<&str>,
20    ) -> bool {
21        let trimmed_ex = order_id_ex.map(str::trim).filter(|s| !s.is_empty());
22        let Some(mut entry) = self.orders.get_mut(&acc_id) else {
23            return false;
24        };
25        let Some(order) = entry.iter_mut().find(|order| {
26            if order_id != 0 && order.order_id == order_id {
27                return true;
28            }
29            if let Some(ex) = trimmed_ex {
30                order.backend_order_id.trim() == ex || order.order_id_ex.trim() == ex
31            } else {
32                false
33            }
34        }) else {
35            return false;
36        };
37
38        order.order_status = 23;
39        order.is_stub = false;
40        order.is_local_order = true;
41        order.stub_inserted_at_ms = 0;
42        order.is_pending_broker_confirm = false;
43        true
44    }
45
46    pub fn record_order_broker(
47        &self,
48        acc_id: AccKey,
49        order_id_ex: impl AsRef<str>,
50        broker_id: u32,
51    ) {
52        let order_id_ex = order_id_ex.as_ref();
53        if order_id_ex.trim().is_empty() {
54            return;
55        }
56        let order_id_ex = order_id_ex.to_string();
57        let previous_acc = self
58            .order_broker_accounts
59            .insert(order_id_ex.clone(), acc_id);
60        if let Some(old_acc_id) = previous_acc.filter(|old_acc_id| *old_acc_id != acc_id) {
61            self.remove_order_broker_id_from_acc_index(old_acc_id, &order_id_ex);
62        }
63        self.order_broker_ids_by_acc
64            .entry(acc_id)
65            .or_default()
66            .insert(order_id_ex.clone());
67        self.order_brokers.insert(order_id_ex, broker_id);
68    }
69
70    fn remove_order_broker_id_from_acc_index(&self, acc_id: AccKey, order_id_ex: &str) {
71        let mut remove_bucket = false;
72        if let Some(mut ids) = self.order_broker_ids_by_acc.get_mut(&acc_id) {
73            ids.remove(order_id_ex);
74            remove_bucket = ids.is_empty();
75        }
76        if remove_bucket {
77            self.order_broker_ids_by_acc
78                .remove_if(&acc_id, |_, ids| ids.is_empty());
79        }
80    }
81
82    pub(super) fn prune_order_brokers_for_acc(&self, acc_id: AccKey) {
83        if self.order_brokers.is_empty() {
84            return;
85        }
86
87        let stale_order_ids = {
88            let orders = self.orders.get(&acc_id);
89            let mut active_order_ids = HashSet::new();
90            if let Some(ref entry) = orders {
91                for order in entry.value() {
92                    let order_id_ex = order.order_id_ex.trim();
93                    if !order_id_ex.is_empty() {
94                        active_order_ids.insert(order_id_ex);
95                    }
96                    let backend_order_id = order.backend_order_id.trim();
97                    if !backend_order_id.is_empty() {
98                        active_order_ids.insert(backend_order_id);
99                    }
100                }
101            }
102
103            let Some(order_ids) = self.order_broker_ids_by_acc.get(&acc_id) else {
104                return;
105            };
106            order_ids
107                .iter()
108                .filter(|order_id_ex| !active_order_ids.contains(order_id_ex.as_str()))
109                .cloned()
110                .collect::<Vec<_>>()
111        };
112
113        for order_id_ex in stale_order_ids {
114            let still_belongs_to_acc = self
115                .order_broker_accounts
116                .get(&order_id_ex)
117                .is_some_and(|entry| *entry.value() == acc_id);
118            if still_belongs_to_acc {
119                self.order_broker_accounts.remove(&order_id_ex);
120                self.order_brokers.remove(&order_id_ex);
121            }
122            self.remove_order_broker_id_from_acc_index(acc_id, &order_id_ex);
123        }
124    }
125
126    /// v1.4.105 BUG-v1.4.104-001 (P0): broker async confirm 到达后清 pending 标志.
127    ///
128    /// 当 push notice_type=4/5/8/100 (ORDER_UPDATE / ORDER_LIST_UPDATE /
129    /// TRADE_STATISTIC / ORDER_NTF) 到达对应 acc_id 时, 调本 fn 把所有
130    /// `is_pending_broker_confirm=true` 的 order 翻成 `false`.
131    ///
132    /// 设计选择: 不按 order_id 精确匹配清 — push notice 通常不带具体 order_id,
133    /// 只表 "本 acc 有 order 状态变化". 简化处理: acc 内任何 ORDER 类 push 到
134    /// 即视为 broker 已开始处理本 acc 的 stub orders.
135    /// 后续 query_orders refresh 会通过 `merge_preserving_stubs` 把 enriched
136    /// 版本写入, 替换 stub.
137    ///
138    /// 返被清的 order 数 (caller 用于 audit log).
139    pub fn clear_pending_confirm_for_acc(&self, acc_id: u64) -> usize {
140        let mut cleared = 0;
141        if let Some(mut entry) = self.orders.get_mut(&acc_id) {
142            for o in entry.iter_mut() {
143                if o.is_pending_broker_confirm {
144                    o.is_pending_broker_confirm = false;
145                    cleared += 1;
146                }
147            }
148        }
149        cleared
150    }
151
152    /// v1.4.106 codex 0226 F4 (P2): selective clear pending confirm by order_ids.
153    ///
154    /// `clear_pending_confirm_for_acc` 是 acc-level 全清, 但 ORDER push notify
155    /// 在 backend 实际带具体 `order_ids` 时(notice_type=4 ORDER_UPDATE 通常
156    /// 带), daemon 应**只**清对应订单的 pending flag, 而不是把同账户其他还没
157    /// confirm 的 stub 一并误清.
158    ///
159    /// 触发场景 (`bridge/dispatcher.rs:251-268`):
160    /// - notice_type=4/5/9 + 非空 `order_ids` (backend 真带 → 按订单清)
161    /// - notice_type=4/5/9 + 空 `order_ids` → fall back to `clear_pending_confirm_for_acc`
162    ///
163    /// match 逻辑: `o.backend_order_id` / `o.order_id_ex` (alphanumeric backend
164    /// szOrderID) 与 `order_ids` 任一相等. 不 match `o.order_id` (FTAPI u64
165    /// hash) 因为 backend push 带的 `order_ids` 是 backend 原生 string id.
166    ///
167    /// 返被清的 order 数 (caller 用于 audit log).
168    pub fn clear_pending_confirm_for_orders(&self, acc_id: u64, order_ids: &[String]) -> usize {
169        if order_ids.is_empty() {
170            return 0;
171        }
172        let mut cleared = 0;
173        if let Some(mut entry) = self.orders.get_mut(&acc_id) {
174            for o in entry.iter_mut() {
175                let matches_backend_id = Self::order_backend_ids(o)
176                    .any(|backend_id| order_ids.iter().any(|id| id.trim() == backend_id));
177                if o.is_pending_broker_confirm && matches_backend_id {
178                    o.is_pending_broker_confirm = false;
179                    cleared += 1;
180                }
181            }
182        }
183        cleared
184    }
185
186    /// v1.4.105 BUG-v1.4.104-001 (P0): cleanup task 删超时未 confirm 的 pending stub.
187    ///
188    /// 触发: PlaceOrder spawn 一个 30s 延迟 task, 到点检查 (acc_id, order_id_ex)
189    /// 对应的 stub 是否仍 `is_stub=true && is_pending_broker_confirm=true`.
190    /// 若是 → 删 stub + warn (push channel 断 / broker 拒单未 push 的兜底).
191    ///
192    /// **不**简单调 STUB_TTL_MS evict — 那个是 query_orders merge 时的逻辑,
193    /// 这里是主动 GC pending stub. 两者互补.
194    ///
195    /// 返 (purged: bool, reason: 描述), caller 写 audit log.
196    pub fn purge_pending_stub_if_still_pending(
197        &self,
198        acc_id: u64,
199        order_id: u64,
200    ) -> Option<String> {
201        if let Some(mut entry) = self.orders.get_mut(&acc_id) {
202            let before = entry.len();
203            let mut purged_code = None;
204            entry.retain(|o| {
205                let should_purge =
206                    o.order_id == order_id && o.is_stub && o.is_pending_broker_confirm;
207                if should_purge {
208                    purged_code = Some(o.code.clone());
209                }
210                !should_purge
211            });
212            let after = entry.len();
213            if before != after {
214                // v1.4.111 P2-1 Tier 3 audit comment: purged_code 是 audit log
215                // 字段, `Some(empty_string)` 跟 `Some("CODE")` 都表示 "purge 成功",
216                // caller (e.g. post_ack.rs:50-80 stub cleanup) 不基于 code 内容
217                // mutate state. 非 silent-success risk (audit verified).
218                return Some(purged_code.unwrap_or_default());
219            }
220        }
221        None
222    }
223
224    /// v1.4.83 §9 F6: 扫全 cache 查 orphan orders.
225    ///
226    /// **Orphan 定义**: `order_status ∈ {0, 1, 2, 4}` (未达到 Submitted=5
227    /// 之前的 in-flight stub) **且** `create_timestamp.is_some()` **且**
228    /// `now_secs - create_timestamp > threshold_secs`.
229    ///
230    /// 含义对应 C++ proto OrderStatus enum (Trd_Common.proto:108):
231    /// - 0 = Unsubmitted (未提交) — 极端情况, daemon stub 修后不应该出现 (v1.4.103 P0 hotfix)
232    /// - 1 = WaitingSubmit (等待提交) — 条件单 stub 初值, 等触发
233    /// - 2 = Submitting (提交中) — 普通单 stub 初值 (v1.4.103 起)
234    /// - 4 = TimeOut (处理超时) — 后端回 timeout, 状态未知
235    ///
236    /// **为什么需要**: v1.4.82 A2 PlaceOrder 成功后直接 upsert stub order
237    /// 让 `/api/orders` 立刻可见 (BUG-60b0-002 fix). 后续 push notice_type=
238    /// 4/5/8 / re-fetch 把 status 推到 5 (Submitted) / 10/11 (Filled).
239    /// 若 push 通道断流 (§9 CMD3020 chain broken), stub 卡住 5min+ = orphan.
240    ///
241    /// **v1.4.103 P0 (BUG-WUZONG-001)**: stub status 从 0 (proto 定义为
242    /// Unsubmitted "未提交", 触发客户端 retry 多下单) 改成 1/2 (WaitingSubmit/
243    /// Submitting, 对齐 C++ NNProto_Trd_OrderOp.cpp:483-510). orphan 检测同步
244    /// 扩展到 {0, 1, 2, 4} 全 in-flight 状态 — 老 daemon 留下来 status=0 的
245    /// 卡死 stub 也能被检测到.
246    ///
247    /// 返 `Vec<OrphanOrder>`; caller 决定 log 级别 + metric bump.
248    #[must_use]
249    pub fn scan_orphan_orders(&self, now_secs: f64, threshold_secs: f64) -> Vec<OrphanOrder> {
250        let mut orphans = Vec::new();
251        for entry in self.orders.iter() {
252            let acc_id = *entry.key();
253            for order in entry.value().iter() {
254                // v1.4.103 codex F2.4 (P2) round 2: 仅扫**真 stub** orders
255                // (is_stub == true). 非 stub 的 backend 权威 orders 即使 status
256                // 是 0/1/2/4 (in-flight) 也不算 orphan — 它们是 backend 主动
257                // 持续推送的真实状态, daemon 不应认为是卡住. 之前 v1.4.103
258                // P0 hotfix 加 status filter {0,1,2,4} 但漏 is_stub guard,
259                // 导致 backend 长时 Submitting/TimeOut 真单 (5min+) 也被 log
260                // 当 orphan / push 通道断流候选 → metrics 噪音.
261                if !order.is_stub {
262                    continue;
263                }
264                // v1.4.103 P0: in-flight 状态 (未到 Submitted=5 之前的过渡值)
265                // 都视为 stub 卡住候选 — 0 (Unsubmitted) / 1 (WaitingSubmit) /
266                // 2 (Submitting) / 4 (TimeOut).
267                if !matches!(order.order_status, 0 | 1 | 2 | 4) {
268                    continue;
269                }
270                // **v1.4.106 codex 0920 F7 (P2)**: PlaceOrder stub `create_timestamp=None`
271                // (backend 还没 push 真 timestamp). 只用 `create_timestamp.is_some()`
272                // 时所有 stub 都被 scanner skip → orphan 检测对最关键场景 (stub
273                // 卡死) 失效.
274                //
275                // 修法: stub (`is_stub=true`) 优先用 `stub_inserted_at_ms`
276                // 推算 age (转 secs); 非 stub 用 `create_timestamp` (与原行为
277                // 一致, 但本 scanner 已加 `is_stub` guard, 不会 reach 此分支).
278                //
279                // `stub_inserted_at_ms` 在 PlaceOrder ack 后 upsert 时写, 所以
280                // 真 stub 必有非零值. = 0 视为没初始化 (老 cache entry 兼容)
281                // → skip.
282                let age_secs = if order.is_stub {
283                    if order.stub_inserted_at_ms == 0 {
284                        // 老 cache entry 未携带 stub_inserted_at_ms (e.g. 升级前
285                        // 持久化数据反序列化), skip 而非误报.
286                        continue;
287                    }
288                    let stub_inserted_secs = (order.stub_inserted_at_ms as f64) / 1000.0;
289                    let now_unix_secs = now_secs;
290                    now_unix_secs - stub_inserted_secs
291                } else {
292                    // 非 stub: 走 create_timestamp 老路径 (本 scanner 已加
293                    // is_stub guard, 此分支不应 reach 但保留兼容).
294                    let Some(create_ts) = order.create_timestamp else {
295                        continue;
296                    };
297                    now_secs - create_ts
298                };
299                if age_secs > threshold_secs {
300                    orphans.push(OrphanOrder {
301                        acc_id,
302                        order_id: order.order_id,
303                        order_id_ex: order.order_id_ex.clone(),
304                        code: order.code.clone(),
305                        age_secs,
306                    });
307                }
308            }
309        }
310        orphans
311    }
312}
313
314/// v1.4.83 §9 F6: orphan order 结构化报告.
315#[derive(Debug, Clone)]
316pub struct OrphanOrder {
317    pub acc_id: u64,
318    pub order_id: u64,
319    pub order_id_ex: String,
320    pub code: String,
321    /// 距离 create_timestamp 的秒数
322    pub age_secs: f64,
323}