futu_cache/trd_cache/order_state.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use super::*;
5
6impl TrdCache {
7 pub fn get_cipher(&self, acc_id: u64) -> Option<Vec<u8>> {
8 self.ciphers.get(&acc_id).map(|v| v.clone())
9 }
10
11 pub fn set_cipher(&self, acc_id: u64, cipher: Vec<u8>) {
12 self.ciphers.insert(acc_id, cipher);
13 }
14
15 /// v1.4.73 A2 BUG-008 fix: 读当前账户的 cipher state version(用于 unlock idem_key)。
16 ///
17 /// 首次访问 acc_id 会初始化为 0。后续每次 lock 清 cipher 会 `fetch_add(1)`。
18 /// `idem_key` 构造时把这个 version 纳入 hash → cipher 清后 version 递增 →
19 /// 同 body 的 idem_key 不同 → cache miss → 真执行 unlock(或 backend 真校验)。
20 pub fn get_cipher_state_version(&self, acc_id: u64) -> u64 {
21 let entry = self
22 .cipher_state_versions
23 .entry(acc_id)
24 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
25 entry.load(Ordering::SeqCst)
26 }
27
28 /// v1.4.73 A2 BUG-008 fix: lock 清 cipher 时调,递增 version → 让下次 unlock
29 /// 同 body 得 cache miss。
30 ///
31 /// 返回 new version(递增后值),便于调用方 log。
32 #[must_use]
33 pub fn bump_cipher_state_version(&self, acc_id: u64) -> u64 {
34 let entry = self
35 .cipher_state_versions
36 .entry(acc_id)
37 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
38 entry.fetch_add(1, Ordering::SeqCst) + 1
39 }
40
41 /// v1.4.106 codex 0226 F1+F2: PlaceOrder 解析到 `OrderNewRsp.action.order_confirm`
42 /// 时调用, 保存上下文用于后续 `Trd_ReconfirmOrder` 构造 backend `OrderConfirmReq`.
43 ///
44 /// `now_ms` 由 caller 传入 (便于单测注入固定时钟); 真实路径用
45 /// `SystemTime::now()`.
46 pub fn store_pending_order_confirm(
47 &self,
48 acc_id: u64,
49 ftapi_order_id: u64,
50 mut ctx: OrderConfirmContext,
51 now_ms: u64,
52 ) {
53 ctx.inserted_at_ms = now_ms;
54 let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
55 self.pending_order_confirms.insert(key, ctx);
56 }
57
58 /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder handler 入口 lookup, 取出
59 /// `(acc_id, ftapi_order_id)` 对应 OrderConfirmContext.
60 ///
61 /// 返 `None`: cache miss (PlaceOrder 没存 / TTL 过期 / 已被消费). caller 必须
62 /// 早 reject loud, **不**允许 silent fallback (避免反模式 D / silent-success).
63 ///
64 /// `now_ms` 检查 TTL: `now - ctx.inserted_at_ms > ORDER_CONFIRM_CONTEXT_TTL_MS`
65 /// 视为 stale → return None + remove (proactive GC).
66 pub fn get_pending_order_confirm(
67 &self,
68 acc_id: u64,
69 ftapi_order_id: u64,
70 now_ms: u64,
71 ) -> Option<OrderConfirmContext> {
72 let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
73 let ctx = self.pending_order_confirms.get(&key)?.value().clone();
74 if now_ms.saturating_sub(ctx.inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
75 // Stale → proactive GC
76 self.pending_order_confirms.remove(&key);
77 return None;
78 }
79 Some(ctx)
80 }
81
82 /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder backend 成功 (`OrderConfirmRsp.result==0`)
83 /// 后调用, 从 cache 删除 (一次性消费, 防重复 confirm).
84 ///
85 /// 返 `true` 表示真有删除发生; `false` = 已被其他路径消费 / 过期 GC.
86 pub fn remove_pending_order_confirm(&self, acc_id: u64, ftapi_order_id: u64) -> bool {
87 let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
88 self.pending_order_confirms.remove(&key).is_some()
89 }
90
91 /// v1.4.106 codex 0226 F1+F2: GC stale OrderConfirmContext entries.
92 ///
93 /// 用于定时清理 (push dispatcher 收到 ORDER 类 push 时顺便扫一次), 防止
94 /// stale ctx 累积. 返回清理掉的条目数.
95 pub fn purge_stale_order_confirms(&self, now_ms: u64) -> usize {
96 let mut purged = Vec::new();
97 for entry in self.pending_order_confirms.iter() {
98 if now_ms.saturating_sub(entry.value().inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
99 purged.push(*entry.key());
100 }
101 }
102 let n = purged.len();
103 for key in purged {
104 self.pending_order_confirms.remove(&key);
105 }
106 n
107 }
108
109 /// Store backend write `req_id -> order_id_ex` for C++ order op-result
110 /// push parity.
111 ///
112 /// Ref:
113 /// - `NNProto_Trd_OnPush.cpp:22-27`
114 /// - `NNProto_Trd_OrderOpReal.cpp:217,352`
115 ///
116 /// Empty values are ignored because C++ only maps meaningful backend
117 /// request/order pairs. Return value mirrors best-effort insert success.
118 pub fn store_order_op_req_order(&self, acc_id: u64, req_id: &str, order_id_ex: &str) -> bool {
119 let req_id = req_id.trim();
120 let order_id_ex = order_id_ex.trim();
121 if req_id.is_empty() || order_id_ex.is_empty() {
122 return false;
123 }
124
125 self.order_op_req_orders
126 .insert(
127 OrderOpReqKey::new(acc_id, req_id.to_string()),
128 order_id_ex.to_string(),
129 )
130 .is_none()
131 }
132
133 /// Consume backend order id by operation `req_id`.
134 ///
135 /// C++ `FindOrderIDByReqID` erases the pair after lookup and clears the
136 /// helper map once it reaches 512 entries. Keep the same bounded,
137 /// best-effort behavior; a miss only means the push may have come from
138 /// another terminal/client and the dispatcher may fall back to full list.
139 pub fn take_order_id_by_op_req_id(&self, acc_id: u64, req_id: &str) -> Option<String> {
140 let req_id = req_id.trim();
141 if req_id.is_empty() {
142 return None;
143 }
144
145 let key = OrderOpReqKey::new(acc_id, req_id.to_string());
146 let value = self
147 .order_op_req_orders
148 .remove(&key)
149 .map(|(_, value)| value);
150 if self.order_op_req_orders.len() >= ORDER_OP_REQ_ORDER_MAP_CLEAR_LIMIT {
151 self.order_op_req_orders.clear();
152 }
153 value
154 }
155
156 /// v1.4.106 codex 0554 F1 [P1]: 原子性清空所有 cipher + 同步 bump 各账户的
157 /// `cipher_state_version`。
158 ///
159 /// 起源:`/api/admin/reload` 之前的实现是
160 /// `bridge.caches.trd_cache.ciphers.clear()` 直接动 `DashMap`,但 **没** bump
161 /// `cipher_state_version`。这与 v1.4.73 A2 BUG-008 修复的语义不一致:
162 /// lock-trade 路径里 `ciphers.remove()` 之后必跟 `bump_cipher_state_version()`,
163 /// 防止旧 idempotency cache entry(unlock idem_key 含 cipher_state_version
164 /// hash)在 cipher 被清后仍命中返 stale "cached success",导致
165 /// step 4 / step 5 silent regression。
166 ///
167 /// admin/reload 漏 bump 的具体后果:
168 /// - reload 清光 ciphers
169 /// - 客户端再调 unlock-trade 同 body → idem_key 命中(cipher_state_version
170 /// 未变)→ 返 stale 成功 → cipher cache 仍空 → place-order `-401` 解锁失败
171 ///
172 /// 本 helper 把两步打包,**禁止外部直接 `cache.ciphers.clear()`**(那条
173 /// 路径 silent skip bump,复活 BUG-008)。所有清 cipher 的 control-plane
174 /// 路径(reload / admin / 未来若加更多)必须走本 helper。
175 ///
176 /// 返回 `(cleared_count, bumped_versions)`:
177 /// - `cleared_count`:清掉的 cipher 数(即 reload 前已解锁账户数)
178 /// - `bumped_versions`:每个被清 acc_id 的 (acc_id, new_version) 列表,
179 /// 便于 log + 客户端调试 idem_key 失效原因
180 ///
181 /// 与 lock-trade 路径的 bump 行为一致:仅对**实际清掉 cipher** 的 acc_id
182 /// 递增 version;从未解锁的账户 cipher_state_version 保持不变。
183 ///
184 /// 并发:`DashMap::iter()` 期间其他线程的 `ciphers.remove()` /
185 /// `ciphers.insert()` 可能 race,但本 helper 用 `remove(&key)` 逐个清,
186 /// 拿到 `Some(_)` 才 bump,保证 `version` 单调递增 + 与 `ciphers` 实际
187 /// 状态一致。`SeqCst` 保证 bump 对所有后续 `get_cipher_state_version()`
188 /// 立即可见。
189 #[must_use]
190 pub fn clear_all_ciphers_and_bump_versions(&self) -> (usize, Vec<(u64, u64)>) {
191 // 先收集 acc_ids(避免持 DashMap iter guard 时 mutate map → deadlock)
192 let acc_ids: Vec<u64> = self.ciphers.iter().map(|e| *e.key()).collect();
193 let mut cleared_count = 0usize;
194 let mut bumped_versions: Vec<(u64, u64)> = Vec::with_capacity(acc_ids.len());
195 for acc_id in acc_ids {
196 if self.ciphers.remove(&acc_id).is_some() {
197 cleared_count += 1;
198 let new_ver = self.bump_cipher_state_version(acc_id);
199 bumped_versions.push((acc_id, new_ver));
200 }
201 }
202 (cleared_count, bumped_versions)
203 }
204}