Skip to main content

futu_rest/adapter/
proto_request.rs

1//! Split from adapter.rs: proto_request.
2//!
3//! pub items: proto_request,proto_request_with_filter,proto_request_with_idempotency,proto_request_with_idempotency_and_caller,proto_request_with_ctx.
4
5use axum::Json;
6use axum::http::StatusCode;
7use axum::http::header::CONTENT_TYPE;
8use axum::response::{IntoResponse, Response};
9use serde_json::Value;
10
11use super::*;
12
13#[derive(Debug, Clone)]
14pub struct RawJson {
15    bytes: Bytes,
16}
17
18impl RawJson {
19    pub fn new(bytes: impl Into<Bytes>) -> Self {
20        Self {
21            bytes: bytes.into(),
22        }
23    }
24}
25
26impl IntoResponse for RawJson {
27    fn into_response(self) -> Response {
28        ([(CONTENT_TYPE, "application/json")], self.bytes).into_response()
29    }
30}
31
32///
33/// 泛型参数:
34/// - `Req`: protobuf 请求类型 (prost::Message + serde::Deserialize)
35/// - `Rsp`: protobuf 响应类型 (prost::Message + serde::Serialize)
36///
37/// 流程: JSON → Req → encode → dispatch(proto_id) → decode → Rsp → JSON
38pub async fn proto_request<Req, Rsp>(
39    state: &RestState,
40    proto_id: u32,
41    json_body: Option<Value>,
42) -> Result<Json<Value>, (StatusCode, Json<Value>)>
43where
44    Req: Message + Default + serde::de::DeserializeOwned,
45    Rsp: Message + Default + serde::Serialize,
46{
47    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, None, None).await
48}
49
50pub async fn proto_request_raw<Req, Rsp>(
51    state: &RestState,
52    proto_id: u32,
53    json_body: Option<Value>,
54) -> Result<RawJson, (StatusCode, Json<Value>)>
55where
56    Req: Message + Default + serde::de::DeserializeOwned,
57    Rsp: Message + Default + serde::Serialize,
58{
59    proto_request_raw_internal::<Req, Rsp>(state, proto_id, json_body, None, None).await
60}
61
62/// 把 `DispatchError` 翻译成 REST 400 response (HTTP + JSON body).
63///
64/// pitfall #45 loud error: 不返 silent empty ret_type=0, 返清晰 400 + ret_msg.
65pub(super) fn map_dispatch_error(
66    spec: &'static EndpointSpec,
67    err: DispatchError,
68) -> (StatusCode, Json<Value>) {
69    let proto_id = spec
70        .proto_id()
71        .map(|id| id.to_string())
72        .unwrap_or_else(|| "daemon-local".to_string());
73    let ret_msg = format!(
74        "{} (endpoint: {}, proto_id: {})",
75        err, spec.canonical_name, proto_id
76    );
77    let status = StatusCode::from_u16(spec.runtime.error.validation_http_status)
78        .unwrap_or(StatusCode::BAD_REQUEST);
79    let machine_error_field = spec.runtime.error.machine_error_field;
80    let mut body = serde_json::json!({
81        "ret_type": -1,
82        "ret_msg": ret_msg,
83    });
84    if let Some(obj) = body.as_object_mut() {
85        obj.insert(
86            machine_error_field.to_string(),
87            serde_json::json!({
88                "kind": "validation_error",
89                "message": err.to_string(),
90                "endpoint": spec.canonical_name,
91                "proto_id": proto_id,
92            }),
93        );
94    }
95    (status, Json(body))
96}
97
98pub(super) fn validation_error_body(message: impl Into<String>) -> Value {
99    let message = message.into();
100    serde_json::json!({
101        "ret_type": -1,
102        "ret_msg": message,
103        "error": message,
104    })
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub(crate) enum JsonRequestMode {
109    /// Generic REST adapter path. This includes trade-header expansion because
110    /// TRD write/read REST endpoints accept flat `acc_id` / `trd_env` aliases.
111    GenericRest,
112    /// QOT shared-connection path. It intentionally skips TRD header expansion
113    /// while keeping the same JSON normalization, symbol shorthand expansion,
114    /// and EndpointSpec validation.
115    QotSharedConn,
116    /// REST path for non-FTAPI wrapper protos that already expose their JSON
117    /// shape directly instead of the usual `{c2s: ...}` request envelope.
118    RawSpecBody,
119}
120
121pub(crate) fn decode_json_request<Req>(
122    proto_id: u32,
123    json_body: Option<Value>,
124    mode: JsonRequestMode,
125) -> Result<Req, (StatusCode, Json<Value>)>
126where
127    Req: Default + serde::de::DeserializeOwned,
128{
129    let mut body = json_body.unwrap_or_else(|| Value::Object(serde_json::Map::new()));
130    normalize_json_keys_snake_case(&mut body);
131    apply_known_field_aliases_for_proto_id(&mut body, Some(proto_id));
132    if mode != JsonRequestMode::RawSpecBody {
133        maybe_wrap_flat_body_as_c2s(&mut body);
134    }
135    if mode == JsonRequestMode::GenericRest {
136        maybe_expand_flat_trd_header(&mut body);
137    }
138    expand_symbol_shorthand(&mut body)
139        .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
140    normalize_endpoint_local_market_strings(proto_id, &mut body)
141        .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
142
143    if let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) {
144        let validation_result = if let Some(c2s) = body.get_mut("c2s") {
145            futu_surface_spec::validate_and_normalize(spec, c2s)
146        } else {
147            futu_surface_spec::validate_and_normalize(spec, &mut body)
148        };
149        if let Err(err) = validation_result {
150            return Err(map_dispatch_error(spec, err));
151        }
152    }
153    if mode != JsonRequestMode::RawSpecBody {
154        // EndpointSpec validation may inject C2S-level defaults into an originally
155        // empty flat body. Keep the typed proto request shape intact before serde.
156        maybe_wrap_flat_body_as_c2s(&mut body);
157    }
158
159    serde_json::from_value(body).map_err(|e| {
160        (
161            StatusCode::BAD_REQUEST,
162            Json(validation_error_body(format!("invalid request body: {e}"))),
163        )
164    })
165}
166
167fn normalize_endpoint_local_market_strings(proto_id: u32, body: &mut Value) -> Result<(), String> {
168    let Some(c2s_or_body) = c2s_or_body_object_mut(body) else {
169        return Ok(());
170    };
171    let valid = match proto_id {
172        futu_core::proto_id::QOT_STOCK_FILTER => STOCK_FILTER_MARKETS,
173        futu_core::proto_id::QOT_GET_IPO_LIST => IPO_LIST_MARKETS,
174        futu_core::proto_id::QOT_GET_PRICE_REMINDER => PRICE_REMINDER_MARKETS,
175        _ => return Ok(()),
176    };
177    normalize_market_field(c2s_or_body, valid)
178}
179
180const STOCK_FILTER_MARKETS: &[(&str, i32)] = &[
181    ("HK", 1),
182    ("HK_FUTURE", 2),
183    ("HKFUTURE", 2),
184    ("US", 11),
185    ("CN", 21),
186    ("SH", 21),
187    ("SZ", 22),
188    ("SG", 31),
189    ("JP", 41),
190    ("MY", 61),
191];
192
193const IPO_LIST_MARKETS: &[(&str, i32)] = &[
194    ("HK", 1),
195    ("HK_FUTURE", 2),
196    ("HKFUTURE", 2),
197    ("US", 11),
198    ("CN", 21),
199    ("SH", 21),
200    ("SZ", 22),
201    ("SG", 31),
202    ("JP", 41),
203    ("MY", 61),
204];
205
206const PRICE_REMINDER_MARKETS: &[(&str, i32)] = &[
207    ("HK", 1),
208    ("HK_FUTURE", 6),
209    ("HKFUTURE", 6),
210    ("US", 11),
211    ("CN", 21),
212    ("SH", 21),
213    ("SZ", 22),
214];
215
216fn c2s_or_body_object_mut(body: &mut Value) -> Option<&mut serde_json::Map<String, Value>> {
217    if let Value::Object(top) = body {
218        if matches!(top.get("c2s"), Some(Value::Object(_))) {
219            return top.get_mut("c2s").and_then(Value::as_object_mut);
220        }
221        return Some(top);
222    }
223    None
224}
225
226fn normalize_market_field(
227    body: &mut serde_json::Map<String, Value>,
228    valid: &[(&str, i32)],
229) -> Result<(), String> {
230    let Some(value) = body.get_mut("market") else {
231        return Ok(());
232    };
233    let Value::String(raw) = value else {
234        return Ok(());
235    };
236    let trimmed = raw.trim();
237    if let Ok(number) = trimmed.parse::<i32>() {
238        if valid.iter().any(|(_, v)| *v == number) {
239            *value = Value::Number(number.into());
240            return Ok(());
241        }
242        return Err(format!(
243            "unknown market {trimmed:?}: valid = {}",
244            market_valid_values(valid)
245        ));
246    }
247    let normalized = trimmed
248        .trim_start_matches("QOTMARKET_")
249        .replace(['-', ' '], "_")
250        .to_ascii_uppercase();
251    if let Some((_, market)) = valid.iter().find(|(name, _)| *name == normalized) {
252        *value = Value::Number((*market).into());
253        return Ok(());
254    }
255    Err(format!(
256        "unknown market {raw:?}: valid = {}",
257        market_valid_values(valid)
258    ))
259}
260
261fn market_valid_values(valid: &[(&str, i32)]) -> String {
262    let mut seen = Vec::new();
263    for (name, number) in valid {
264        if !seen.iter().any(|(_, n)| n == number) {
265            seen.push((*name, *number));
266        }
267    }
268    seen.into_iter()
269        .map(|(name, number)| format!("{name}={number}"))
270        .collect::<Vec<_>>()
271        .join(", ")
272}
273
274/// v1.4.104 阶段 7-1: 走 FilterRegistry 的 proto_request 变体.
275///
276/// 跟 [`proto_request`] / [`proto_request_with_idempotency`] 流程一致, 但在
277/// `decode protobuf 响应` 之前**插入 FilterRegistry::apply** 一步, 按
278/// `allowed_acc_ids` filter 受限 key 的响应 acc_list (proto 2001 TRD_GET_ACC_LIST
279/// 等). 与 gRPC server.rs / WS ws_listener.rs 同源 (单一 registry).
280///
281/// `allowed_acc_ids = None` 时 filter no-op (legacy / 无限制 key).
282///
283/// **codex 0522 F2 v1.4.106**: 推荐改用 [`proto_request_with_ctx`], 这个
284/// helper 改为内部包装, 丢失 `caller_key_id`. 保留作 backward-compat 给
285/// 已有的 acc_list filter call site 不破坏 (route 层迁移到 ctx 后此函数
286/// 全部 caller 应该归零, 保留一段过渡期再删).
287pub async fn proto_request_with_filter<Req, Rsp>(
288    state: &RestState,
289    proto_id: u32,
290    json_body: Option<Value>,
291    allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
292) -> Result<Json<Value>, (StatusCode, Json<Value>)>
293where
294    Req: Message + Default + serde::de::DeserializeOwned,
295    Rsp: Message + Default + serde::Serialize,
296{
297    // codex 0522 F1 v1.4.106: 构 minimal CallerContext 把 allowed_acc_ids 同时
298    // 接进 IncomingRequest.caller_allowed_acc_ids + FilterRegistry. caller_key_id
299    // 仍 None (老 call site 没传). 推荐 caller 迁移到 proto_request_with_ctx.
300    let ctx = if let Some(allowed) = allowed_acc_ids {
301        crate::caller_context::CallerContext {
302            key_id: None,
303            allowed_acc_ids: Some(std::sync::Arc::new(allowed.clone())),
304        }
305    } else {
306        crate::caller_context::CallerContext::legacy()
307    };
308    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, None, Some(&ctx)).await
309}
310
311/// v1.4.38 Phase 4: 支持 `Idempotency-Key` header 的 proto_request。
312/// 老 call site 继续用 `proto_request`(header=None), 新写 trade endpoint
313/// 用 `proto_request_with_idempotency` 从 axum HeaderMap 提取 header 后传入。
314pub async fn proto_request_with_idempotency<Req, Rsp>(
315    state: &RestState,
316    proto_id: u32,
317    json_body: Option<Value>,
318    idempotency_key: Option<String>,
319) -> Result<Json<Value>, (StatusCode, Json<Value>)>
320where
321    Req: Message + Default + serde::de::DeserializeOwned,
322    Rsp: Message + Default + serde::Serialize,
323{
324    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, None).await
325}
326
327/// v1.4.106 codex 0920 F1 (P1): 支持 caller key id 的 idempotency 变体.
328/// 让 cache namespace 跨 caller 隔离 — 不同 caller 用同 Idempotency-Key
329/// **不能** 跨 caller 命中老 response (避免跨账户数据泄漏 + 重复下单).
330///
331/// **call site**: REST trade endpoint (place / modify / cancel / reconfirm)
332/// 在 `rec: Option<Extension<Arc<KeyRecord>>>` 抽 caller_key_id 后传入.
333pub async fn proto_request_with_idempotency_and_caller<Req, Rsp>(
334    state: &RestState,
335    proto_id: u32,
336    json_body: Option<Value>,
337    idempotency_key: Option<String>,
338    caller_key_id: Option<String>,
339) -> Result<Json<Value>, (StatusCode, Json<Value>)>
340where
341    Req: Message + Default + serde::de::DeserializeOwned,
342    Rsp: Message + Default + serde::Serialize,
343{
344    let ctx = caller_key_id.map(|k| crate::caller_context::CallerContext {
345        key_id: Some(k),
346        allowed_acc_ids: None,
347    });
348    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, ctx.as_ref())
349        .await
350}
351
352/// codex 0522 F1 v1.4.106 (推荐 API): 带 `CallerContext` 的 proto_request.
353///
354/// 与 `proto_request_with_idempotency` / `proto_request_with_filter` 的关系:
355/// 后两者只接 `idempotency_key` 或 `allowed_acc_ids` 单一维度, 本函数接完整
356/// `CallerContext` (含 `key_id` + `allowed_acc_ids`), 把 caller scope **同时**
357/// 接到三个下游消费点:
358///
359/// 1. `IncomingRequest.caller_allowed_acc_ids` (dispatch handler 的 per-acc
360///    enforce, defense-in-depth)
361/// 2. `IncomingRequest.caller_key_id` (per-key 配额 / cleanup / 审计入口)
362/// 3. `FilterRegistry::apply` (响应 acc_list 过滤)
363///
364/// 任一接错 → `dispatch handler` 看到 `None` 就 silent bypass (codex F1 audit
365/// 实锤的 v1.4.105 之前 REST regression).
366///
367/// `ctx = None` 等价 `legacy mode` (无 caller key, 无 acc 限制) — 通常仅
368/// 测试 / 显式 unauthenticated route 用. 真 route handler 应该总是构 ctx
369/// 从 `Extension<Arc<KeyRecord>>` 抽出来 (`CallerContext::from_key_record`).
370pub async fn proto_request_with_ctx<Req, Rsp>(
371    state: &RestState,
372    proto_id: u32,
373    json_body: Option<Value>,
374    idempotency_key: Option<String>,
375    ctx: Option<&crate::caller_context::CallerContext>,
376) -> Result<Json<Value>, (StatusCode, Json<Value>)>
377where
378    Req: Message + Default + serde::de::DeserializeOwned,
379    Rsp: Message + Default + serde::Serialize,
380{
381    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, ctx).await
382}
383
384async fn proto_request_raw_internal<Req, Rsp>(
385    state: &RestState,
386    proto_id: u32,
387    json_body: Option<Value>,
388    idempotency_key: Option<String>,
389    ctx: Option<&crate::caller_context::CallerContext>,
390) -> Result<RawJson, (StatusCode, Json<Value>)>
391where
392    Req: Message + Default + serde::de::DeserializeOwned,
393    Rsp: Message + Default + serde::Serialize,
394{
395    let req_msg: Req = decode_json_request(proto_id, json_body, JsonRequestMode::GenericRest)?;
396    let conn_id = state.next_conn_id();
397    let serial_no = state.next_serial();
398    let body = futu_server::trade_packet_id::fill_omitted_trade_packet_id_bytes(
399        proto_id,
400        req_msg.encode_to_vec(),
401        conn_id,
402        serial_no,
403    )
404    .map(Bytes::from)
405    .map_err(|e| {
406        (
407            StatusCode::INTERNAL_SERVER_ERROR,
408            Json(serde_json::json!({
409                "error": format!("failed to prepare trade PacketID: {e}")
410            })),
411        )
412    })?;
413
414    let incoming =
415        IncomingRequest::builder(conn_id, proto_id, serial_no, ProtoFmtType::Protobuf, body)
416            .with_idempotency_key(idempotency_key)
417            .with_caller_scope(
418                ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
419                ctx.and_then(|c| c.caller_key_id()),
420            )
421            .build();
422
423    let resp_bytes = state
424        .router
425        .dispatch(incoming.conn_id, &incoming)
426        .await
427        .ok_or_else(|| {
428            (
429                StatusCode::INTERNAL_SERVER_ERROR,
430                Json(serde_json::json!({
431                    "error": "handler returned no response"
432                })),
433            )
434        })?;
435
436    let resp_bytes = state.filter_registry.apply(
437        proto_id,
438        resp_bytes,
439        ctx.and_then(|c| c.allowed_acc_ids_borrow()),
440    );
441
442    let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
443        (
444            StatusCode::INTERNAL_SERVER_ERROR,
445            Json(serde_json::json!({
446                "error": format!("failed to decode response: {e}")
447            })),
448        )
449    })?;
450
451    raw_json_from_proto_response(&rsp_msg)
452}
453
454pub(crate) fn raw_json_from_proto_response<Rsp>(
455    rsp_msg: &Rsp,
456) -> Result<RawJson, (StatusCode, Json<Value>)>
457where
458    Rsp: serde::Serialize,
459{
460    let encoded = encode_proto_response_raw_or_error_value(rsp_msg).map_err(|e| {
461        (
462            StatusCode::INTERNAL_SERVER_ERROR,
463            Json(serde_json::json!({
464                "error": e
465            })),
466        )
467    })?;
468    match encoded {
469        ProtoJsonBody::Raw(raw) => Ok(RawJson::new(raw)),
470        ProtoJsonBody::Value(value) => {
471            let raw = serde_json::to_vec(&value).map_err(|e| {
472                (
473                    StatusCode::INTERNAL_SERVER_ERROR,
474                    Json(serde_json::json!({
475                        "error": format!("failed to serialize response: {e}")
476                    })),
477                )
478            })?;
479            Ok(RawJson::new(raw))
480        }
481    }
482}
483
484/// v1.4.104 阶段 7-1 + codex 0522 F1 v1.4.106: 内部统一实现, 接 `CallerContext`
485/// 替代单 `allowed_acc_ids` 入参. 调度时同时填 `IncomingRequest.caller_key_id`
486/// 与 `caller_allowed_acc_ids`, 响应过滤共用同一个 `allowed_acc_ids` 借引用 ——
487/// 单一来源, 防 routes 层手写多套 caller scope check 漂移 (codex F2).
488async fn proto_request_internal<Req, Rsp>(
489    state: &RestState,
490    proto_id: u32,
491    json_body: Option<Value>,
492    idempotency_key: Option<String>,
493    ctx: Option<&crate::caller_context::CallerContext>,
494) -> Result<Json<Value>, (StatusCode, Json<Value>)>
495where
496    Req: Message + Default + serde::de::DeserializeOwned,
497    Rsp: Message + Default + serde::Serialize,
498{
499    // 1. JSON → protobuf 请求. Empty body still flows through EndpointSpec
500    // validation; otherwise required fields could silently become
501    // `Req::default()` and bypass REST contract checks.
502    let req_msg: Req = decode_json_request(proto_id, json_body, JsonRequestMode::GenericRest)?;
503
504    // 2. encode 为 protobuf bytes
505    let conn_id = state.next_conn_id();
506    let serial_no = state.next_serial();
507    let body = futu_server::trade_packet_id::fill_omitted_trade_packet_id_bytes(
508        proto_id,
509        req_msg.encode_to_vec(),
510        conn_id,
511        serial_no,
512    )
513    .map(Bytes::from)
514    .map_err(|e| {
515        (
516            StatusCode::INTERNAL_SERVER_ERROR,
517            Json(serde_json::json!({
518                "error": format!("failed to prepare trade PacketID: {e}")
519            })),
520        )
521    })?;
522
523    // 3. 构造 IncomingRequest 调用现有 handler
524    // codex 0522 F1 v1.4.106: caller_allowed_acc_ids + caller_key_id 同时从
525    // CallerContext 拿, 单一来源, 不再分别写 None.
526    let incoming =
527        IncomingRequest::builder(conn_id, proto_id, serial_no, ProtoFmtType::Protobuf, body)
528            .with_idempotency_key(idempotency_key)
529            .with_caller_scope(
530                ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
531                ctx.and_then(|c| c.caller_key_id()),
532            )
533            .build();
534
535    let resp_bytes = state
536        .router
537        .dispatch(incoming.conn_id, &incoming)
538        .await
539        .ok_or_else(|| {
540            (
541                StatusCode::INTERNAL_SERVER_ERROR,
542                Json(serde_json::json!({
543                    "error": "handler returned no response"
544                })),
545            )
546        })?;
547
548    // 3.5. v1.4.104 阶段 7-1: response filter (cross-surface 共享 FilterRegistry).
549    //      proto 2001 TRD_GET_ACC_LIST 默认注册, allowed_acc_ids 非空时 filter
550    //      响应 acc_list, 受限 key 不能跨账户 enumerate. legacy / 无限制 key /
551    //      非注册 proto_id → no-op 返原 bytes 不动.
552    // codex 0522 F2 v1.4.106: 同一 ctx.allowed_acc_ids 借引用 (与
553    // IncomingRequest.caller_allowed_acc_ids 是同一份 Arc), 单一来源.
554    let resp_bytes = state.filter_registry.apply(
555        proto_id,
556        resp_bytes,
557        ctx.and_then(|c| c.allowed_acc_ids_borrow()),
558    );
559
560    // 4. decode protobuf 响应
561    let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
562        (
563            StatusCode::INTERNAL_SERVER_ERROR,
564            Json(serde_json::json!({
565                "error": format!("failed to decode response: {e}")
566            })),
567        )
568    })?;
569
570    // 5. 序列化为 JSON
571    let mut json_rsp = serde_json::to_value(&rsp_msg).map_err(|e| {
572        (
573            StatusCode::INTERNAL_SERVER_ERROR,
574            Json(serde_json::json!({
575                "error": format!("failed to serialize response: {e}")
576            })),
577        )
578    })?;
579
580    // 6. v1.4.34 BUG-2b 修:给所有错误响应的 ret_msg 加 `[err_code=X]` 前缀
581    //    历史:v1.4.27 server_err() helper 在 futu-trd 客户端 lib 里包了 CLI/gRPC/MCP
582    //    三个 surface,漏了 REST 这个 surface。external reviewer 4 次独立复现;v1.4.30 / 31 /
583    //    32 / 33 都没修。根因:REST 走 proto_request 直接序列化响应,没经过 server_err
584    //    helper 层。v1.4.34 直接在 JSON 层包一下,不动 daemon response(daemon 要给
585    //    CLI 客户端留原始 err_code 字段,CLI 自己会包)。
586    maybe_wrap_err_code_prefix(&mut json_rsp);
587
588    Ok(Json(json_rsp))
589}
590
591pub(super) enum ProtoJsonBody {
592    Raw(Bytes),
593    Value(Value),
594}
595
596pub(super) fn encode_proto_response_raw_or_error_value<Rsp>(
597    rsp_msg: &Rsp,
598) -> Result<ProtoJsonBody, String>
599where
600    Rsp: serde::Serialize,
601{
602    let raw =
603        serde_json::to_vec(rsp_msg).map_err(|e| format!("failed to serialize response: {e}"))?;
604    if serialized_ret_type(&raw) == Some(0) {
605        return Ok(ProtoJsonBody::Raw(Bytes::from(raw)));
606    }
607
608    let mut json_rsp =
609        serde_json::to_value(rsp_msg).map_err(|e| format!("failed to serialize response: {e}"))?;
610    maybe_wrap_err_code_prefix(&mut json_rsp);
611    Ok(ProtoJsonBody::Value(json_rsp))
612}
613
614fn serialized_ret_type(raw: &[u8]) -> Option<i64> {
615    #[derive(serde::Deserialize)]
616    struct RetOnly {
617        ret_type: Option<i64>,
618    }
619
620    serde_json::from_slice::<RetOnly>(raw)
621        .ok()
622        .and_then(|ret| ret.ret_type)
623}
624
625/// v1.4.34 BUG-2b:REST 响应的 ret_msg 包 `[err_code=X]` 前缀。
626///
627/// 与 `futu_trd::server_err()` 同语义,但作用在已序列化的 JSON 上:
628///
629/// - `ret_type == 0`:成功,不动
630/// - `ret_type != 0` 且 `err_code` 非 null:`[err_code=<code>] <原 ret_msg>`
631/// - `ret_type != 0` 且 `err_code` 缺省:`[err_code=none] <原 ret_msg>`
632/// - `ret_type != 0` 且 ret_msg 空:只留 `[err_code=<X>]` 带方括号的标签
633///
634/// 幂等(已经带 `[err_code=` 前缀的 ret_msg 不重复包)。方括号位置精确,既
635/// 便于客户端 grep,也不误伤"错误描述里恰好有方括号"的正常 msg。
636///
637/// 只对**顶层**的 `ret_type / ret_msg / err_code` 生效;嵌套在 s2c 里的状态字段
638/// 不动。
639pub(crate) fn maybe_wrap_err_code_prefix(v: &mut Value) {
640    let obj = match v.as_object_mut() {
641        Some(o) => o,
642        None => return,
643    };
644    // 只处理失败响应
645    let is_err = obj
646        .get("ret_type")
647        .and_then(|t| t.as_i64())
648        .map(|t| t != 0)
649        .unwrap_or(false);
650    if !is_err {
651        return;
652    }
653    // 读当前 msg(可能是 null)
654    let raw_msg = obj
655        .get("ret_msg")
656        .and_then(|m| m.as_str())
657        .unwrap_or("")
658        .to_string();
659    // 幂等:已带前缀就不动(多轮 middleware 不应该双包)
660    if raw_msg.starts_with("[err_code=") {
661        return;
662    }
663    // 读 err_code(可能是 null / 整数)
664    let err_code_label = match obj.get("err_code") {
665        Some(Value::Number(n)) => n
666            .as_i64()
667            .map(|i| i.to_string())
668            .unwrap_or("none".to_string()),
669        _ => "none".to_string(),
670    };
671    let new_msg = if raw_msg.is_empty() {
672        format!("[err_code={err_code_label}]")
673    } else {
674        format!("[err_code={err_code_label}] {raw_msg}")
675    };
676    obj.insert("ret_msg".to_string(), Value::String(new_msg));
677}