Skip to main content

futu_cache/trd_cache/
order_state.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use super::*;
5
6impl TrdCache {
7    pub fn get_cipher(&self, acc_id: u64) -> Option<Vec<u8>> {
8        self.ciphers.get(&acc_id).map(|v| v.clone())
9    }
10
11    pub fn set_cipher(&self, acc_id: u64, cipher: Vec<u8>) {
12        self.ciphers.insert(acc_id, cipher);
13    }
14
15    /// v1.4.73 A2 BUG-008 fix: 读当前账户的 cipher state version(用于 unlock idem_key)。
16    ///
17    /// 首次访问 acc_id 会初始化为 0。后续每次 lock 清 cipher 会 `fetch_add(1)`。
18    /// `idem_key` 构造时把这个 version 纳入 hash → cipher 清后 version 递增 →
19    /// 同 body 的 idem_key 不同 → cache miss → 真执行 unlock(或 backend 真校验)。
20    pub fn get_cipher_state_version(&self, acc_id: u64) -> u64 {
21        let entry = self
22            .cipher_state_versions
23            .entry(acc_id)
24            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
25        entry.load(Ordering::SeqCst)
26    }
27
28    /// v1.4.73 A2 BUG-008 fix: lock 清 cipher 时调,递增 version → 让下次 unlock
29    /// 同 body 得 cache miss。
30    ///
31    /// 返回 new version(递增后值),便于调用方 log。
32    #[must_use]
33    pub fn bump_cipher_state_version(&self, acc_id: u64) -> u64 {
34        let entry = self
35            .cipher_state_versions
36            .entry(acc_id)
37            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
38        entry.fetch_add(1, Ordering::SeqCst) + 1
39    }
40
41    /// v1.4.106 codex 0226 F1+F2: PlaceOrder 解析到 `OrderNewRsp.action.order_confirm`
42    /// 时调用, 保存上下文用于后续 `Trd_ReconfirmOrder` 构造 backend `OrderConfirmReq`.
43    ///
44    /// `now_ms` 由 caller 传入 (便于单测注入固定时钟); 真实路径用
45    /// `SystemTime::now()`.
46    pub fn store_pending_order_confirm(
47        &self,
48        acc_id: u64,
49        ftapi_order_id: u64,
50        mut ctx: OrderConfirmContext,
51        now_ms: u64,
52    ) {
53        ctx.inserted_at_ms = now_ms;
54        let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
55        self.pending_order_confirms.insert(key, ctx);
56    }
57
58    /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder handler 入口 lookup, 取出
59    /// `(acc_id, ftapi_order_id)` 对应 OrderConfirmContext.
60    ///
61    /// 返 `None`: cache miss (PlaceOrder 没存 / TTL 过期 / 已被消费). caller 必须
62    /// 早 reject loud, **不**允许 silent fallback (避免反模式 D / silent-success).
63    ///
64    /// `now_ms` 检查 TTL: `now - ctx.inserted_at_ms > ORDER_CONFIRM_CONTEXT_TTL_MS`
65    /// 视为 stale → return None + remove (proactive GC).
66    pub fn get_pending_order_confirm(
67        &self,
68        acc_id: u64,
69        ftapi_order_id: u64,
70        now_ms: u64,
71    ) -> Option<OrderConfirmContext> {
72        let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
73        let ctx = self.pending_order_confirms.get(&key)?.value().clone();
74        if now_ms.saturating_sub(ctx.inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
75            // Stale → proactive GC
76            self.pending_order_confirms.remove(&key);
77            return None;
78        }
79        Some(ctx)
80    }
81
82    /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder backend 成功 (`OrderConfirmRsp.result==0`)
83    /// 后调用, 从 cache 删除 (一次性消费, 防重复 confirm).
84    ///
85    /// 返 `true` 表示真有删除发生; `false` = 已被其他路径消费 / 过期 GC.
86    pub fn remove_pending_order_confirm(&self, acc_id: u64, ftapi_order_id: u64) -> bool {
87        let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
88        self.pending_order_confirms.remove(&key).is_some()
89    }
90
91    /// v1.4.106 codex 0226 F1+F2: GC stale OrderConfirmContext entries.
92    ///
93    /// 用于定时清理 (push dispatcher 收到 ORDER 类 push 时顺便扫一次), 防止
94    /// stale ctx 累积. 返回清理掉的条目数.
95    pub fn purge_stale_order_confirms(&self, now_ms: u64) -> usize {
96        let mut purged = Vec::new();
97        for entry in self.pending_order_confirms.iter() {
98            if now_ms.saturating_sub(entry.value().inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
99                purged.push(*entry.key());
100            }
101        }
102        let n = purged.len();
103        for key in purged {
104            self.pending_order_confirms.remove(&key);
105        }
106        n
107    }
108
109    /// Store backend write `req_id -> order_id_ex` for C++ order op-result
110    /// push parity.
111    ///
112    /// Ref:
113    /// - `NNProto_Trd_OnPush.cpp:22-27`
114    /// - `NNProto_Trd_OrderOpReal.cpp:217,352`
115    ///
116    /// Empty values are ignored because C++ only maps meaningful backend
117    /// request/order pairs. Return value mirrors best-effort insert success.
118    pub fn store_order_op_req_order(&self, acc_id: u64, req_id: &str, order_id_ex: &str) -> bool {
119        let req_id = req_id.trim();
120        let order_id_ex = order_id_ex.trim();
121        if req_id.is_empty() || order_id_ex.is_empty() {
122            return false;
123        }
124
125        self.order_op_req_orders
126            .insert(
127                OrderOpReqKey::new(acc_id, req_id.to_string()),
128                order_id_ex.to_string(),
129            )
130            .is_none()
131    }
132
133    /// Consume backend order id by operation `req_id`.
134    ///
135    /// C++ `FindOrderIDByReqID` erases the pair after lookup and clears the
136    /// helper map once it reaches 512 entries. Keep the same bounded,
137    /// best-effort behavior; a miss only means the push may have come from
138    /// another terminal/client and the dispatcher may fall back to full list.
139    pub fn take_order_id_by_op_req_id(&self, acc_id: u64, req_id: &str) -> Option<String> {
140        let req_id = req_id.trim();
141        if req_id.is_empty() {
142            return None;
143        }
144
145        let key = OrderOpReqKey::new(acc_id, req_id.to_string());
146        let value = self
147            .order_op_req_orders
148            .remove(&key)
149            .map(|(_, value)| value);
150        if self.order_op_req_orders.len() >= ORDER_OP_REQ_ORDER_MAP_CLEAR_LIMIT {
151            self.order_op_req_orders.clear();
152        }
153        value
154    }
155
156    /// v1.4.106 codex 0554 F1 [P1]: 原子性清空所有 cipher + 同步 bump 各账户的
157    /// `cipher_state_version`。
158    ///
159    /// 起源:`/api/admin/reload` 之前的实现是
160    /// `bridge.caches.trd_cache.ciphers.clear()` 直接动 `DashMap`,但 **没** bump
161    /// `cipher_state_version`。这与 v1.4.73 A2 BUG-008 修复的语义不一致:
162    /// lock-trade 路径里 `ciphers.remove()` 之后必跟 `bump_cipher_state_version()`,
163    /// 防止旧 idempotency cache entry(unlock idem_key 含 cipher_state_version
164    /// hash)在 cipher 被清后仍命中返 stale "cached success",导致
165    /// step 4 / step 5 silent regression。
166    ///
167    /// admin/reload 漏 bump 的具体后果:
168    /// - reload 清光 ciphers
169    /// - 客户端再调 unlock-trade 同 body → idem_key 命中(cipher_state_version
170    ///   未变)→ 返 stale 成功 → cipher cache 仍空 → place-order `-401` 解锁失败
171    ///
172    /// 本 helper 把两步打包,**禁止外部直接 `cache.ciphers.clear()`**(那条
173    /// 路径 silent skip bump,复活 BUG-008)。所有清 cipher 的 control-plane
174    /// 路径(reload / admin / 未来若加更多)必须走本 helper。
175    ///
176    /// 返回 `(cleared_count, bumped_versions)`:
177    /// - `cleared_count`:清掉的 cipher 数(即 reload 前已解锁账户数)
178    /// - `bumped_versions`:每个被清 acc_id 的 (acc_id, new_version) 列表,
179    ///   便于 log + 客户端调试 idem_key 失效原因
180    ///
181    /// 与 lock-trade 路径的 bump 行为一致:仅对**实际清掉 cipher** 的 acc_id
182    /// 递增 version;从未解锁的账户 cipher_state_version 保持不变。
183    ///
184    /// 并发:`DashMap::iter()` 期间其他线程的 `ciphers.remove()` /
185    /// `ciphers.insert()` 可能 race,但本 helper 用 `remove(&key)` 逐个清,
186    /// 拿到 `Some(_)` 才 bump,保证 `version` 单调递增 + 与 `ciphers` 实际
187    /// 状态一致。`SeqCst` 保证 bump 对所有后续 `get_cipher_state_version()`
188    /// 立即可见。
189    #[must_use]
190    pub fn clear_all_ciphers_and_bump_versions(&self) -> (usize, Vec<(u64, u64)>) {
191        // 先收集 acc_ids(避免持 DashMap iter guard 时 mutate map → deadlock)
192        let acc_ids: Vec<u64> = self.ciphers.iter().map(|e| *e.key()).collect();
193        let mut cleared_count = 0usize;
194        let mut bumped_versions: Vec<(u64, u64)> = Vec::with_capacity(acc_ids.len());
195        for acc_id in acc_ids {
196            if self.ciphers.remove(&acc_id).is_some() {
197                cleared_count += 1;
198                let new_ver = self.bump_cipher_state_version(acc_id);
199                bumped_versions.push((acc_id, new_ver));
200            }
201        }
202        (cleared_count, bumped_versions)
203    }
204}