Skip to main content

futu_server/
metrics.rs

1// 网关运行时监控指标
2//
3// 使用原子计数器,零开销采集,无需外部依赖。
4// 通过 telnet `show_metrics` 命令查看。
5// v1.4.90 P1-B: 也通过 [`GatewayMetrics::render_prometheus`] 暴露到
6// `/metrics` HTTP 端点 (经 [`futu_auth::metrics::Registry`] extension renderer
7// 注册). 之前 v1.4.83/84 声称这些 counter 在 Prometheus 但仅在 telnet 输出.
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::Instant;
12
13use chrono::{Timelike, Utc};
14use parking_lot::RwLock;
15
16/// v1.4.84 §14: per-cmd_id per-UTC-hour breakdown for monitoring.
17///
18/// 让 tester CI 长窗口 job 能做时段异常检测 (CMD14716 UTC 15-18 window).
19/// 单独结构, 各 cmd counter 独立持有一组 24 bucket.
20#[derive(Debug)]
21pub struct HourBreakdown {
22    counters: [AtomicU64; 24],
23}
24
25impl HourBreakdown {
26    pub const fn new() -> Self {
27        Self {
28            counters: [const { AtomicU64::new(0) }; 24],
29        }
30    }
31
32    /// Bump the counter for current UTC hour (0..23).
33    pub fn bump_now(&self) {
34        let hour = Utc::now().hour() as usize;
35        if hour < 24 {
36            self.counters[hour].fetch_add(1, Ordering::Relaxed);
37        }
38    }
39
40    /// Read counter for specific hour (0..23). Out-of-range returns 0.
41    pub fn get(&self, hour: usize) -> u64 {
42        self.counters
43            .get(hour)
44            .map(|a| a.load(Ordering::Relaxed))
45            .unwrap_or(0)
46    }
47
48    /// Return snapshot of all 24 hours as array.
49    pub fn snapshot(&self) -> [u64; 24] {
50        let mut out = [0u64; 24];
51        for (i, c) in self.counters.iter().enumerate() {
52            out[i] = c.load(Ordering::Relaxed);
53        }
54        out
55    }
56}
57
58impl Default for HourBreakdown {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64/// 网关运行时监控指标
65pub struct GatewayMetrics {
66    /// 网关启动时间
67    pub start_time: Instant,
68
69    // ===== 连接指标 =====
70    /// 累计接受的客户端连接数
71    pub total_connections: AtomicU64,
72    /// 累计客户端断开数
73    pub total_disconnections: AtomicU64,
74    /// 被拒绝的连接数(超过上限)
75    pub rejected_connections: AtomicU64,
76
77    // ===== 请求指标 =====
78    /// 累计处理的请求数
79    pub total_requests: AtomicU64,
80    /// 累计请求错误数(handler 返回 None 或解密失败)
81    pub total_request_errors: AtomicU64,
82    /// 累计响应字节数
83    pub total_response_bytes: AtomicU64,
84
85    // ===== 后端指标 =====
86    /// 后端重连次数
87    pub backend_reconnects: AtomicU64,
88    /// 后端重连失败次数
89    pub backend_reconnect_failures: AtomicU64,
90    /// 最近一次重连时间 (Unix 毫秒, 0=未重连过)
91    pub last_reconnect_ms: AtomicU64,
92    /// 后端是否在线 (1=online, 0=offline)
93    pub backend_online: AtomicU64,
94
95    // ===== 推送指标 =====
96    /// 后端收到的推送数 (CMD 6212 / 4716 / 5300 等)
97    pub backend_pushes_received: AtomicU64,
98    /// 向客户端发送的推送数
99    pub client_pushes_sent: AtomicU64,
100    /// 向客户端推送时 client channel 已关闭导致发送失败的次数
101    pub client_push_send_failures: AtomicU64,
102    /// 行情推送 fanout 时 client channel 已满、仅丢该慢客户端本帧的次数。
103    ///
104    /// 注意: `qot_push_dropped_total` 统计 backend → dispatcher 队列 drop;
105    /// 本 counter 统计 dispatcher → client channel backpressure,语义不同。
106    pub qot_client_push_backpressure_drops: AtomicU64,
107    /// 按 SubType 拆分的 dispatcher → client channel backpressure drop 计数。
108    /// 桶 0 = 未知 / 首推未携带 SubType;桶 1..=17 = 对应 Qot_Common.SubType。
109    pub qot_client_push_backpressure_drops_by_sub_type: [AtomicU64; 18],
110    // v1.4.83 §14 Phase 4: per-cmd_id push 细分计数 (monitoring)
111    /// CMD 6212 quote push 计数
112    pub backend_pushes_cmd_quote: AtomicU64,
113    /// CMD 4716 trade notify (legacy channel) 计数
114    pub backend_pushes_cmd_trade_legacy: AtomicU64,
115    /// CMD 14716 trade notify (v1.4.41 new channel) 计数 — tester §14 追踪
116    pub backend_pushes_cmd_trade_new: AtomicU64,
117    /// CMD 5300 msg-center push 计数
118    pub backend_pushes_cmd_msg_center: AtomicU64,
119    /// 其他未路由 CMD push 计数
120    pub backend_pushes_cmd_other: AtomicU64,
121
122    // v1.4.84 §14: per-cmd_id × UTC-hour 分桶 (监控深化 — tester CI 长窗口
123    // job 用于 "CMD14716 UTC 15-18 异常时段" 检测). cmd_other 不分时段.
124    /// CMD 6212 quote push per-UTC-hour 计数
125    pub backend_pushes_cmd_quote_by_hour: HourBreakdown,
126    /// CMD 4716 trade notify (legacy) per-UTC-hour 计数
127    pub backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown,
128    /// CMD 14716 trade notify (new) per-UTC-hour 计数 — tester §14 主角
129    pub backend_pushes_cmd_trade_new_by_hour: HourBreakdown,
130    /// CMD 5300 msg-center push per-UTC-hour 计数
131    pub backend_pushes_cmd_msg_center_by_hour: HourBreakdown,
132
133    // ===== 订阅指标 =====
134    /// 行情订阅操作次数
135    pub qot_subscribe_ops: AtomicU64,
136    /// 行情退订操作次数
137    pub qot_unsubscribe_ops: AtomicU64,
138
139    // ===== v1.4.110 codex audit Round2 P3 #19: cold-cache wait 监控 =====
140    //
141    // GetBasicQot / GetOrderBook cache miss + 已订阅 → cold-cache wait 路径
142    // (Pull_SubData 主动拉 + 最多 3s 等 push). ops 用 hit/total 比看 backend
143    // push 延迟健康度, timeout/total 比看 cold-cache wait 是否常超时.
144    /// cold-cache wait 进入次数 (cache miss + IsSub, 触发 wait)
145    pub cold_cache_wait_total: AtomicU64,
146    /// cold-cache wait 命中次数 (3s 内 push 写 cache → re-read 拿到值)
147    pub cold_cache_wait_hit: AtomicU64,
148    /// cold-cache wait 超时次数 (3s timeout 仍 cache miss)
149    pub cold_cache_wait_timeout: AtomicU64,
150    /// 重连后重新订阅次数 (legacy, == applied_keys 累加; v1.4.106 codex 0631
151    /// F5 起仍 bump 向后兼容旧 dashboard, 等价新 `resubscribe_applied_keys`).
152    pub resubscribe_ops: AtomicU64,
153
154    // ===== v1.4.106 codex 0631 F5 [P3]: dual resubscribe counter =====
155    //
156    // 老 `resubscribe_ops_total` 把"触发数"和"真生效 keys 数"混一桶, 看不出
157    // partial / cache miss 的 silent loss. 拆 dual:
158    //   - resubscribe_attempts_total: 触发次数 (本 reconnect / staleness 触
159    //     发了 N 次 resubscribe, 不论结果).
160    //   - resubscribe_applied_keys_total: 真生效 keys 数 (cache resolve OK +
161    //     backend ack OK 的 (sec_key, sub_type) 对). 部分失败时 < attempts.
162    //
163    // ratio applied/attempts < 1.0 显著 → ghost subs / cache miss / backend
164    // partial reject 信号. legacy resubscribe_ops_total 仍 bump (向后兼容).
165    /// **v1.4.106 codex 0631 F5 [P3]**: resubscribe 触发次数 (每次 reconnect /
166    /// staleness loop 触发 += 1). 与 applied_keys 对比 ratio 看 partial 程度.
167    pub resubscribe_attempts: AtomicU64,
168    /// **v1.4.106 codex 0631 F5 [P3]**: resubscribe 真生效 keys 数 (累积).
169    /// applied < attempts × global_keys → 部分 partial.
170    pub resubscribe_applied_keys: AtomicU64,
171
172    // ===== v1.4.106 codex 1140 F8: 行情 push 投递失败计数 =====
173    //
174    // 之前 `bridge::push_parser` 用 `let _ = push_tx.try_send(event)` 静默吞错,
175    // cache 已更新但 subscriber 收不到 push (audit Finding 8). 加 metric +
176    // warn log 让 channel full / closed 立即可观测.
177    /// CMD 6212 行情 push (BasicQot/OrderBook/Ticker/RT/KL/Broker/...) 因
178    /// `push_tx` 队列满或关闭被 drop 的总次数 (累积).
179    pub qot_push_dropped_total: AtomicU64,
180    /// 按 SubType 拆分的 drop 计数 (proto Qot_Common.SubType: 0..=17, 共 18 桶).
181    /// 桶 0 = "未知 / 不属于任何已知 SubType" (兜底, e.g. 拼错的 sub_type).
182    /// 桶 1..=17 = 对应 SubType. v1.4.106 codex 1140 F8 加.
183    pub qot_push_dropped_by_sub_type: [AtomicU64; 18],
184
185    // ===== KeepAlive 指标 =====
186    /// KeepAlive 超时断开数
187    pub keepalive_timeouts: AtomicU64,
188
189    // ===== 延迟采样 =====
190    /// 最近 N 个请求延迟的环形缓冲 (纳秒)
191    latency_ring: RwLock<LatencyRing>,
192}
193
194/// 延迟环形缓冲 — 保留最近 1000 个采样
195struct LatencyRing {
196    buf: Vec<u64>,
197    pos: usize,
198    count: u64,
199    total_ns: u64,
200}
201
202const LATENCY_RING_SIZE: usize = 1000;
203
204impl LatencyRing {
205    fn new() -> Self {
206        Self {
207            buf: vec![0u64; LATENCY_RING_SIZE],
208            pos: 0,
209            count: 0,
210            total_ns: 0,
211        }
212    }
213
214    fn push(&mut self, ns: u64) {
215        // 减去被覆盖的旧值
216        if self.count >= LATENCY_RING_SIZE as u64 {
217            self.total_ns = self.total_ns.saturating_sub(self.buf[self.pos]);
218        }
219        self.buf[self.pos] = ns;
220        self.total_ns += ns;
221        self.pos = (self.pos + 1) % LATENCY_RING_SIZE;
222        self.count += 1;
223    }
224
225    fn stats(&self) -> LatencyStats {
226        let n = self.count.min(LATENCY_RING_SIZE as u64) as usize;
227        if n == 0 {
228            return LatencyStats::default();
229        }
230
231        let mut samples: Vec<u64> = if self.count >= LATENCY_RING_SIZE as u64 {
232            self.buf.clone()
233        } else {
234            self.buf[..n].to_vec()
235        };
236        samples.sort_unstable();
237
238        LatencyStats {
239            count: self.count,
240            avg_us: (self.total_ns / n as u64) / 1000,
241            p50_us: samples[n / 2] / 1000,
242            p95_us: samples[(n as f64 * 0.95) as usize] / 1000,
243            p99_us: samples[(n as f64 * 0.99).min((n - 1) as f64) as usize] / 1000,
244            max_us: samples[n - 1] / 1000,
245        }
246    }
247}
248
249/// 延迟统计摘要 (微秒)
250#[derive(Default)]
251pub struct LatencyStats {
252    /// 总采样数
253    pub count: u64,
254    /// 平均延迟 (微秒)
255    pub avg_us: u64,
256    /// P50 延迟
257    pub p50_us: u64,
258    /// P95 延迟
259    pub p95_us: u64,
260    /// P99 延迟
261    pub p99_us: u64,
262    /// 最大延迟
263    pub max_us: u64,
264}
265
266/// v1.4.84 §14: 把 24 小时 counters snapshot 格式化为空格分隔的单行.
267///
268/// 输出格式: `"h00=N h01=N ... h23=N"` — 便于人眼 scan 时段异常,
269/// 同时保持 parseable (awk / grep / Prometheus textfile).
270fn format_hour_row(hb: &HourBreakdown) -> String {
271    let snap = hb.snapshot();
272    let mut out = String::with_capacity(24 * 10);
273    for (i, v) in snap.iter().enumerate() {
274        if i > 0 {
275            out.push(' ');
276        }
277        out.push_str(&format!("h{:02}={}", i, v));
278    }
279    out
280}
281
282fn qot_sub_type_bucket(sub_type: i32) -> usize {
283    if (0..18).contains(&sub_type) {
284        sub_type as usize
285    } else {
286        0
287    }
288}
289
290impl GatewayMetrics {
291    pub fn new() -> Self {
292        Self {
293            start_time: Instant::now(),
294            total_connections: AtomicU64::new(0),
295            total_disconnections: AtomicU64::new(0),
296            rejected_connections: AtomicU64::new(0),
297            total_requests: AtomicU64::new(0),
298            total_request_errors: AtomicU64::new(0),
299            total_response_bytes: AtomicU64::new(0),
300            backend_reconnects: AtomicU64::new(0),
301            backend_reconnect_failures: AtomicU64::new(0),
302            last_reconnect_ms: AtomicU64::new(0),
303            backend_online: AtomicU64::new(1),
304            backend_pushes_received: AtomicU64::new(0),
305            client_pushes_sent: AtomicU64::new(0),
306            client_push_send_failures: AtomicU64::new(0),
307            qot_client_push_backpressure_drops: AtomicU64::new(0),
308            qot_client_push_backpressure_drops_by_sub_type: [const { AtomicU64::new(0) }; 18],
309            backend_pushes_cmd_quote: AtomicU64::new(0),
310            backend_pushes_cmd_trade_legacy: AtomicU64::new(0),
311            backend_pushes_cmd_trade_new: AtomicU64::new(0),
312            backend_pushes_cmd_msg_center: AtomicU64::new(0),
313            backend_pushes_cmd_other: AtomicU64::new(0),
314            backend_pushes_cmd_quote_by_hour: HourBreakdown::new(),
315            backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown::new(),
316            backend_pushes_cmd_trade_new_by_hour: HourBreakdown::new(),
317            backend_pushes_cmd_msg_center_by_hour: HourBreakdown::new(),
318            qot_subscribe_ops: AtomicU64::new(0),
319            qot_unsubscribe_ops: AtomicU64::new(0),
320            cold_cache_wait_total: AtomicU64::new(0),
321            cold_cache_wait_hit: AtomicU64::new(0),
322            cold_cache_wait_timeout: AtomicU64::new(0),
323            resubscribe_ops: AtomicU64::new(0),
324            resubscribe_attempts: AtomicU64::new(0),
325            resubscribe_applied_keys: AtomicU64::new(0),
326            // v1.4.106 codex 1140 F8: qot push drop counter init.
327            qot_push_dropped_total: AtomicU64::new(0),
328            qot_push_dropped_by_sub_type: [const { AtomicU64::new(0) }; 18],
329            keepalive_timeouts: AtomicU64::new(0),
330            latency_ring: RwLock::new(LatencyRing::new()),
331        }
332    }
333
334    /// 记录一次请求延迟 (纳秒)
335    pub fn record_latency_ns(&self, ns: u64) {
336        self.latency_ring.write().push(ns);
337    }
338
339    /// v1.4.106 codex 1140 F8: 记录一次 qot push 被 drop (channel full / closed).
340    ///
341    /// `sub_type` 范围 0..=17 (proto Qot_Common.SubType). 越界值归桶 0
342    /// (未知). 同时 bump 总计数 + per-sub-type 桶, 保证 dashboard 可分维度.
343    pub fn record_qot_push_dropped(&self, sub_type: i32) {
344        self.qot_push_dropped_total.fetch_add(1, Ordering::Relaxed);
345        let bucket = qot_sub_type_bucket(sub_type);
346        self.qot_push_dropped_by_sub_type[bucket].fetch_add(1, Ordering::Relaxed);
347    }
348
349    /// 记录一次 QOT fanout 因单个客户端 channel 满而丢给该客户端的帧。
350    pub fn record_qot_client_push_backpressure_drop(&self, sub_type: i32) {
351        self.qot_client_push_backpressure_drops
352            .fetch_add(1, Ordering::Relaxed);
353        let bucket = qot_sub_type_bucket(sub_type);
354        self.qot_client_push_backpressure_drops_by_sub_type[bucket].fetch_add(1, Ordering::Relaxed);
355    }
356
357    /// v1.4.106 codex 1140 F8: 读取每个 sub_type 桶的 drop 计数 (snapshot).
358    /// 用于 metrics endpoint render.
359    pub fn qot_push_dropped_per_sub_type(&self) -> [u64; 18] {
360        let mut out = [0u64; 18];
361        for (i, slot) in self.qot_push_dropped_by_sub_type.iter().enumerate() {
362            out[i] = slot.load(Ordering::Relaxed);
363        }
364        out
365    }
366
367    pub fn qot_client_push_backpressure_drops_per_sub_type(&self) -> [u64; 18] {
368        let mut out = [0u64; 18];
369        for (i, slot) in self
370            .qot_client_push_backpressure_drops_by_sub_type
371            .iter()
372            .enumerate()
373        {
374            out[i] = slot.load(Ordering::Relaxed);
375        }
376        out
377    }
378
379    /// 获取延迟统计
380    pub fn latency_stats(&self) -> LatencyStats {
381        self.latency_ring.read().stats()
382    }
383
384    /// 格式化运行时间
385    pub fn uptime_str(&self) -> String {
386        let elapsed = self.start_time.elapsed();
387        let secs = elapsed.as_secs();
388        let days = secs / 86400;
389        let hours = (secs % 86400) / 3600;
390        let mins = (secs % 3600) / 60;
391        let s = secs % 60;
392        if days > 0 {
393            format!("{days}d {hours}h {mins}m {s}s")
394        } else if hours > 0 {
395            format!("{hours}h {mins}m {s}s")
396        } else {
397            format!("{mins}m {s}s")
398        }
399    }
400
401    /// 生成 telnet 可展示的指标报告
402    pub fn report(&self) -> String {
403        let lat = self.latency_stats();
404        let backend_status = if self.backend_online.load(Ordering::Relaxed) == 1 {
405            "ONLINE"
406        } else {
407            "OFFLINE"
408        };
409
410        let total_req = self.total_requests.load(Ordering::Relaxed);
411        let uptime_secs = self.start_time.elapsed().as_secs_f64();
412        let avg_rps = if uptime_secs > 0.0 {
413            total_req as f64 / uptime_secs
414        } else {
415            0.0
416        };
417
418        format!(
419            "=== Gateway Metrics ===\r\n\
420             Uptime: {uptime}\r\n\
421             \r\n\
422             [Connections]\r\n\
423             total_accepted: {total_conn}\r\n\
424             total_disconnected: {total_disconn}\r\n\
425             rejected (limit): {rejected}\r\n\
426             keepalive_timeouts: {ka_timeout}\r\n\
427             \r\n\
428             [Requests]\r\n\
429             total_requests: {total_req}\r\n\
430             total_errors: {total_err}\r\n\
431             avg_rps: {avg_rps:.1}\r\n\
432             response_bytes: {resp_bytes}\r\n\
433             \r\n\
434             [Latency (recent {lat_count} samples)]\r\n\
435             avg: {lat_avg}us  p50: {lat_p50}us  p95: {lat_p95}us  p99: {lat_p99}us  max: {lat_max}us\r\n\
436             \r\n\
437             [Backend]\r\n\
438             status: {backend_status}\r\n\
439             reconnects: {reconnects}\r\n\
440             reconnect_failures: {reconnect_fail}\r\n\
441             pushes_received: {push_recv}\r\n\
442             pushes_sent_to_clients: {push_sent}\r\n\
443             push_send_failures_to_clients: {push_send_failures}\r\n\
444             qot_client_push_backpressure_drops: {qot_client_backpressure_drops}\r\n\
445             \r\n\
446             [Pushes by CMD (v1.4.83 §14)]\r\n\
447             cmd_6212_quote: {push_cmd_quote}\r\n\
448             cmd_4716_trade_legacy: {push_cmd_trade_legacy}\r\n\
449             cmd_14716_trade_new: {push_cmd_trade_new}\r\n\
450             cmd_5300_msg_center: {push_cmd_msg_center}\r\n\
451             cmd_other: {push_cmd_other}\r\n\
452             \r\n\
453             [Pushes by CMD × UTC hour (v1.4.84 §14)]\r\n\
454             cmd_14716_trade_new_hour_0..23: {hour_trade_new}\r\n\
455             cmd_6212_quote_hour_0..23: {hour_quote}\r\n\
456             cmd_4716_trade_legacy_hour_0..23: {hour_trade_legacy}\r\n\
457             cmd_5300_msg_center_hour_0..23: {hour_msg_center}\r\n\
458             \r\n\
459             [Subscriptions]\r\n\
460             subscribe_ops: {sub_ops}\r\n\
461             unsubscribe_ops: {unsub_ops}\r\n\
462             resubscribe_ops: {resub_ops}\r\n\
463             \r\n\
464             [Cold-cache wait (v1.4.110 §P3 #19)]\r\n\
465             total: {cc_total}  hit: {cc_hit}  timeout: {cc_timeout}\r\n",
466            uptime = self.uptime_str(),
467            total_conn = self.total_connections.load(Ordering::Relaxed),
468            total_disconn = self.total_disconnections.load(Ordering::Relaxed),
469            rejected = self.rejected_connections.load(Ordering::Relaxed),
470            ka_timeout = self.keepalive_timeouts.load(Ordering::Relaxed),
471            total_req = total_req,
472            total_err = self.total_request_errors.load(Ordering::Relaxed),
473            resp_bytes = self.total_response_bytes.load(Ordering::Relaxed),
474            lat_count = lat.count.min(LATENCY_RING_SIZE as u64),
475            lat_avg = lat.avg_us,
476            lat_p50 = lat.p50_us,
477            lat_p95 = lat.p95_us,
478            lat_p99 = lat.p99_us,
479            lat_max = lat.max_us,
480            reconnects = self.backend_reconnects.load(Ordering::Relaxed),
481            reconnect_fail = self.backend_reconnect_failures.load(Ordering::Relaxed),
482            push_recv = self.backend_pushes_received.load(Ordering::Relaxed),
483            push_sent = self.client_pushes_sent.load(Ordering::Relaxed),
484            push_send_failures = self.client_push_send_failures.load(Ordering::Relaxed),
485            qot_client_backpressure_drops = self
486                .qot_client_push_backpressure_drops
487                .load(Ordering::Relaxed),
488            push_cmd_quote = self.backend_pushes_cmd_quote.load(Ordering::Relaxed),
489            push_cmd_trade_legacy = self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed),
490            push_cmd_trade_new = self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed),
491            push_cmd_msg_center = self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed),
492            push_cmd_other = self.backend_pushes_cmd_other.load(Ordering::Relaxed),
493            hour_trade_new = format_hour_row(&self.backend_pushes_cmd_trade_new_by_hour),
494            hour_quote = format_hour_row(&self.backend_pushes_cmd_quote_by_hour),
495            hour_trade_legacy = format_hour_row(&self.backend_pushes_cmd_trade_legacy_by_hour),
496            hour_msg_center = format_hour_row(&self.backend_pushes_cmd_msg_center_by_hour),
497            sub_ops = self.qot_subscribe_ops.load(Ordering::Relaxed),
498            unsub_ops = self.qot_unsubscribe_ops.load(Ordering::Relaxed),
499            resub_ops = self.resubscribe_ops.load(Ordering::Relaxed),
500            cc_total = self.cold_cache_wait_total.load(Ordering::Relaxed),
501            cc_hit = self.cold_cache_wait_hit.load(Ordering::Relaxed),
502            cc_timeout = self.cold_cache_wait_timeout.load(Ordering::Relaxed),
503        )
504    }
505}
506
507impl Default for GatewayMetrics {
508    fn default() -> Self {
509        Self::new()
510    }
511}
512
513/// v1.4.90 P1-B: 把 24 小时 hour breakdown 渲染成 Prometheus 行 (with `hour` label).
514///
515/// 输出形如 `<metric_name>{hour="00"} 7\n<metric_name>{hour="01"} 0\n...`
516/// 每个 metric 24 行 (UTC hour 0..23).
517fn render_hour_breakdown_prom(metric_name: &str, hb: &HourBreakdown) -> String {
518    let snap = hb.snapshot();
519    let mut out = String::with_capacity(24 * 60);
520    for (h, v) in snap.iter().enumerate() {
521        out.push_str(&format!("{}{{hour=\"{:02}\"}} {}\n", metric_name, h, v));
522    }
523    out
524}
525
526impl GatewayMetrics {
527    /// v1.4.90 P1-B: 输出 Prometheus text exposition 格式, 涵盖:
528    ///
529    /// - 连接 / 请求 / 响应 / 后端连接基础 counter
530    /// - per-cmd_id push counter (cmd_6212_quote / cmd_4716 / cmd_14716 /
531    ///   cmd_5300 / cmd_other) — v1.4.83/84 telnet 已暴露, 此处补 Prometheus
532    /// - per-cmd_id × UTC-hour 24-bucket breakdown (v1.4.84 §14)
533    /// - 订阅 / KeepAlive 计数
534    /// - 延迟 p50/p95/p99 gauge (取最近 1000 样本)
535    ///
536    /// 注册到 [`futu_auth::metrics::Registry`] extension renderer 后, 在
537    /// `/metrics` HTTP 输出末尾自动追加. 见
538    /// [`install_prometheus_extension`].
539    #[must_use]
540    pub fn render_prometheus(&self) -> String {
541        let mut s = String::with_capacity(8192);
542
543        // ===== 连接 =====
544        s.push_str("# HELP futu_gateway_connections_total Total accepted client connections\n");
545        s.push_str("# TYPE futu_gateway_connections_total counter\n");
546        s.push_str(&format!(
547            "futu_gateway_connections_total {}\n",
548            self.total_connections.load(Ordering::Relaxed)
549        ));
550        s.push_str(
551            "# HELP futu_gateway_disconnections_total Total client disconnections\n# TYPE futu_gateway_disconnections_total counter\n",
552        );
553        s.push_str(&format!(
554            "futu_gateway_disconnections_total {}\n",
555            self.total_disconnections.load(Ordering::Relaxed)
556        ));
557        s.push_str(
558            "# HELP futu_gateway_rejected_connections_total Connections rejected (limit hit)\n# TYPE futu_gateway_rejected_connections_total counter\n",
559        );
560        s.push_str(&format!(
561            "futu_gateway_rejected_connections_total {}\n",
562            self.rejected_connections.load(Ordering::Relaxed)
563        ));
564        s.push_str(
565            "# HELP futu_gateway_keepalive_timeouts_total KeepAlive timeout disconnects\n# TYPE futu_gateway_keepalive_timeouts_total counter\n",
566        );
567        s.push_str(&format!(
568            "futu_gateway_keepalive_timeouts_total {}\n",
569            self.keepalive_timeouts.load(Ordering::Relaxed)
570        ));
571
572        // ===== 请求 =====
573        s.push_str(
574            "# HELP futu_gateway_requests_total Total handled client requests\n# TYPE futu_gateway_requests_total counter\n",
575        );
576        s.push_str(&format!(
577            "futu_gateway_requests_total {}\n",
578            self.total_requests.load(Ordering::Relaxed)
579        ));
580        s.push_str(
581            "# HELP futu_gateway_request_errors_total Handler-returned-None or decryption errors\n# TYPE futu_gateway_request_errors_total counter\n",
582        );
583        s.push_str(&format!(
584            "futu_gateway_request_errors_total {}\n",
585            self.total_request_errors.load(Ordering::Relaxed)
586        ));
587        s.push_str(
588            "# HELP futu_gateway_response_bytes_total Cumulative response payload bytes\n# TYPE futu_gateway_response_bytes_total counter\n",
589        );
590        s.push_str(&format!(
591            "futu_gateway_response_bytes_total {}\n",
592            self.total_response_bytes.load(Ordering::Relaxed)
593        ));
594
595        // ===== 后端 =====
596        s.push_str(
597            "# HELP futu_gateway_backend_online Backend connection state (1=online,0=offline)\n# TYPE futu_gateway_backend_online gauge\n",
598        );
599        s.push_str(&format!(
600            "futu_gateway_backend_online {}\n",
601            self.backend_online.load(Ordering::Relaxed)
602        ));
603        s.push_str(
604            "# HELP futu_gateway_backend_reconnects_total Backend reconnect attempts\n# TYPE futu_gateway_backend_reconnects_total counter\n",
605        );
606        s.push_str(&format!(
607            "futu_gateway_backend_reconnects_total {}\n",
608            self.backend_reconnects.load(Ordering::Relaxed)
609        ));
610        s.push_str(
611            "# HELP futu_gateway_backend_reconnect_failures_total Backend reconnect failures\n# TYPE futu_gateway_backend_reconnect_failures_total counter\n",
612        );
613        s.push_str(&format!(
614            "futu_gateway_backend_reconnect_failures_total {}\n",
615            self.backend_reconnect_failures.load(Ordering::Relaxed)
616        ));
617
618        // ===== 推送总数 =====
619        s.push_str(
620            "# HELP futu_gateway_backend_pushes_received_total Pushes received from backend\n# TYPE futu_gateway_backend_pushes_received_total counter\n",
621        );
622        s.push_str(&format!(
623            "futu_gateway_backend_pushes_received_total {}\n",
624            self.backend_pushes_received.load(Ordering::Relaxed)
625        ));
626        s.push_str(
627            "# HELP futu_gateway_client_pushes_sent_total Pushes forwarded to clients\n# TYPE futu_gateway_client_pushes_sent_total counter\n",
628        );
629        s.push_str(&format!(
630            "futu_gateway_client_pushes_sent_total {}\n",
631            self.client_pushes_sent.load(Ordering::Relaxed)
632        ));
633        s.push_str(
634            "# HELP futu_gateway_client_push_send_failures_total Client push send failures because the downstream channel was closed\n# TYPE futu_gateway_client_push_send_failures_total counter\n",
635        );
636        s.push_str(&format!(
637            "futu_gateway_client_push_send_failures_total {}\n",
638            self.client_push_send_failures.load(Ordering::Relaxed)
639        ));
640        s.push_str(
641            "# HELP futu_gateway_qot_client_push_backpressure_drops_total Quote push frames dropped for clients whose downstream channel is full\n# TYPE futu_gateway_qot_client_push_backpressure_drops_total counter\n",
642        );
643        s.push_str(&format!(
644            "futu_gateway_qot_client_push_backpressure_drops_total {}\n",
645            self.qot_client_push_backpressure_drops
646                .load(Ordering::Relaxed)
647        ));
648        s.push_str(
649            "# HELP futu_gateway_qot_client_push_backpressure_drops_by_sub_type_total Quote push frames dropped by full client channel, grouped by Qot_Common.SubType\n# TYPE futu_gateway_qot_client_push_backpressure_drops_by_sub_type_total counter\n",
650        );
651        for (sub_type, count) in self
652            .qot_client_push_backpressure_drops_per_sub_type()
653            .iter()
654            .enumerate()
655        {
656            s.push_str(&format!(
657                "futu_gateway_qot_client_push_backpressure_drops_by_sub_type_total{{sub_type=\"{}\"}} {}\n",
658                sub_type, count
659            ));
660        }
661
662        // ===== per-cmd push counters (v1.4.83 §14) =====
663        // 这是 P1-B 修复的核心: telnet 已有但 /metrics 之前没暴露
664        s.push_str(
665            "# HELP futu_gateway_backend_pushes_cmd_total Backend pushes by cmd_id (v1.4.83 §14)\n# TYPE futu_gateway_backend_pushes_cmd_total counter\n",
666        );
667        s.push_str(&format!(
668            "futu_gateway_backend_pushes_cmd_total{{cmd=\"6212_quote\"}} {}\n",
669            self.backend_pushes_cmd_quote.load(Ordering::Relaxed)
670        ));
671        s.push_str(&format!(
672            "futu_gateway_backend_pushes_cmd_total{{cmd=\"4716_trade_legacy\"}} {}\n",
673            self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed)
674        ));
675        s.push_str(&format!(
676            "futu_gateway_backend_pushes_cmd_total{{cmd=\"14716_trade_new\"}} {}\n",
677            self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed)
678        ));
679        s.push_str(&format!(
680            "futu_gateway_backend_pushes_cmd_total{{cmd=\"5300_msg_center\"}} {}\n",
681            self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed)
682        ));
683        s.push_str(&format!(
684            "futu_gateway_backend_pushes_cmd_total{{cmd=\"other\"}} {}\n",
685            self.backend_pushes_cmd_other.load(Ordering::Relaxed)
686        ));
687
688        // ===== per-cmd × UTC hour breakdown (v1.4.84 §14) =====
689        s.push_str(
690            "# HELP futu_gateway_backend_pushes_cmd_quote_by_hour Cmd 6212 quote pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_quote_by_hour counter\n",
691        );
692        s.push_str(&render_hour_breakdown_prom(
693            "futu_gateway_backend_pushes_cmd_quote_by_hour",
694            &self.backend_pushes_cmd_quote_by_hour,
695        ));
696        s.push_str(
697            "# HELP futu_gateway_backend_pushes_cmd_trade_legacy_by_hour Cmd 4716 trade-legacy pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_trade_legacy_by_hour counter\n",
698        );
699        s.push_str(&render_hour_breakdown_prom(
700            "futu_gateway_backend_pushes_cmd_trade_legacy_by_hour",
701            &self.backend_pushes_cmd_trade_legacy_by_hour,
702        ));
703        s.push_str(
704            "# HELP futu_gateway_backend_pushes_cmd_trade_new_by_hour Cmd 14716 trade-new pushes per UTC hour (v1.4.84 §14 tester subject)\n# TYPE futu_gateway_backend_pushes_cmd_trade_new_by_hour counter\n",
705        );
706        s.push_str(&render_hour_breakdown_prom(
707            "futu_gateway_backend_pushes_cmd_trade_new_by_hour",
708            &self.backend_pushes_cmd_trade_new_by_hour,
709        ));
710        s.push_str(
711            "# HELP futu_gateway_backend_pushes_cmd_msg_center_by_hour Cmd 5300 msg-center pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_msg_center_by_hour counter\n",
712        );
713        s.push_str(&render_hour_breakdown_prom(
714            "futu_gateway_backend_pushes_cmd_msg_center_by_hour",
715            &self.backend_pushes_cmd_msg_center_by_hour,
716        ));
717
718        // ===== 订阅 =====
719        s.push_str(
720            "# HELP futu_gateway_qot_subscribe_ops_total Quote subscribe operations\n# TYPE futu_gateway_qot_subscribe_ops_total counter\n",
721        );
722        s.push_str(&format!(
723            "futu_gateway_qot_subscribe_ops_total {}\n",
724            self.qot_subscribe_ops.load(Ordering::Relaxed)
725        ));
726        s.push_str(
727            "# HELP futu_gateway_qot_unsubscribe_ops_total Quote unsubscribe operations\n# TYPE futu_gateway_qot_unsubscribe_ops_total counter\n",
728        );
729        s.push_str(&format!(
730            "futu_gateway_qot_unsubscribe_ops_total {}\n",
731            self.qot_unsubscribe_ops.load(Ordering::Relaxed)
732        ));
733        s.push_str(
734            "# HELP futu_gateway_resubscribe_ops_total Re-subscribe ops after reconnect (legacy, == resubscribe_applied_keys)\n# TYPE futu_gateway_resubscribe_ops_total counter\n",
735        );
736        s.push_str(&format!(
737            "futu_gateway_resubscribe_ops_total {}\n",
738            self.resubscribe_ops.load(Ordering::Relaxed)
739        ));
740        // v1.4.106 codex 0631 F5: dual counter — attempts 触发 vs applied_keys 真生效.
741        s.push_str(
742            "# HELP futu_gateway_resubscribe_attempts_total Re-subscribe trigger count (each reconnect/staleness loop +=1)\n# TYPE futu_gateway_resubscribe_attempts_total counter\n",
743        );
744        s.push_str(&format!(
745            "futu_gateway_resubscribe_attempts_total {}\n",
746            self.resubscribe_attempts.load(Ordering::Relaxed)
747        ));
748        s.push_str(
749            "# HELP futu_gateway_resubscribe_applied_keys_total Re-subscribe applied keys total (cache resolve OK + backend ack OK)\n# TYPE futu_gateway_resubscribe_applied_keys_total counter\n",
750        );
751        s.push_str(&format!(
752            "futu_gateway_resubscribe_applied_keys_total {}\n",
753            self.resubscribe_applied_keys.load(Ordering::Relaxed)
754        ));
755
756        // v1.4.110 codex audit Round2 P3 #19: cold-cache wait 监控.
757        // hit/total 比看 backend push 延迟健康度, timeout/total 比看常超时.
758        s.push_str(
759            "# HELP futu_gateway_cold_cache_wait_total Cold-cache wait entries (cache miss + IsSub)\n# TYPE futu_gateway_cold_cache_wait_total counter\n",
760        );
761        s.push_str(&format!(
762            "futu_gateway_cold_cache_wait_total {}\n",
763            self.cold_cache_wait_total.load(Ordering::Relaxed)
764        ));
765        s.push_str(
766            "# HELP futu_gateway_cold_cache_wait_hit_total Cold-cache wait hits (push filled cache within timeout)\n# TYPE futu_gateway_cold_cache_wait_hit_total counter\n",
767        );
768        s.push_str(&format!(
769            "futu_gateway_cold_cache_wait_hit_total {}\n",
770            self.cold_cache_wait_hit.load(Ordering::Relaxed)
771        ));
772        s.push_str(
773            "# HELP futu_gateway_cold_cache_wait_timeout_total Cold-cache wait timeouts (3s elapsed, cache still miss)\n# TYPE futu_gateway_cold_cache_wait_timeout_total counter\n",
774        );
775        s.push_str(&format!(
776            "futu_gateway_cold_cache_wait_timeout_total {}\n",
777            self.cold_cache_wait_timeout.load(Ordering::Relaxed)
778        ));
779
780        // ===== 延迟 (gauge, 取 ring 当前 stats) =====
781        let lat = self.latency_stats();
782        s.push_str(
783            "# HELP futu_gateway_request_latency_us Request latency percentiles (microseconds, recent ring)\n# TYPE futu_gateway_request_latency_us gauge\n",
784        );
785        s.push_str(&format!(
786            "futu_gateway_request_latency_us{{quantile=\"p50\"}} {}\n",
787            lat.p50_us
788        ));
789        s.push_str(&format!(
790            "futu_gateway_request_latency_us{{quantile=\"p95\"}} {}\n",
791            lat.p95_us
792        ));
793        s.push_str(&format!(
794            "futu_gateway_request_latency_us{{quantile=\"p99\"}} {}\n",
795            lat.p99_us
796        ));
797        s.push_str(&format!(
798            "futu_gateway_request_latency_us{{quantile=\"max\"}} {}\n",
799            lat.max_us
800        ));
801
802        s
803    }
804}
805
806/// v1.4.90 P1-B: 把 [`GatewayMetrics`] 注册为 [`futu_auth::metrics::Registry`]
807/// 的 extension renderer, 让 `/metrics` HTTP 端点自动包含 per-cmd / per-hour
808/// counter.
809///
810/// 调用方 (futu-opend `main.rs`) 在创建 `GatewayMetrics` Arc 之后调一次:
811///
812/// ```ignore
813/// futu_auth::metrics::install(Arc::new(MetricsRegistry::default()));
814/// // ... bridge / server 初始化, 都共享同一份 Arc<GatewayMetrics> ...
815/// futu_server::metrics::install_prometheus_extension(server.metrics().clone());
816/// ```
817///
818/// 多次调用会注册多个 renderer (无害但重复输出); 实际上只该调用一次.
819pub fn install_prometheus_extension(metrics: Arc<GatewayMetrics>) {
820    futu_auth::metrics::register_global_renderer(move || metrics.render_prometheus());
821}
822
823#[cfg(test)]
824mod tests;