1use std::sync::Arc;
4
5use async_trait::async_trait;
6
7use futu_cache::static_data::StaticDataCache;
8use futu_cache::trd_cache::TrdCache;
9use futu_core::proto_id;
10use futu_server::conn::IncomingRequest;
11use futu_server::router::{RequestHandler, RequestRouter};
12use futu_server::subscription::SubscriptionManager;
13
14use crate::bridge::GatewayBridge;
15
16pub fn register_handlers(router: &Arc<RequestRouter>, bridge: &GatewayBridge) {
18 let cache = Arc::clone(&bridge.trd_cache);
19 let subs = Arc::clone(&bridge.subscriptions);
20
21 router.register(
23 proto_id::TRD_GET_ACC_LIST,
24 Arc::new(GetAccListHandler {
25 cache: cache.clone(),
26 }),
27 );
28 router.register(
29 proto_id::TRD_GET_FUNDS,
30 Arc::new(GetFundsHandler {
31 cache: cache.clone(),
32 }),
33 );
34 router.register(
35 proto_id::TRD_GET_POSITION_LIST,
36 Arc::new(GetPositionListHandler {
37 cache: cache.clone(),
38 }),
39 );
40 router.register(
41 proto_id::TRD_GET_ORDER_LIST,
42 Arc::new(GetOrderListHandler {
43 cache: cache.clone(),
44 }),
45 );
46 let backend = bridge.backend.clone();
48
49 router.register(
50 proto_id::TRD_GET_ORDER_FILL_LIST,
51 Arc::new(GetOrderFillListHandler {
52 cache: cache.clone(),
53 backend: backend.clone(),
54 }),
55 );
56
57 router.register(
59 proto_id::TRD_SUB_ACC_PUSH,
60 Arc::new(SubAccPushHandler {
61 subscriptions: subs,
62 }),
63 );
64
65 let fwd = |name: &'static str, cmd: Option<u16>| {
70 Arc::new(BackendTrdForwardHandler {
71 name,
72 backend: backend.clone(),
73 backend_cmd_id: cmd,
74 })
75 };
76
77 router.register(
78 proto_id::TRD_UNLOCK_TRADE,
79 Arc::new(UnlockTradeHandler {
80 backend: backend.clone(),
81 cache: cache.clone(),
82 }),
83 );
84 router.register(
85 proto_id::TRD_PLACE_ORDER,
86 Arc::new(PlaceOrderHandler {
87 backend: backend.clone(),
88 cache: cache.clone(),
89 }),
90 );
91 router.register(
92 proto_id::TRD_MODIFY_ORDER,
93 Arc::new(ModifyOrderHandler {
94 backend: backend.clone(),
95 cache: cache.clone(),
96 }),
97 );
98 router.register(
99 proto_id::TRD_GET_MAX_TRD_QTYS,
100 fwd("GetMaxTrdQtys", Some(4713)),
101 );
102 router.register(proto_id::TRD_RECONFIRM_ORDER, fwd("ReconfirmOrder", None));
103 router.register(
104 proto_id::TRD_GET_HISTORY_ORDER_LIST,
105 fwd("GetHistoryOrderList", Some(4711)),
106 );
107 router.register(
108 proto_id::TRD_GET_HISTORY_ORDER_FILL_LIST,
109 fwd("GetHistoryOrderFillList", Some(4712)),
110 );
111
112 router.register(
114 proto_id::TRD_GET_MARGIN_RATIO,
115 Arc::new(GetMarginRatioHandler {
116 backend: backend.clone(),
117 cache: cache.clone(),
118 static_cache: Arc::clone(&bridge.static_cache),
119 }),
120 );
121 router.register(
122 proto_id::TRD_GET_ORDER_FEE,
123 Arc::new(GetOrderFeeHandler {
124 backend: backend.clone(),
125 cache: cache.clone(),
126 }),
127 );
128 router.register(
129 proto_id::TRD_FLOW_SUMMARY,
130 Arc::new(FlowSummaryHandler {
131 backend: backend.clone(),
132 cache: cache.clone(),
133 }),
134 );
135
136 tracing::debug!("trade handlers registered (all implemented)");
137}
138
139struct GetAccListHandler {
141 cache: Arc<TrdCache>,
142}
143
144#[async_trait]
145impl RequestHandler for GetAccListHandler {
146 async fn handle(&self, _conn_id: u64, _request: &IncomingRequest) -> Option<Vec<u8>> {
147 let mut accounts = self.cache.get_accounts();
148 accounts.sort_by(|a, b| {
151 let order = |acc: &futu_cache::trd_cache::CachedTrdAcc| -> (i32, u64) {
152 let is_real = acc.trd_env == 1;
153 let is_active = acc.acc_status == Some(0);
154 let group = if is_real && is_active {
155 0 } else if !is_real {
157 1 } else {
159 2 };
161 (group, acc.sort_key)
162 };
163 order(a).cmp(&order(b))
164 });
165 let acc_list = accounts
166 .iter()
167 .map(|a| futu_proto::trd_common::TrdAcc {
168 trd_env: a.trd_env,
169 acc_id: a.acc_id,
170 trd_market_auth_list: a.trd_market_auth_list.clone(),
171 acc_type: a.acc_type,
172 card_num: a.card_num.clone(),
173 security_firm: a.security_firm,
174 sim_acc_type: a.sim_acc_type,
175 uni_card_num: a.uni_card_num.clone(),
176 acc_status: a.acc_status,
177 acc_role: a.acc_role,
178 jp_acc_type: a.jp_acc_type.clone(),
179 })
180 .collect();
181 let resp = futu_proto::trd_get_acc_list::Response {
182 ret_type: 0,
183 ret_msg: None,
184 err_code: None,
185 s2c: Some(futu_proto::trd_get_acc_list::S2c { acc_list }),
186 };
187 Some(prost::Message::encode_to_vec(&resp))
188 }
189}
190
191struct GetFundsHandler {
193 cache: Arc<TrdCache>,
194}
195
196#[async_trait]
197impl RequestHandler for GetFundsHandler {
198 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
199 let req: futu_proto::trd_get_funds::Request =
200 prost::Message::decode(request.body.as_ref()).ok()?;
201 let header = req.c2s.header;
202 let cached_fund_keys: Vec<u64> = self.cache.funds.iter().map(|e| *e.key()).collect();
203 tracing::info!(
204 requested_acc_id = header.acc_id,
205 cached_fund_keys = ?cached_fund_keys,
206 "GetFunds lookup"
207 );
208 let funds = self
209 .cache
210 .funds
211 .get(&header.acc_id)
212 .map(|f| futu_proto::trd_common::Funds {
213 power: f.power,
214 total_assets: f.total_assets,
215 cash: f.cash,
216 market_val: f.market_val,
217 frozen_cash: f.frozen_cash,
218 debt_cash: f.debt_cash,
219 avl_withdrawal_cash: f.avl_withdrawal_cash,
220 currency: f.currency,
221 available_funds: f.available_funds,
222 unrealized_pl: f.unrealized_pl,
223 realized_pl: f.realized_pl,
224 risk_level: f.risk_level,
225 initial_margin: f.initial_margin,
226 maintenance_margin: f.maintenance_margin,
227 cash_info_list: f
228 .cash_info_list
229 .iter()
230 .map(|c| futu_proto::trd_common::AccCashInfo {
231 currency: Some(c.currency),
232 cash: Some(c.cash),
233 available_balance: Some(c.available_balance),
234 net_cash_power: Some(c.net_cash_power),
235 })
236 .collect(),
237 max_power_short: f.max_power_short,
238 net_cash_power: f.net_cash_power,
239 long_mv: f.long_mv,
240 short_mv: f.short_mv,
241 pending_asset: f.pending_asset,
242 max_withdrawal: f.max_withdrawal,
243 risk_status: f.risk_status,
244 margin_call_margin: f.margin_call_margin,
245 is_pdt: None,
246 pdt_seq: None,
247 beginning_dtbp: None,
248 remaining_dtbp: None,
249 dt_call_amount: None,
250 dt_status: None,
251 securities_assets: f.securities_assets,
252 fund_assets: f.fund_assets,
253 bond_assets: f.bond_assets,
254 market_info_list: f
255 .market_info_list
256 .iter()
257 .map(|m| futu_proto::trd_common::AccMarketInfo {
258 trd_market: Some(m.trd_market),
259 assets: Some(m.assets),
260 })
261 .collect(),
262 });
263 let resp = futu_proto::trd_get_funds::Response {
264 ret_type: 0,
265 ret_msg: None,
266 err_code: None,
267 s2c: Some(futu_proto::trd_get_funds::S2c { header, funds }),
268 };
269 Some(prost::Message::encode_to_vec(&resp))
270 }
271}
272
273struct GetPositionListHandler {
275 cache: Arc<TrdCache>,
276}
277
278#[async_trait]
279impl RequestHandler for GetPositionListHandler {
280 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
281 let req: futu_proto::trd_get_position_list::Request =
282 prost::Message::decode(request.body.as_ref()).ok()?;
283 let header = req.c2s.header;
284 let cached_keys: Vec<u64> = self.cache.positions.iter().map(|e| *e.key()).collect();
285 tracing::info!(
286 requested_acc_id = header.acc_id,
287 cached_position_keys = ?cached_keys,
288 "GetPositionList lookup"
289 );
290 let positions = self
291 .cache
292 .positions
293 .get(&header.acc_id)
294 .map(|p| p.value().clone())
295 .unwrap_or_default();
296 let position_list = positions
297 .iter()
298 .map(|p| futu_proto::trd_common::Position {
299 position_id: p.position_id,
300 position_side: p.position_side,
301 code: p.code.clone(),
302 name: p.name.clone(),
303 qty: p.qty,
304 can_sell_qty: p.can_sell_qty,
305 price: p.price,
306 cost_price: Some(p.cost_price),
307 val: p.val,
308 pl_val: p.pl_val,
309 pl_ratio: p.pl_ratio,
310 sec_market: p.sec_market,
311 td_pl_val: p.td_pl_val,
312 td_trd_val: p.td_trd_val,
313 td_buy_val: p.td_buy_val,
314 td_buy_qty: p.td_buy_qty,
315 td_sell_val: p.td_sell_val,
316 td_sell_qty: p.td_sell_qty,
317 unrealized_pl: p.unrealized_pl,
318 realized_pl: p.realized_pl,
319 currency: p.currency,
320 trd_market: p.trd_market,
321 diluted_cost_price: p.diluted_cost_price,
322 average_cost_price: p.average_cost_price,
323 average_pl_ratio: p.average_pl_ratio,
324 })
325 .collect();
326 let resp = futu_proto::trd_get_position_list::Response {
327 ret_type: 0,
328 ret_msg: None,
329 err_code: None,
330 s2c: Some(futu_proto::trd_get_position_list::S2c {
331 header,
332 position_list,
333 }),
334 };
335 Some(prost::Message::encode_to_vec(&resp))
336 }
337}
338
339struct GetOrderListHandler {
341 cache: Arc<TrdCache>,
342}
343
344#[async_trait]
345impl RequestHandler for GetOrderListHandler {
346 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
347 let req: futu_proto::trd_get_order_list::Request =
348 prost::Message::decode(request.body.as_ref()).ok()?;
349 let header = req.c2s.header;
350 let orders = self
351 .cache
352 .orders
353 .get(&header.acc_id)
354 .map(|o| o.value().clone())
355 .unwrap_or_default();
356 let order_list = orders
357 .iter()
358 .map(|o| futu_proto::trd_common::Order {
359 trd_side: o.trd_side,
360 order_type: o.order_type,
361 order_status: o.order_status,
362 order_id: o.order_id,
363 order_id_ex: o.order_id_ex.clone(),
364 code: o.code.clone(),
365 name: o.name.clone(),
366 qty: o.qty,
367 price: Some(o.price),
368 create_time: o.create_time.clone(),
369 update_time: o.update_time.clone(),
370 fill_qty: Some(o.fill_qty),
371 fill_avg_price: Some(o.fill_avg_price),
372 last_err_msg: o.last_err_msg.clone(),
373 sec_market: o.sec_market,
374 create_timestamp: o.create_timestamp,
375 update_timestamp: o.update_timestamp,
376 remark: o.remark.clone(),
377 time_in_force: o.time_in_force,
378 fill_outside_rth: o.fill_outside_rth,
379 aux_price: o.aux_price,
380 trail_type: o.trail_type,
381 trail_value: o.trail_value,
382 trail_spread: o.trail_spread,
383 currency: o.currency,
384 trd_market: o.trd_market,
385 session: None,
386 jp_acc_type: None,
387 })
388 .collect();
389 let resp = futu_proto::trd_get_order_list::Response {
390 ret_type: 0,
391 ret_msg: None,
392 err_code: None,
393 s2c: Some(futu_proto::trd_get_order_list::S2c { header, order_list }),
394 };
395 Some(prost::Message::encode_to_vec(&resp))
396 }
397}
398
399struct GetOrderFillListHandler {
401 #[expect(dead_code)]
402 cache: Arc<TrdCache>,
403 backend: crate::bridge::SharedBackend,
404}
405
406const CMD_ORDER_FILL_LIST: u16 = 4710;
408
409fn map_backend_market_to_trd_market(market: u32) -> i32 {
412 match market {
413 1 => 1, 2 => 2, 3 => 3, 10 => 5, 15 => 6, v => v as i32,
419 }
420}
421
422#[async_trait]
423impl RequestHandler for GetOrderFillListHandler {
424 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
425 use futu_backend::proto_internal::{odr_sys_cmn, order_sys_interface};
426
427 let req: futu_proto::trd_get_order_fill_list::Request =
428 prost::Message::decode(request.body.as_ref()).ok()?;
429 let header = req.c2s.header;
430 let acc_id = header.acc_id;
431
432 let backend = match super::load_backend(&self.backend) {
433 Some(b) => b,
434 None => {
435 return Some(super::make_error_response(
436 -1,
437 "GetOrderFillList: 后端未连接",
438 ));
439 }
440 };
441
442 let backend_req = order_sys_interface::OrderFillListReq {
444 msg_header: Some(odr_sys_cmn::MsgHeader {
445 req_id: Some(format!("{acc_id}_{}", request.serial_no)),
446 account_id: Some(acc_id),
447 cipher: Some(vec![]), security_type: None,
449 exchange_code: None,
450 input_source: Some(6), sub_account_id: None,
452 }),
453 page_flag: None,
454 page_size: None,
455 security_type: vec![0], market: vec![],
457 };
458
459 tracing::info!(acc_id, "querying order fill list from backend");
460
461 let mut all_fills: Vec<odr_sys_cmn::OrderFill> = Vec::new();
463 let mut page_flag: Option<String> = None;
464
465 loop {
466 let mut paged_req = backend_req.clone();
467 paged_req.page_flag = page_flag.clone();
468
469 let resp = match backend
470 .request(
471 CMD_ORDER_FILL_LIST,
472 prost::Message::encode_to_vec(&paged_req),
473 )
474 .await
475 {
476 Ok(r) => r,
477 Err(e) => {
478 tracing::warn!(error = %e, "GetOrderFillList backend request failed");
479 return Some(super::make_error_response(
480 -1,
481 &format!("GetOrderFillList: {e}"),
482 ));
483 }
484 };
485
486 let backend_rsp: order_sys_interface::OrderFillListRsp =
487 match super::decode_backend_proto(resp.body.as_ref()) {
488 Ok(r) => r,
489 Err(e) => {
490 tracing::warn!(error = %e, body_len = resp.body.len(),
491 "GetOrderFillList response decode failed");
492 return Some(super::make_error_response(
493 -1,
494 "GetOrderFillList: 响应解析失败",
495 ));
496 }
497 };
498
499 if backend_rsp.result.unwrap_or(-1) != 0 {
500 let err_msg = backend_rsp
501 .err_msg
502 .unwrap_or_else(|| "unknown error".to_string());
503 tracing::warn!(err_msg = %err_msg, "GetOrderFillList backend returned error");
504 return Some(super::make_error_response(
505 -1,
506 &format!("GetOrderFillList: {err_msg}"),
507 ));
508 }
509
510 all_fills.extend(backend_rsp.order_fills);
511
512 if backend_rsp.completed.unwrap_or(true) {
513 break;
514 }
515 match backend_rsp.page_flag {
516 Some(ref pf) if !pf.is_empty() => {
517 page_flag = Some(pf.clone());
518 }
519 _ => break,
520 }
521 }
522
523 tracing::info!(acc_id, count = all_fills.len(), "order fill list retrieved");
524
525 let order_fill_list: Vec<futu_proto::trd_common::OrderFill> = all_fills
527 .iter()
528 .map(|f| {
529 let pf = |s: &Option<String>| -> f64 {
530 s.as_ref()
531 .and_then(|v| v.parse::<f64>().ok())
532 .unwrap_or(0.0)
533 };
534
535 let fill_id_ex = f.id.clone().unwrap_or_default();
537 let fill_id: u64 = fill_id_ex.parse().unwrap_or(0);
538
539 let order_id_ex = f.order_id.clone().unwrap_or_default();
541 let order_id: u64 = order_id_ex.parse().unwrap_or(0);
542
543 let create_timestamp = f.create_time.map(|t| t as f64 / 1_000_000.0);
545 let update_timestamp = f.update_time.map(|t| t as f64 / 1_000_000.0);
546
547 let counter_broker_id = f
549 .counter_broker_id
550 .as_ref()
551 .and_then(|s| s.parse::<i32>().ok());
552
553 let trd_market = f
555 .market
556 .as_ref()
557 .and_then(|s| s.parse::<u32>().ok())
558 .map(map_backend_market_to_trd_market);
559
560 let status = if f.is_cancelled.unwrap_or(false) {
562 Some(2) } else if f.is_corrected.unwrap_or(false) {
564 Some(3) } else {
566 Some(1) };
568
569 futu_proto::trd_common::OrderFill {
570 trd_side: f.side.unwrap_or(0) as i32,
571 fill_id,
572 fill_id_ex,
573 order_id: Some(order_id),
574 order_id_ex: Some(order_id_ex),
575 code: f
576 .symbol
577 .as_ref()
578 .map(|b| String::from_utf8_lossy(b).into_owned())
579 .unwrap_or_default(),
580 name: f
581 .stock_name
582 .as_ref()
583 .map(|b| String::from_utf8_lossy(b).into_owned())
584 .unwrap_or_default(),
585 qty: pf(&f.qty),
586 price: pf(&f.price),
587 create_time: String::new(), counter_broker_id,
589 counter_broker_name: None,
590 sec_market: None,
591 create_timestamp,
592 update_timestamp,
593 status,
594 trd_market,
595 jp_acc_type: None,
596 }
597 })
598 .collect();
599
600 let resp = futu_proto::trd_get_order_fill_list::Response {
601 ret_type: 0,
602 ret_msg: None,
603 err_code: None,
604 s2c: Some(futu_proto::trd_get_order_fill_list::S2c {
605 header,
606 order_fill_list,
607 }),
608 };
609 Some(prost::Message::encode_to_vec(&resp))
610 }
611}
612
613struct SubAccPushHandler {
615 subscriptions: Arc<SubscriptionManager>,
616}
617
618#[async_trait]
619impl RequestHandler for SubAccPushHandler {
620 async fn handle(&self, conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
621 let req: futu_proto::trd_sub_acc_push::Request =
622 prost::Message::decode(request.body.as_ref()).ok()?;
623 for &acc_id in &req.c2s.acc_id_list {
624 self.subscriptions.subscribe_trd_acc(conn_id, acc_id);
625 }
626 let resp = futu_proto::trd_sub_acc_push::Response {
627 ret_type: 0,
628 ret_msg: None,
629 err_code: None,
630 s2c: Some(futu_proto::trd_sub_acc_push::S2c {}),
631 };
632 Some(prost::Message::encode_to_vec(&resp))
633 }
634}
635
636struct UnlockTradeHandler {
640 backend: crate::bridge::SharedBackend,
641 cache: Arc<TrdCache>,
642}
643
644const CMD_TRD_PWD_AUTH: u16 = 2900;
646
647#[async_trait]
648impl RequestHandler for UnlockTradeHandler {
649 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
650 use futu_backend::proto_internal::ft_cmd_trade_auth;
651
652 let req: futu_proto::trd_unlock_trade::Request =
653 prost::Message::decode(request.body.as_ref()).ok()?;
654 let is_unlock = req.c2s.unlock;
655 let pwd_md5 = req.c2s.pwd_md5.as_deref().unwrap_or("");
656
657 tracing::info!(
658 unlock = is_unlock,
659 has_pwd = !pwd_md5.is_empty(),
660 "trade unlock request"
661 );
662
663 if !is_unlock || pwd_md5.is_empty() {
665 let resp = futu_proto::trd_unlock_trade::Response {
666 ret_type: 0,
667 ret_msg: None,
668 err_code: None,
669 s2c: Some(futu_proto::trd_unlock_trade::S2c {}),
670 };
671 return Some(prost::Message::encode_to_vec(&resp));
672 }
673
674 let backend = match super::load_backend(&self.backend) {
675 Some(b) => b,
676 None => {
677 return Some(super::make_error_response(-1, "UnlockTrade: 后端未连接"));
678 }
679 };
680
681 let accounts = self.cache.get_accounts();
683 let user_id = backend.user_id.load(std::sync::atomic::Ordering::Relaxed) as u64;
684 let mut pswds = Vec::new();
685 for acc in &accounts {
686 if acc.trd_env == 1 && acc.acc_status == Some(0) {
687 pswds.push(ft_cmd_trade_auth::AccountPswd {
689 account_id: acc.acc_id,
690 trade_pswd_hash: Some(pwd_md5.to_string()),
691 });
692 }
693 }
694
695 if pswds.is_empty() {
696 let resp = futu_proto::trd_unlock_trade::Response {
697 ret_type: 0,
698 ret_msg: None,
699 err_code: None,
700 s2c: Some(futu_proto::trd_unlock_trade::S2c {}),
701 };
702 return Some(prost::Message::encode_to_vec(&resp));
703 }
704
705 let auth_req = ft_cmd_trade_auth::TradePswdAuthReq {
706 uid: user_id, flag: Some(1), pswds,
709 bio_id_hash: None,
710 bio_trade_sig: None,
711 };
712
713 tracing::info!(
714 account_count = auth_req.pswds.len(),
715 "sending CMD2900 TradePswdAuth to backend"
716 );
717
718 match backend
720 .request(CMD_TRD_PWD_AUTH, prost::Message::encode_to_vec(&auth_req))
721 .await
722 {
723 Ok(resp_frame) => {
724 let auth_rsp: ft_cmd_trade_auth::TradePswdAuthRsp =
725 match prost::Message::decode(resp_frame.body.as_ref()) {
726 Ok(r) => r,
727 Err(e) => {
728 tracing::warn!(error = %e, "TradePswdAuth response decode failed");
729 return Some(super::make_error_response(-1, "解锁响应解析失败"));
730 }
731 };
732
733 let result = auth_rsp.result_code;
734 if result == 0 {
735 for cipher_item in &auth_rsp.ciphers {
737 if let Some(ref trade_cipher) = cipher_item.trade_cipher {
738 self.cache
739 .set_cipher(cipher_item.account_id, trade_cipher.clone());
740 }
741 }
742 let cipher_count = auth_rsp.ciphers.len();
743 tracing::info!(cipher_count, "trade unlock succeeded, ciphers stored");
744 let resp = futu_proto::trd_unlock_trade::Response {
745 ret_type: 0,
746 ret_msg: None,
747 err_code: None,
748 s2c: Some(futu_proto::trd_unlock_trade::S2c {}),
749 };
750 Some(prost::Message::encode_to_vec(&resp))
751 } else {
752 let err_msg = auth_rsp.err_msg.unwrap_or_else(|| "密码错误".to_string());
753 tracing::warn!(result, err = %err_msg, "trade unlock failed");
754 Some(super::make_error_response(-1, &err_msg))
755 }
756 }
757 Err(e) => {
758 tracing::warn!(error = %e, "TradePswdAuth request failed");
759 Some(super::make_error_response(-1, &format!("UnlockTrade: {e}")))
760 }
761 }
762 }
763}
764
765struct PlaceOrderHandler {
767 backend: crate::bridge::SharedBackend,
768 cache: Arc<TrdCache>,
769}
770
771const CMD_PLACE_ORDER: u16 = 4701;
773
774fn map_order_type(ftapi_order_type: i32) -> u32 {
779 match ftapi_order_type {
780 1 | 5 => 1, 2 => 3, v => v as u32,
783 }
784}
785
786fn format_qty(v: f64) -> String {
788 if v.fract() == 0.0 {
790 format!("{}", v as i64)
791 } else {
792 let s = format!("{}", v);
794 s.trim_end_matches('0').trim_end_matches('.').to_string()
795 }
796}
797
798fn format_price(v: f64) -> String {
799 format_qty(v)
800}
801
802#[async_trait]
803impl RequestHandler for PlaceOrderHandler {
804 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
805 use futu_backend::proto_internal::{odr_sys_cmn, order_sys_interface};
806
807 let backend = match super::load_backend(&self.backend) {
808 Some(b) => b,
809 None => {
810 return Some(super::make_error_response(-1, "PlaceOrder: 后端未连接"));
811 }
812 };
813
814 let req: futu_proto::trd_place_order::Request =
816 prost::Message::decode(request.body.as_ref()).ok()?;
817 let c2s = &req.c2s;
818 let header = &c2s.header;
819 let acc_id = header.acc_id;
820
821 let cipher = self.cache.get_cipher(acc_id);
823 let backend_req = order_sys_interface::OrderNewReq {
824 msg_header: Some(odr_sys_cmn::MsgHeader {
825 req_id: Some(format!("{acc_id}_{}", request.serial_no)),
826 account_id: Some(acc_id),
827 cipher,
828 security_type: Some(0), exchange_code: None,
830 input_source: Some(6), sub_account_id: None,
832 }),
833 order_type: Some(map_order_type(c2s.order_type)),
834 order_side: Some(c2s.trd_side as u32),
835 symbol: Some(c2s.code.clone()),
836 qty: Some(format_qty(c2s.qty)),
837 price: c2s.price.map(format_price),
838 expire_time: None,
839 text: c2s.remark.clone(),
840 lot_type: None,
841 trigger_info: None,
842 algo_info: None,
843 };
844
845 tracing::info!(
846 acc_id,
847 symbol = %c2s.code,
848 side = c2s.trd_side,
849 order_type = c2s.order_type,
850 qty = c2s.qty,
851 "placing order via backend"
852 );
853
854 let resp = match backend
856 .request(CMD_PLACE_ORDER, prost::Message::encode_to_vec(&backend_req))
857 .await
858 {
859 Ok(r) => r,
860 Err(e) => {
861 tracing::warn!(error = %e, "PlaceOrder backend request failed");
862 return Some(super::make_error_response(-1, &format!("PlaceOrder: {e}")));
863 }
864 };
865
866 let backend_rsp: order_sys_interface::OrderNewRsp =
868 match prost::Message::decode(resp.body.as_ref()) {
869 Ok(r) => r,
870 Err(e) => {
871 tracing::warn!(error = %e, "PlaceOrder response decode failed");
872 return Some(super::make_error_response(-1, "PlaceOrder: 响应解析失败"));
873 }
874 };
875
876 if backend_rsp.result.unwrap_or(-1) != 0 {
878 let err_msg = backend_rsp
879 .err_msg
880 .unwrap_or_else(|| "unknown error".to_string());
881 tracing::warn!(err_msg = %err_msg, "PlaceOrder backend returned error");
882 return Some(super::make_error_response(
883 -1,
884 &format!("PlaceOrder: {err_msg}"),
885 ));
886 }
887
888 let order_id: u64 = backend_rsp
890 .order_id
891 .as_ref()
892 .and_then(|s| s.parse().ok())
893 .unwrap_or(0);
894
895 let ftapi_resp = futu_proto::trd_place_order::Response {
896 ret_type: 0,
897 ret_msg: None,
898 err_code: None,
899 s2c: Some(futu_proto::trd_place_order::S2c {
900 header: *header,
901 order_id: Some(order_id),
902 order_id_ex: backend_rsp.order_id,
903 }),
904 };
905
906 tracing::info!(order_id, "PlaceOrder success");
907 Some(prost::Message::encode_to_vec(&ftapi_resp))
908 }
909}
910
911struct ModifyOrderHandler {
913 backend: crate::bridge::SharedBackend,
914 cache: Arc<TrdCache>,
915}
916
917const CMD_MODIFY_ORDER: u16 = 4702;
919const CMD_CANCEL_ORDER: u16 = 4703;
921
922#[async_trait]
923impl RequestHandler for ModifyOrderHandler {
924 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
925 use futu_backend::proto_internal::{odr_sys_cmn, order_sys_interface};
926
927 let backend = match super::load_backend(&self.backend) {
928 Some(b) => b,
929 None => {
930 return Some(super::make_error_response(-1, "ModifyOrder: 后端未连接"));
931 }
932 };
933
934 let req: futu_proto::trd_modify_order::Request =
936 prost::Message::decode(request.body.as_ref()).ok()?;
937 let c2s = &req.c2s;
938 let header = &c2s.header;
939 let acc_id = header.acc_id;
940 let order_id = c2s.order_id;
941
942 let modify_op = c2s.modify_order_op;
944
945 let cipher = self.cache.get_cipher(acc_id);
946 let msg_header = odr_sys_cmn::MsgHeader {
947 req_id: Some(format!("{acc_id}_{}", request.serial_no)),
948 account_id: Some(acc_id),
949 cipher,
950 security_type: Some(0),
951 exchange_code: None,
952 input_source: Some(6),
953 sub_account_id: None,
954 };
955
956 let order_id_str = if let Some(ref ex) = c2s.order_id_ex {
957 ex.clone()
958 } else {
959 order_id.to_string()
960 };
961
962 tracing::info!(acc_id, order_id, modify_op, "modifying order via backend");
963
964 if modify_op == 2 {
965 let cancel_all = c2s.for_all.unwrap_or(false);
967 let cancel_req = order_sys_interface::OrderCancelReq {
968 msg_header: Some(msg_header),
969 order_id: Some(order_id_str),
970 cancel_all: Some(cancel_all),
971 };
972
973 let resp = match backend
974 .request(CMD_CANCEL_ORDER, prost::Message::encode_to_vec(&cancel_req))
975 .await
976 {
977 Ok(r) => r,
978 Err(e) => {
979 tracing::warn!(error = %e, "CancelOrder backend request failed");
980 return Some(super::make_error_response(
981 -1,
982 &format!("ModifyOrder(Cancel): {e}"),
983 ));
984 }
985 };
986
987 let backend_rsp: order_sys_interface::OrderCancelRsp =
988 match prost::Message::decode(resp.body.as_ref()) {
989 Ok(r) => r,
990 Err(e) => {
991 tracing::warn!(error = %e, "CancelOrder response decode failed");
992 return Some(super::make_error_response(
993 -1,
994 "ModifyOrder(Cancel): 响应解析失败",
995 ));
996 }
997 };
998
999 if backend_rsp.result.unwrap_or(-1) != 0 {
1000 let err_msg = backend_rsp
1001 .err_msg
1002 .unwrap_or_else(|| "unknown error".to_string());
1003 return Some(super::make_error_response(
1004 -1,
1005 &format!("ModifyOrder(Cancel): {err_msg}"),
1006 ));
1007 }
1008
1009 let ftapi_resp = futu_proto::trd_modify_order::Response {
1010 ret_type: 0,
1011 ret_msg: None,
1012 err_code: None,
1013 s2c: Some(futu_proto::trd_modify_order::S2c {
1014 header: *header,
1015 order_id,
1016 order_id_ex: c2s.order_id_ex.clone(),
1017 }),
1018 };
1019
1020 tracing::info!(order_id, "CancelOrder success");
1021 return Some(prost::Message::encode_to_vec(&ftapi_resp));
1022 }
1023
1024 let replace_req = order_sys_interface::OrderReplaceReq {
1026 msg_header: Some(msg_header),
1027 order_id: Some(order_id_str),
1028 order_version: None,
1029 new_qty: c2s.qty.map(format_qty),
1030 new_price: c2s.price.map(format_price),
1031 trigger_info: None,
1032 };
1033
1034 let resp = match backend
1035 .request(
1036 CMD_MODIFY_ORDER,
1037 prost::Message::encode_to_vec(&replace_req),
1038 )
1039 .await
1040 {
1041 Ok(r) => r,
1042 Err(e) => {
1043 tracing::warn!(error = %e, "ReplaceOrder backend request failed");
1044 return Some(super::make_error_response(-1, &format!("ModifyOrder: {e}")));
1045 }
1046 };
1047
1048 let backend_rsp: order_sys_interface::OrderReplaceRsp =
1049 match prost::Message::decode(resp.body.as_ref()) {
1050 Ok(r) => r,
1051 Err(e) => {
1052 tracing::warn!(error = %e, "ReplaceOrder response decode failed");
1053 return Some(super::make_error_response(-1, "ModifyOrder: 响应解析失败"));
1054 }
1055 };
1056
1057 if backend_rsp.result.unwrap_or(-1) != 0 {
1058 let err_msg = backend_rsp
1059 .err_msg
1060 .unwrap_or_else(|| "unknown error".to_string());
1061 return Some(super::make_error_response(
1062 -1,
1063 &format!("ModifyOrder: {err_msg}"),
1064 ));
1065 }
1066
1067 let ftapi_resp = futu_proto::trd_modify_order::Response {
1068 ret_type: 0,
1069 ret_msg: None,
1070 err_code: None,
1071 s2c: Some(futu_proto::trd_modify_order::S2c {
1072 header: *header,
1073 order_id,
1074 order_id_ex: c2s.order_id_ex.clone(),
1075 }),
1076 };
1077
1078 tracing::info!(order_id, "ModifyOrder success");
1079 Some(prost::Message::encode_to_vec(&ftapi_resp))
1080 }
1081}
1082
1083struct GetMarginRatioHandler {
1085 backend: crate::bridge::SharedBackend,
1086 #[expect(dead_code)]
1087 cache: Arc<TrdCache>,
1088 #[expect(dead_code)]
1089 static_cache: Arc<StaticDataCache>,
1090}
1091
1092const CMD_MARGIN_RATIO: u16 = 2309;
1094
1095fn qot_market_to_exchange(qot_market: i32) -> &'static str {
1097 match qot_market {
1098 1 => "SEHK", 2 => "US", 3 => "SSE", 4 => "SZSE", 6 => "SGX", _ => "SEHK", }
1105}
1106
1107fn exchange_to_qot_market(exchange: &str) -> i32 {
1109 match exchange {
1110 "SEHK" | "HKFE" => 1,
1111 "US" | "NYMEX" | "COMEX" | "CBOT" | "CME" | "CBOE" => 2,
1112 "SSE" => 3,
1113 "SZSE" => 4,
1114 "SGX" => 6,
1115 "OSE" => 23,
1116 "CN" => 3,
1117 _ => 0,
1118 }
1119}
1120
1121fn parse_short_pool_remain(s: &str) -> f64 {
1123 let mut number_str = String::new();
1124 for ch in s.chars() {
1125 if ch.is_ascii_digit() || ch == '.' {
1126 number_str.push(ch);
1127 }
1128 }
1129 let base: f64 = number_str.parse().unwrap_or(0.0);
1130 if s.contains('M') {
1131 base * 1_000_000.0
1132 } else if s.contains('K') {
1133 base * 1_000.0
1134 } else if s.contains('\u{4e07}') {
1135 base * 10_000.0
1137 } else {
1138 base
1139 }
1140}
1141
1142#[async_trait]
1143impl RequestHandler for GetMarginRatioHandler {
1144 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
1145 use futu_backend::proto_internal::stock_ratio_info;
1146
1147 let backend = match super::load_backend(&self.backend) {
1148 Some(b) => b,
1149 None => {
1150 return Some(super::make_error_response(-1, "GetMarginRatio: 后端未连接"));
1151 }
1152 };
1153
1154 let req: futu_proto::trd_get_margin_ratio::Request =
1155 prost::Message::decode(request.body.as_ref()).ok()?;
1156 let header = req.c2s.header;
1157 let user_id = backend.user_id.load(std::sync::atomic::Ordering::Relaxed) as u64;
1158
1159 let mut stocks = Vec::new();
1161 for sec in &req.c2s.security_list {
1162 let exchange = qot_market_to_exchange(sec.market);
1163
1164 stocks.push(stock_ratio_info::StockInfo {
1165 stock_id: sec.code.clone(),
1166 exchange_code: exchange.to_string(),
1167 uid: user_id,
1168 });
1169 }
1170
1171 let backend_req = stock_ratio_info::QueryMarginPropertyReq { stocks, period: 0 };
1172
1173 tracing::info!(
1174 acc_id = header.acc_id,
1175 stock_count = req.c2s.security_list.len(),
1176 "querying margin ratio from backend"
1177 );
1178
1179 let resp = match backend
1180 .request(
1181 CMD_MARGIN_RATIO,
1182 prost::Message::encode_to_vec(&backend_req),
1183 )
1184 .await
1185 {
1186 Ok(r) => r,
1187 Err(e) => {
1188 tracing::warn!(error = %e, "GetMarginRatio backend request failed");
1189 return Some(super::make_error_response(
1190 -1,
1191 &format!("GetMarginRatio: {e}"),
1192 ));
1193 }
1194 };
1195
1196 let backend_rsp: stock_ratio_info::QueryMarginPropertyRsp =
1197 match prost::Message::decode(resp.body.as_ref()) {
1198 Ok(r) => r,
1199 Err(e) => {
1200 tracing::warn!(error = %e, "GetMarginRatio response decode failed");
1201 return Some(super::make_error_response(
1202 -1,
1203 "GetMarginRatio: 响应解析失败",
1204 ));
1205 }
1206 };
1207
1208 if backend_rsp.err_code != 0 {
1209 let err_msg = if backend_rsp.err_msg.is_empty() {
1210 "unknown error".to_string()
1211 } else {
1212 backend_rsp.err_msg.clone()
1213 };
1214 tracing::warn!(err_code = backend_rsp.err_code, err_msg = %err_msg, "GetMarginRatio backend error");
1215 return Some(super::make_error_response(
1216 -1,
1217 &format!("GetMarginRatio: {err_msg}"),
1218 ));
1219 }
1220
1221 let margin_ratio_info_list: Vec<futu_proto::trd_get_margin_ratio::MarginRatioInfo> =
1224 backend_rsp
1225 .data
1226 .iter()
1227 .map(|p| {
1228 let market = exchange_to_qot_market(&p.exchange_code);
1229 futu_proto::trd_get_margin_ratio::MarginRatioInfo {
1230 security: futu_proto::qot_common::Security {
1231 market,
1232 code: p.stock_id.clone(),
1233 },
1234 is_long_permit: Some(p.is_long_permit != 0),
1235 is_short_permit: Some(p.is_short_permit != 0),
1236 short_pool_remain: Some(parse_short_pool_remain(&p.short_pool_remain)),
1237 short_fee_rate: {
1238 let rate: f64 = p.short_fee_rate.parse().unwrap_or(0.0);
1239 Some(rate)
1240 },
1241 alert_long_ratio: Some(p.alert_long_ratio * 100.0),
1242 alert_short_ratio: Some(p.alert_short_ratio * 100.0),
1243 im_long_ratio: Some(p.im_long_ratio * 100.0),
1244 im_short_ratio: Some(p.im_short_ratio * 100.0),
1245 mcm_long_ratio: Some(p.mcm_long_ratio * 100.0),
1246 mcm_short_ratio: Some(p.mcm_short_ratio * 100.0),
1247 mm_long_ratio: Some(p.mm_long_ratio * 100.0),
1248 mm_short_ratio: Some(p.mm_short_ratio * 100.0),
1249 }
1250 })
1251 .collect();
1252
1253 tracing::info!(
1254 count = margin_ratio_info_list.len(),
1255 "GetMarginRatio success"
1256 );
1257
1258 let ftapi_resp = futu_proto::trd_get_margin_ratio::Response {
1259 ret_type: 0,
1260 ret_msg: None,
1261 err_code: None,
1262 s2c: Some(futu_proto::trd_get_margin_ratio::S2c {
1263 header,
1264 margin_ratio_info_list,
1265 }),
1266 };
1267
1268 Some(prost::Message::encode_to_vec(&ftapi_resp))
1269 }
1270}
1271
1272struct GetOrderFeeHandler {
1274 backend: crate::bridge::SharedBackend,
1275 #[expect(dead_code)]
1276 cache: Arc<TrdCache>,
1277}
1278
1279const CMD_BATCH_QUERY_ORDER_FEE: u16 = 2273;
1281const ORDER_FEE_DEFAULT_PAGE_SIZE: u32 = 400;
1283const ORDER_FEE_COUNT_LIMIT: usize = 400;
1285const ORDER_FEE_BEGIN_DATE: u32 = 20180101;
1287
1288fn get_tomorrow_date() -> u32 {
1290 use std::time::{Duration, SystemTime, UNIX_EPOCH};
1291 let tomorrow = SystemTime::now() + Duration::from_secs(86400);
1292 let secs = tomorrow
1293 .duration_since(UNIX_EPOCH)
1294 .unwrap_or_default()
1295 .as_secs();
1296 let days = secs / 86400;
1297 let (y, m, d) = days_to_ymd(days);
1298 (y * 10000 + m * 100 + d) as u32
1299}
1300
1301fn days_to_ymd(days: u64) -> (u64, u64, u64) {
1303 let z = days + 719468;
1304 let era = z / 146097;
1305 let doe = z - era * 146097;
1306 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1307 let y = yoe + era * 400;
1308 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1309 let mp = (5 * doy + 2) / 153;
1310 let d = doy - (153 * mp + 2) / 5 + 1;
1311 let m = if mp < 10 { mp + 3 } else { mp - 9 };
1312 let y = if m <= 2 { y + 1 } else { y };
1313 (y, m, d)
1314}
1315
1316#[async_trait]
1317impl RequestHandler for GetOrderFeeHandler {
1318 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
1319 use futu_backend::proto_internal::order_fee_query;
1320
1321 let backend = match super::load_backend(&self.backend) {
1322 Some(b) => b,
1323 None => {
1324 return Some(super::make_error_response(-1, "GetOrderFee: 后端未连接"));
1325 }
1326 };
1327
1328 let req: futu_proto::trd_get_order_fee::Request =
1329 prost::Message::decode(request.body.as_ref()).ok()?;
1330 let header = req.c2s.header;
1331 let acc_id = header.acc_id;
1332
1333 if req.c2s.order_id_ex_list.is_empty() {
1335 let ftapi_resp = futu_proto::trd_get_order_fee::Response {
1336 ret_type: 0,
1337 ret_msg: None,
1338 err_code: None,
1339 s2c: Some(futu_proto::trd_get_order_fee::S2c {
1340 header,
1341 order_fee_list: vec![],
1342 }),
1343 };
1344 return Some(prost::Message::encode_to_vec(&ftapi_resp));
1345 }
1346
1347 let end_date = get_tomorrow_date();
1348
1349 tracing::info!(
1350 acc_id,
1351 order_count = req.c2s.order_id_ex_list.len(),
1352 "querying order fee from backend"
1353 );
1354
1355 let mut all_order_fees: Vec<order_fee_query::OrderFee> = Vec::new();
1357 let mut page_index: u32 = 0;
1358
1359 loop {
1360 let backend_req = order_fee_query::BatchQueryOrderFeeReq {
1361 uniq_id: acc_id,
1362 begin_date: ORDER_FEE_BEGIN_DATE,
1363 end_date,
1364 exchange: vec![],
1365 order_id: req.c2s.order_id_ex_list.clone(),
1366 page_index,
1367 page_size: ORDER_FEE_DEFAULT_PAGE_SIZE,
1368 };
1369
1370 let resp = match backend
1371 .request(
1372 CMD_BATCH_QUERY_ORDER_FEE,
1373 prost::Message::encode_to_vec(&backend_req),
1374 )
1375 .await
1376 {
1377 Ok(r) => r,
1378 Err(e) => {
1379 tracing::warn!(error = %e, "GetOrderFee backend request failed");
1380 return Some(super::make_error_response(-1, &format!("GetOrderFee: {e}")));
1381 }
1382 };
1383
1384 let backend_rsp: order_fee_query::BatchQueryOrderFeeRsp =
1385 match prost::Message::decode(resp.body.as_ref()) {
1386 Ok(r) => r,
1387 Err(e) => {
1388 tracing::warn!(error = %e, "GetOrderFee response decode failed");
1389 return Some(super::make_error_response(-1, "GetOrderFee: 响应解析失败"));
1390 }
1391 };
1392
1393 if backend_rsp.err_code != 0 {
1394 let err_msg = if backend_rsp.err_msg.is_empty() {
1395 "unknown error".to_string()
1396 } else {
1397 backend_rsp.err_msg.clone()
1398 };
1399 tracing::warn!(err_msg = %err_msg, "GetOrderFee backend error");
1400 return Some(super::make_error_response(
1401 -1,
1402 &format!("GetOrderFee: {err_msg}"),
1403 ));
1404 }
1405
1406 all_order_fees.extend(backend_rsp.order_fee_list);
1407
1408 let total = backend_rsp.total;
1409 let page_size = backend_rsp.page_size.max(ORDER_FEE_DEFAULT_PAGE_SIZE);
1410 let completed = (page_index + 1) * page_size >= total
1411 || all_order_fees.len() >= ORDER_FEE_COUNT_LIMIT;
1412
1413 if completed {
1414 break;
1415 }
1416 page_index += 1;
1417 }
1418
1419 all_order_fees.truncate(ORDER_FEE_COUNT_LIMIT);
1420
1421 let order_fee_list: Vec<futu_proto::trd_common::OrderFee> = all_order_fees
1423 .iter()
1424 .map(|f| {
1425 let mut total: f64 = 0.0;
1426 let mut fee_items: Vec<futu_proto::trd_common::OrderFeeItem> = Vec::new();
1427
1428 for group in &f.fee_group_list {
1429 for item in &group.fee_list {
1430 let value: f64 = item.val.parse().unwrap_or(0.0);
1431 if !item.title.is_empty() {
1432 fee_items.push(futu_proto::trd_common::OrderFeeItem {
1433 title: Some(item.title.clone()),
1434 value: Some(value),
1435 });
1436 total += value;
1437 }
1438 }
1439 }
1440
1441 futu_proto::trd_common::OrderFee {
1442 order_id_ex: f.order_id.clone(),
1443 fee_amount: Some(total),
1444 fee_list: fee_items,
1445 }
1446 })
1447 .collect();
1448
1449 tracing::info!(count = order_fee_list.len(), "GetOrderFee success");
1450
1451 let ftapi_resp = futu_proto::trd_get_order_fee::Response {
1452 ret_type: 0,
1453 ret_msg: None,
1454 err_code: None,
1455 s2c: Some(futu_proto::trd_get_order_fee::S2c {
1456 header,
1457 order_fee_list,
1458 }),
1459 };
1460
1461 Some(prost::Message::encode_to_vec(&ftapi_resp))
1462 }
1463}
1464
1465struct FlowSummaryHandler {
1467 backend: crate::bridge::SharedBackend,
1468 cache: Arc<TrdCache>,
1469}
1470
1471const CMD_FLOW_SUMMARY: u16 = 20963;
1473const FLOW_SUMMARY_PAGE_SIZE: i32 = 2000;
1475
1476fn format_date_str(date: i32) -> String {
1478 let y = date / 10000;
1479 let m = (date / 100) % 100;
1480 let d = date % 100;
1481 format!("{y:04}-{m:02}-{d:02}")
1482}
1483
1484fn parse_clearing_date(s: &str) -> Option<i32> {
1486 let parts: Vec<&str> = s.split('-').collect();
1487 if parts.len() == 3 {
1488 let y: i32 = parts[0].parse().ok()?;
1489 let m: i32 = parts[1].parse().ok()?;
1490 let d: i32 = parts[2].parse().ok()?;
1491 Some(y * 10000 + m * 100 + d)
1492 } else {
1493 None
1494 }
1495}
1496
1497fn currency_str_to_enum(ccy: &str) -> i32 {
1499 match ccy {
1500 "HKD" => 1,
1501 "USD" => 2,
1502 "CNH" | "CNY" | "RMB" => 3,
1503 "JPY" => 4,
1504 "SGD" => 5,
1505 "AUD" => 6,
1506 "CAD" => 7,
1507 "GBP" => 8,
1508 "MYR" => 9,
1509 _ => 0,
1510 }
1511}
1512
1513fn extract_intra_acc_id(acc_id: u64) -> i64 {
1515 (acc_id & 0xFFFF_FFFF) as i64
1516}
1517
1518fn extract_market_id(acc_id: u64) -> i32 {
1520 ((acc_id >> 32) & 0xFFFF) as i32
1521}
1522
1523#[async_trait]
1524impl RequestHandler for FlowSummaryHandler {
1525 async fn handle(&self, _conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
1526 use futu_backend::proto_internal::soa_std_data;
1527
1528 let backend = match super::load_backend(&self.backend) {
1529 Some(b) => b,
1530 None => {
1531 return Some(super::make_error_response(-1, "FlowSummary: 后端未连接"));
1532 }
1533 };
1534
1535 let req: futu_proto::trd_flow_summary::Request =
1536 prost::Message::decode(request.body.as_ref()).ok()?;
1537 let header = req.c2s.header;
1538 let acc_id = header.acc_id;
1539
1540 let clearing_date = match parse_clearing_date(&req.c2s.clearing_date) {
1541 Some(d) => d,
1542 None => {
1543 return Some(super::make_error_response(
1544 -1,
1545 "FlowSummary: 日期格式错误,需 YYYY-MM-DD",
1546 ));
1547 }
1548 };
1549
1550 let intra_acc_id = extract_intra_acc_id(acc_id);
1551 let market_id = extract_market_id(acc_id);
1552 let market = if market_id != 0 {
1553 market_id
1554 } else {
1555 self.cache
1556 .accounts
1557 .get(&acc_id)
1558 .and_then(|a| a.trd_market)
1559 .unwrap_or(1)
1560 };
1561
1562 tracing::info!(
1563 acc_id,
1564 clearing_date,
1565 intra_acc_id,
1566 market,
1567 "querying flow summary from backend"
1568 );
1569
1570 let mut all_items: Vec<soa_std_data::FlowSummaryItem> = Vec::new();
1572 let mut start_id: u64 = 0;
1573
1574 loop {
1575 let backend_req = soa_std_data::FlowSummaryReq {
1576 query_comm: Some(soa_std_data::QueryComm {
1577 date: clearing_date,
1578 market,
1579 identifier: "std".to_string(),
1580 start_id,
1581 end_id: 0,
1582 page_size: FLOW_SUMMARY_PAGE_SIZE,
1583 cond_fields: vec![soa_std_data::CondField {
1584 op: soa_std_data::Op::WhereIn as i32,
1585 field_name: "account_id".to_string(),
1586 string_list: vec![],
1587 int32_list: vec![],
1588 int64_list: vec![intra_acc_id],
1589 }],
1590 ret_fields: vec![],
1591 persist: String::new(),
1592 }),
1593 };
1594
1595 let resp = match backend
1596 .request(
1597 CMD_FLOW_SUMMARY,
1598 prost::Message::encode_to_vec(&backend_req),
1599 )
1600 .await
1601 {
1602 Ok(r) => r,
1603 Err(e) => {
1604 tracing::warn!(error = %e, "FlowSummary backend request failed");
1605 if !all_items.is_empty() {
1606 break;
1607 }
1608 return Some(super::make_error_response(-1, &format!("FlowSummary: {e}")));
1609 }
1610 };
1611
1612 let backend_rsp: soa_std_data::FlowSummaryRsp =
1613 match prost::Message::decode(resp.body.as_ref()) {
1614 Ok(r) => r,
1615 Err(_) => {
1616 break;
1618 }
1619 };
1620
1621 let item_count = backend_rsp.items.len();
1622
1623 let max_item_id = backend_rsp
1624 .items
1625 .iter()
1626 .filter_map(|item| if item.id > 0 { Some(item.id) } else { None })
1627 .max()
1628 .unwrap_or(0);
1629
1630 all_items.extend(backend_rsp.items);
1631
1632 let mut completed = true;
1633 if let Some(ref info) = backend_rsp.instance_info {
1634 if item_count as i32 >= FLOW_SUMMARY_PAGE_SIZE
1635 && info.max_id > 0
1636 && max_item_id != info.max_id
1637 {
1638 completed = false;
1639 start_id = max_item_id + 1;
1640 }
1641 }
1642
1643 if completed {
1644 break;
1645 }
1646 }
1647
1648 tracing::info!(
1649 total_items = all_items.len(),
1650 "FlowSummary raw items retrieved"
1651 );
1652
1653 let cash_flow_direction_filter = req.c2s.cash_flow_direction.unwrap_or(0);
1656
1657 let flow_summary_info_list: Vec<futu_proto::trd_flow_summary::FlowSummaryInfo> = all_items
1658 .iter()
1659 .filter_map(|item| {
1660 let amount: f64 = item.net_amount.parse().unwrap_or(0.0);
1661 let total_fee: f64 = item.total_fee.parse().unwrap_or(0.0);
1662 let trade_flag = item.trade_flag;
1663
1664 if trade_flag == 1 || (amount == 0.0 && total_fee == 0.0) {
1665 return None;
1666 }
1667
1668 let direction = if amount > 0.0 {
1669 1 } else if amount < 0.0 {
1671 2 } else {
1673 0
1674 };
1675
1676 if cash_flow_direction_filter != 0 && direction != cash_flow_direction_filter {
1677 return None;
1678 }
1679
1680 let clearing_date_str = format_date_str(item.clearing_date);
1681 let settlement_date_str = format_date_str(item.settlement_date);
1682 let currency = currency_str_to_enum(&item.ccy);
1683
1684 Some(futu_proto::trd_flow_summary::FlowSummaryInfo {
1685 clearing_date: Some(clearing_date_str),
1686 settlement_date: Some(settlement_date_str),
1687 currency: Some(currency),
1688 cash_flow_type: Some(item.comment.clone()),
1689 cash_flow_direction: Some(direction),
1690 cash_flow_amount: Some(amount),
1691 cash_flow_remark: Some(item.comment.clone()),
1692 cash_flow_id: Some(item.id),
1693 })
1694 })
1695 .collect();
1696
1697 tracing::info!(count = flow_summary_info_list.len(), "FlowSummary success");
1698
1699 let ftapi_resp = futu_proto::trd_flow_summary::Response {
1700 ret_type: 0,
1701 ret_msg: None,
1702 err_code: None,
1703 s2c: Some(futu_proto::trd_flow_summary::S2c {
1704 header,
1705 flow_summary_info_list,
1706 }),
1707 };
1708
1709 Some(prost::Message::encode_to_vec(&ftapi_resp))
1710 }
1711}
1712
1713struct BackendTrdForwardHandler {
1715 name: &'static str,
1716 backend: crate::bridge::SharedBackend,
1717 backend_cmd_id: Option<u16>,
1719}
1720
1721#[async_trait]
1722impl RequestHandler for BackendTrdForwardHandler {
1723 async fn handle(&self, conn_id: u64, request: &IncomingRequest) -> Option<Vec<u8>> {
1724 let backend = match super::load_backend(&self.backend) {
1725 Some(b) => b,
1726 None => {
1727 return Some(super::make_error_response(
1728 -1,
1729 &format!("{}: 后端未连接", self.name),
1730 ));
1731 }
1732 };
1733
1734 let backend_cmd = match self.backend_cmd_id {
1735 Some(cmd) => cmd,
1736 None => {
1737 return Some(super::make_error_response(
1739 -1,
1740 &format!("{}: 暂不支持", self.name),
1741 ));
1742 }
1743 };
1744
1745 tracing::info!(
1746 proto_id = request.proto_id,
1747 conn_id,
1748 handler = self.name,
1749 backend_cmd,
1750 "forwarding trade request to backend"
1751 );
1752
1753 match backend.request(backend_cmd, request.body.to_vec()).await {
1756 Ok(resp) => {
1757 tracing::debug!(
1758 handler = self.name,
1759 body_len = resp.body.len(),
1760 "backend trade response"
1761 );
1762 Some(resp.body.to_vec())
1766 }
1767 Err(e) => {
1768 tracing::warn!(
1769 handler = self.name,
1770 error = %e,
1771 "backend trade request failed"
1772 );
1773 Some(super::make_error_response(
1774 -1,
1775 &format!("{}: {e}", self.name),
1776 ))
1777 }
1778 }
1779 }
1780}