1use anyhow::{Context, Result};
9use std::sync::Arc;
10
11use futu_gateway_core::bridge::GatewayBridge;
12use futu_server::ws_listener::{WsServer, WsServerDeps};
13
14use crate::config::RuntimeConfig;
15use crate::startup::phase1::Phase1Out;
16use crate::startup::phase3::Phase3Out;
17
18mod shutdown;
19#[cfg(test)]
20mod tests;
21
22use shutdown::{
23 BackgroundJoinHandle, SHUTDOWN_GRACE_PERIOD, SurfaceJoinHandle, await_background_shutdown,
24 await_surface_shutdown, log_surface_task_result, request_surface_shutdown,
25 surface_task_result_or_pending,
26};
27
28fn merge_push_health_snapshots_for_rest(
29 push: serde_json::Value,
30 qot_login: serde_json::Value,
31 backend_connected: bool,
32) -> serde_json::Value {
33 let mut combined = match push {
34 serde_json::Value::Object(map) => map,
35 other => {
36 let mut map = serde_json::Map::new();
37 map.insert(
38 "error".to_string(),
39 serde_json::Value::String(format!(
40 "push_health snapshot serialized to non-object {}",
41 json_value_kind(&other)
42 )),
43 );
44 map
45 }
46 };
47 combined.insert("qot_login_health".to_string(), qot_login);
48 combined.insert(
49 "backend_connected".to_string(),
50 serde_json::Value::Bool(backend_connected),
51 );
52 serde_json::Value::Object(combined)
53}
54
55fn json_value_kind(value: &serde_json::Value) -> &'static str {
56 match value {
57 serde_json::Value::Null => "null",
58 serde_json::Value::Bool(_) => "bool",
59 serde_json::Value::Number(_) => "number",
60 serde_json::Value::String(_) => "string",
61 serde_json::Value::Array(_) => "array",
62 serde_json::Value::Object(_) => "object",
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67struct LegacyNetworkExposureWarning {
68 surface: &'static str,
69 key_flag: &'static str,
70}
71
72fn is_loopback_bind(ip: &str) -> bool {
73 matches!(ip, "127.0.0.1" | "localhost" | "::1") || ip.starts_with("127.")
74}
75
76fn legacy_network_exposure_warnings(
77 ip: &str,
78 any_keys_configured: bool,
79 allow_tcp_unauthenticated: bool,
80 rest_key_configured: Option<bool>,
81 grpc_key_configured: Option<bool>,
82 ws_key_configured: Option<bool>,
83) -> Vec<LegacyNetworkExposureWarning> {
84 if is_loopback_bind(ip) {
85 return Vec::new();
86 }
87
88 let mut warnings = Vec::new();
89 if !any_keys_configured || allow_tcp_unauthenticated {
90 warnings.push(LegacyNetworkExposureWarning {
91 surface: "FTAPI TCP",
92 key_flag: "--rest-keys-file/--grpc-keys-file/--ws-keys-file",
93 });
94 }
95 for (enabled_keyed, surface, key_flag) in [
96 (rest_key_configured, "REST", "--rest-keys-file"),
97 (grpc_key_configured, "gRPC", "--grpc-keys-file"),
98 (ws_key_configured, "WS", "--ws-keys-file"),
99 ] {
100 if enabled_keyed == Some(false) {
101 warnings.push(LegacyNetworkExposureWarning { surface, key_flag });
102 }
103 }
104 warnings
105}
106
107fn warn_legacy_network_exposure(config: &RuntimeConfig, any_keys_configured: bool) {
108 for warning in legacy_network_exposure_warnings(
109 &config.ip,
110 any_keys_configured,
111 config.allow_tcp_unauthenticated,
112 config.rest_port.map(|_| config.rest_keys_file.is_some()),
113 config.grpc_port.map(|_| config.grpc_keys_file.is_some()),
114 config.websocket_port.map(|_| config.ws_keys_file.is_some()),
115 ) {
116 tracing::warn!(
117 bind_ip = %config.ip,
118 surface = warning.surface,
119 key_flag = warning.key_flag,
120 "legacy network exposure: surface is reachable on a non-loopback bind without API-key auth"
121 );
122 eprintln!(
123 "⚠️ {} legacy mode is reachable on {} without API-key auth. \
124 For production, bind 127.0.0.1 or configure {}.",
125 warning.surface, config.ip, warning.key_flag
126 );
127 }
128}
129
130pub(super) async fn run_phase4(
131 config: &RuntimeConfig,
132 phase1: Phase1Out,
133 bridge: Arc<GatewayBridge>,
134 phase3: Phase3Out,
135 shutdown_tx: tokio::sync::watch::Sender<bool>,
136 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
137) -> Result<()> {
138 let Phase1Out {
139 _audit_guard,
140 shared_counters,
141 listen_addr,
142 rest_keys_file,
143 ws_keys_file,
144 grpc_keys_file,
145 allow_tcp_unauthenticated,
146 } = phase1;
147 let Phase3Out {
148 server,
149 server_config,
150 ws_broadcaster,
151 grpc_broadcaster,
152 } = phase3;
153
154 let mut ws_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
158 let mut rest_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
159 let mut grpc_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
160 let any_keys_configured =
161 rest_keys_file.is_some() || grpc_keys_file.is_some() || ws_keys_file.is_some();
162 warn_legacy_network_exposure(config, any_keys_configured);
163
164 let mut ws_handle = if let Some(ws_port) = config.websocket_port {
166 let ws_addr = format!("{}:{}", config.ip, ws_port);
167 let ws_key_store = match &ws_keys_file {
175 Some(path) => match futu_auth::KeyStore::load(path) {
176 Ok(ks) => {
177 tracing::info!(
178 path = %path.display(),
179 keys_loaded = ks.len(),
180 "WS keys file loaded (Bearer/?token auth enabled)"
181 );
182 Some(std::sync::Arc::new(ks))
183 }
184 Err(e) => {
185 tracing::error!(
187 error = %e,
188 path = %path.display(),
189 "failed to load WS keys file (--ws-keys-file 明确指定 → fail-closed). \
190 daemon abort. fix the keys file then restart. \
191 不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
192 );
193 return Err(anyhow::anyhow!(
194 "failed to load WS keys file {}: {e}. \
195 --ws-keys-file 明确指定 → fail-closed (BUG-007).",
196 path.display()
197 ));
198 }
199 },
200 None => None,
201 };
202 let ws_counters = std::sync::Arc::clone(&shared_counters);
203 ws_key_store_holder = ws_key_store.as_ref().map(std::sync::Arc::clone);
205 let ws_server = WsServer::with_auth(
206 ws_addr.clone(),
207 server_config.clone(),
208 WsServerDeps::new(
209 std::sync::Arc::clone(server.connections()),
210 std::sync::Arc::clone(server.router()),
211 Some(bridge.subscription_runtime().manager()),
212 ),
213 ws_key_store,
214 Some(ws_counters),
215 )
216 .with_server_time_offset_secs(bridge.server_clock().offset_secs());
217 tracing::info!(addr = %ws_addr, "starting WebSocket server");
218 let ws_shutdown_rx = shutdown_rx.clone();
219 Some(tokio::spawn(async move {
220 ws_server
221 .run_until_shutdown(ws_shutdown_rx)
222 .await
223 .context("WebSocket server error")
224 }))
225 } else {
226 None
227 };
228
229 let mut rest_handle = if let Some(rest_port) = config.rest_port {
231 let rest_addr = format!("{}:{}", config.ip, rest_port);
232 let router = std::sync::Arc::clone(server.router());
233 let broadcaster = std::sync::Arc::clone(&ws_broadcaster);
234 let rest_key_store = match &rest_keys_file {
236 Some(path) => match futu_auth::KeyStore::load(path) {
237 Ok(ks) => {
238 tracing::info!(
239 path = %path.display(),
240 keys_loaded = ks.len(),
241 "REST keys file loaded (Bearer auth enabled)"
242 );
243 std::sync::Arc::new(ks)
244 }
245 Err(e) => {
246 tracing::error!(
247 error = %e,
248 path = %path.display(),
249 "failed to load REST keys file (--rest-keys-file 明确指定 → fail-closed). \
250 daemon abort. fix the keys file then restart. \
251 不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
252 );
253 return Err(anyhow::anyhow!(
254 "failed to load REST keys file {}: {e}. \
255 --rest-keys-file 明确指定 → fail-closed (BUG-007).",
256 path.display()
257 ));
258 }
259 },
260 None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
261 };
262 tracing::info!(addr = %rest_addr, "starting REST API server (WebSocket: /ws)");
263
264 if rest_key_store.is_configured() {
268 rest_key_store_holder = Some(std::sync::Arc::clone(&rest_key_store));
269 }
270
271 let rest_counters = std::sync::Arc::clone(&shared_counters);
278 let bridge_for_status = std::sync::Arc::clone(&bridge);
282 let admin_status_provider: futu_rest::adapter::AdminStatusProvider =
283 std::sync::Arc::new(move || {
284 serde_json::to_value(bridge_for_status.snapshot_status())
285 .unwrap_or_else(|_| serde_json::json!({"error": "snapshot serialize failed"}))
286 });
287 let rest_shutdown_tx = shutdown_tx.clone();
288 let admin_shutdown_handler: futu_rest::adapter::AdminShutdownHandler =
289 std::sync::Arc::new(move || {
290 rest_shutdown_tx
291 .send(true)
292 .map_err(|e| format!("shutdown receiver dropped: {e}"))
293 });
294 let bridge_for_reload = std::sync::Arc::clone(&bridge);
302 let admin_reload_handler: futu_rest::adapter::AdminReloadHandler =
303 std::sync::Arc::new(move || {
304 let bridge = std::sync::Arc::clone(&bridge_for_reload);
305 Box::pin(async move {
306 serde_json::to_value(bridge.reload())
307 .unwrap_or_else(|_| serde_json::json!({"error": "reload serialize failed"}))
308 })
309 });
310 let bridge_for_push_health = std::sync::Arc::clone(&bridge);
317 let push_health_snapshot_provider: futu_rest::adapter::PushHealthSnapshotProvider =
318 std::sync::Arc::new(move || {
319 let push = serde_json::to_value(
320 bridge_for_push_health
321 .push_runtime()
322 .push_health()
323 .snapshot(),
324 )
325 .unwrap_or_else(
326 |_| serde_json::json!({"error": "push_health snapshot serialize failed"}),
327 );
328 let qot_login = serde_json::to_value(
329 bridge_for_push_health
330 .push_runtime()
331 .qot_login_health()
332 .snapshot(),
333 )
334 .unwrap_or_else(
335 |_| serde_json::json!({"error": "qot_login_health snapshot serialize failed"}),
336 );
337 let backend_connected =
338 bridge_for_push_health.broker_runtime().platform_connected();
339 merge_push_health_snapshots_for_rest(push, qot_login, backend_connected)
340 });
341 let bridge_for_card_num = std::sync::Arc::clone(&bridge);
347 let card_num_resolver: futu_rest::adapter::CardNumResolver =
348 std::sync::Arc::new(move |cn: &str| {
349 bridge_for_card_num
350 .caches()
351 .trd_cache
352 .find_acc_ids_by_card_num(cn)
353 });
354 let rest_shutdown_rx = shutdown_rx.clone();
355 Some(tokio::spawn(async move {
356 futu_rest::server::start_with_auth_full_admin_until_shutdown(
357 &rest_addr,
358 router,
359 broadcaster,
360 rest_key_store,
361 rest_counters,
362 futu_rest::server::RestAdminHooks {
363 admin_status_provider: Some(admin_status_provider),
364 admin_shutdown_handler: Some(admin_shutdown_handler),
365 admin_reload_handler: Some(admin_reload_handler),
366 push_health_snapshot_provider: Some(push_health_snapshot_provider),
367 card_num_resolver: Some(card_num_resolver),
368 },
369 rest_shutdown_rx,
370 )
371 .await
372 .context("REST API server error")
373 }))
374 } else {
375 None
376 };
377
378 let mut grpc_handle = if let Some(grpc_port) = config.grpc_port {
380 let grpc_addr = format!("{}:{}", config.ip, grpc_port);
381 let router = std::sync::Arc::clone(server.router());
382 let broadcaster = std::sync::Arc::clone(&grpc_broadcaster);
383 let grpc_key_store = match &grpc_keys_file {
385 Some(path) => match futu_auth::KeyStore::load(path) {
386 Ok(ks) => {
387 tracing::info!(
388 path = %path.display(),
389 keys_loaded = ks.len(),
390 "gRPC keys file loaded (Bearer auth enabled)"
391 );
392 std::sync::Arc::new(ks)
393 }
394 Err(e) => {
395 tracing::error!(
396 error = %e,
397 path = %path.display(),
398 "failed to load gRPC keys file (--grpc-keys-file 明确指定 → fail-closed). \
399 daemon abort. fix the keys file then restart. \
400 不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
401 );
402 return Err(anyhow::anyhow!(
403 "failed to load gRPC keys file {}: {e}. \
404 --grpc-keys-file 明确指定 → fail-closed (BUG-007).",
405 path.display()
406 ));
407 }
408 },
409 None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
410 };
411 tracing::info!(addr = %grpc_addr, "starting gRPC server (SubscribePush: streaming)");
412
413 if grpc_key_store.is_configured() {
415 grpc_key_store_holder = Some(std::sync::Arc::clone(&grpc_key_store));
416 }
417
418 let grpc_counters = std::sync::Arc::clone(&shared_counters);
423 let grpc_shutdown_rx = shutdown_rx.clone();
424 Some(tokio::spawn(async move {
425 futu_grpc::server::start_with_auth_until_shutdown(
426 &grpc_addr,
427 router,
428 broadcaster,
429 grpc_key_store,
430 grpc_counters,
431 grpc_shutdown_rx,
432 )
433 .await
434 .map_err(|e| anyhow::anyhow!("gRPC server error: {e}"))
435 }))
436 } else {
437 None
438 };
439
440 let mut telnet_handle = if let Some(telnet_port) = config.telnet_port {
442 let telnet_addr = format!("{}:{}", config.ip, telnet_port);
443 let bridge_for_relogin = std::sync::Arc::clone(&bridge);
447 let relogin_fn: futu_server::telnet::ReloginFn = std::sync::Arc::new(move || {
448 tracing::warn!(
449 "v1.4.97 P1-D-F: telnet relogin clearing login_cache; \
450 next P1-D tick will trigger AuthRefresher relogin"
451 );
452 bridge_for_relogin.caches().login_cache.clear();
453 });
454 let telnet_server = futu_server::telnet::TelnetServer::new(
455 telnet_addr.clone(),
456 std::sync::Arc::clone(server.connections()),
457 Some(bridge.subscription_runtime().manager()),
458 Some(std::sync::Arc::clone(server.metrics())),
459 shutdown_tx.clone(),
460 )
461 .with_relogin_fn(relogin_fn);
462 tracing::info!(addr = %telnet_addr, "starting Telnet server");
463 let telnet_shutdown_rx = shutdown_rx.clone();
464 Some(tokio::spawn(async move {
465 telnet_server
466 .run_until_shutdown(telnet_shutdown_rx)
467 .await
468 .context("Telnet server error")
469 }))
470 } else {
471 None
472 };
473
474 tracing::info!("gateway ready, accepting connections on {listen_addr}");
475 tracing::info!("press Ctrl+C to exit");
476
477 let card_num_reload_and_expand_fn: std::sync::Arc<dyn Fn(bool) + Send + Sync> = {
495 let bridge_for_expand = std::sync::Arc::clone(&bridge);
496 let ws_ks = ws_key_store_holder.clone();
497 let rest_ks = rest_key_store_holder.clone();
498 let grpc_ks = grpc_key_store_holder.clone();
499 std::sync::Arc::new(move |do_reload: bool| {
500 if do_reload {
502 for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
503 let Some(ks) = ks_opt.as_ref() else { continue };
504 match ks.reload() {
505 Ok(()) => tracing::warn!(
506 ks = ks_name,
507 keys_loaded = ks.len(),
508 "v1.4.103 F3.1: keys reloaded on SIGHUP (before card_num expand)"
509 ),
510 Err(e) => tracing::error!(
511 ks = ks_name,
512 error = %e,
513 "v1.4.103 F3.1: keys reload failed (skipping expand for this store)"
514 ),
515 }
516 }
517 }
518 let trd_cache = std::sync::Arc::clone(&bridge_for_expand.caches().trd_cache);
520 let resolver = {
521 let cache_clone = std::sync::Arc::clone(&trd_cache);
522 move |cn: &str| cache_clone.find_acc_ids_by_card_num(cn)
523 };
524 for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
525 let Some(ks) = ks_opt.as_ref() else { continue };
526 let (resolved, unresolved, ambiguous) = ks.expand_allowed_card_nums(
527 &resolver,
528 |key_id, cn| {
529 tracing::warn!(
530 key_id = %key_id,
531 card_num = %cn,
532 "v1.4.103 B10/F1 fail-closed: card_num not found in trd_cache; \
533 writing sentinel acc_id=0 to enforce restrictive denylist \
534 (limits.contains check 永远 false → reject 真账户)"
535 );
536 },
537 |key_id, cn, candidates| {
538 tracing::warn!(
539 key_id = %key_id,
540 card_num = %cn,
541 candidates = ?candidates,
542 "v1.4.103 B10/F1 fail-closed: ambiguous card_num suffix \
543 matched multiple accounts (skipped, write 完整 16 位 / specific 4 位)"
544 );
545 },
546 );
547 tracing::info!(
548 ks = ks_name,
549 resolved,
550 unresolved,
551 ambiguous,
552 "v1.4.103 B10: expanded allowed_card_nums into allowed_acc_ids"
553 );
554 }
555 })
556 };
557 let card_num_expand_fn: std::sync::Arc<dyn Fn() + Send + Sync> = {
559 let inner = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
560 std::sync::Arc::new(move || (inner)(false))
561 };
562
563 (card_num_expand_fn)();
565
566 let card_num_retry_handle: BackgroundJoinHandle = {
568 let card_num_expand_fn_loop = std::sync::Arc::clone(&card_num_expand_fn);
569 let bridge_for_check = std::sync::Arc::clone(&bridge);
570 let mut card_num_retry_shutdown_rx = shutdown_rx.clone();
571 tokio::spawn(async move {
572 let trd_cache = std::sync::Arc::clone(&bridge_for_check.caches().trd_cache);
573 let mut attempts = 0u32;
574 let max_attempts = 6u32; loop {
576 tokio::select! {
577 changed = card_num_retry_shutdown_rx.changed() => {
578 if changed.is_err() || *card_num_retry_shutdown_rx.borrow() {
579 tracing::debug!(
580 "v1.4.111: card_num retry loop received shutdown signal"
581 );
582 return;
583 }
584 }
585 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {}
586 }
587 attempts += 1;
588 let accounts = trd_cache.get_accounts();
589 if accounts.is_empty() {
590 if attempts >= max_attempts {
591 tracing::warn!(
592 "v1.4.103 B10: trd_cache 仍空 (after {max_attempts} × 10s); \
593 受限 key 仍走 fail-closed sentinel reject 直到 SIGHUP / cache 加载."
594 );
595 return;
596 }
597 continue;
598 }
599 (card_num_expand_fn_loop)();
600 return;
601 }
602 })
603 };
604
605 #[cfg(unix)]
614 let sighup_handle: Option<BackgroundJoinHandle> = {
615 let unified_sighup_fn = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
616 let mut sighup_shutdown_rx = shutdown_rx.clone();
617 Some(tokio::spawn(async move {
618 use tokio::signal::unix::{SignalKind, signal};
619 let mut sig = match signal(SignalKind::hangup()) {
620 Ok(s) => s,
621 Err(e) => {
622 tracing::error!(error = %e, "SIGHUP install failed (unified reload+expand)");
623 return;
624 }
625 };
626 tracing::info!(
627 "v1.4.103 F3.1: unified SIGHUP handler installed (reload all keys + expand card_num)"
628 );
629 loop {
630 tokio::select! {
631 signal = sig.recv() => {
632 if signal.is_none() {
633 return;
634 }
635 tracing::info!(
636 "v1.4.103 F3.1: SIGHUP received — running reload_all_stores + \
637 expand_allowed_card_nums (single ordered op, no race)"
638 );
639 (unified_sighup_fn)(true); }
641 changed = sighup_shutdown_rx.changed() => {
642 if changed.is_err() || *sighup_shutdown_rx.borrow() {
643 tracing::debug!(
644 "v1.4.111: unified SIGHUP handler received shutdown signal"
645 );
646 return;
647 }
648 }
649 }
650 }
651 }))
652 };
653 #[cfg(not(unix))]
654 let sighup_handle: Option<BackgroundJoinHandle> = None;
655
656 let tcp_disabled = any_keys_configured && !allow_tcp_unauthenticated;
667
668 if tcp_disabled {
669 tracing::warn!(
670 listen_addr = %listen_addr,
671 "v1.4.104 external report S-001 (P0) fix: TCP listener (port {}) NOT started — \
672 keys file configured but --allow-tcp-unauthenticated not set. \
673 native TCP FTAPI protocol has no Bearer field, cannot enforce \
674 caller-specific scope check; defaulting to fail-closed (skip TCP). \
675 Use REST/gRPC/WS endpoints for authenticated access. \
676 To restore TCP (legacy Python SDK clients) add --allow-tcp-unauthenticated, \
677 but be aware that port {} will accept ANY local connection without \
678 scope check (跨账户 leak risk).",
679 config.port,
680 config.port,
681 );
682 eprintln!(
683 "⚠️ TCP listener (port {}) DISABLED (v1.4.104 external report S-001 fix): \
684 keys file configured + no --allow-tcp-unauthenticated. \
685 Pass --allow-tcp-unauthenticated to restore (with security warning).",
686 config.port,
687 );
688 } else if any_keys_configured && allow_tcp_unauthenticated {
689 tracing::warn!(
690 listen_addr = %listen_addr,
691 "⚠️ v1.4.104: TCP listener running WITHOUT scope check despite keys configured \
692 (--allow-tcp-unauthenticated set). Port {} accepts ANY local connection — \
693 跨账户 leak risk. Use REST/gRPC/WS for authenticated clients; reserve \
694 TCP only for legacy Python SDK / C++ OpenD where Bearer not feasible.",
695 config.port,
696 );
697 eprintln!(
698 "⚠️ TCP port {} ACCEPTS UNAUTHENTICATED connections (--allow-tcp-unauthenticated). \
699 受限 keys 不在该 surface 强制. 推荐改用 REST/gRPC/WS.",
700 config.port,
701 );
702 }
703
704 let tcp_shutdown_rx = shutdown_rx.clone();
705 let mut tcp_handle: Option<SurfaceJoinHandle> = if tcp_disabled {
706 None
707 } else {
708 Some(tokio::spawn(async move {
709 server
710 .run_until_shutdown(tcp_shutdown_rx)
711 .await
712 .context("API server error")
713 }))
714 };
715
716 let phase4_result: anyhow::Result<()> = tokio::select! {
718 result = surface_task_result_or_pending(&mut tcp_handle) => {
719 tcp_handle = None;
720 log_surface_task_result("API server", result)
721 }
722 result = surface_task_result_or_pending(&mut ws_handle) => {
723 ws_handle = None;
724 log_surface_task_result("WebSocket", result)
725 }
726 result = surface_task_result_or_pending(&mut rest_handle) => {
727 rest_handle = None;
728 log_surface_task_result("REST API", result)
729 }
730 result = surface_task_result_or_pending(&mut grpc_handle) => {
731 grpc_handle = None;
732 log_surface_task_result("gRPC", result)
733 }
734 result = surface_task_result_or_pending(&mut telnet_handle) => {
735 telnet_handle = None;
736 log_surface_task_result("Telnet", result)
737 }
738 _ = tokio::signal::ctrl_c() => {
739 tracing::info!("received Ctrl+C, shutting down gracefully...");
740 Ok(())
741 }
742 _ = async {
743 while shutdown_rx.changed().await.is_ok() {
744 if *shutdown_rx.borrow() {
745 break;
746 }
747 }
748 } => {
749 tracing::info!("shutdown requested via telnet");
750 Ok(())
751 }
752 };
753
754 request_surface_shutdown(&shutdown_tx, "phase4 exit");
756 await_background_shutdown(
757 "card_num retry",
758 card_num_retry_handle,
759 SHUTDOWN_GRACE_PERIOD,
760 )
761 .await;
762 if let Some(handle) = sighup_handle {
763 await_background_shutdown("SIGHUP reload", handle, SHUTDOWN_GRACE_PERIOD).await;
764 }
765 if let Some(handle) = tcp_handle {
766 await_surface_shutdown("API server", handle, SHUTDOWN_GRACE_PERIOD).await;
767 }
768 if let Some(handle) = ws_handle {
769 await_surface_shutdown("WebSocket", handle, SHUTDOWN_GRACE_PERIOD).await;
770 }
771 if let Some(handle) = rest_handle {
772 await_surface_shutdown("REST API", handle, SHUTDOWN_GRACE_PERIOD).await;
773 }
774 if let Some(handle) = grpc_handle {
775 await_surface_shutdown("gRPC", handle, SHUTDOWN_GRACE_PERIOD).await;
776 }
777 if let Some(handle) = telnet_handle {
778 await_surface_shutdown("Telnet", handle, SHUTDOWN_GRACE_PERIOD).await;
779 }
780
781 phase4_result?;
782 tracing::info!("gateway stopped");
783 Ok(())
784}