1use std::collections::HashMap;
9use std::future::Future;
10use std::sync::Arc;
11use std::sync::LazyLock;
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14use parking_lot::Mutex;
15
16mod response;
17use response::response_ret_type_is_success;
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct ReqReplyStatisticsSnapshot {
21 pub proto_id: u32,
22 pub count: u32,
23 pub total_cost_avg_ms: f32,
24 pub open_d_cost_avg_ms: f32,
25 pub net_delay_avg_ms: f32,
26 pub is_local_reply: bool,
27}
28
29#[derive(Debug, Clone, PartialEq)]
30pub struct QotPushStatisticsSnapshot {
31 pub qot_push_type: i32,
32 pub item_list: Vec<DelayStatisticsItemSnapshot>,
33 pub delay_avg_ms: f32,
34 pub count: i32,
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub struct PlaceOrderStatisticsSnapshot {
39 pub order_id: String,
40 pub total_cost_ms: f32,
41 pub open_d_cost_ms: f32,
42 pub net_delay_ms: f32,
43 pub update_cost_ms: f32,
44}
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct DelayStatisticsItemSnapshot {
48 pub begin: i32,
49 pub end: i32,
50 pub count: i32,
51 pub proportion: f32,
52 pub cumulative_ratio: f32,
53}
54
55pub const QOT_PUSH_TYPE_PRICE: i32 = 1;
56pub const QOT_PUSH_TYPE_TICKER: i32 = 2;
57pub const QOT_PUSH_TYPE_ORDER_BOOK: i32 = 3;
58pub const QOT_PUSH_TYPE_BROKER: i32 = 4;
59
60const QOT_PUSH_STAGE_SR2SS: i32 = 1;
61const QOT_PUSH_STAGE_SS2CR: i32 = 2;
62const QOT_PUSH_STAGE_CR2CS: i32 = 3;
63const QOT_PUSH_STAGE_SS2CS: i32 = 4;
64const QOT_PUSH_STAGE_SR2CS: i32 = 5;
65
66#[derive(Default)]
67struct ReqReplyCounter {
68 count: u32,
69 total_cost_avg_ms: f64,
70 open_d_cost_avg_ms: f64,
71 net_delay_avg_ms: f64,
72 is_local_reply: bool,
73}
74
75impl ReqReplyCounter {
76 fn add(&mut self, total: Duration, open_d: Duration, net_delay: Duration, local_reply: bool) {
77 let count = self.count as f64;
78 self.total_cost_avg_ms =
79 ((self.total_cost_avg_ms * count) + duration_ms(total)) / (count + 1.0);
80 self.open_d_cost_avg_ms =
81 ((self.open_d_cost_avg_ms * count) + duration_ms(open_d)) / (count + 1.0);
82 self.net_delay_avg_ms =
83 ((self.net_delay_avg_ms * count) + duration_ms(net_delay)) / (count + 1.0);
84 self.count += 1;
85 if local_reply {
86 self.is_local_reply = true;
87 }
88 }
89
90 fn snapshot(&self, proto_id: u32) -> ReqReplyStatisticsSnapshot {
91 ReqReplyStatisticsSnapshot {
92 proto_id,
93 count: self.count,
94 total_cost_avg_ms: self.total_cost_avg_ms as f32,
95 open_d_cost_avg_ms: self.open_d_cost_avg_ms as f32,
96 net_delay_avg_ms: self.net_delay_avg_ms as f32,
97 is_local_reply: self.is_local_reply,
98 }
99 }
100}
101
102#[derive(Default)]
103struct DelayStatisticsStore {
104 req_reply_counts: Mutex<HashMap<u32, ReqReplyCounter>>,
105 qot_push_counts: Mutex<HashMap<i32, QotPushCounter>>,
106 place_order_req_details: Mutex<Vec<PlaceOrderReqDetail>>,
107 place_order_push_once: Mutex<Vec<PlaceOrderPushOnce>>,
108 time_adjustment: Mutex<TimeAdjustment>,
109}
110
111impl DelayStatisticsStore {
112 fn set_time_adjustment(&self, s2c_time_diff_us: i64, net_delay_us: i64) {
113 *self.time_adjustment.lock() = TimeAdjustment {
114 s2c_time_diff_us,
115 net_delay_us,
116 };
117 }
118
119 fn time_adjustment(&self) -> TimeAdjustment {
120 *self.time_adjustment.lock()
121 }
122
123 fn record_req_reply(&self, ctx: &ApiRequestDelayContext) {
124 let total_cost = ctx.begin.elapsed();
125 let api_end_at = SystemTime::now();
126 let backend_spans = ctx.backend_spans.lock();
127 let is_local_reply = backend_spans.is_empty();
128 let backend_cost = backend_union_duration(&backend_spans);
129 let open_d_cost = total_cost.saturating_sub(backend_cost);
130
131 self.req_reply_counts
132 .lock()
133 .entry(ctx.proto_id)
134 .or_default()
135 .add(total_cost, open_d_cost, Duration::ZERO, is_local_reply);
136
137 self.record_place_order_req_detail(ctx, &backend_spans, api_end_at);
138 }
139
140 fn snapshot_req_reply(&self) -> Vec<ReqReplyStatisticsSnapshot> {
141 let mut items: Vec<_> = self
142 .req_reply_counts
143 .lock()
144 .iter()
145 .map(|(&proto_id, counter)| counter.snapshot(proto_id))
146 .collect();
147 items.sort_by_key(|item| item.proto_id);
148 items
149 }
150
151 fn record_qot_push_count(
152 &self,
153 qot_push_type: i32,
154 server_recv_from_exchange_time_ms: Option<i64>,
155 server_send_to_client_time_ms: Option<i64>,
156 f3c_recv_at: SystemTime,
157 api_response_at: SystemTime,
158 ) {
159 let Some(f3c_recv_us) = system_time_us(f3c_recv_at) else {
160 return;
161 };
162 let Some(api_response_us) = system_time_us(api_response_at) else {
163 return;
164 };
165
166 let TimeAdjustment {
167 s2c_time_diff_us,
168 net_delay_us,
169 } = *self.time_adjustment.lock();
170
171 let mut server_recv_us = server_recv_from_exchange_time_ms.unwrap_or(0) * 1000;
172 let mut server_send_us = server_send_to_client_time_ms.unwrap_or(0) * 1000;
173 if server_recv_us != 0 {
174 server_recv_us -= s2c_time_diff_us;
175 }
176 if server_send_us != 0 {
177 server_send_us -= s2c_time_diff_us;
178 }
179
180 let mut f3c_recv_us = f3c_recv_us;
185 let mut api_response_us = api_response_us;
186 if server_send_us > f3c_recv_us {
187 let open_d_cost_us = api_response_us.saturating_sub(f3c_recv_us);
188 f3c_recv_us = server_send_us + net_delay_us;
189 api_response_us = f3c_recv_us + open_d_cost_us;
190 }
191
192 let mut counts = self.qot_push_counts.lock();
193 let counter = counts.entry(qot_push_type).or_default();
194 counter.add(QOT_PUSH_STAGE_SR2SS, server_recv_us, server_send_us);
195 counter.add(QOT_PUSH_STAGE_SS2CR, server_send_us, f3c_recv_us);
196 counter.add(QOT_PUSH_STAGE_CR2CS, f3c_recv_us, api_response_us);
197 counter.add(QOT_PUSH_STAGE_SS2CS, server_send_us, api_response_us);
198 counter.add(QOT_PUSH_STAGE_SR2CS, server_recv_us, api_response_us);
199 counter.total_count = counter.total_count.saturating_add(1);
200 }
201
202 fn snapshot_qot_push(
203 &self,
204 qot_push_stage: i32,
205 segment_list: &[i32],
206 ) -> Vec<QotPushStatisticsSnapshot> {
207 if qot_push_stage == 0 || segment_list.len() < 2 {
208 return Vec::new();
209 }
210
211 let counts = self.qot_push_counts.lock();
212 let mut items = Vec::new();
213 for qot_push_type in [
214 QOT_PUSH_TYPE_PRICE,
215 QOT_PUSH_TYPE_TICKER,
216 QOT_PUSH_TYPE_ORDER_BOOK,
217 QOT_PUSH_TYPE_BROKER,
218 ] {
219 let Some(counter) = counts.get(&qot_push_type) else {
220 continue;
221 };
222 let Some(stage) = counter.stages.get(&qot_push_stage) else {
223 continue;
224 };
225 if stage.total == 0 {
226 continue;
227 }
228
229 let mut cumulative_count = 0i32;
230 let total = stage.total as f32;
231 let item_list = segment_list
232 .windows(2)
233 .map(|window| {
234 let begin = window[0];
235 let end = window[1];
236 let count = stage.range_count(begin, end) as i32;
237 cumulative_count += count;
238 DelayStatisticsItemSnapshot {
239 begin,
240 end,
241 count,
242 proportion: count as f32 / total * 100.0,
243 cumulative_ratio: cumulative_count as f32 / total * 100.0,
244 }
245 })
246 .collect();
247
248 items.push(QotPushStatisticsSnapshot {
249 qot_push_type,
250 item_list,
251 delay_avg_ms: stage.cost_avg_ms,
252 count: stage.total as i32,
253 });
254 }
255 items
256 }
257
258 fn record_place_order_req_detail(
259 &self,
260 ctx: &ApiRequestDelayContext,
261 backend_spans: &[BackendSpan],
262 api_end_at: SystemTime,
263 ) {
264 if ctx.proto_id != futu_core::proto_id::TRD_PLACE_ORDER {
265 return;
266 }
267 let Some(user_data) = ctx.place_order_user_data.lock().clone() else {
268 return;
269 };
270 let Some(api_begin_us) = system_time_us(ctx.begin_at) else {
271 return;
272 };
273 let Some(api_end_us) = system_time_us(api_end_at) else {
274 return;
275 };
276 if api_end_us < api_begin_us {
277 return;
278 }
279 let first_span = backend_spans.first();
280 let first_backend_begin_us = first_span.and_then(|span| system_time_us(span.begin_at));
281 let first_backend_end_us = first_span.and_then(|span| system_time_us(span.end_at));
282
283 self.place_order_req_details
284 .lock()
285 .push(PlaceOrderReqDetail {
286 order_id: user_data.order_id,
287 trd_env: user_data.trd_env,
288 _market: user_data.market,
289 api_begin_us,
290 api_end_us,
291 first_backend_begin_us,
292 first_backend_end_us,
293 net_delay_us: ctx.time_adjustment.net_delay_us,
294 });
295 }
296
297 fn set_place_order_user_data(&self, user_data: PlaceOrderUserData) {
298 let _ = CURRENT_API_REQUEST.try_with(|ctx| {
299 *ctx.place_order_user_data.lock() = Some(user_data);
300 });
301 }
302
303 fn record_place_order_update_push(
304 &self,
305 order_id: String,
306 trd_env: i32,
307 market: i32,
308 order_status: i32,
309 api_response_at: SystemTime,
310 ) {
311 let Some(api_response_us) = system_time_us(api_response_at) else {
312 return;
313 };
314 self.place_order_push_once.lock().push(PlaceOrderPushOnce {
315 order_id,
316 _trd_env: trd_env,
317 _market: market,
318 order_status,
319 api_response_us,
320 });
321 }
322
323 fn snapshot_place_order(&self) -> Vec<PlaceOrderStatisticsSnapshot> {
324 let mut earliest_push_by_order: HashMap<String, i64> = HashMap::new();
325 for push in self.place_order_push_once.lock().iter() {
326 if !place_order_update_status_counts(push.order_status) {
327 continue;
328 }
329 earliest_push_by_order
330 .entry(push.order_id.clone())
331 .and_modify(|existing| *existing = (*existing).min(push.api_response_us))
332 .or_insert(push.api_response_us);
333 }
334
335 self.place_order_req_details
336 .lock()
337 .iter()
338 .filter(|detail| detail.trd_env != 0)
339 .filter_map(|detail| {
340 let total_cost_us = detail.api_end_us.checked_sub(detail.api_begin_us)?;
341 let open_d_cost_us =
342 match (detail.first_backend_begin_us, detail.first_backend_end_us) {
343 (Some(begin), Some(end)) => begin
344 .saturating_sub(detail.api_begin_us)
345 .saturating_add(detail.api_end_us.saturating_sub(end)),
346 _ => 0,
347 };
348 let update_cost_us = earliest_push_by_order
349 .get(&detail.order_id)
350 .map(|push_us| push_us.saturating_sub(detail.api_end_us))
351 .unwrap_or(0);
352
353 Some(PlaceOrderStatisticsSnapshot {
354 order_id: detail.order_id.clone(),
355 total_cost_ms: us_to_ms(total_cost_us),
356 open_d_cost_ms: us_to_ms(open_d_cost_us),
357 net_delay_ms: us_to_ms(detail.net_delay_us.max(0)),
358 update_cost_ms: us_to_ms(update_cost_us),
359 })
360 })
361 .collect()
362 }
363}
364
365#[derive(Debug, Clone, Copy, Default)]
366struct TimeAdjustment {
367 s2c_time_diff_us: i64,
368 net_delay_us: i64,
369}
370
371#[derive(Debug, Clone)]
372struct PlaceOrderUserData {
373 order_id: String,
374 trd_env: i32,
375 market: i32,
376}
377
378#[derive(Debug, Clone)]
379struct PlaceOrderReqDetail {
380 order_id: String,
381 trd_env: i32,
382 _market: i32,
383 api_begin_us: i64,
384 api_end_us: i64,
385 first_backend_begin_us: Option<i64>,
386 first_backend_end_us: Option<i64>,
387 net_delay_us: i64,
388}
389
390#[derive(Debug, Clone)]
391struct PlaceOrderPushOnce {
392 order_id: String,
393 _trd_env: i32,
394 _market: i32,
395 order_status: i32,
396 api_response_us: i64,
397}
398
399#[derive(Default)]
400struct QotPushCounter {
401 total_count: u32,
402 stages: HashMap<i32, PushStageCounter>,
403}
404
405impl QotPushCounter {
406 fn add(&mut self, stage: i32, begin_us: i64, end_us: i64) {
407 if begin_us == 0 || end_us == 0 {
408 return;
409 }
410
411 let cost_ms = ((end_us - begin_us) as f64 / 1000.0 + 0.5).max(0.0).floor() as i32;
412 self.stages.entry(stage).or_default().add(cost_ms);
413 }
414}
415
416#[derive(Default)]
417struct PushStageCounter {
418 cost_avg_ms: f32,
419 total: u32,
420 buckets: Vec<CountItem>,
421}
422
423impl PushStageCounter {
424 fn add(&mut self, cost_ms: i32) {
425 if self.buckets.is_empty() {
426 self.buckets = default_count_buckets();
427 }
428
429 for bucket in &mut self.buckets {
430 if cost_ms >= bucket.begin && (cost_ms < bucket.end || bucket.end == -1) {
431 bucket.add(cost_ms);
432 break;
433 }
434 }
435
436 let total = self.total as f32;
437 self.cost_avg_ms = (self.cost_avg_ms * total + cost_ms as f32) / (total + 1.0);
438 self.total = self.total.saturating_add(1);
439 }
440
441 fn range_count(&self, begin: i32, end: i32) -> u32 {
442 self.buckets
443 .iter()
444 .map(|bucket| bucket.range_count(begin, end))
445 .sum()
446 }
447}
448
449#[derive(Debug, Clone)]
450struct CountItem {
451 arr_count: usize,
452 begin: i32,
453 end: i32,
454 gap: i32,
455 counts: Vec<u32>,
456}
457
458impl CountItem {
459 fn new(arr_count: usize, begin: i32, end: i32) -> Self {
460 let range = end - begin;
461 let gap = if range > 0 && arr_count > 0 {
462 range / arr_count as i32
463 } else {
464 0
465 };
466 Self {
467 arr_count,
468 begin,
469 end,
470 gap,
471 counts: vec![0; arr_count],
472 }
473 }
474
475 fn add(&mut self, cost_ms: i32) {
476 let idx = if self.gap > 0 {
477 ((cost_ms - self.begin) / self.gap) as usize
478 } else {
479 0
480 };
481 if let Some(count) = self.counts.get_mut(idx) {
482 *count = count.saturating_add(1);
483 }
484 }
485
486 fn range_count(&self, begin: i32, end: i32) -> u32 {
487 if self.counts.is_empty()
488 || (end != -1 && end <= self.begin)
489 || (self.end != -1 && begin >= self.end)
490 {
491 return 0;
492 }
493
494 if self.end == -1 {
495 return if end == -1 || end > self.begin {
496 self.counts[0]
497 } else {
498 0
499 };
500 }
501
502 let overlap_begin = begin.max(self.begin);
503 let overlap_end = if end == -1 {
504 self.end
505 } else {
506 end.min(self.end)
507 };
508 if overlap_begin >= overlap_end {
509 return 0;
510 }
511
512 let start_idx = if self.gap > 0 {
513 ((overlap_begin - self.begin) / self.gap) as usize
514 } else {
515 0
516 };
517 if start_idx >= self.arr_count {
518 return 0;
519 }
520 let end_idx = if self.gap > 0 {
521 (((overlap_end - self.begin) / self.gap) as usize).saturating_sub(1)
522 } else {
523 0
524 }
525 .min(self.arr_count.saturating_sub(1));
526 self.counts[start_idx..=end_idx].iter().sum()
527 }
528}
529
530fn default_count_buckets() -> Vec<CountItem> {
531 vec![
532 CountItem::new(100, 0, 100),
533 CountItem::new(18, 100, 1000),
534 CountItem::new(8, 1000, 5000),
535 CountItem::new(1, 5000, -1),
536 ]
537}
538
539#[derive(Debug)]
540struct ApiRequestDelayContext {
541 proto_id: u32,
542 begin: Instant,
543 begin_at: SystemTime,
544 backend_spans: Mutex<Vec<BackendSpan>>,
545 place_order_user_data: Mutex<Option<PlaceOrderUserData>>,
546 time_adjustment: TimeAdjustment,
547}
548
549impl ApiRequestDelayContext {
550 fn new(proto_id: u32) -> Self {
551 Self {
552 proto_id,
553 begin: Instant::now(),
554 begin_at: SystemTime::now(),
555 backend_spans: Mutex::new(Vec::new()),
556 place_order_user_data: Mutex::new(None),
557 time_adjustment: DELAY_STATS.time_adjustment(),
558 }
559 }
560
561 fn record_backend_span(
562 &self,
563 begin: Instant,
564 end: Instant,
565 begin_at: SystemTime,
566 end_at: SystemTime,
567 ) {
568 if end >= begin {
569 self.backend_spans.lock().push(BackendSpan {
570 begin,
571 end,
572 begin_at,
573 end_at,
574 });
575 }
576 }
577}
578
579#[derive(Debug, Clone, Copy)]
580struct BackendSpan {
581 begin: Instant,
582 end: Instant,
583 begin_at: SystemTime,
584 end_at: SystemTime,
585}
586
587static DELAY_STATS: LazyLock<DelayStatisticsStore> = LazyLock::new(DelayStatisticsStore::default);
588
589tokio::task_local! {
590 static CURRENT_API_REQUEST: Arc<ApiRequestDelayContext>;
591}
592
593pub async fn with_api_request<F, Fut>(
599 _conn_id: u64,
600 _serial_no: u32,
601 proto_id: u32,
602 future: F,
603) -> Option<Vec<u8>>
604where
605 F: FnOnce() -> Fut,
606 Fut: Future<Output = Option<Vec<u8>>>,
607{
608 let ctx = Arc::new(ApiRequestDelayContext::new(proto_id));
609 let response = CURRENT_API_REQUEST.scope(ctx.clone(), future()).await;
610
611 if response
612 .as_deref()
613 .is_some_and(response_ret_type_is_success)
614 {
615 DELAY_STATS.record_req_reply(&ctx);
616 }
617
618 response
619}
620
621pub async fn trace_backend_request<Fut, T>(_cmd_id: u16, future: Fut) -> T
625where
626 Fut: Future<Output = T>,
627{
628 let begin = Instant::now();
629 let begin_at = SystemTime::now();
630 let output = future.await;
631 let end_at = SystemTime::now();
632 let end = Instant::now();
633
634 let _ =
635 CURRENT_API_REQUEST.try_with(|ctx| ctx.record_backend_span(begin, end, begin_at, end_at));
636 output
637}
638
639pub fn snapshot_req_reply_statistics() -> Vec<ReqReplyStatisticsSnapshot> {
640 DELAY_STATS.snapshot_req_reply()
641}
642
643pub fn record_qot_push_count(
644 qot_push_type: i32,
645 server_recv_from_exchange_time_ms: Option<i64>,
646 server_send_to_client_time_ms: Option<i64>,
647 f3c_recv_at: SystemTime,
648 api_response_at: SystemTime,
649) {
650 DELAY_STATS.record_qot_push_count(
651 qot_push_type,
652 server_recv_from_exchange_time_ms,
653 server_send_to_client_time_ms,
654 f3c_recv_at,
655 api_response_at,
656 );
657}
658
659pub fn record_place_order_request(order_id: &str, trd_env: i32, market: i32) {
665 let order_id = order_id.trim();
666 if order_id.is_empty() {
667 return;
668 }
669 DELAY_STATS.set_place_order_user_data(PlaceOrderUserData {
670 order_id: order_id.to_string(),
671 trd_env,
672 market,
673 });
674}
675
676pub fn record_place_order_update_push(
683 order_id: &str,
684 trd_env: i32,
685 market: i32,
686 order_status: i32,
687 api_response_at: SystemTime,
688) {
689 let order_id = order_id.trim();
690 if order_id.is_empty() {
691 return;
692 }
693 DELAY_STATS.record_place_order_update_push(
694 order_id.to_string(),
695 trd_env,
696 market,
697 order_status,
698 api_response_at,
699 );
700}
701
702pub fn set_time_adjustment(s2c_time_diff_us: i64, net_delay_us: i64) {
712 DELAY_STATS.set_time_adjustment(s2c_time_diff_us, net_delay_us);
713}
714
715pub fn snapshot_qot_push_statistics(
716 qot_push_stage: i32,
717 segment_list: &[i32],
718) -> Vec<QotPushStatisticsSnapshot> {
719 DELAY_STATS.snapshot_qot_push(qot_push_stage, segment_list)
720}
721
722pub fn snapshot_place_order_statistics() -> Vec<PlaceOrderStatisticsSnapshot> {
723 DELAY_STATS.snapshot_place_order()
724}
725
726fn backend_union_duration(spans: &[BackendSpan]) -> Duration {
727 if spans.is_empty() {
728 return Duration::ZERO;
729 }
730
731 let mut spans = spans.to_vec();
732 spans.sort_by_key(|span| span.begin);
733
734 let mut total = Duration::ZERO;
735 let mut cur_begin = spans[0].begin;
736 let mut cur_end = spans[0].end;
737
738 for span in spans.into_iter().skip(1) {
739 if span.begin <= cur_end {
740 if span.end > cur_end {
741 cur_end = span.end;
742 }
743 } else {
744 total += cur_end.duration_since(cur_begin);
745 cur_begin = span.begin;
746 cur_end = span.end;
747 }
748 }
749
750 total + cur_end.duration_since(cur_begin)
751}
752
753fn duration_ms(duration: Duration) -> f64 {
754 duration.as_secs_f64() * 1000.0
755}
756
757fn us_to_ms(us: i64) -> f32 {
758 us as f32 / 1000.0
759}
760
761fn system_time_us(time: SystemTime) -> Option<i64> {
762 let duration = time.duration_since(UNIX_EPOCH).ok()?;
763 i64::try_from(duration.as_micros()).ok()
764}
765
766fn place_order_update_status_counts(order_status: i32) -> bool {
767 matches!(order_status, 5 | 11 | 14 | 10)
771}
772
773#[cfg(test)]
774mod tests;