futu_server/
metrics.rs

1// 网关运行时监控指标
2//
3// 使用原子计数器,零开销采集,无需外部依赖。
4// 通过 telnet `show_metrics` 命令查看。
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Instant;
8
9use parking_lot::RwLock;
10
11/// 网关运行时监控指标
12pub struct GatewayMetrics {
13    /// 网关启动时间
14    pub start_time: Instant,
15
16    // ===== 连接指标 =====
17    /// 累计接受的客户端连接数
18    pub total_connections: AtomicU64,
19    /// 累计客户端断开数
20    pub total_disconnections: AtomicU64,
21    /// 被拒绝的连接数(超过上限)
22    pub rejected_connections: AtomicU64,
23
24    // ===== 请求指标 =====
25    /// 累计处理的请求数
26    pub total_requests: AtomicU64,
27    /// 累计请求错误数(handler 返回 None 或解密失败)
28    pub total_request_errors: AtomicU64,
29    /// 累计响应字节数
30    pub total_response_bytes: AtomicU64,
31
32    // ===== 后端指标 =====
33    /// 后端重连次数
34    pub backend_reconnects: AtomicU64,
35    /// 后端重连失败次数
36    pub backend_reconnect_failures: AtomicU64,
37    /// 最近一次重连时间 (Unix 毫秒, 0=未重连过)
38    pub last_reconnect_ms: AtomicU64,
39    /// 后端是否在线 (1=online, 0=offline)
40    pub backend_online: AtomicU64,
41
42    // ===== 推送指标 =====
43    /// 后端收到的推送数 (CMD 6212 / 4716 / 5300 等)
44    pub backend_pushes_received: AtomicU64,
45    /// 向客户端发送的推送数
46    pub client_pushes_sent: AtomicU64,
47
48    // ===== 订阅指标 =====
49    /// 行情订阅操作次数
50    pub qot_subscribe_ops: AtomicU64,
51    /// 行情退订操作次数
52    pub qot_unsubscribe_ops: AtomicU64,
53    /// 重连后重新订阅次数
54    pub resubscribe_ops: AtomicU64,
55
56    // ===== KeepAlive 指标 =====
57    /// KeepAlive 超时断开数
58    pub keepalive_timeouts: AtomicU64,
59
60    // ===== 延迟采样 =====
61    /// 最近 N 个请求延迟的环形缓冲 (纳秒)
62    latency_ring: RwLock<LatencyRing>,
63}
64
65/// 延迟环形缓冲 — 保留最近 1000 个采样
66struct LatencyRing {
67    buf: Vec<u64>,
68    pos: usize,
69    count: u64,
70    total_ns: u64,
71}
72
73const LATENCY_RING_SIZE: usize = 1000;
74
75impl LatencyRing {
76    fn new() -> Self {
77        Self {
78            buf: vec![0u64; LATENCY_RING_SIZE],
79            pos: 0,
80            count: 0,
81            total_ns: 0,
82        }
83    }
84
85    fn push(&mut self, ns: u64) {
86        // 减去被覆盖的旧值
87        if self.count >= LATENCY_RING_SIZE as u64 {
88            self.total_ns = self.total_ns.saturating_sub(self.buf[self.pos]);
89        }
90        self.buf[self.pos] = ns;
91        self.total_ns += ns;
92        self.pos = (self.pos + 1) % LATENCY_RING_SIZE;
93        self.count += 1;
94    }
95
96    fn stats(&self) -> LatencyStats {
97        let n = self.count.min(LATENCY_RING_SIZE as u64) as usize;
98        if n == 0 {
99            return LatencyStats::default();
100        }
101
102        let mut samples: Vec<u64> = if self.count >= LATENCY_RING_SIZE as u64 {
103            self.buf.clone()
104        } else {
105            self.buf[..n].to_vec()
106        };
107        samples.sort_unstable();
108
109        LatencyStats {
110            count: self.count,
111            avg_us: (self.total_ns / n as u64) / 1000,
112            p50_us: samples[n / 2] / 1000,
113            p95_us: samples[(n as f64 * 0.95) as usize] / 1000,
114            p99_us: samples[(n as f64 * 0.99).min((n - 1) as f64) as usize] / 1000,
115            max_us: samples[n - 1] / 1000,
116        }
117    }
118}
119
120/// 延迟统计摘要 (微秒)
121#[derive(Default)]
122pub struct LatencyStats {
123    /// 总采样数
124    pub count: u64,
125    /// 平均延迟 (微秒)
126    pub avg_us: u64,
127    /// P50 延迟
128    pub p50_us: u64,
129    /// P95 延迟
130    pub p95_us: u64,
131    /// P99 延迟
132    pub p99_us: u64,
133    /// 最大延迟
134    pub max_us: u64,
135}
136
137impl GatewayMetrics {
138    pub fn new() -> Self {
139        Self {
140            start_time: Instant::now(),
141            total_connections: AtomicU64::new(0),
142            total_disconnections: AtomicU64::new(0),
143            rejected_connections: AtomicU64::new(0),
144            total_requests: AtomicU64::new(0),
145            total_request_errors: AtomicU64::new(0),
146            total_response_bytes: AtomicU64::new(0),
147            backend_reconnects: AtomicU64::new(0),
148            backend_reconnect_failures: AtomicU64::new(0),
149            last_reconnect_ms: AtomicU64::new(0),
150            backend_online: AtomicU64::new(1),
151            backend_pushes_received: AtomicU64::new(0),
152            client_pushes_sent: AtomicU64::new(0),
153            qot_subscribe_ops: AtomicU64::new(0),
154            qot_unsubscribe_ops: AtomicU64::new(0),
155            resubscribe_ops: AtomicU64::new(0),
156            keepalive_timeouts: AtomicU64::new(0),
157            latency_ring: RwLock::new(LatencyRing::new()),
158        }
159    }
160
161    /// 记录一次请求延迟 (纳秒)
162    pub fn record_latency_ns(&self, ns: u64) {
163        self.latency_ring.write().push(ns);
164    }
165
166    /// 获取延迟统计
167    pub fn latency_stats(&self) -> LatencyStats {
168        self.latency_ring.read().stats()
169    }
170
171    /// 格式化运行时间
172    pub fn uptime_str(&self) -> String {
173        let elapsed = self.start_time.elapsed();
174        let secs = elapsed.as_secs();
175        let days = secs / 86400;
176        let hours = (secs % 86400) / 3600;
177        let mins = (secs % 3600) / 60;
178        let s = secs % 60;
179        if days > 0 {
180            format!("{days}d {hours}h {mins}m {s}s")
181        } else if hours > 0 {
182            format!("{hours}h {mins}m {s}s")
183        } else {
184            format!("{mins}m {s}s")
185        }
186    }
187
188    /// 生成 telnet 可展示的指标报告
189    pub fn report(&self) -> String {
190        let lat = self.latency_stats();
191        let backend_status = if self.backend_online.load(Ordering::Relaxed) == 1 {
192            "ONLINE"
193        } else {
194            "OFFLINE"
195        };
196
197        let total_req = self.total_requests.load(Ordering::Relaxed);
198        let uptime_secs = self.start_time.elapsed().as_secs_f64();
199        let avg_rps = if uptime_secs > 0.0 {
200            total_req as f64 / uptime_secs
201        } else {
202            0.0
203        };
204
205        format!(
206            "=== Gateway Metrics ===\r\n\
207             Uptime: {uptime}\r\n\
208             \r\n\
209             [Connections]\r\n\
210             total_accepted: {total_conn}\r\n\
211             total_disconnected: {total_disconn}\r\n\
212             rejected (limit): {rejected}\r\n\
213             keepalive_timeouts: {ka_timeout}\r\n\
214             \r\n\
215             [Requests]\r\n\
216             total_requests: {total_req}\r\n\
217             total_errors: {total_err}\r\n\
218             avg_rps: {avg_rps:.1}\r\n\
219             response_bytes: {resp_bytes}\r\n\
220             \r\n\
221             [Latency (recent {lat_count} samples)]\r\n\
222             avg: {lat_avg}us  p50: {lat_p50}us  p95: {lat_p95}us  p99: {lat_p99}us  max: {lat_max}us\r\n\
223             \r\n\
224             [Backend]\r\n\
225             status: {backend_status}\r\n\
226             reconnects: {reconnects}\r\n\
227             reconnect_failures: {reconnect_fail}\r\n\
228             pushes_received: {push_recv}\r\n\
229             pushes_sent_to_clients: {push_sent}\r\n\
230             \r\n\
231             [Subscriptions]\r\n\
232             subscribe_ops: {sub_ops}\r\n\
233             unsubscribe_ops: {unsub_ops}\r\n\
234             resubscribe_ops: {resub_ops}\r\n",
235            uptime = self.uptime_str(),
236            total_conn = self.total_connections.load(Ordering::Relaxed),
237            total_disconn = self.total_disconnections.load(Ordering::Relaxed),
238            rejected = self.rejected_connections.load(Ordering::Relaxed),
239            ka_timeout = self.keepalive_timeouts.load(Ordering::Relaxed),
240            total_req = total_req,
241            total_err = self.total_request_errors.load(Ordering::Relaxed),
242            resp_bytes = self.total_response_bytes.load(Ordering::Relaxed),
243            lat_count = lat.count.min(LATENCY_RING_SIZE as u64),
244            lat_avg = lat.avg_us,
245            lat_p50 = lat.p50_us,
246            lat_p95 = lat.p95_us,
247            lat_p99 = lat.p99_us,
248            lat_max = lat.max_us,
249            reconnects = self.backend_reconnects.load(Ordering::Relaxed),
250            reconnect_fail = self.backend_reconnect_failures.load(Ordering::Relaxed),
251            push_recv = self.backend_pushes_received.load(Ordering::Relaxed),
252            push_sent = self.client_pushes_sent.load(Ordering::Relaxed),
253            sub_ops = self.qot_subscribe_ops.load(Ordering::Relaxed),
254            unsub_ops = self.qot_unsubscribe_ops.load(Ordering::Relaxed),
255            resub_ops = self.resubscribe_ops.load(Ordering::Relaxed),
256        )
257    }
258}
259
260impl Default for GatewayMetrics {
261    fn default() -> Self {
262        Self::new()
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_latency_ring() {
272        let m = GatewayMetrics::new();
273        // 空状态
274        let s = m.latency_stats();
275        assert_eq!(s.count, 0);
276
277        // 添加几个采样
278        m.record_latency_ns(1_000_000); // 1ms
279        m.record_latency_ns(2_000_000); // 2ms
280        m.record_latency_ns(3_000_000); // 3ms
281        let s = m.latency_stats();
282        assert_eq!(s.count, 3);
283        assert_eq!(s.avg_us, 2000); // 2ms avg
284    }
285
286    #[test]
287    fn test_uptime_format() {
288        let m = GatewayMetrics::new();
289        let s = m.uptime_str();
290        assert!(s.contains('m'));
291    }
292
293    #[test]
294    fn test_report_format() {
295        let m = GatewayMetrics::new();
296        m.total_requests.store(100, Ordering::Relaxed);
297        m.backend_online.store(1, Ordering::Relaxed);
298        let report = m.report();
299        assert!(report.contains("total_requests: 100"));
300        assert!(report.contains("ONLINE"));
301    }
302}