1use axum::Json;
6use axum::http::StatusCode;
7use axum::http::header::CONTENT_TYPE;
8use axum::response::{IntoResponse, Response};
9use serde_json::Value;
10
11use super::*;
12
13#[derive(Debug, Clone)]
14pub struct RawJson {
15 bytes: Bytes,
16}
17
18impl RawJson {
19 pub fn new(bytes: impl Into<Bytes>) -> Self {
20 Self {
21 bytes: bytes.into(),
22 }
23 }
24}
25
26impl IntoResponse for RawJson {
27 fn into_response(self) -> Response {
28 ([(CONTENT_TYPE, "application/json")], self.bytes).into_response()
29 }
30}
31
32pub async fn proto_request<Req, Rsp>(
39 state: &RestState,
40 proto_id: u32,
41 json_body: Option<Value>,
42) -> Result<Json<Value>, (StatusCode, Json<Value>)>
43where
44 Req: Message + Default + serde::de::DeserializeOwned,
45 Rsp: Message + Default + serde::Serialize,
46{
47 proto_request_internal::<Req, Rsp>(state, proto_id, json_body, None, None).await
48}
49
50pub async fn proto_request_raw<Req, Rsp>(
51 state: &RestState,
52 proto_id: u32,
53 json_body: Option<Value>,
54) -> Result<RawJson, (StatusCode, Json<Value>)>
55where
56 Req: Message + Default + serde::de::DeserializeOwned,
57 Rsp: Message + Default + serde::Serialize,
58{
59 proto_request_raw_internal::<Req, Rsp>(state, proto_id, json_body, None, None).await
60}
61
62pub(super) fn map_dispatch_error(
66 spec: &'static EndpointSpec,
67 err: DispatchError,
68) -> (StatusCode, Json<Value>) {
69 let proto_id = spec
70 .proto_id()
71 .map(|id| id.to_string())
72 .unwrap_or_else(|| "daemon-local".to_string());
73 let ret_msg = format!(
74 "{} (endpoint: {}, proto_id: {})",
75 err, spec.canonical_name, proto_id
76 );
77 let status = StatusCode::from_u16(spec.runtime.error.validation_http_status)
78 .unwrap_or(StatusCode::BAD_REQUEST);
79 let machine_error_field = spec.runtime.error.machine_error_field;
80 let mut body = serde_json::json!({
81 "ret_type": -1,
82 "ret_msg": ret_msg,
83 });
84 if let Some(obj) = body.as_object_mut() {
85 obj.insert(
86 machine_error_field.to_string(),
87 serde_json::json!({
88 "kind": "validation_error",
89 "message": err.to_string(),
90 "endpoint": spec.canonical_name,
91 "proto_id": proto_id,
92 }),
93 );
94 }
95 (status, Json(body))
96}
97
98pub(super) fn validation_error_body(message: impl Into<String>) -> Value {
99 let message = message.into();
100 serde_json::json!({
101 "ret_type": -1,
102 "ret_msg": message,
103 "error": message,
104 })
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub(crate) enum JsonRequestMode {
109 GenericRest,
112 QotSharedConn,
116 RawSpecBody,
119}
120
121pub(crate) fn decode_json_request<Req>(
122 proto_id: u32,
123 json_body: Option<Value>,
124 mode: JsonRequestMode,
125) -> Result<Req, (StatusCode, Json<Value>)>
126where
127 Req: Default + serde::de::DeserializeOwned,
128{
129 let mut body = json_body.unwrap_or_else(|| Value::Object(serde_json::Map::new()));
130 normalize_json_keys_snake_case(&mut body);
131 apply_known_field_aliases_for_proto_id(&mut body, Some(proto_id));
132 if mode != JsonRequestMode::RawSpecBody {
133 maybe_wrap_flat_body_as_c2s(&mut body);
134 }
135 if mode == JsonRequestMode::GenericRest {
136 maybe_expand_flat_trd_header(&mut body);
137 }
138 expand_symbol_shorthand(&mut body)
139 .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
140 normalize_endpoint_local_market_strings(proto_id, &mut body)
141 .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
142
143 if let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) {
144 let validation_result = if let Some(c2s) = body.get_mut("c2s") {
145 futu_surface_spec::validate_and_normalize(spec, c2s)
146 } else {
147 futu_surface_spec::validate_and_normalize(spec, &mut body)
148 };
149 if let Err(err) = validation_result {
150 return Err(map_dispatch_error(spec, err));
151 }
152 }
153 if mode != JsonRequestMode::RawSpecBody {
154 maybe_wrap_flat_body_as_c2s(&mut body);
157 }
158
159 serde_json::from_value(body).map_err(|e| {
160 (
161 StatusCode::BAD_REQUEST,
162 Json(validation_error_body(format!("invalid request body: {e}"))),
163 )
164 })
165}
166
167fn normalize_endpoint_local_market_strings(proto_id: u32, body: &mut Value) -> Result<(), String> {
168 let Some(c2s_or_body) = c2s_or_body_object_mut(body) else {
169 return Ok(());
170 };
171 let valid = match proto_id {
172 futu_core::proto_id::QOT_STOCK_FILTER => STOCK_FILTER_MARKETS,
173 futu_core::proto_id::QOT_GET_IPO_LIST => IPO_LIST_MARKETS,
174 futu_core::proto_id::QOT_GET_PRICE_REMINDER => PRICE_REMINDER_MARKETS,
175 _ => return Ok(()),
176 };
177 normalize_market_field(c2s_or_body, valid)
178}
179
180const STOCK_FILTER_MARKETS: &[(&str, i32)] = &[
181 ("HK", 1),
182 ("HK_FUTURE", 2),
183 ("HKFUTURE", 2),
184 ("US", 11),
185 ("CN", 21),
186 ("SH", 21),
187 ("SZ", 22),
188 ("SG", 31),
189 ("JP", 41),
190 ("MY", 61),
191];
192
193const IPO_LIST_MARKETS: &[(&str, i32)] = &[
194 ("HK", 1),
195 ("HK_FUTURE", 2),
196 ("HKFUTURE", 2),
197 ("US", 11),
198 ("CN", 21),
199 ("SH", 21),
200 ("SZ", 22),
201 ("SG", 31),
202 ("JP", 41),
203 ("MY", 61),
204];
205
206const PRICE_REMINDER_MARKETS: &[(&str, i32)] = &[
207 ("HK", 1),
208 ("HK_FUTURE", 6),
209 ("HKFUTURE", 6),
210 ("US", 11),
211 ("CN", 21),
212 ("SH", 21),
213 ("SZ", 22),
214];
215
216fn c2s_or_body_object_mut(body: &mut Value) -> Option<&mut serde_json::Map<String, Value>> {
217 if let Value::Object(top) = body {
218 if matches!(top.get("c2s"), Some(Value::Object(_))) {
219 return top.get_mut("c2s").and_then(Value::as_object_mut);
220 }
221 return Some(top);
222 }
223 None
224}
225
226fn normalize_market_field(
227 body: &mut serde_json::Map<String, Value>,
228 valid: &[(&str, i32)],
229) -> Result<(), String> {
230 let Some(value) = body.get_mut("market") else {
231 return Ok(());
232 };
233 let Value::String(raw) = value else {
234 return Ok(());
235 };
236 let trimmed = raw.trim();
237 if let Ok(number) = trimmed.parse::<i32>() {
238 if valid.iter().any(|(_, v)| *v == number) {
239 *value = Value::Number(number.into());
240 return Ok(());
241 }
242 return Err(format!(
243 "unknown market {trimmed:?}: valid = {}",
244 market_valid_values(valid)
245 ));
246 }
247 let normalized = trimmed
248 .trim_start_matches("QOTMARKET_")
249 .replace(['-', ' '], "_")
250 .to_ascii_uppercase();
251 if let Some((_, market)) = valid.iter().find(|(name, _)| *name == normalized) {
252 *value = Value::Number((*market).into());
253 return Ok(());
254 }
255 Err(format!(
256 "unknown market {raw:?}: valid = {}",
257 market_valid_values(valid)
258 ))
259}
260
261fn market_valid_values(valid: &[(&str, i32)]) -> String {
262 let mut seen = Vec::new();
263 for (name, number) in valid {
264 if !seen.iter().any(|(_, n)| n == number) {
265 seen.push((*name, *number));
266 }
267 }
268 seen.into_iter()
269 .map(|(name, number)| format!("{name}={number}"))
270 .collect::<Vec<_>>()
271 .join(", ")
272}
273
274pub async fn proto_request_with_filter<Req, Rsp>(
288 state: &RestState,
289 proto_id: u32,
290 json_body: Option<Value>,
291 allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
292) -> Result<Json<Value>, (StatusCode, Json<Value>)>
293where
294 Req: Message + Default + serde::de::DeserializeOwned,
295 Rsp: Message + Default + serde::Serialize,
296{
297 let ctx = if let Some(allowed) = allowed_acc_ids {
301 crate::caller_context::CallerContext {
302 key_id: None,
303 allowed_acc_ids: Some(std::sync::Arc::new(allowed.clone())),
304 }
305 } else {
306 crate::caller_context::CallerContext::legacy()
307 };
308 proto_request_internal::<Req, Rsp>(state, proto_id, json_body, None, Some(&ctx)).await
309}
310
311pub async fn proto_request_with_idempotency<Req, Rsp>(
315 state: &RestState,
316 proto_id: u32,
317 json_body: Option<Value>,
318 idempotency_key: Option<String>,
319) -> Result<Json<Value>, (StatusCode, Json<Value>)>
320where
321 Req: Message + Default + serde::de::DeserializeOwned,
322 Rsp: Message + Default + serde::Serialize,
323{
324 proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, None).await
325}
326
327pub async fn proto_request_with_idempotency_and_caller<Req, Rsp>(
334 state: &RestState,
335 proto_id: u32,
336 json_body: Option<Value>,
337 idempotency_key: Option<String>,
338 caller_key_id: Option<String>,
339) -> Result<Json<Value>, (StatusCode, Json<Value>)>
340where
341 Req: Message + Default + serde::de::DeserializeOwned,
342 Rsp: Message + Default + serde::Serialize,
343{
344 let ctx = caller_key_id.map(|k| crate::caller_context::CallerContext {
345 key_id: Some(k),
346 allowed_acc_ids: None,
347 });
348 proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, ctx.as_ref())
349 .await
350}
351
352pub async fn proto_request_with_ctx<Req, Rsp>(
371 state: &RestState,
372 proto_id: u32,
373 json_body: Option<Value>,
374 idempotency_key: Option<String>,
375 ctx: Option<&crate::caller_context::CallerContext>,
376) -> Result<Json<Value>, (StatusCode, Json<Value>)>
377where
378 Req: Message + Default + serde::de::DeserializeOwned,
379 Rsp: Message + Default + serde::Serialize,
380{
381 proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, ctx).await
382}
383
384async fn proto_request_raw_internal<Req, Rsp>(
385 state: &RestState,
386 proto_id: u32,
387 json_body: Option<Value>,
388 idempotency_key: Option<String>,
389 ctx: Option<&crate::caller_context::CallerContext>,
390) -> Result<RawJson, (StatusCode, Json<Value>)>
391where
392 Req: Message + Default + serde::de::DeserializeOwned,
393 Rsp: Message + Default + serde::Serialize,
394{
395 let req_msg: Req = decode_json_request(proto_id, json_body, JsonRequestMode::GenericRest)?;
396 let conn_id = state.next_conn_id();
397 let serial_no = state.next_serial();
398 let body = futu_server::trade_packet_id::fill_omitted_trade_packet_id_bytes(
399 proto_id,
400 req_msg.encode_to_vec(),
401 conn_id,
402 serial_no,
403 )
404 .map(Bytes::from)
405 .map_err(|e| {
406 (
407 StatusCode::INTERNAL_SERVER_ERROR,
408 Json(serde_json::json!({
409 "error": format!("failed to prepare trade PacketID: {e}")
410 })),
411 )
412 })?;
413
414 let incoming =
415 IncomingRequest::builder(conn_id, proto_id, serial_no, ProtoFmtType::Protobuf, body)
416 .with_idempotency_key(idempotency_key)
417 .with_caller_scope(
418 ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
419 ctx.and_then(|c| c.caller_key_id()),
420 )
421 .build();
422
423 let resp_bytes = state
424 .router
425 .dispatch(incoming.conn_id, &incoming)
426 .await
427 .ok_or_else(|| {
428 (
429 StatusCode::INTERNAL_SERVER_ERROR,
430 Json(serde_json::json!({
431 "error": "handler returned no response"
432 })),
433 )
434 })?;
435
436 let resp_bytes = state.filter_registry.apply(
437 proto_id,
438 resp_bytes,
439 ctx.and_then(|c| c.allowed_acc_ids_borrow()),
440 );
441
442 let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
443 (
444 StatusCode::INTERNAL_SERVER_ERROR,
445 Json(serde_json::json!({
446 "error": format!("failed to decode response: {e}")
447 })),
448 )
449 })?;
450
451 raw_json_from_proto_response(&rsp_msg)
452}
453
454pub(crate) fn raw_json_from_proto_response<Rsp>(
455 rsp_msg: &Rsp,
456) -> Result<RawJson, (StatusCode, Json<Value>)>
457where
458 Rsp: serde::Serialize,
459{
460 let encoded = encode_proto_response_raw_or_error_value(rsp_msg).map_err(|e| {
461 (
462 StatusCode::INTERNAL_SERVER_ERROR,
463 Json(serde_json::json!({
464 "error": e
465 })),
466 )
467 })?;
468 match encoded {
469 ProtoJsonBody::Raw(raw) => Ok(RawJson::new(raw)),
470 ProtoJsonBody::Value(value) => {
471 let raw = serde_json::to_vec(&value).map_err(|e| {
472 (
473 StatusCode::INTERNAL_SERVER_ERROR,
474 Json(serde_json::json!({
475 "error": format!("failed to serialize response: {e}")
476 })),
477 )
478 })?;
479 Ok(RawJson::new(raw))
480 }
481 }
482}
483
484async fn proto_request_internal<Req, Rsp>(
489 state: &RestState,
490 proto_id: u32,
491 json_body: Option<Value>,
492 idempotency_key: Option<String>,
493 ctx: Option<&crate::caller_context::CallerContext>,
494) -> Result<Json<Value>, (StatusCode, Json<Value>)>
495where
496 Req: Message + Default + serde::de::DeserializeOwned,
497 Rsp: Message + Default + serde::Serialize,
498{
499 let req_msg: Req = decode_json_request(proto_id, json_body, JsonRequestMode::GenericRest)?;
503
504 let conn_id = state.next_conn_id();
506 let serial_no = state.next_serial();
507 let body = futu_server::trade_packet_id::fill_omitted_trade_packet_id_bytes(
508 proto_id,
509 req_msg.encode_to_vec(),
510 conn_id,
511 serial_no,
512 )
513 .map(Bytes::from)
514 .map_err(|e| {
515 (
516 StatusCode::INTERNAL_SERVER_ERROR,
517 Json(serde_json::json!({
518 "error": format!("failed to prepare trade PacketID: {e}")
519 })),
520 )
521 })?;
522
523 let incoming =
527 IncomingRequest::builder(conn_id, proto_id, serial_no, ProtoFmtType::Protobuf, body)
528 .with_idempotency_key(idempotency_key)
529 .with_caller_scope(
530 ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
531 ctx.and_then(|c| c.caller_key_id()),
532 )
533 .build();
534
535 let resp_bytes = state
536 .router
537 .dispatch(incoming.conn_id, &incoming)
538 .await
539 .ok_or_else(|| {
540 (
541 StatusCode::INTERNAL_SERVER_ERROR,
542 Json(serde_json::json!({
543 "error": "handler returned no response"
544 })),
545 )
546 })?;
547
548 let resp_bytes = state.filter_registry.apply(
555 proto_id,
556 resp_bytes,
557 ctx.and_then(|c| c.allowed_acc_ids_borrow()),
558 );
559
560 let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
562 (
563 StatusCode::INTERNAL_SERVER_ERROR,
564 Json(serde_json::json!({
565 "error": format!("failed to decode response: {e}")
566 })),
567 )
568 })?;
569
570 let mut json_rsp = serde_json::to_value(&rsp_msg).map_err(|e| {
572 (
573 StatusCode::INTERNAL_SERVER_ERROR,
574 Json(serde_json::json!({
575 "error": format!("failed to serialize response: {e}")
576 })),
577 )
578 })?;
579
580 maybe_wrap_err_code_prefix(&mut json_rsp);
587
588 Ok(Json(json_rsp))
589}
590
591pub(super) enum ProtoJsonBody {
592 Raw(Bytes),
593 Value(Value),
594}
595
596pub(super) fn encode_proto_response_raw_or_error_value<Rsp>(
597 rsp_msg: &Rsp,
598) -> Result<ProtoJsonBody, String>
599where
600 Rsp: serde::Serialize,
601{
602 let raw =
603 serde_json::to_vec(rsp_msg).map_err(|e| format!("failed to serialize response: {e}"))?;
604 if serialized_ret_type(&raw) == Some(0) {
605 return Ok(ProtoJsonBody::Raw(Bytes::from(raw)));
606 }
607
608 let mut json_rsp =
609 serde_json::to_value(rsp_msg).map_err(|e| format!("failed to serialize response: {e}"))?;
610 maybe_wrap_err_code_prefix(&mut json_rsp);
611 Ok(ProtoJsonBody::Value(json_rsp))
612}
613
614fn serialized_ret_type(raw: &[u8]) -> Option<i64> {
615 #[derive(serde::Deserialize)]
616 struct RetOnly {
617 ret_type: Option<i64>,
618 }
619
620 serde_json::from_slice::<RetOnly>(raw)
621 .ok()
622 .and_then(|ret| ret.ret_type)
623}
624
625pub(crate) fn maybe_wrap_err_code_prefix(v: &mut Value) {
640 let obj = match v.as_object_mut() {
641 Some(o) => o,
642 None => return,
643 };
644 let is_err = obj
646 .get("ret_type")
647 .and_then(|t| t.as_i64())
648 .map(|t| t != 0)
649 .unwrap_or(false);
650 if !is_err {
651 return;
652 }
653 let raw_msg = obj
655 .get("ret_msg")
656 .and_then(|m| m.as_str())
657 .unwrap_or("")
658 .to_string();
659 if raw_msg.starts_with("[err_code=") {
661 return;
662 }
663 let err_code_label = match obj.get("err_code") {
665 Some(Value::Number(n)) => n
666 .as_i64()
667 .map(|i| i.to_string())
668 .unwrap_or("none".to_string()),
669 _ => "none".to_string(),
670 };
671 let new_msg = if raw_msg.is_empty() {
672 format!("[err_code={err_code_label}]")
673 } else {
674 format!("[err_code={err_code_label}] {raw_msg}")
675 };
676 obj.insert("ret_msg".to_string(), Value::String(new_msg));
677}