1use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::Instant;
12
13use chrono::{Timelike, Utc};
14use parking_lot::RwLock;
15
16#[derive(Debug)]
21pub struct HourBreakdown {
22 counters: [AtomicU64; 24],
23}
24
25impl HourBreakdown {
26 pub const fn new() -> Self {
27 Self {
28 counters: [const { AtomicU64::new(0) }; 24],
29 }
30 }
31
32 pub fn bump_now(&self) {
34 let hour = Utc::now().hour() as usize;
35 if hour < 24 {
36 self.counters[hour].fetch_add(1, Ordering::Relaxed);
37 }
38 }
39
40 pub fn get(&self, hour: usize) -> u64 {
42 self.counters
43 .get(hour)
44 .map(|a| a.load(Ordering::Relaxed))
45 .unwrap_or(0)
46 }
47
48 pub fn snapshot(&self) -> [u64; 24] {
50 let mut out = [0u64; 24];
51 for (i, c) in self.counters.iter().enumerate() {
52 out[i] = c.load(Ordering::Relaxed);
53 }
54 out
55 }
56}
57
58impl Default for HourBreakdown {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64pub struct GatewayMetrics {
66 pub start_time: Instant,
68
69 pub total_connections: AtomicU64,
72 pub total_disconnections: AtomicU64,
74 pub rejected_connections: AtomicU64,
76
77 pub total_requests: AtomicU64,
80 pub total_request_errors: AtomicU64,
82 pub total_response_bytes: AtomicU64,
84
85 pub backend_reconnects: AtomicU64,
88 pub backend_reconnect_failures: AtomicU64,
90 pub last_reconnect_ms: AtomicU64,
92 pub backend_online: AtomicU64,
94
95 pub backend_pushes_received: AtomicU64,
98 pub client_pushes_sent: AtomicU64,
100 pub client_push_send_failures: AtomicU64,
102 pub qot_client_push_backpressure_drops: AtomicU64,
107 pub qot_client_push_backpressure_drops_by_sub_type: [AtomicU64; 18],
110 pub backend_pushes_cmd_quote: AtomicU64,
113 pub backend_pushes_cmd_trade_legacy: AtomicU64,
115 pub backend_pushes_cmd_trade_new: AtomicU64,
117 pub backend_pushes_cmd_msg_center: AtomicU64,
119 pub backend_pushes_cmd_other: AtomicU64,
121
122 pub backend_pushes_cmd_quote_by_hour: HourBreakdown,
126 pub backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown,
128 pub backend_pushes_cmd_trade_new_by_hour: HourBreakdown,
130 pub backend_pushes_cmd_msg_center_by_hour: HourBreakdown,
132
133 pub qot_subscribe_ops: AtomicU64,
136 pub qot_unsubscribe_ops: AtomicU64,
138
139 pub cold_cache_wait_total: AtomicU64,
146 pub cold_cache_wait_hit: AtomicU64,
148 pub cold_cache_wait_timeout: AtomicU64,
150 pub resubscribe_ops: AtomicU64,
153
154 pub resubscribe_attempts: AtomicU64,
168 pub resubscribe_applied_keys: AtomicU64,
171
172 pub qot_push_dropped_total: AtomicU64,
180 pub qot_push_dropped_by_sub_type: [AtomicU64; 18],
184
185 pub keepalive_timeouts: AtomicU64,
188
189 latency_ring: RwLock<LatencyRing>,
192}
193
194struct LatencyRing {
196 buf: Vec<u64>,
197 pos: usize,
198 count: u64,
199 total_ns: u64,
200}
201
202const LATENCY_RING_SIZE: usize = 1000;
203
204impl LatencyRing {
205 fn new() -> Self {
206 Self {
207 buf: vec![0u64; LATENCY_RING_SIZE],
208 pos: 0,
209 count: 0,
210 total_ns: 0,
211 }
212 }
213
214 fn push(&mut self, ns: u64) {
215 if self.count >= LATENCY_RING_SIZE as u64 {
217 self.total_ns = self.total_ns.saturating_sub(self.buf[self.pos]);
218 }
219 self.buf[self.pos] = ns;
220 self.total_ns += ns;
221 self.pos = (self.pos + 1) % LATENCY_RING_SIZE;
222 self.count += 1;
223 }
224
225 fn stats(&self) -> LatencyStats {
226 let n = self.count.min(LATENCY_RING_SIZE as u64) as usize;
227 if n == 0 {
228 return LatencyStats::default();
229 }
230
231 let mut samples: Vec<u64> = if self.count >= LATENCY_RING_SIZE as u64 {
232 self.buf.clone()
233 } else {
234 self.buf[..n].to_vec()
235 };
236 samples.sort_unstable();
237
238 LatencyStats {
239 count: self.count,
240 avg_us: (self.total_ns / n as u64) / 1000,
241 p50_us: samples[n / 2] / 1000,
242 p95_us: samples[(n as f64 * 0.95) as usize] / 1000,
243 p99_us: samples[(n as f64 * 0.99).min((n - 1) as f64) as usize] / 1000,
244 max_us: samples[n - 1] / 1000,
245 }
246 }
247}
248
249#[derive(Default)]
251pub struct LatencyStats {
252 pub count: u64,
254 pub avg_us: u64,
256 pub p50_us: u64,
258 pub p95_us: u64,
260 pub p99_us: u64,
262 pub max_us: u64,
264}
265
266fn format_hour_row(hb: &HourBreakdown) -> String {
271 let snap = hb.snapshot();
272 let mut out = String::with_capacity(24 * 10);
273 for (i, v) in snap.iter().enumerate() {
274 if i > 0 {
275 out.push(' ');
276 }
277 out.push_str(&format!("h{:02}={}", i, v));
278 }
279 out
280}
281
282fn qot_sub_type_bucket(sub_type: i32) -> usize {
283 if (0..18).contains(&sub_type) {
284 sub_type as usize
285 } else {
286 0
287 }
288}
289
290impl GatewayMetrics {
291 pub fn new() -> Self {
292 Self {
293 start_time: Instant::now(),
294 total_connections: AtomicU64::new(0),
295 total_disconnections: AtomicU64::new(0),
296 rejected_connections: AtomicU64::new(0),
297 total_requests: AtomicU64::new(0),
298 total_request_errors: AtomicU64::new(0),
299 total_response_bytes: AtomicU64::new(0),
300 backend_reconnects: AtomicU64::new(0),
301 backend_reconnect_failures: AtomicU64::new(0),
302 last_reconnect_ms: AtomicU64::new(0),
303 backend_online: AtomicU64::new(1),
304 backend_pushes_received: AtomicU64::new(0),
305 client_pushes_sent: AtomicU64::new(0),
306 client_push_send_failures: AtomicU64::new(0),
307 qot_client_push_backpressure_drops: AtomicU64::new(0),
308 qot_client_push_backpressure_drops_by_sub_type: [const { AtomicU64::new(0) }; 18],
309 backend_pushes_cmd_quote: AtomicU64::new(0),
310 backend_pushes_cmd_trade_legacy: AtomicU64::new(0),
311 backend_pushes_cmd_trade_new: AtomicU64::new(0),
312 backend_pushes_cmd_msg_center: AtomicU64::new(0),
313 backend_pushes_cmd_other: AtomicU64::new(0),
314 backend_pushes_cmd_quote_by_hour: HourBreakdown::new(),
315 backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown::new(),
316 backend_pushes_cmd_trade_new_by_hour: HourBreakdown::new(),
317 backend_pushes_cmd_msg_center_by_hour: HourBreakdown::new(),
318 qot_subscribe_ops: AtomicU64::new(0),
319 qot_unsubscribe_ops: AtomicU64::new(0),
320 cold_cache_wait_total: AtomicU64::new(0),
321 cold_cache_wait_hit: AtomicU64::new(0),
322 cold_cache_wait_timeout: AtomicU64::new(0),
323 resubscribe_ops: AtomicU64::new(0),
324 resubscribe_attempts: AtomicU64::new(0),
325 resubscribe_applied_keys: AtomicU64::new(0),
326 qot_push_dropped_total: AtomicU64::new(0),
328 qot_push_dropped_by_sub_type: [const { AtomicU64::new(0) }; 18],
329 keepalive_timeouts: AtomicU64::new(0),
330 latency_ring: RwLock::new(LatencyRing::new()),
331 }
332 }
333
334 pub fn record_latency_ns(&self, ns: u64) {
336 self.latency_ring.write().push(ns);
337 }
338
339 pub fn record_qot_push_dropped(&self, sub_type: i32) {
344 self.qot_push_dropped_total.fetch_add(1, Ordering::Relaxed);
345 let bucket = qot_sub_type_bucket(sub_type);
346 self.qot_push_dropped_by_sub_type[bucket].fetch_add(1, Ordering::Relaxed);
347 }
348
349 pub fn record_qot_client_push_backpressure_drop(&self, sub_type: i32) {
351 self.qot_client_push_backpressure_drops
352 .fetch_add(1, Ordering::Relaxed);
353 let bucket = qot_sub_type_bucket(sub_type);
354 self.qot_client_push_backpressure_drops_by_sub_type[bucket].fetch_add(1, Ordering::Relaxed);
355 }
356
357 pub fn qot_push_dropped_per_sub_type(&self) -> [u64; 18] {
360 let mut out = [0u64; 18];
361 for (i, slot) in self.qot_push_dropped_by_sub_type.iter().enumerate() {
362 out[i] = slot.load(Ordering::Relaxed);
363 }
364 out
365 }
366
367 pub fn qot_client_push_backpressure_drops_per_sub_type(&self) -> [u64; 18] {
368 let mut out = [0u64; 18];
369 for (i, slot) in self
370 .qot_client_push_backpressure_drops_by_sub_type
371 .iter()
372 .enumerate()
373 {
374 out[i] = slot.load(Ordering::Relaxed);
375 }
376 out
377 }
378
379 pub fn latency_stats(&self) -> LatencyStats {
381 self.latency_ring.read().stats()
382 }
383
384 pub fn uptime_str(&self) -> String {
386 let elapsed = self.start_time.elapsed();
387 let secs = elapsed.as_secs();
388 let days = secs / 86400;
389 let hours = (secs % 86400) / 3600;
390 let mins = (secs % 3600) / 60;
391 let s = secs % 60;
392 if days > 0 {
393 format!("{days}d {hours}h {mins}m {s}s")
394 } else if hours > 0 {
395 format!("{hours}h {mins}m {s}s")
396 } else {
397 format!("{mins}m {s}s")
398 }
399 }
400
401 pub fn report(&self) -> String {
403 let lat = self.latency_stats();
404 let backend_status = if self.backend_online.load(Ordering::Relaxed) == 1 {
405 "ONLINE"
406 } else {
407 "OFFLINE"
408 };
409
410 let total_req = self.total_requests.load(Ordering::Relaxed);
411 let uptime_secs = self.start_time.elapsed().as_secs_f64();
412 let avg_rps = if uptime_secs > 0.0 {
413 total_req as f64 / uptime_secs
414 } else {
415 0.0
416 };
417
418 format!(
419 "=== Gateway Metrics ===\r\n\
420 Uptime: {uptime}\r\n\
421 \r\n\
422 [Connections]\r\n\
423 total_accepted: {total_conn}\r\n\
424 total_disconnected: {total_disconn}\r\n\
425 rejected (limit): {rejected}\r\n\
426 keepalive_timeouts: {ka_timeout}\r\n\
427 \r\n\
428 [Requests]\r\n\
429 total_requests: {total_req}\r\n\
430 total_errors: {total_err}\r\n\
431 avg_rps: {avg_rps:.1}\r\n\
432 response_bytes: {resp_bytes}\r\n\
433 \r\n\
434 [Latency (recent {lat_count} samples)]\r\n\
435 avg: {lat_avg}us p50: {lat_p50}us p95: {lat_p95}us p99: {lat_p99}us max: {lat_max}us\r\n\
436 \r\n\
437 [Backend]\r\n\
438 status: {backend_status}\r\n\
439 reconnects: {reconnects}\r\n\
440 reconnect_failures: {reconnect_fail}\r\n\
441 pushes_received: {push_recv}\r\n\
442 pushes_sent_to_clients: {push_sent}\r\n\
443 push_send_failures_to_clients: {push_send_failures}\r\n\
444 qot_client_push_backpressure_drops: {qot_client_backpressure_drops}\r\n\
445 \r\n\
446 [Pushes by CMD (v1.4.83 §14)]\r\n\
447 cmd_6212_quote: {push_cmd_quote}\r\n\
448 cmd_4716_trade_legacy: {push_cmd_trade_legacy}\r\n\
449 cmd_14716_trade_new: {push_cmd_trade_new}\r\n\
450 cmd_5300_msg_center: {push_cmd_msg_center}\r\n\
451 cmd_other: {push_cmd_other}\r\n\
452 \r\n\
453 [Pushes by CMD × UTC hour (v1.4.84 §14)]\r\n\
454 cmd_14716_trade_new_hour_0..23: {hour_trade_new}\r\n\
455 cmd_6212_quote_hour_0..23: {hour_quote}\r\n\
456 cmd_4716_trade_legacy_hour_0..23: {hour_trade_legacy}\r\n\
457 cmd_5300_msg_center_hour_0..23: {hour_msg_center}\r\n\
458 \r\n\
459 [Subscriptions]\r\n\
460 subscribe_ops: {sub_ops}\r\n\
461 unsubscribe_ops: {unsub_ops}\r\n\
462 resubscribe_ops: {resub_ops}\r\n\
463 \r\n\
464 [Cold-cache wait (v1.4.110 §P3 #19)]\r\n\
465 total: {cc_total} hit: {cc_hit} timeout: {cc_timeout}\r\n",
466 uptime = self.uptime_str(),
467 total_conn = self.total_connections.load(Ordering::Relaxed),
468 total_disconn = self.total_disconnections.load(Ordering::Relaxed),
469 rejected = self.rejected_connections.load(Ordering::Relaxed),
470 ka_timeout = self.keepalive_timeouts.load(Ordering::Relaxed),
471 total_req = total_req,
472 total_err = self.total_request_errors.load(Ordering::Relaxed),
473 resp_bytes = self.total_response_bytes.load(Ordering::Relaxed),
474 lat_count = lat.count.min(LATENCY_RING_SIZE as u64),
475 lat_avg = lat.avg_us,
476 lat_p50 = lat.p50_us,
477 lat_p95 = lat.p95_us,
478 lat_p99 = lat.p99_us,
479 lat_max = lat.max_us,
480 reconnects = self.backend_reconnects.load(Ordering::Relaxed),
481 reconnect_fail = self.backend_reconnect_failures.load(Ordering::Relaxed),
482 push_recv = self.backend_pushes_received.load(Ordering::Relaxed),
483 push_sent = self.client_pushes_sent.load(Ordering::Relaxed),
484 push_send_failures = self.client_push_send_failures.load(Ordering::Relaxed),
485 qot_client_backpressure_drops = self
486 .qot_client_push_backpressure_drops
487 .load(Ordering::Relaxed),
488 push_cmd_quote = self.backend_pushes_cmd_quote.load(Ordering::Relaxed),
489 push_cmd_trade_legacy = self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed),
490 push_cmd_trade_new = self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed),
491 push_cmd_msg_center = self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed),
492 push_cmd_other = self.backend_pushes_cmd_other.load(Ordering::Relaxed),
493 hour_trade_new = format_hour_row(&self.backend_pushes_cmd_trade_new_by_hour),
494 hour_quote = format_hour_row(&self.backend_pushes_cmd_quote_by_hour),
495 hour_trade_legacy = format_hour_row(&self.backend_pushes_cmd_trade_legacy_by_hour),
496 hour_msg_center = format_hour_row(&self.backend_pushes_cmd_msg_center_by_hour),
497 sub_ops = self.qot_subscribe_ops.load(Ordering::Relaxed),
498 unsub_ops = self.qot_unsubscribe_ops.load(Ordering::Relaxed),
499 resub_ops = self.resubscribe_ops.load(Ordering::Relaxed),
500 cc_total = self.cold_cache_wait_total.load(Ordering::Relaxed),
501 cc_hit = self.cold_cache_wait_hit.load(Ordering::Relaxed),
502 cc_timeout = self.cold_cache_wait_timeout.load(Ordering::Relaxed),
503 )
504 }
505}
506
507impl Default for GatewayMetrics {
508 fn default() -> Self {
509 Self::new()
510 }
511}
512
513fn render_hour_breakdown_prom(metric_name: &str, hb: &HourBreakdown) -> String {
518 let snap = hb.snapshot();
519 let mut out = String::with_capacity(24 * 60);
520 for (h, v) in snap.iter().enumerate() {
521 out.push_str(&format!("{}{{hour=\"{:02}\"}} {}\n", metric_name, h, v));
522 }
523 out
524}
525
526impl GatewayMetrics {
527 #[must_use]
540 pub fn render_prometheus(&self) -> String {
541 let mut s = String::with_capacity(8192);
542
543 s.push_str("# HELP futu_gateway_connections_total Total accepted client connections\n");
545 s.push_str("# TYPE futu_gateway_connections_total counter\n");
546 s.push_str(&format!(
547 "futu_gateway_connections_total {}\n",
548 self.total_connections.load(Ordering::Relaxed)
549 ));
550 s.push_str(
551 "# HELP futu_gateway_disconnections_total Total client disconnections\n# TYPE futu_gateway_disconnections_total counter\n",
552 );
553 s.push_str(&format!(
554 "futu_gateway_disconnections_total {}\n",
555 self.total_disconnections.load(Ordering::Relaxed)
556 ));
557 s.push_str(
558 "# HELP futu_gateway_rejected_connections_total Connections rejected (limit hit)\n# TYPE futu_gateway_rejected_connections_total counter\n",
559 );
560 s.push_str(&format!(
561 "futu_gateway_rejected_connections_total {}\n",
562 self.rejected_connections.load(Ordering::Relaxed)
563 ));
564 s.push_str(
565 "# HELP futu_gateway_keepalive_timeouts_total KeepAlive timeout disconnects\n# TYPE futu_gateway_keepalive_timeouts_total counter\n",
566 );
567 s.push_str(&format!(
568 "futu_gateway_keepalive_timeouts_total {}\n",
569 self.keepalive_timeouts.load(Ordering::Relaxed)
570 ));
571
572 s.push_str(
574 "# HELP futu_gateway_requests_total Total handled client requests\n# TYPE futu_gateway_requests_total counter\n",
575 );
576 s.push_str(&format!(
577 "futu_gateway_requests_total {}\n",
578 self.total_requests.load(Ordering::Relaxed)
579 ));
580 s.push_str(
581 "# HELP futu_gateway_request_errors_total Handler-returned-None or decryption errors\n# TYPE futu_gateway_request_errors_total counter\n",
582 );
583 s.push_str(&format!(
584 "futu_gateway_request_errors_total {}\n",
585 self.total_request_errors.load(Ordering::Relaxed)
586 ));
587 s.push_str(
588 "# HELP futu_gateway_response_bytes_total Cumulative response payload bytes\n# TYPE futu_gateway_response_bytes_total counter\n",
589 );
590 s.push_str(&format!(
591 "futu_gateway_response_bytes_total {}\n",
592 self.total_response_bytes.load(Ordering::Relaxed)
593 ));
594
595 s.push_str(
597 "# HELP futu_gateway_backend_online Backend connection state (1=online,0=offline)\n# TYPE futu_gateway_backend_online gauge\n",
598 );
599 s.push_str(&format!(
600 "futu_gateway_backend_online {}\n",
601 self.backend_online.load(Ordering::Relaxed)
602 ));
603 s.push_str(
604 "# HELP futu_gateway_backend_reconnects_total Backend reconnect attempts\n# TYPE futu_gateway_backend_reconnects_total counter\n",
605 );
606 s.push_str(&format!(
607 "futu_gateway_backend_reconnects_total {}\n",
608 self.backend_reconnects.load(Ordering::Relaxed)
609 ));
610 s.push_str(
611 "# HELP futu_gateway_backend_reconnect_failures_total Backend reconnect failures\n# TYPE futu_gateway_backend_reconnect_failures_total counter\n",
612 );
613 s.push_str(&format!(
614 "futu_gateway_backend_reconnect_failures_total {}\n",
615 self.backend_reconnect_failures.load(Ordering::Relaxed)
616 ));
617
618 s.push_str(
620 "# HELP futu_gateway_backend_pushes_received_total Pushes received from backend\n# TYPE futu_gateway_backend_pushes_received_total counter\n",
621 );
622 s.push_str(&format!(
623 "futu_gateway_backend_pushes_received_total {}\n",
624 self.backend_pushes_received.load(Ordering::Relaxed)
625 ));
626 s.push_str(
627 "# HELP futu_gateway_client_pushes_sent_total Pushes forwarded to clients\n# TYPE futu_gateway_client_pushes_sent_total counter\n",
628 );
629 s.push_str(&format!(
630 "futu_gateway_client_pushes_sent_total {}\n",
631 self.client_pushes_sent.load(Ordering::Relaxed)
632 ));
633 s.push_str(
634 "# HELP futu_gateway_client_push_send_failures_total Client push send failures because the downstream channel was closed\n# TYPE futu_gateway_client_push_send_failures_total counter\n",
635 );
636 s.push_str(&format!(
637 "futu_gateway_client_push_send_failures_total {}\n",
638 self.client_push_send_failures.load(Ordering::Relaxed)
639 ));
640 s.push_str(
641 "# HELP futu_gateway_qot_client_push_backpressure_drops_total Quote push frames dropped for clients whose downstream channel is full\n# TYPE futu_gateway_qot_client_push_backpressure_drops_total counter\n",
642 );
643 s.push_str(&format!(
644 "futu_gateway_qot_client_push_backpressure_drops_total {}\n",
645 self.qot_client_push_backpressure_drops
646 .load(Ordering::Relaxed)
647 ));
648 s.push_str(
649 "# HELP futu_gateway_qot_client_push_backpressure_drops_by_sub_type_total Quote push frames dropped by full client channel, grouped by Qot_Common.SubType\n# TYPE futu_gateway_qot_client_push_backpressure_drops_by_sub_type_total counter\n",
650 );
651 for (sub_type, count) in self
652 .qot_client_push_backpressure_drops_per_sub_type()
653 .iter()
654 .enumerate()
655 {
656 s.push_str(&format!(
657 "futu_gateway_qot_client_push_backpressure_drops_by_sub_type_total{{sub_type=\"{}\"}} {}\n",
658 sub_type, count
659 ));
660 }
661
662 s.push_str(
665 "# HELP futu_gateway_backend_pushes_cmd_total Backend pushes by cmd_id (v1.4.83 §14)\n# TYPE futu_gateway_backend_pushes_cmd_total counter\n",
666 );
667 s.push_str(&format!(
668 "futu_gateway_backend_pushes_cmd_total{{cmd=\"6212_quote\"}} {}\n",
669 self.backend_pushes_cmd_quote.load(Ordering::Relaxed)
670 ));
671 s.push_str(&format!(
672 "futu_gateway_backend_pushes_cmd_total{{cmd=\"4716_trade_legacy\"}} {}\n",
673 self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed)
674 ));
675 s.push_str(&format!(
676 "futu_gateway_backend_pushes_cmd_total{{cmd=\"14716_trade_new\"}} {}\n",
677 self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed)
678 ));
679 s.push_str(&format!(
680 "futu_gateway_backend_pushes_cmd_total{{cmd=\"5300_msg_center\"}} {}\n",
681 self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed)
682 ));
683 s.push_str(&format!(
684 "futu_gateway_backend_pushes_cmd_total{{cmd=\"other\"}} {}\n",
685 self.backend_pushes_cmd_other.load(Ordering::Relaxed)
686 ));
687
688 s.push_str(
690 "# HELP futu_gateway_backend_pushes_cmd_quote_by_hour Cmd 6212 quote pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_quote_by_hour counter\n",
691 );
692 s.push_str(&render_hour_breakdown_prom(
693 "futu_gateway_backend_pushes_cmd_quote_by_hour",
694 &self.backend_pushes_cmd_quote_by_hour,
695 ));
696 s.push_str(
697 "# HELP futu_gateway_backend_pushes_cmd_trade_legacy_by_hour Cmd 4716 trade-legacy pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_trade_legacy_by_hour counter\n",
698 );
699 s.push_str(&render_hour_breakdown_prom(
700 "futu_gateway_backend_pushes_cmd_trade_legacy_by_hour",
701 &self.backend_pushes_cmd_trade_legacy_by_hour,
702 ));
703 s.push_str(
704 "# HELP futu_gateway_backend_pushes_cmd_trade_new_by_hour Cmd 14716 trade-new pushes per UTC hour (v1.4.84 §14 tester subject)\n# TYPE futu_gateway_backend_pushes_cmd_trade_new_by_hour counter\n",
705 );
706 s.push_str(&render_hour_breakdown_prom(
707 "futu_gateway_backend_pushes_cmd_trade_new_by_hour",
708 &self.backend_pushes_cmd_trade_new_by_hour,
709 ));
710 s.push_str(
711 "# HELP futu_gateway_backend_pushes_cmd_msg_center_by_hour Cmd 5300 msg-center pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_msg_center_by_hour counter\n",
712 );
713 s.push_str(&render_hour_breakdown_prom(
714 "futu_gateway_backend_pushes_cmd_msg_center_by_hour",
715 &self.backend_pushes_cmd_msg_center_by_hour,
716 ));
717
718 s.push_str(
720 "# HELP futu_gateway_qot_subscribe_ops_total Quote subscribe operations\n# TYPE futu_gateway_qot_subscribe_ops_total counter\n",
721 );
722 s.push_str(&format!(
723 "futu_gateway_qot_subscribe_ops_total {}\n",
724 self.qot_subscribe_ops.load(Ordering::Relaxed)
725 ));
726 s.push_str(
727 "# HELP futu_gateway_qot_unsubscribe_ops_total Quote unsubscribe operations\n# TYPE futu_gateway_qot_unsubscribe_ops_total counter\n",
728 );
729 s.push_str(&format!(
730 "futu_gateway_qot_unsubscribe_ops_total {}\n",
731 self.qot_unsubscribe_ops.load(Ordering::Relaxed)
732 ));
733 s.push_str(
734 "# HELP futu_gateway_resubscribe_ops_total Re-subscribe ops after reconnect (legacy, == resubscribe_applied_keys)\n# TYPE futu_gateway_resubscribe_ops_total counter\n",
735 );
736 s.push_str(&format!(
737 "futu_gateway_resubscribe_ops_total {}\n",
738 self.resubscribe_ops.load(Ordering::Relaxed)
739 ));
740 s.push_str(
742 "# HELP futu_gateway_resubscribe_attempts_total Re-subscribe trigger count (each reconnect/staleness loop +=1)\n# TYPE futu_gateway_resubscribe_attempts_total counter\n",
743 );
744 s.push_str(&format!(
745 "futu_gateway_resubscribe_attempts_total {}\n",
746 self.resubscribe_attempts.load(Ordering::Relaxed)
747 ));
748 s.push_str(
749 "# HELP futu_gateway_resubscribe_applied_keys_total Re-subscribe applied keys total (cache resolve OK + backend ack OK)\n# TYPE futu_gateway_resubscribe_applied_keys_total counter\n",
750 );
751 s.push_str(&format!(
752 "futu_gateway_resubscribe_applied_keys_total {}\n",
753 self.resubscribe_applied_keys.load(Ordering::Relaxed)
754 ));
755
756 s.push_str(
759 "# HELP futu_gateway_cold_cache_wait_total Cold-cache wait entries (cache miss + IsSub)\n# TYPE futu_gateway_cold_cache_wait_total counter\n",
760 );
761 s.push_str(&format!(
762 "futu_gateway_cold_cache_wait_total {}\n",
763 self.cold_cache_wait_total.load(Ordering::Relaxed)
764 ));
765 s.push_str(
766 "# HELP futu_gateway_cold_cache_wait_hit_total Cold-cache wait hits (push filled cache within timeout)\n# TYPE futu_gateway_cold_cache_wait_hit_total counter\n",
767 );
768 s.push_str(&format!(
769 "futu_gateway_cold_cache_wait_hit_total {}\n",
770 self.cold_cache_wait_hit.load(Ordering::Relaxed)
771 ));
772 s.push_str(
773 "# HELP futu_gateway_cold_cache_wait_timeout_total Cold-cache wait timeouts (3s elapsed, cache still miss)\n# TYPE futu_gateway_cold_cache_wait_timeout_total counter\n",
774 );
775 s.push_str(&format!(
776 "futu_gateway_cold_cache_wait_timeout_total {}\n",
777 self.cold_cache_wait_timeout.load(Ordering::Relaxed)
778 ));
779
780 let lat = self.latency_stats();
782 s.push_str(
783 "# HELP futu_gateway_request_latency_us Request latency percentiles (microseconds, recent ring)\n# TYPE futu_gateway_request_latency_us gauge\n",
784 );
785 s.push_str(&format!(
786 "futu_gateway_request_latency_us{{quantile=\"p50\"}} {}\n",
787 lat.p50_us
788 ));
789 s.push_str(&format!(
790 "futu_gateway_request_latency_us{{quantile=\"p95\"}} {}\n",
791 lat.p95_us
792 ));
793 s.push_str(&format!(
794 "futu_gateway_request_latency_us{{quantile=\"p99\"}} {}\n",
795 lat.p99_us
796 ));
797 s.push_str(&format!(
798 "futu_gateway_request_latency_us{{quantile=\"max\"}} {}\n",
799 lat.max_us
800 ));
801
802 s
803 }
804}
805
806pub fn install_prometheus_extension(metrics: Arc<GatewayMetrics>) {
820 futu_auth::metrics::register_global_renderer(move || metrics.render_prometheus());
821}
822
823#[cfg(test)]
824mod tests;