1use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Instant;
8
9use parking_lot::RwLock;
10
11pub struct GatewayMetrics {
13 pub start_time: Instant,
15
16 pub total_connections: AtomicU64,
19 pub total_disconnections: AtomicU64,
21 pub rejected_connections: AtomicU64,
23
24 pub total_requests: AtomicU64,
27 pub total_request_errors: AtomicU64,
29 pub total_response_bytes: AtomicU64,
31
32 pub backend_reconnects: AtomicU64,
35 pub backend_reconnect_failures: AtomicU64,
37 pub last_reconnect_ms: AtomicU64,
39 pub backend_online: AtomicU64,
41
42 pub backend_pushes_received: AtomicU64,
45 pub client_pushes_sent: AtomicU64,
47
48 pub qot_subscribe_ops: AtomicU64,
51 pub qot_unsubscribe_ops: AtomicU64,
53 pub resubscribe_ops: AtomicU64,
55
56 pub keepalive_timeouts: AtomicU64,
59
60 latency_ring: RwLock<LatencyRing>,
63}
64
65struct LatencyRing {
67 buf: Vec<u64>,
68 pos: usize,
69 count: u64,
70 total_ns: u64,
71}
72
73const LATENCY_RING_SIZE: usize = 1000;
74
75impl LatencyRing {
76 fn new() -> Self {
77 Self {
78 buf: vec![0u64; LATENCY_RING_SIZE],
79 pos: 0,
80 count: 0,
81 total_ns: 0,
82 }
83 }
84
85 fn push(&mut self, ns: u64) {
86 if self.count >= LATENCY_RING_SIZE as u64 {
88 self.total_ns = self.total_ns.saturating_sub(self.buf[self.pos]);
89 }
90 self.buf[self.pos] = ns;
91 self.total_ns += ns;
92 self.pos = (self.pos + 1) % LATENCY_RING_SIZE;
93 self.count += 1;
94 }
95
96 fn stats(&self) -> LatencyStats {
97 let n = self.count.min(LATENCY_RING_SIZE as u64) as usize;
98 if n == 0 {
99 return LatencyStats::default();
100 }
101
102 let mut samples: Vec<u64> = if self.count >= LATENCY_RING_SIZE as u64 {
103 self.buf.clone()
104 } else {
105 self.buf[..n].to_vec()
106 };
107 samples.sort_unstable();
108
109 LatencyStats {
110 count: self.count,
111 avg_us: (self.total_ns / n as u64) / 1000,
112 p50_us: samples[n / 2] / 1000,
113 p95_us: samples[(n as f64 * 0.95) as usize] / 1000,
114 p99_us: samples[(n as f64 * 0.99).min((n - 1) as f64) as usize] / 1000,
115 max_us: samples[n - 1] / 1000,
116 }
117 }
118}
119
120#[derive(Default)]
122pub struct LatencyStats {
123 pub count: u64,
125 pub avg_us: u64,
127 pub p50_us: u64,
129 pub p95_us: u64,
131 pub p99_us: u64,
133 pub max_us: u64,
135}
136
137impl GatewayMetrics {
138 pub fn new() -> Self {
139 Self {
140 start_time: Instant::now(),
141 total_connections: AtomicU64::new(0),
142 total_disconnections: AtomicU64::new(0),
143 rejected_connections: AtomicU64::new(0),
144 total_requests: AtomicU64::new(0),
145 total_request_errors: AtomicU64::new(0),
146 total_response_bytes: AtomicU64::new(0),
147 backend_reconnects: AtomicU64::new(0),
148 backend_reconnect_failures: AtomicU64::new(0),
149 last_reconnect_ms: AtomicU64::new(0),
150 backend_online: AtomicU64::new(1),
151 backend_pushes_received: AtomicU64::new(0),
152 client_pushes_sent: AtomicU64::new(0),
153 qot_subscribe_ops: AtomicU64::new(0),
154 qot_unsubscribe_ops: AtomicU64::new(0),
155 resubscribe_ops: AtomicU64::new(0),
156 keepalive_timeouts: AtomicU64::new(0),
157 latency_ring: RwLock::new(LatencyRing::new()),
158 }
159 }
160
161 pub fn record_latency_ns(&self, ns: u64) {
163 self.latency_ring.write().push(ns);
164 }
165
166 pub fn latency_stats(&self) -> LatencyStats {
168 self.latency_ring.read().stats()
169 }
170
171 pub fn uptime_str(&self) -> String {
173 let elapsed = self.start_time.elapsed();
174 let secs = elapsed.as_secs();
175 let days = secs / 86400;
176 let hours = (secs % 86400) / 3600;
177 let mins = (secs % 3600) / 60;
178 let s = secs % 60;
179 if days > 0 {
180 format!("{days}d {hours}h {mins}m {s}s")
181 } else if hours > 0 {
182 format!("{hours}h {mins}m {s}s")
183 } else {
184 format!("{mins}m {s}s")
185 }
186 }
187
188 pub fn report(&self) -> String {
190 let lat = self.latency_stats();
191 let backend_status = if self.backend_online.load(Ordering::Relaxed) == 1 {
192 "ONLINE"
193 } else {
194 "OFFLINE"
195 };
196
197 let total_req = self.total_requests.load(Ordering::Relaxed);
198 let uptime_secs = self.start_time.elapsed().as_secs_f64();
199 let avg_rps = if uptime_secs > 0.0 {
200 total_req as f64 / uptime_secs
201 } else {
202 0.0
203 };
204
205 format!(
206 "=== Gateway Metrics ===\r\n\
207 Uptime: {uptime}\r\n\
208 \r\n\
209 [Connections]\r\n\
210 total_accepted: {total_conn}\r\n\
211 total_disconnected: {total_disconn}\r\n\
212 rejected (limit): {rejected}\r\n\
213 keepalive_timeouts: {ka_timeout}\r\n\
214 \r\n\
215 [Requests]\r\n\
216 total_requests: {total_req}\r\n\
217 total_errors: {total_err}\r\n\
218 avg_rps: {avg_rps:.1}\r\n\
219 response_bytes: {resp_bytes}\r\n\
220 \r\n\
221 [Latency (recent {lat_count} samples)]\r\n\
222 avg: {lat_avg}us p50: {lat_p50}us p95: {lat_p95}us p99: {lat_p99}us max: {lat_max}us\r\n\
223 \r\n\
224 [Backend]\r\n\
225 status: {backend_status}\r\n\
226 reconnects: {reconnects}\r\n\
227 reconnect_failures: {reconnect_fail}\r\n\
228 pushes_received: {push_recv}\r\n\
229 pushes_sent_to_clients: {push_sent}\r\n\
230 \r\n\
231 [Subscriptions]\r\n\
232 subscribe_ops: {sub_ops}\r\n\
233 unsubscribe_ops: {unsub_ops}\r\n\
234 resubscribe_ops: {resub_ops}\r\n",
235 uptime = self.uptime_str(),
236 total_conn = self.total_connections.load(Ordering::Relaxed),
237 total_disconn = self.total_disconnections.load(Ordering::Relaxed),
238 rejected = self.rejected_connections.load(Ordering::Relaxed),
239 ka_timeout = self.keepalive_timeouts.load(Ordering::Relaxed),
240 total_req = total_req,
241 total_err = self.total_request_errors.load(Ordering::Relaxed),
242 resp_bytes = self.total_response_bytes.load(Ordering::Relaxed),
243 lat_count = lat.count.min(LATENCY_RING_SIZE as u64),
244 lat_avg = lat.avg_us,
245 lat_p50 = lat.p50_us,
246 lat_p95 = lat.p95_us,
247 lat_p99 = lat.p99_us,
248 lat_max = lat.max_us,
249 reconnects = self.backend_reconnects.load(Ordering::Relaxed),
250 reconnect_fail = self.backend_reconnect_failures.load(Ordering::Relaxed),
251 push_recv = self.backend_pushes_received.load(Ordering::Relaxed),
252 push_sent = self.client_pushes_sent.load(Ordering::Relaxed),
253 sub_ops = self.qot_subscribe_ops.load(Ordering::Relaxed),
254 unsub_ops = self.qot_unsubscribe_ops.load(Ordering::Relaxed),
255 resub_ops = self.resubscribe_ops.load(Ordering::Relaxed),
256 )
257 }
258}
259
260impl Default for GatewayMetrics {
261 fn default() -> Self {
262 Self::new()
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_latency_ring() {
272 let m = GatewayMetrics::new();
273 let s = m.latency_stats();
275 assert_eq!(s.count, 0);
276
277 m.record_latency_ns(1_000_000); m.record_latency_ns(2_000_000); m.record_latency_ns(3_000_000); let s = m.latency_stats();
282 assert_eq!(s.count, 3);
283 assert_eq!(s.avg_us, 2000); }
285
286 #[test]
287 fn test_uptime_format() {
288 let m = GatewayMetrics::new();
289 let s = m.uptime_str();
290 assert!(s.contains('m'));
291 }
292
293 #[test]
294 fn test_report_format() {
295 let m = GatewayMetrics::new();
296 m.total_requests.store(100, Ordering::Relaxed);
297 m.backend_online.store(1, Ordering::Relaxed);
298 let report = m.report();
299 assert!(report.contains("total_requests: 100"));
300 assert!(report.contains("ONLINE"));
301 }
302}