Skip to main content

futu_backend/
delay_stats.rs

1//! C++ `INNData_ProtoDelay`-style local delay statistics.
2//!
3//! This module intentionally lives near `BackendConn`: the request/reply
4//! statistic needs to observe both public API dispatch and backend request
5//! spans.  Public API dispatch is wired from `futu-server`, while backend spans
6//! are wired in `BackendConn::request_with_reserved_timeout`.
7
8use std::collections::HashMap;
9use std::future::Future;
10use std::sync::Arc;
11use std::sync::LazyLock;
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14use parking_lot::Mutex;
15
16mod response;
17use response::response_ret_type_is_success;
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct ReqReplyStatisticsSnapshot {
21    pub proto_id: u32,
22    pub count: u32,
23    pub total_cost_avg_ms: f32,
24    pub open_d_cost_avg_ms: f32,
25    pub net_delay_avg_ms: f32,
26    pub is_local_reply: bool,
27}
28
29#[derive(Debug, Clone, PartialEq)]
30pub struct QotPushStatisticsSnapshot {
31    pub qot_push_type: i32,
32    pub item_list: Vec<DelayStatisticsItemSnapshot>,
33    pub delay_avg_ms: f32,
34    pub count: i32,
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub struct PlaceOrderStatisticsSnapshot {
39    pub order_id: String,
40    pub total_cost_ms: f32,
41    pub open_d_cost_ms: f32,
42    pub net_delay_ms: f32,
43    pub update_cost_ms: f32,
44}
45
46#[derive(Debug, Clone, PartialEq)]
47pub struct DelayStatisticsItemSnapshot {
48    pub begin: i32,
49    pub end: i32,
50    pub count: i32,
51    pub proportion: f32,
52    pub cumulative_ratio: f32,
53}
54
55pub const QOT_PUSH_TYPE_PRICE: i32 = 1;
56pub const QOT_PUSH_TYPE_TICKER: i32 = 2;
57pub const QOT_PUSH_TYPE_ORDER_BOOK: i32 = 3;
58pub const QOT_PUSH_TYPE_BROKER: i32 = 4;
59
60const QOT_PUSH_STAGE_SR2SS: i32 = 1;
61const QOT_PUSH_STAGE_SS2CR: i32 = 2;
62const QOT_PUSH_STAGE_CR2CS: i32 = 3;
63const QOT_PUSH_STAGE_SS2CS: i32 = 4;
64const QOT_PUSH_STAGE_SR2CS: i32 = 5;
65
66#[derive(Default)]
67struct ReqReplyCounter {
68    count: u32,
69    total_cost_avg_ms: f64,
70    open_d_cost_avg_ms: f64,
71    net_delay_avg_ms: f64,
72    is_local_reply: bool,
73}
74
75impl ReqReplyCounter {
76    fn add(&mut self, total: Duration, open_d: Duration, net_delay: Duration, local_reply: bool) {
77        let count = self.count as f64;
78        self.total_cost_avg_ms =
79            ((self.total_cost_avg_ms * count) + duration_ms(total)) / (count + 1.0);
80        self.open_d_cost_avg_ms =
81            ((self.open_d_cost_avg_ms * count) + duration_ms(open_d)) / (count + 1.0);
82        self.net_delay_avg_ms =
83            ((self.net_delay_avg_ms * count) + duration_ms(net_delay)) / (count + 1.0);
84        self.count += 1;
85        if local_reply {
86            self.is_local_reply = true;
87        }
88    }
89
90    fn snapshot(&self, proto_id: u32) -> ReqReplyStatisticsSnapshot {
91        ReqReplyStatisticsSnapshot {
92            proto_id,
93            count: self.count,
94            total_cost_avg_ms: self.total_cost_avg_ms as f32,
95            open_d_cost_avg_ms: self.open_d_cost_avg_ms as f32,
96            net_delay_avg_ms: self.net_delay_avg_ms as f32,
97            is_local_reply: self.is_local_reply,
98        }
99    }
100}
101
102#[derive(Default)]
103struct DelayStatisticsStore {
104    req_reply_counts: Mutex<HashMap<u32, ReqReplyCounter>>,
105    qot_push_counts: Mutex<HashMap<i32, QotPushCounter>>,
106    place_order_req_details: Mutex<Vec<PlaceOrderReqDetail>>,
107    place_order_push_once: Mutex<Vec<PlaceOrderPushOnce>>,
108    time_adjustment: Mutex<TimeAdjustment>,
109}
110
111impl DelayStatisticsStore {
112    fn set_time_adjustment(&self, s2c_time_diff_us: i64, net_delay_us: i64) {
113        *self.time_adjustment.lock() = TimeAdjustment {
114            s2c_time_diff_us,
115            net_delay_us,
116        };
117    }
118
119    fn time_adjustment(&self) -> TimeAdjustment {
120        *self.time_adjustment.lock()
121    }
122
123    fn record_req_reply(&self, ctx: &ApiRequestDelayContext) {
124        let total_cost = ctx.begin.elapsed();
125        let api_end_at = SystemTime::now();
126        let backend_spans = ctx.backend_spans.lock();
127        let is_local_reply = backend_spans.is_empty();
128        let backend_cost = backend_union_duration(&backend_spans);
129        let open_d_cost = total_cost.saturating_sub(backend_cost);
130
131        self.req_reply_counts
132            .lock()
133            .entry(ctx.proto_id)
134            .or_default()
135            .add(total_cost, open_d_cost, Duration::ZERO, is_local_reply);
136
137        self.record_place_order_req_detail(ctx, &backend_spans, api_end_at);
138    }
139
140    fn snapshot_req_reply(&self) -> Vec<ReqReplyStatisticsSnapshot> {
141        let mut items: Vec<_> = self
142            .req_reply_counts
143            .lock()
144            .iter()
145            .map(|(&proto_id, counter)| counter.snapshot(proto_id))
146            .collect();
147        items.sort_by_key(|item| item.proto_id);
148        items
149    }
150
151    fn record_qot_push_count(
152        &self,
153        qot_push_type: i32,
154        server_recv_from_exchange_time_ms: Option<i64>,
155        server_send_to_client_time_ms: Option<i64>,
156        f3c_recv_at: SystemTime,
157        api_response_at: SystemTime,
158    ) {
159        let Some(f3c_recv_us) = system_time_us(f3c_recv_at) else {
160            return;
161        };
162        let Some(api_response_us) = system_time_us(api_response_at) else {
163            return;
164        };
165
166        let TimeAdjustment {
167            s2c_time_diff_us,
168            net_delay_us,
169        } = *self.time_adjustment.lock();
170
171        let mut server_recv_us = server_recv_from_exchange_time_ms.unwrap_or(0) * 1000;
172        let mut server_send_us = server_send_to_client_time_ms.unwrap_or(0) * 1000;
173        if server_recv_us != 0 {
174            server_recv_us -= s2c_time_diff_us;
175        }
176        if server_send_us != 0 {
177            server_send_us -= s2c_time_diff_us;
178        }
179
180        // Ref: FutuOpenD/Src/NNDataCenter/Other/NNData_ProtoDelay.cpp:179-202.
181        // C++ corrects impossible clock skew (server send later than client
182        // receive) by preserving OpenD processing cost and shifting CR/CS by
183        // the measured one-way network delay.
184        let mut f3c_recv_us = f3c_recv_us;
185        let mut api_response_us = api_response_us;
186        if server_send_us > f3c_recv_us {
187            let open_d_cost_us = api_response_us.saturating_sub(f3c_recv_us);
188            f3c_recv_us = server_send_us + net_delay_us;
189            api_response_us = f3c_recv_us + open_d_cost_us;
190        }
191
192        let mut counts = self.qot_push_counts.lock();
193        let counter = counts.entry(qot_push_type).or_default();
194        counter.add(QOT_PUSH_STAGE_SR2SS, server_recv_us, server_send_us);
195        counter.add(QOT_PUSH_STAGE_SS2CR, server_send_us, f3c_recv_us);
196        counter.add(QOT_PUSH_STAGE_CR2CS, f3c_recv_us, api_response_us);
197        counter.add(QOT_PUSH_STAGE_SS2CS, server_send_us, api_response_us);
198        counter.add(QOT_PUSH_STAGE_SR2CS, server_recv_us, api_response_us);
199        counter.total_count = counter.total_count.saturating_add(1);
200    }
201
202    fn snapshot_qot_push(
203        &self,
204        qot_push_stage: i32,
205        segment_list: &[i32],
206    ) -> Vec<QotPushStatisticsSnapshot> {
207        if qot_push_stage == 0 || segment_list.len() < 2 {
208            return Vec::new();
209        }
210
211        let counts = self.qot_push_counts.lock();
212        let mut items = Vec::new();
213        for qot_push_type in [
214            QOT_PUSH_TYPE_PRICE,
215            QOT_PUSH_TYPE_TICKER,
216            QOT_PUSH_TYPE_ORDER_BOOK,
217            QOT_PUSH_TYPE_BROKER,
218        ] {
219            let Some(counter) = counts.get(&qot_push_type) else {
220                continue;
221            };
222            let Some(stage) = counter.stages.get(&qot_push_stage) else {
223                continue;
224            };
225            if stage.total == 0 {
226                continue;
227            }
228
229            let mut cumulative_count = 0i32;
230            let total = stage.total as f32;
231            let item_list = segment_list
232                .windows(2)
233                .map(|window| {
234                    let begin = window[0];
235                    let end = window[1];
236                    let count = stage.range_count(begin, end) as i32;
237                    cumulative_count += count;
238                    DelayStatisticsItemSnapshot {
239                        begin,
240                        end,
241                        count,
242                        proportion: count as f32 / total * 100.0,
243                        cumulative_ratio: cumulative_count as f32 / total * 100.0,
244                    }
245                })
246                .collect();
247
248            items.push(QotPushStatisticsSnapshot {
249                qot_push_type,
250                item_list,
251                delay_avg_ms: stage.cost_avg_ms,
252                count: stage.total as i32,
253            });
254        }
255        items
256    }
257
258    fn record_place_order_req_detail(
259        &self,
260        ctx: &ApiRequestDelayContext,
261        backend_spans: &[BackendSpan],
262        api_end_at: SystemTime,
263    ) {
264        if ctx.proto_id != futu_core::proto_id::TRD_PLACE_ORDER {
265            return;
266        }
267        let Some(user_data) = ctx.place_order_user_data.lock().clone() else {
268            return;
269        };
270        let Some(api_begin_us) = system_time_us(ctx.begin_at) else {
271            return;
272        };
273        let Some(api_end_us) = system_time_us(api_end_at) else {
274            return;
275        };
276        if api_end_us < api_begin_us {
277            return;
278        }
279        let first_span = backend_spans.first();
280        let first_backend_begin_us = first_span.and_then(|span| system_time_us(span.begin_at));
281        let first_backend_end_us = first_span.and_then(|span| system_time_us(span.end_at));
282
283        self.place_order_req_details
284            .lock()
285            .push(PlaceOrderReqDetail {
286                order_id: user_data.order_id,
287                trd_env: user_data.trd_env,
288                _market: user_data.market,
289                api_begin_us,
290                api_end_us,
291                first_backend_begin_us,
292                first_backend_end_us,
293                net_delay_us: ctx.time_adjustment.net_delay_us,
294            });
295    }
296
297    fn set_place_order_user_data(&self, user_data: PlaceOrderUserData) {
298        let _ = CURRENT_API_REQUEST.try_with(|ctx| {
299            *ctx.place_order_user_data.lock() = Some(user_data);
300        });
301    }
302
303    fn record_place_order_update_push(
304        &self,
305        order_id: String,
306        trd_env: i32,
307        market: i32,
308        order_status: i32,
309        api_response_at: SystemTime,
310    ) {
311        let Some(api_response_us) = system_time_us(api_response_at) else {
312            return;
313        };
314        self.place_order_push_once.lock().push(PlaceOrderPushOnce {
315            order_id,
316            _trd_env: trd_env,
317            _market: market,
318            order_status,
319            api_response_us,
320        });
321    }
322
323    fn snapshot_place_order(&self) -> Vec<PlaceOrderStatisticsSnapshot> {
324        let mut earliest_push_by_order: HashMap<String, i64> = HashMap::new();
325        for push in self.place_order_push_once.lock().iter() {
326            if !place_order_update_status_counts(push.order_status) {
327                continue;
328            }
329            earliest_push_by_order
330                .entry(push.order_id.clone())
331                .and_modify(|existing| *existing = (*existing).min(push.api_response_us))
332                .or_insert(push.api_response_us);
333        }
334
335        self.place_order_req_details
336            .lock()
337            .iter()
338            .filter(|detail| detail.trd_env != 0)
339            .filter_map(|detail| {
340                let total_cost_us = detail.api_end_us.checked_sub(detail.api_begin_us)?;
341                let open_d_cost_us =
342                    match (detail.first_backend_begin_us, detail.first_backend_end_us) {
343                        (Some(begin), Some(end)) => begin
344                            .saturating_sub(detail.api_begin_us)
345                            .saturating_add(detail.api_end_us.saturating_sub(end)),
346                        _ => 0,
347                    };
348                let update_cost_us = earliest_push_by_order
349                    .get(&detail.order_id)
350                    .map(|push_us| push_us.saturating_sub(detail.api_end_us))
351                    .unwrap_or(0);
352
353                Some(PlaceOrderStatisticsSnapshot {
354                    order_id: detail.order_id.clone(),
355                    total_cost_ms: us_to_ms(total_cost_us),
356                    open_d_cost_ms: us_to_ms(open_d_cost_us),
357                    net_delay_ms: us_to_ms(detail.net_delay_us.max(0)),
358                    update_cost_ms: us_to_ms(update_cost_us),
359                })
360            })
361            .collect()
362    }
363}
364
365#[derive(Debug, Clone, Copy, Default)]
366struct TimeAdjustment {
367    s2c_time_diff_us: i64,
368    net_delay_us: i64,
369}
370
371#[derive(Debug, Clone)]
372struct PlaceOrderUserData {
373    order_id: String,
374    trd_env: i32,
375    market: i32,
376}
377
378#[derive(Debug, Clone)]
379struct PlaceOrderReqDetail {
380    order_id: String,
381    trd_env: i32,
382    _market: i32,
383    api_begin_us: i64,
384    api_end_us: i64,
385    first_backend_begin_us: Option<i64>,
386    first_backend_end_us: Option<i64>,
387    net_delay_us: i64,
388}
389
390#[derive(Debug, Clone)]
391struct PlaceOrderPushOnce {
392    order_id: String,
393    _trd_env: i32,
394    _market: i32,
395    order_status: i32,
396    api_response_us: i64,
397}
398
399#[derive(Default)]
400struct QotPushCounter {
401    total_count: u32,
402    stages: HashMap<i32, PushStageCounter>,
403}
404
405impl QotPushCounter {
406    fn add(&mut self, stage: i32, begin_us: i64, end_us: i64) {
407        if begin_us == 0 || end_us == 0 {
408            return;
409        }
410
411        let cost_ms = ((end_us - begin_us) as f64 / 1000.0 + 0.5).max(0.0).floor() as i32;
412        self.stages.entry(stage).or_default().add(cost_ms);
413    }
414}
415
416#[derive(Default)]
417struct PushStageCounter {
418    cost_avg_ms: f32,
419    total: u32,
420    buckets: Vec<CountItem>,
421}
422
423impl PushStageCounter {
424    fn add(&mut self, cost_ms: i32) {
425        if self.buckets.is_empty() {
426            self.buckets = default_count_buckets();
427        }
428
429        for bucket in &mut self.buckets {
430            if cost_ms >= bucket.begin && (cost_ms < bucket.end || bucket.end == -1) {
431                bucket.add(cost_ms);
432                break;
433            }
434        }
435
436        let total = self.total as f32;
437        self.cost_avg_ms = (self.cost_avg_ms * total + cost_ms as f32) / (total + 1.0);
438        self.total = self.total.saturating_add(1);
439    }
440
441    fn range_count(&self, begin: i32, end: i32) -> u32 {
442        self.buckets
443            .iter()
444            .map(|bucket| bucket.range_count(begin, end))
445            .sum()
446    }
447}
448
449#[derive(Debug, Clone)]
450struct CountItem {
451    arr_count: usize,
452    begin: i32,
453    end: i32,
454    gap: i32,
455    counts: Vec<u32>,
456}
457
458impl CountItem {
459    fn new(arr_count: usize, begin: i32, end: i32) -> Self {
460        let range = end - begin;
461        let gap = if range > 0 && arr_count > 0 {
462            range / arr_count as i32
463        } else {
464            0
465        };
466        Self {
467            arr_count,
468            begin,
469            end,
470            gap,
471            counts: vec![0; arr_count],
472        }
473    }
474
475    fn add(&mut self, cost_ms: i32) {
476        let idx = if self.gap > 0 {
477            ((cost_ms - self.begin) / self.gap) as usize
478        } else {
479            0
480        };
481        if let Some(count) = self.counts.get_mut(idx) {
482            *count = count.saturating_add(1);
483        }
484    }
485
486    fn range_count(&self, begin: i32, end: i32) -> u32 {
487        if self.counts.is_empty()
488            || (end != -1 && end <= self.begin)
489            || (self.end != -1 && begin >= self.end)
490        {
491            return 0;
492        }
493
494        if self.end == -1 {
495            return if end == -1 || end > self.begin {
496                self.counts[0]
497            } else {
498                0
499            };
500        }
501
502        let overlap_begin = begin.max(self.begin);
503        let overlap_end = if end == -1 {
504            self.end
505        } else {
506            end.min(self.end)
507        };
508        if overlap_begin >= overlap_end {
509            return 0;
510        }
511
512        let start_idx = if self.gap > 0 {
513            ((overlap_begin - self.begin) / self.gap) as usize
514        } else {
515            0
516        };
517        if start_idx >= self.arr_count {
518            return 0;
519        }
520        let end_idx = if self.gap > 0 {
521            (((overlap_end - self.begin) / self.gap) as usize).saturating_sub(1)
522        } else {
523            0
524        }
525        .min(self.arr_count.saturating_sub(1));
526        self.counts[start_idx..=end_idx].iter().sum()
527    }
528}
529
530fn default_count_buckets() -> Vec<CountItem> {
531    vec![
532        CountItem::new(100, 0, 100),
533        CountItem::new(18, 100, 1000),
534        CountItem::new(8, 1000, 5000),
535        CountItem::new(1, 5000, -1),
536    ]
537}
538
539#[derive(Debug)]
540struct ApiRequestDelayContext {
541    proto_id: u32,
542    begin: Instant,
543    begin_at: SystemTime,
544    backend_spans: Mutex<Vec<BackendSpan>>,
545    place_order_user_data: Mutex<Option<PlaceOrderUserData>>,
546    time_adjustment: TimeAdjustment,
547}
548
549impl ApiRequestDelayContext {
550    fn new(proto_id: u32) -> Self {
551        Self {
552            proto_id,
553            begin: Instant::now(),
554            begin_at: SystemTime::now(),
555            backend_spans: Mutex::new(Vec::new()),
556            place_order_user_data: Mutex::new(None),
557            time_adjustment: DELAY_STATS.time_adjustment(),
558        }
559    }
560
561    fn record_backend_span(
562        &self,
563        begin: Instant,
564        end: Instant,
565        begin_at: SystemTime,
566        end_at: SystemTime,
567    ) {
568        if end >= begin {
569            self.backend_spans.lock().push(BackendSpan {
570                begin,
571                end,
572                begin_at,
573                end_at,
574            });
575        }
576    }
577}
578
579#[derive(Debug, Clone, Copy)]
580struct BackendSpan {
581    begin: Instant,
582    end: Instant,
583    begin_at: SystemTime,
584    end_at: SystemTime,
585}
586
587static DELAY_STATS: LazyLock<DelayStatisticsStore> = LazyLock::new(DelayStatisticsStore::default);
588
589tokio::task_local! {
590    static CURRENT_API_REQUEST: Arc<ApiRequestDelayContext>;
591}
592
593/// Run a public API handler inside the current request/reply delay context.
594///
595/// C++ starts recording at `APIServerCS_Conn.cpp:279`
596/// (`ReqReply_APIReqBegin`) and records successful responses in
597/// `APIServer_Inner_API.h` via `ReqReply_APIReqEnd(..., bSuc)`.
598pub async fn with_api_request<F, Fut>(
599    _conn_id: u64,
600    _serial_no: u32,
601    proto_id: u32,
602    future: F,
603) -> Option<Vec<u8>>
604where
605    F: FnOnce() -> Fut,
606    Fut: Future<Output = Option<Vec<u8>>>,
607{
608    let ctx = Arc::new(ApiRequestDelayContext::new(proto_id));
609    let response = CURRENT_API_REQUEST.scope(ctx.clone(), future()).await;
610
611    if response
612        .as_deref()
613        .is_some_and(response_ret_type_is_success)
614    {
615        DELAY_STATS.record_req_reply(&ctx);
616    }
617
618    response
619}
620
621/// Record a backend request span for the API request currently executing on
622/// this Tokio task.  Calls outside public API dispatch are intentionally no-op,
623/// matching C++'s explicit `ReqReply_RelatedAPISvrReq` linking model.
624pub async fn trace_backend_request<Fut, T>(_cmd_id: u16, future: Fut) -> T
625where
626    Fut: Future<Output = T>,
627{
628    let begin = Instant::now();
629    let begin_at = SystemTime::now();
630    let output = future.await;
631    let end_at = SystemTime::now();
632    let end = Instant::now();
633
634    let _ =
635        CURRENT_API_REQUEST.try_with(|ctx| ctx.record_backend_span(begin, end, begin_at, end_at));
636    output
637}
638
639pub fn snapshot_req_reply_statistics() -> Vec<ReqReplyStatisticsSnapshot> {
640    DELAY_STATS.snapshot_req_reply()
641}
642
643pub fn record_qot_push_count(
644    qot_push_type: i32,
645    server_recv_from_exchange_time_ms: Option<i64>,
646    server_send_to_client_time_ms: Option<i64>,
647    f3c_recv_at: SystemTime,
648    api_response_at: SystemTime,
649) {
650    DELAY_STATS.record_qot_push_count(
651        qot_push_type,
652        server_recv_from_exchange_time_ms,
653        server_send_to_client_time_ms,
654        f3c_recv_at,
655        api_response_at,
656    );
657}
658
659/// Attach successful `Trd_PlaceOrder` user data to the current API request.
660/// Calls outside `with_api_request` are intentionally no-op.
661///
662/// Ref: `APIServer_Trd_PlaceOrder.cpp:864-873` serializes `trdEnv`,
663/// `trdMarket`, and backend `order.szOrderID` into `ReqReply_SetUserData`.
664pub fn record_place_order_request(order_id: &str, trd_env: i32, market: i32) {
665    let order_id = order_id.trim();
666    if order_id.is_empty() {
667        return;
668    }
669    DELAY_STATS.set_place_order_user_data(PlaceOrderUserData {
670        order_id: order_id.to_string(),
671        trd_env,
672        market,
673    });
674}
675
676/// Record one `Trd_UpdateOrder` push candidate for PlaceOrder update-cost
677/// statistics.
678///
679/// Ref: `APIServer_Trd_UpdateOrder.cpp:36-38` records `Push_Once_Add` after
680/// sending the FTAPI push.  `api_response_at` should therefore be captured
681/// after the local push dispatch attempt.
682pub fn record_place_order_update_push(
683    order_id: &str,
684    trd_env: i32,
685    market: i32,
686    order_status: i32,
687    api_response_at: SystemTime,
688) {
689    let order_id = order_id.trim();
690    if order_id.is_empty() {
691        return;
692    }
693    DELAY_STATS.record_place_order_update_push(
694        order_id.to_string(),
695        trd_env,
696        market,
697        order_status,
698        api_response_at,
699    );
700}
701
702/// Update the server-to-client clock correction used by QOT push delay
703/// statistics.
704///
705/// Ref: `FutuOpenD/Src/NNProtoCenter/Other/NNBiz_SvrTime.cpp:41-78` calls
706/// `INNData_ProtoDelay::SetS2CTimeDiffAndNetDelay(nDiffTime_ms * 1000,
707/// nNetDelay_ms * 1000)`. Rust currently has the login-derived
708/// `AuthResult.svr_time_offset` data source but not the C++ StandardPing
709/// one-way network-delay source, so callers pass `net_delay_us = 0` until that
710/// ping path is implemented.
711pub fn set_time_adjustment(s2c_time_diff_us: i64, net_delay_us: i64) {
712    DELAY_STATS.set_time_adjustment(s2c_time_diff_us, net_delay_us);
713}
714
715pub fn snapshot_qot_push_statistics(
716    qot_push_stage: i32,
717    segment_list: &[i32],
718) -> Vec<QotPushStatisticsSnapshot> {
719    DELAY_STATS.snapshot_qot_push(qot_push_stage, segment_list)
720}
721
722pub fn snapshot_place_order_statistics() -> Vec<PlaceOrderStatisticsSnapshot> {
723    DELAY_STATS.snapshot_place_order()
724}
725
726fn backend_union_duration(spans: &[BackendSpan]) -> Duration {
727    if spans.is_empty() {
728        return Duration::ZERO;
729    }
730
731    let mut spans = spans.to_vec();
732    spans.sort_by_key(|span| span.begin);
733
734    let mut total = Duration::ZERO;
735    let mut cur_begin = spans[0].begin;
736    let mut cur_end = spans[0].end;
737
738    for span in spans.into_iter().skip(1) {
739        if span.begin <= cur_end {
740            if span.end > cur_end {
741                cur_end = span.end;
742            }
743        } else {
744            total += cur_end.duration_since(cur_begin);
745            cur_begin = span.begin;
746            cur_end = span.end;
747        }
748    }
749
750    total + cur_end.duration_since(cur_begin)
751}
752
753fn duration_ms(duration: Duration) -> f64 {
754    duration.as_secs_f64() * 1000.0
755}
756
757fn us_to_ms(us: i64) -> f32 {
758    us as f32 / 1000.0
759}
760
761fn system_time_us(time: SystemTime) -> Option<i64> {
762    let duration = time.duration_since(UNIX_EPOCH).ok()?;
763    i64::try_from(duration.as_micros()).ok()
764}
765
766fn place_order_update_status_counts(order_status: i32) -> bool {
767    // Ref: APIServer_GetDelayStatistics.cpp:323-324.
768    // NN_OrderStatus_Submitted / Filled_All / Cancelled_Part / Filled_Part
769    // map to FTAPI values 5 / 11 / 14 / 10.
770    matches!(order_status, 5 | 11 | 14 | 10)
771}
772
773#[cfg(test)]
774mod tests;