futu_rest/routes/trd/sub_acc_push.rs
1//! REST trade account push subscription route and helpers.
2
3use std::sync::Arc;
4
5use axum::extract::{Extension, Json, State};
6use axum::http::StatusCode;
7use serde_json::Value;
8
9use futu_auth::KeyRecord;
10
11use crate::adapter::RestState;
12
13use super::ApiResult;
14
15/// v1.4.90 P1-C 修:POST /api/sub-acc-push — 订阅交易账户 push(订单 / 成交变动)
16///
17/// **历史背景**(外部 tester v1.4.69-89 报告 6 版 silent stub):
18/// - 老版 handler 是单 `proto_request` 透传 wrapper,无 input validation、
19/// 无 audit log、无 session_id token —— 用户 POST 后 daemon log 看不到任何
20/// "本 endpoint 被调用过" 的痕迹(仅 gateway 侧 SubAccPushHandler 默默更
21/// 新 `SubscriptionManager.subscribe_trd_acc(conn_id, acc_id)`),
22/// "ret=0 + s2c={}" 看似成功但下游全无可观测信号。
23///
24/// **本次 (v1.4.90 P1-C) 对齐 MCP `futu_sub_acc_push`**:
25/// 1. **Loud input validation**(acc_id_list 必须非空 + 每个 id 非零;C++
26/// `IAPIServer_Trd_SubAccPush::OnClientReq_SubAccPush` 等价 check)
27/// 2. **Audit log** `tracing::info!` 留 endpoint + count + 头几个 acc_id 痕迹,
28/// 给 push_health 看不到的部分提供可观测性
29/// 3. **Backend CMD forward 经 `proto_request`**(即 dispatch 到 gateway
30/// `SubAccPushHandler` 完成 `SubscriptionManager` 注册 + return ret=0)
31/// 4. **Response augment**:在标准 proto Response (`ret_type/s2c{}`) 上补
32/// `subscribed_acc_ids` + `session_id` + `unsub_hint`,对齐 MCP
33/// `{ok, subscribed_acc_ids, session_id, unsub_hint}` 输出形状。
34/// REST stateless,session_id 用 sorted acc_ids hash 派生(确定性 token,
35/// 可拿来 POST `/api/unsub-acc-push`)
36///
37/// **架构说明**:本 daemon 内 gateway local `SubAccPushHandler` 即对齐
38/// C++ APIServer 的 `Trd_SubAccPush` 处理点 —— 没有"再下一层 backend"
39/// 的 CMD 需要 forward。MCP 通过 TCP `client.request(TRD_SUB_ACC_PUSH)`
40/// 也是落到同一个 gateway handler。所以 "CMD forward" 的语义是 REST →
41/// `proto_request` → `RequestRouter::dispatch` → `SubAccPushHandler`。
42pub async fn sub_acc_push(
43 State(state): State<RestState>,
44 rec: Option<Extension<Arc<KeyRecord>>>,
45 Json(mut body): Json<Value>,
46) -> ApiResult {
47 // v1.4.103 (codex 56 F1 / 58 F5 — B9): legacy 模式 (无 keys.json / 无 Bearer)
48 // 直接 reject. 之前 legacy 模式返 ret_type=0 + session_id 给客户端 "看似成功",
49 // 但**不写** rest_acc_subscriptions 状态 (legacy WS 连接也无 key_id, sub
50 // state filter 跳过) — 用户实际看到 revoke 成功仍按 legacy 全量 trade push
51 // 行为继续收, silent inconsistency. 改为 loud reject 让用户知道 sub state
52 // 需要 keys.json + Bearer 才工作.
53 if rec.is_none() {
54 futu_auth::audit::reject(
55 "rest",
56 "/api/sub-acc-push",
57 "<legacy>",
58 "sub-acc-push not supported in legacy mode (no keys.json)",
59 );
60 return Err((
61 StatusCode::FORBIDDEN,
62 Json(serde_json::json!({
63 "error": "/api/sub-acc-push: legacy mode (no keys.json) does not support per-key sub state. \
64 Configure keys.json and pass Bearer token to enable.",
65 "ret_type": -1,
66 "hint": "v1.4.103 BUG-009: legacy mode previously returned silent success without persisting sub state. Now loud-reject to surface the limitation."
67 })),
68 ));
69 }
70 // v1.4.102 codex 43 F1 (P2): normalize 在 extract_acc_id_list 之前.
71 // 之前 extractor 只看 snake_case `acc_id_list`, 但官方 proto 是 camelCase
72 // `accIDList`. adapter 后续会 normalize, 但本 extractor 先跑会误 reject 空
73 // → 400 silent. 现在 normalize 进去, extractor 看到 acc_id_list.
74 crate::adapter::normalize_json_keys_snake_case(&mut body);
75 // 1. Loud input validation(解析 acc_id_list;畸形 / 空 / 全零 → 400)
76 let acc_ids = match extract_acc_id_list(&body) {
77 Ok(acc_ids) => acc_ids,
78 Err(reason) => {
79 let key_id = rec
80 .as_deref()
81 .map(|r| r.as_ref().id.clone())
82 .unwrap_or_else(|| "<legacy>".to_string());
83 futu_auth::audit::reject("rest", "/api/sub-acc-push", &key_id, &reason);
84 return Err((
85 StatusCode::BAD_REQUEST,
86 Json(serde_json::json!({
87 "error": format!("/api/sub-acc-push: {reason}")
88 })),
89 ));
90 }
91 };
92 if let Err(reason) = validate_sub_acc_push_acc_ids(&acc_ids) {
93 let key_id = rec
94 .as_deref()
95 .map(|r| r.as_ref().id.clone())
96 .unwrap_or_else(|| "<legacy>".to_string());
97 futu_auth::audit::reject("rest", "/api/sub-acc-push", &key_id, reason);
98 return Err((
99 StatusCode::BAD_REQUEST,
100 Json(serde_json::json!({
101 "error": format!("/api/sub-acc-push: {reason}")
102 })),
103 ));
104 }
105 // v1.4.102 codex 46 F4 (P1): sub-acc-push 必须对 acc_id_list 每个 id 跑
106 // allowed_acc_ids 限额. 之前只 validate 非空, 但 key 限到 acc A 的用户传
107 // [A, B, C] 仍能 subscribe 别人账户的 trade push stream — 越权.
108 //
109 // codex 0522 F2 v1.4.106: 走共享 helper `check_per_acc_rate_for_caller`,
110 // 与 `/api/unsub-acc-push` 同源 (单一逻辑, 防漂移).
111 check_per_acc_rate_for_caller(
112 &state.counters,
113 rec.as_deref().map(|r| r.as_ref()),
114 &acc_ids,
115 "/api/sub-acc-push",
116 )?;
117
118 // 2. Audit log:留 endpoint + count + head acc_ids 痕迹(push_health 不
119 // 跟踪 subscribe,本 log 是 silent-stub 的可观测性补丁)
120 let head: Vec<u64> = acc_ids.iter().take(3).copied().collect();
121 tracing::info!(
122 target: futu_auth::audit::TARGET,
123 iface = "rest",
124 endpoint = "/api/sub-acc-push",
125 count = acc_ids.len(),
126 head_acc_ids = ?head,
127 "v1.4.90 P1-C: sub-acc-push CMD forward → SubAccPushHandler"
128 );
129
130 // 3. v1.4.102 codex 51 F2 (P2): REST 不再 dispatch 到 gateway
131 // SubAccPushHandler — 那条路径 register ephemeral REST conn_id 到
132 // SubscriptionManager (per-conn_id), 而 unsub 用新 conn_id 删不到, 留
133 // stale entries 让 push_trd_acc 一直扫. REST sub state map (per-key) 已
134 // 是 REST 层 truly canonical 真相源, 直接 return success + 让 augment
135 // 段 (后) 写 state map.
136 let mut resp = Json(serde_json::json!({
137 "ret_type": 0,
138 "ret_msg": serde_json::Value::Null,
139 "err_code": serde_json::Value::Null,
140 "s2c": {}
141 }));
142 // 4. 仅在 backend 返成功 (ret_type=0) 时 augment session_id / unsub_hint
143 // + v1.4.102 codex 46 F2/F3 / 47 F3 / 48 F3 (P1): 写入 REST sub state
144 // map 让 WS push delivery 过滤生效. **mutate-after-daemon-success** —
145 // proto_request 已 await 完, ret_type=0 才 register, 失败时不留 local
146 // state (避免 daemon 失败但 REST WS state 残留导致行为不一致).
147 if let Some(obj) = resp.0.as_object_mut() {
148 let ret_type_ok = obj.get("ret_type").and_then(|v| v.as_i64()).unwrap_or(-1) == 0;
149 if ret_type_ok {
150 // codex 47 F3 / 48 F3: 仅 daemon-success 后 register local state.
151 if let Some(rec_ref) = rec.as_deref() {
152 let key_id = rec_ref.as_ref().id.clone();
153 crate::adapter::with_rest_acc_subscriptions_write(
154 &state.rest_acc_subscriptions,
155 |subs| {
156 let entry = subs.entry(key_id).or_default();
157 for &acc_id in &acc_ids {
158 entry.insert(acc_id);
159 }
160 },
161 );
162 }
163 let session_id = derive_sub_acc_push_session_id(&acc_ids);
164 obj.insert("subscribed_acc_ids".to_string(), serde_json::json!(acc_ids));
165 obj.insert("session_id".to_string(), Value::String(session_id.clone()));
166 obj.insert(
167 "unsub_hint".to_string(),
168 Value::String(format!(
169 "POST /api/unsub-acc-push with same acc_id_list to revoke (REST is stateless, session_id=\"{session_id}\" is a deterministic token derived from sorted acc_ids)"
170 )),
171 );
172 }
173 }
174 Ok(resp)
175}
176
177/// v1.4.90 P1-C helper:从 JSON body 抽 `c2s.acc_id_list`(兼容 flat body)。
178///
179/// 容忍点:
180/// - `body.c2s.acc_id_list` 嵌套(标准 proto 结构)
181/// - `body.acc_id_list` 扁平(adapter 之后会 wrap 成 c2s,本 helper 提前看一眼)
182/// - 数组元素必须是 JSON 正整数;畸形元素 fail-closed,不静默过滤
183pub(crate) fn extract_acc_id_list(body: &Value) -> Result<Vec<u64>, String> {
184 let raw = body
185 .pointer("/c2s/acc_id_list")
186 .or_else(|| body.pointer("/acc_id_list"));
187 let Some(raw) = raw else {
188 return Ok(Vec::new());
189 };
190 let Some(arr) = raw.as_array() else {
191 return Err("c2s.acc_id_list must be an array of positive integer acc_id values".into());
192 };
193
194 let mut acc_ids = Vec::with_capacity(arr.len());
195 for (idx, value) in arr.iter().enumerate() {
196 let Some(acc_id) = value.as_u64() else {
197 return Err(format!(
198 "c2s.acc_id_list[{idx}] must be a positive integer acc_id"
199 ));
200 };
201 acc_ids.push(acc_id);
202 }
203 Ok(acc_ids)
204}
205
206/// v1.4.90 P1-C helper:sub-acc-push 入参校验。
207///
208/// C++ `IAPIServer_Trd_SubAccPush::OnClientReq_SubAccPush` 没有显式校验
209/// 空 list / 0 acc_id(默默 no-op),但 silent no-op 是用户视角的 bug
210/// (tester 报告 "看似 ret=0 但什么都没订上")。Rust REST 侧 fail-fast。
211pub(crate) fn validate_sub_acc_push_acc_ids(acc_ids: &[u64]) -> Result<(), &'static str> {
212 if acc_ids.is_empty() {
213 return Err("c2s.acc_id_list is required and must not be empty");
214 }
215 if acc_ids.contains(&0) {
216 return Err(
217 "c2s.acc_id_list contains zero — call /api/list-accounts to discover real acc_id values",
218 );
219 }
220 Ok(())
221}
222
223/// codex 0522 F2 v1.4.106: 共享的 per-acc rate / scope check helper.
224///
225/// `/api/sub-acc-push` 与 `/api/unsub-acc-push` 之前在两个文件 (trd.rs +
226/// sys.rs) 各写一份相同的 for-loop 跑 `check_full_skip_rate` per acc_id.
227/// codex F2 audit 指出: REST caller scope 没统一入口, 多套手写 = 漂移风险.
228/// 本 helper 是单一来源 — 两处 callers 都调它, 任一改逻辑都同步生效.
229///
230/// 接 `CallerContext` 而不是 `&KeyRecord`: 让 caller side 总是先 build ctx
231/// (per-call snapshot), 与 `IncomingRequest.caller_*` 同源数据不漂移.
232///
233/// `endpoint` 用作 audit reject label (e.g. "/api/sub-acc-push").
234///
235/// **行为**:
236/// - `ctx.key_id = None` (legacy mode): no-op 返 Ok (route handler 已在 sub
237/// 入口 reject legacy, 但本 helper 防御性兼容)
238/// - `acc_ids` 任一不通过 → audit reject + 返 Err 含 status code (从 outcome
239/// 的 http_status_code 派生, 默认 403)
240/// - 全通过 → Ok
241pub(crate) fn check_per_acc_rate_for_caller(
242 counters: &futu_auth::RuntimeCounters,
243 rec: Option<&futu_auth::KeyRecord>,
244 acc_ids: &[u64],
245 endpoint: &'static str,
246) -> Result<(), (StatusCode, Json<Value>)> {
247 let Some(key_rec) = rec else {
248 // legacy mode — caller handler 已 reject 过, 但防御性兼容. no-op.
249 return Ok(());
250 };
251 for &acc_id in acc_ids {
252 let ctx = futu_auth::CheckCtx {
253 market: String::new(),
254 symbol: String::new(),
255 order_value: None,
256 trd_side: None,
257 acc_id: Some(acc_id),
258 mutation_no_exposure: false,
259 currency: None,
260 };
261 let now = chrono::Utc::now();
262 let outcome = counters.check_full_skip_rate(&key_rec.id, &key_rec.limits(), &ctx, now);
263 if let Some(reason) = outcome.reason() {
264 futu_auth::audit::reject(
265 "rest",
266 endpoint,
267 &key_rec.id,
268 &format!("limit: {reason} (acc_id={acc_id})"),
269 );
270 let status =
271 StatusCode::from_u16(outcome.http_status_code()).unwrap_or(StatusCode::FORBIDDEN);
272 return Err((
273 status,
274 Json(serde_json::json!({
275 "error": format!("{endpoint}: limit check failed for acc_id={acc_id}: {reason}")
276 })),
277 ));
278 }
279 }
280 Ok(())
281}
282
283/// v1.4.90 P1-C helper:从 sorted acc_ids 派生确定性 session_id。
284///
285/// REST stateless 没有 in-process push 注册表(MCP 有 `Arc<Mutex<HashMap<>>>`),
286/// session_id 由 sorted acc_ids hash 派生,**输入相同 → 输出相同**。
287/// 用户存下 session_id 之后通过 `/api/unsub-acc-push` 用相同 acc_ids
288/// 派生出相同 session_id 即可撤销。
289///
290/// 注:这不是为了"防 hallucination"或"鉴权",仅作为输出 token 让用户
291/// 体感对齐 MCP `futu_sub_acc_push` 的 `{session_id, unsub_hint}` 形状。
292fn derive_sub_acc_push_session_id(acc_ids: &[u64]) -> String {
293 use std::collections::hash_map::DefaultHasher;
294 use std::hash::{Hash, Hasher};
295 let mut sorted = acc_ids.to_vec();
296 sorted.sort_unstable();
297 sorted.dedup();
298 let mut hasher = DefaultHasher::new();
299 sorted.hash(&mut hasher);
300 let hash = hasher.finish();
301 format!("rest-sub-{hash:016x}")
302}
303
304/// codex 0522 F2 v1.4.106: caller scope invariant tests for sub/unsub helpers.
305///
306/// 验证 `check_per_acc_rate_for_caller` 给定同一 `KeyRecord` 的两个不同
307/// endpoint label (sub / unsub) 看到同一份 allowed set。即使 helper
308/// 被加新的 endpoint 调用,`KeyRecord.allowed_acc_ids` 仍是 single source
309/// of truth,helper 不能引入 per-endpoint 漂移。
310#[cfg(test)]
311mod tests_v1_4_106_caller_ctx_invariant;
312
313#[cfg(test)]
314mod acc_id_list_tests;
315
316#[cfg(test)]
317mod tests_v1_4_90_p1_c_sub_acc_push;