1use std::sync::Arc;
4
5use axum::Extension;
6use axum::extract::{Json, Query, State};
7use axum::http::StatusCode;
8use bytes::Bytes;
9use futu_codec::header::ProtoFmtType;
10use futu_server::conn::IncomingRequest;
11use prost::Message;
12use serde::Deserialize;
13use serde_json::Value;
14
15use futu_core::proto_id;
16use futu_proto::get_delay_statistics;
17use futu_proto::get_global_state;
18use futu_proto::get_user_info;
19use futu_proto::test_cmd;
20use futu_surface_spec::endpoints::get_delay_statistics::default_request_body_json;
21use futu_backend::proto_internal::futu_token_state;
23use futu_qot::quote_rights::SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE;
24
25use crate::adapter::{self, RestState};
26
27type ApiResult = Result<Json<Value>, (StatusCode, Json<Value>)>;
28type RawApiResult = Result<adapter::RawJson, (StatusCode, Json<Value>)>;
29
30pub async fn get_global_state(State(state): State<RestState>) -> RawApiResult {
35 adapter::proto_request_raw::<get_global_state::Request, get_global_state::Response>(
36 &state,
37 proto_id::GET_GLOBAL_STATE,
38 None,
39 )
40 .await
41}
42
43pub async fn get_user_info(State(state): State<RestState>) -> RawApiResult {
45 adapter::proto_request_raw::<get_user_info::Request, get_user_info::Response>(
46 &state,
47 proto_id::GET_USER_INFO,
48 None,
49 )
50 .await
51}
52
53#[derive(Debug, Deserialize)]
54#[serde(deny_unknown_fields)]
55pub struct QuoteRightsQuery {
56 refresh: Option<bool>,
57}
58
59pub async fn get_quote_rights(
61 State(state): State<RestState>,
62 Query(query): Query<QuoteRightsQuery>,
63) -> ApiResult {
64 if query.refresh.unwrap_or(false) {
65 let req = test_cmd::Request {
66 c2s: test_cmd::C2s {
67 cmd: "request_highest_quote_right".to_string(),
68 param_str: None,
69 param_bytes: None,
70 },
71 };
72 let resp: test_cmd::Response = dispatch_proto(
73 &state,
74 proto_id::TEST_CMD,
75 req,
76 "request_highest_quote_right",
77 )
78 .await?;
79 if resp.ret_type != 0 {
80 return Err(api_error(
81 StatusCode::BAD_GATEWAY,
82 format_sys_command_error_message(
83 "request_highest_quote_right",
84 resp.ret_type,
85 resp.ret_msg.as_deref(),
86 ),
87 ));
88 }
89 }
90
91 let req = test_cmd::Request {
92 c2s: test_cmd::C2s {
93 cmd: SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE.to_string(),
94 param_str: None,
95 param_bytes: None,
96 },
97 };
98 let resp: test_cmd::Response = dispatch_proto(
99 &state,
100 proto_id::TEST_CMD,
101 req,
102 SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE,
103 )
104 .await?;
105 if resp.ret_type != 0 {
106 return Err(api_error(
107 StatusCode::BAD_GATEWAY,
108 format_sys_command_error_message(
109 SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE,
110 resp.ret_type,
111 resp.ret_msg.as_deref(),
112 ),
113 ));
114 }
115 let json = resp.s2c.and_then(|s| s.result_str).ok_or_else(|| {
116 api_error(
117 StatusCode::BAD_GATEWAY,
118 format!("{SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE}: missing result_str"),
119 )
120 })?;
121 serde_json::from_str::<Value>(&json).map(Json).map_err(|e| {
122 api_error(
123 StatusCode::INTERNAL_SERVER_ERROR,
124 format!("parse {SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE}: {e}"),
125 )
126 })
127}
128
129fn format_sys_command_error_message(label: &str, ret_type: i32, ret_msg: Option<&str>) -> String {
130 let ret_msg = ret_msg
131 .filter(|msg| !msg.is_empty())
132 .unwrap_or("<missing ret_msg>");
133 format!("{label} ret_type={ret_type} msg={ret_msg}")
134}
135
136async fn dispatch_proto<Req, Rsp>(
137 state: &RestState,
138 proto_id: u32,
139 req: Req,
140 label: &str,
141) -> Result<Rsp, (StatusCode, Json<Value>)>
142where
143 Req: Message,
144 Rsp: Message + Default,
145{
146 let incoming = IncomingRequest::builder(
147 state.next_conn_id(),
148 proto_id,
149 state.next_serial(),
150 ProtoFmtType::Protobuf,
151 Bytes::from(req.encode_to_vec()),
152 )
153 .build();
154 let resp_bytes = state
155 .router
156 .dispatch(incoming.conn_id, &incoming)
157 .await
158 .ok_or_else(|| {
159 api_error(
160 StatusCode::INTERNAL_SERVER_ERROR,
161 format!("{label}: handler returned no response"),
162 )
163 })?;
164 Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
165 api_error(
166 StatusCode::INTERNAL_SERVER_ERROR,
167 format!("decode {label}: {e}"),
168 )
169 })
170}
171
172fn api_error(status: StatusCode, message: String) -> (StatusCode, Json<Value>) {
173 (
174 status,
175 Json(serde_json::json!({
176 "ret_type": -1,
177 "ret_msg": message,
178 })),
179 )
180}
181
182pub async fn get_delay_statistics(State(state): State<RestState>) -> RawApiResult {
184 let body = default_request_body_json();
185 adapter::proto_request_raw::<get_delay_statistics::Request, get_delay_statistics::Response>(
186 &state,
187 proto_id::GET_DELAY_STATISTICS,
188 Some(body),
189 )
190 .await
191}
192
193pub async fn get_delay_statistics_post(
199 State(state): State<RestState>,
200 Json(body): Json<Value>,
201) -> RawApiResult {
202 adapter::proto_request_raw::<get_delay_statistics::Request, get_delay_statistics::Response>(
203 &state,
204 proto_id::GET_DELAY_STATISTICS,
205 Some(body),
206 )
207 .await
208}
209
210pub async fn ping(State(state): State<RestState>) -> ApiResult {
218 let ok = adapter::proto_request::<get_global_state::Request, get_global_state::Response>(
221 &state,
222 proto_id::GET_GLOBAL_STATE,
223 None,
224 )
225 .await
226 .is_ok();
227
228 Ok(Json(serde_json::json!({
229 "ok": ok,
230 "version": env!("CARGO_PKG_VERSION"),
231 "gateway": "futu-opend-rs",
232 })))
233}
234
235pub async fn push_subscriber_info(
258 State(state): State<RestState>,
259) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
260 if let Some(ref provider) = state.push_health_snapshot_provider {
262 let health = provider();
263 return Ok(Json(serde_json::json!({
264 "ret_type": 0,
265 "ret_msg": "success",
266 "push_health": health,
267 "recommendations": [
268 {
269 "purpose": "查订阅列表 + 全局 quota",
270 "endpoint": "POST /api/query-subscription -d '{}'",
271 "note": "v1.4.83 起默认 all-conn 视图"
272 },
273 {
274 "purpose": "接收 push 数据(quote / tick / orderbook 等)",
275 "endpoint": "WebSocket /ws (支持 Bearer Token 握手)"
276 }
277 ],
278 })));
279 }
280 Err((
284 StatusCode::SERVICE_UNAVAILABLE,
285 Json(serde_json::json!({
286 "ret_type": -1,
287 "ret_msg": "push health snapshot provider not wired (internal setup bug)",
288 "recommendations": [
289 {
290 "purpose": "查订阅列表 + 全局 quota",
291 "endpoint": "POST /api/query-subscription -d '{}'"
292 },
293 {
294 "purpose": "接收 push 数据",
295 "endpoint": "WebSocket /ws"
296 }
297 ],
298 })),
299 ))
300}
301
302pub async fn unsub_acc_push(
318 State(state): State<RestState>,
319 rec: Option<Extension<Arc<futu_auth::KeyRecord>>>,
320 Json(mut body): Json<Value>,
321) -> ApiResult {
322 if rec.is_none() {
326 futu_auth::audit::reject(
327 "rest",
328 "/api/unsub-acc-push",
329 "<legacy>",
330 "unsub-acc-push not supported in legacy mode (no keys.json)",
331 );
332 return Err((
333 axum::http::StatusCode::FORBIDDEN,
334 Json(serde_json::json!({
335 "error": "/api/unsub-acc-push: legacy mode (no keys.json) does not support per-key sub state. \
336 Configure keys.json and pass Bearer token to enable.",
337 "ret_type": -1,
338 "hint": "v1.4.103 B9: legacy mode previously returned silent success without revoking. Now loud-reject to surface the limitation."
339 })),
340 ));
341 }
342 crate::adapter::normalize_json_keys_snake_case(&mut body);
344
345 let acc_ids = match crate::routes::trd::extract_acc_id_list(&body) {
347 Ok(acc_ids) => acc_ids,
348 Err(reason) => {
349 let key_id = rec
350 .as_deref()
351 .map(|r| r.as_ref().id.clone())
352 .unwrap_or_else(|| "<legacy>".to_string());
353 futu_auth::audit::reject("rest", "/api/unsub-acc-push", &key_id, &reason);
354 return Err((
355 axum::http::StatusCode::BAD_REQUEST,
356 Json(serde_json::json!({
357 "error": format!("/api/unsub-acc-push: {reason}")
358 })),
359 ));
360 }
361 };
362 if let Err(reason) = crate::routes::trd::validate_sub_acc_push_acc_ids(&acc_ids) {
363 let key_id = rec
364 .as_deref()
365 .map(|r| r.as_ref().id.clone())
366 .unwrap_or_else(|| "<legacy>".to_string());
367 futu_auth::audit::reject("rest", "/api/unsub-acc-push", &key_id, reason);
368 return Err((
369 axum::http::StatusCode::BAD_REQUEST,
370 Json(serde_json::json!({
371 "error": format!("/api/unsub-acc-push: {reason}")
372 })),
373 ));
374 }
375
376 crate::routes::trd::check_per_acc_rate_for_caller(
381 &state.counters,
382 rec.as_deref().map(|r| r.as_ref()),
383 &acc_ids,
384 "/api/unsub-acc-push",
385 )?;
386
387 let daemon_resp: Json<serde_json::Value> = Json(serde_json::json!({
392 "ret_type": 0,
393 "ret_msg": serde_json::Value::Null,
394 "err_code": serde_json::Value::Null,
395 "s2c": {}
396 }));
397
398 if let Some(rec_ref) = rec.as_deref() {
404 let key_id = rec_ref.as_ref().id.clone();
405 crate::adapter::with_rest_acc_subscriptions_write(&state.rest_acc_subscriptions, |subs| {
406 let entry = subs.entry(key_id).or_default();
409 for &acc_id in &acc_ids {
410 entry.remove(&acc_id);
411 }
412 });
414 }
415
416 Ok(daemon_resp)
417}
418
419#[derive(Debug, Deserialize, Default)]
443#[serde(deny_unknown_fields)]
444pub struct TokenStateQuery {
445 pub app_id: Option<String>,
447}
448
449pub async fn get_token_state(
450 State(state): State<RestState>,
451 Query(q): Query<TokenStateQuery>,
452 body: Option<Json<Value>>,
453) -> RawApiResult {
454 let body_val = match body {
456 Some(Json(mut v)) => {
457 if let Some(qs_app) = q.app_id.as_ref()
459 && let Some(map) = v.as_object_mut()
460 {
461 let c2s_has = map
462 .get("c2s")
463 .and_then(|c| c.as_object())
464 .is_some_and(|c| c.contains_key("app_id") || c.contains_key("appId"));
465 let top_has = map.contains_key("app_id") || map.contains_key("appId");
466 if !c2s_has && !top_has {
467 map.entry("c2s".to_string())
468 .or_insert_with(|| serde_json::json!({}))
469 .as_object_mut()
470 .map(|c| {
471 c.insert(
472 "app_id".to_string(),
473 serde_json::Value::String(qs_app.clone()),
474 )
475 });
476 }
477 }
478 Some(v)
479 }
480 None => {
481 q.app_id
483 .as_ref()
484 .map(|app_id| serde_json::json!({"c2s": {"app_id": app_id}}))
485 }
486 };
487 adapter::proto_request_raw::<
488 futu_token_state::DaemonGetTokenStateReq,
489 futu_token_state::DaemonGetTokenStateRsp,
490 >(&state, proto_id::GET_TOKEN_STATE, body_val)
491 .await
492}
493
494#[cfg(test)]
495mod tests;