1use std::sync::Arc;
28use std::sync::atomic::{AtomicU32, Ordering};
29
30use bytes::Bytes;
31use tokio::sync::{broadcast, mpsc, watch};
32use tokio_stream::wrappers::ReceiverStream;
33use tonic::{Request, Response, Status};
34
35use futu_auth::{KeyStore, RuntimeCounters, Scope};
36use futu_codec::header::ProtoFmtType;
37use futu_server::conn::IncomingRequest;
38use futu_server::push::ExternalPushSink;
39use futu_server::router::RequestRouter;
40
41use crate::auth::{
42 derive_grpc_conn_id, extract_grpc_idempotency_key, extract_grpc_session_id, extract_grpc_token,
43 grpc_audit_context, grpc_status_for,
44};
45use crate::proto::futu_open_d_server::{FutuOpenD, FutuOpenDServer};
46use crate::proto::{FutuRequest, FutuResponse, PushEvent, SubscribePushRequest};
47use futu_auth_pipeline::{
48 AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, PushEventCtx, RejectKind,
49 SurfaceId, authenticate_request,
50};
51
52pub const GRPC_MAX_MESSAGE_SIZE_BYTES: usize = 12 * 1024 * 1024;
57
58#[derive(Clone)]
63pub struct GrpcPushBroadcaster {
64 tx: broadcast::Sender<PushEvent>,
65}
66
67impl GrpcPushBroadcaster {
68 pub fn new(capacity: usize) -> Self {
69 let (tx, _) = broadcast::channel(capacity);
70 Self { tx }
71 }
72
73 pub fn subscribe(&self) -> broadcast::Receiver<PushEvent> {
75 self.tx.subscribe()
76 }
77
78 fn has_receivers(&self) -> bool {
79 self.tx.receiver_count() > 0
80 }
81
82 fn send(&self, event: PushEvent) {
83 if !self.has_receivers() {
84 return;
85 }
86 let proto_id = event.proto_id;
87 let event_type = event.event_type.clone();
88 if self.tx.send(event).is_err() {
89 tracing::debug!(
90 proto_id,
91 event_type,
92 receiver_count = self.tx.receiver_count(),
93 "grpc push broadcast send skipped"
94 );
95 }
96 }
97}
98
99impl ExternalPushSink for GrpcPushBroadcaster {
100 fn on_quote_push(
105 &self,
106 sec_key: &str,
107 sub_type: i32,
108 rehab_type: i32,
109 proto_id: u32,
110 body: &[u8],
111 ) {
112 if !self.has_receivers() {
113 return;
114 }
115 self.send(PushEvent {
116 proto_id,
117 sec_key: sec_key.to_string(),
118 sub_type,
119 rehab_type,
120 body: body.to_vec(),
121 event_type: "quote".to_string(),
122 acc_id: 0,
123 trd_market: String::new(), });
125 }
126
127 fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
128 if !self.has_receivers() {
129 return;
130 }
131 self.send(PushEvent {
132 proto_id,
133 sec_key: String::new(),
134 sub_type: 0,
135 rehab_type: 0,
136 body: body.to_vec(),
137 event_type: "notify".to_string(),
138 acc_id: 0,
139 trd_market: String::new(),
140 });
141 }
142
143 fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
147 if !self.has_receivers() {
148 return;
149 }
150 self.send(PushEvent {
151 proto_id,
152 sec_key: String::new(),
153 sub_type: 0,
154 rehab_type: 0,
155 body: body.to_vec(),
156 event_type: "trade".to_string(),
157 acc_id,
158 trd_market: trd_market.unwrap_or("").to_string(),
159 });
160 }
161}
162
163pub struct FutuGrpcService {
165 router: Arc<RequestRouter>,
166 push_broadcaster: Arc<GrpcPushBroadcaster>,
167 key_store: Arc<KeyStore>,
168 counters: Arc<RuntimeCounters>,
169 filter_registry: Arc<FilterRegistry>,
173 serial_counter: AtomicU32,
179}
180
181impl FutuGrpcService {
182 pub fn new(router: Arc<RequestRouter>, push_broadcaster: Arc<GrpcPushBroadcaster>) -> Self {
183 Self::with_auth(
184 router,
185 push_broadcaster,
186 Arc::new(KeyStore::empty()),
187 Arc::new(RuntimeCounters::new()),
188 )
189 }
190
191 pub fn with_auth(
196 router: Arc<RequestRouter>,
197 push_broadcaster: Arc<GrpcPushBroadcaster>,
198 key_store: Arc<KeyStore>,
199 counters: Arc<RuntimeCounters>,
200 ) -> Self {
201 Self {
202 router,
203 push_broadcaster,
204 key_store,
205 counters,
206 filter_registry: Arc::new(FilterRegistry::with_defaults()),
207 serial_counter: AtomicU32::new(1),
208 }
209 }
210
211 fn next_serial(&self) -> u32 {
212 self.serial_counter.fetch_add(1, Ordering::Relaxed)
213 }
214}
215
216#[tonic::async_trait]
217impl FutuOpenD for FutuGrpcService {
218 async fn request(
220 &self,
221 request: Request<FutuRequest>,
222 ) -> Result<Response<FutuResponse>, Status> {
223 let proto_id = request.get_ref().proto_id;
229 if proto_id == 0 {
230 return Err(Status::invalid_argument("proto_id is required"));
231 }
232 if futu_auth::is_internal_proto_id(proto_id) {
236 tracing::warn!(
237 proto_id,
238 "rejecting daemon-internal proto_id at gRPC public surface (audit 0532 F3)"
239 );
240 return Err(Status::permission_denied(
241 "daemon-internal proto_id not allowed on public surface",
242 ));
243 }
244
245 let token = extract_grpc_token(&request);
249 let session_id = extract_grpc_session_id(&request);
250 let idempotency_key = extract_grpc_idempotency_key(&request);
251 let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
252 let audit_ctx = grpc_audit_context(&request, stable_conn_id);
253 let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) else {
256 tracing::warn!(
257 proto_id,
258 "rejecting gRPC request for proto_id not declared in EndpointSpec"
259 );
260 return Err(Status::invalid_argument(
261 "proto_id is not declared in endpoint spec",
262 ));
263 };
264 match spec.grpc_exposure() {
265 futu_surface_spec::SurfaceExposure::Exposed(
266 futu_surface_spec::GrpcSurface::GenericProtoRequest,
267 ) => {}
268 futu_surface_spec::SurfaceExposure::NotExposed { reason } => {
269 tracing::warn!(
270 proto_id,
271 endpoint = spec.canonical_name,
272 reason,
273 "rejecting gRPC request for endpoint not exposed to gRPC"
274 );
275 return Err(Status::permission_denied(reason));
276 }
277 }
278 let needed_scope = Some(spec.runtime.scope);
279 let endpoint_name = spec.canonical_name;
280 tracing::debug!(
281 proto_id,
282 endpoint = endpoint_name,
283 "gRPC request dispatch (Layer 2 spec lookup)"
284 );
285 let req_inner = request.into_inner();
286
287 let credential = match token.as_deref() {
289 Some(t) => Credential::Bearer(t),
290 None => Credential::None,
291 };
292 let env = AuthEnvelope {
293 surface: SurfaceId::Grpc,
294 endpoint: Endpoint::Proto(proto_id),
295 needed_scope,
296 credential,
297 proto_id: Some(proto_id),
298 body: &req_inner.body,
299 explicit_acc_id: None,
300 explicit_ctx: None,
301 commit_rate: true, audit_emit: true,
303 };
304 let (allowed_for_filter, caller_rec) =
305 match futu_auth::audit::with_context(audit_ctx.clone(), || {
306 authenticate_request(&self.key_store, &self.counters, env)
307 }) {
308 AuthDecision::Reject { kind, reason, .. } => {
309 return Err(grpc_status_for(kind, reason));
310 }
311 AuthDecision::Allow {
312 allowed_acc_ids,
313 rec,
314 ..
315 } => (allowed_acc_ids, rec),
316 };
317
318 let caller_allowed = allowed_for_filter
325 .as_ref()
326 .map(|s| std::sync::Arc::new(s.clone()));
327 let caller_key_id = caller_rec.as_ref().map(|r| r.id.clone());
328 let serial_no = self.next_serial();
329 let body = futu_server::trade_packet_id::fill_omitted_trade_packet_id_bytes(
330 proto_id,
331 req_inner.body,
332 stable_conn_id,
333 serial_no,
334 )
335 .map_err(|e| Status::internal(format!("failed to prepare trade PacketID: {e}")))?;
336 let incoming = IncomingRequest::builder(
337 stable_conn_id,
338 proto_id,
339 serial_no,
340 ProtoFmtType::Protobuf,
341 Bytes::from(body),
342 )
343 .with_idempotency_key(idempotency_key)
344 .with_caller_scope(caller_allowed, caller_key_id)
345 .build();
346
347 match self.router.dispatch(incoming.conn_id, &incoming).await {
348 Some(resp_bytes) => {
349 let filtered_body =
353 self.filter_registry
354 .apply(proto_id, resp_bytes, allowed_for_filter.as_ref());
355 Ok(Response::new(FutuResponse {
356 ret_type: 0,
357 ret_msg: String::new(),
358 proto_id,
359 body: filtered_body,
360 }))
361 }
362 None => Ok(Response::new(FutuResponse {
363 ret_type: -1,
364 ret_msg: "handler returned no response".to_string(),
365 proto_id,
366 body: Vec::new(),
367 })),
368 }
369 }
370
371 type SubscribePushStream = ReceiverStream<Result<PushEvent, Status>>;
372
373 async fn subscribe_push(
395 &self,
396 request: Request<SubscribePushRequest>,
397 ) -> Result<Response<Self::SubscribePushStream>, Status> {
398 let notify_subscribe = request.get_ref().notify_subscribe;
402
403 let token = extract_grpc_token(&request);
413 let session_id = extract_grpc_session_id(&request);
414 let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
415 let audit_ctx = grpc_audit_context(&request, stable_conn_id);
416 let credential = match token.as_deref() {
417 Some(t) => Credential::Bearer(t),
418 None => Credential::None,
419 };
420 let env = AuthEnvelope {
421 surface: SurfaceId::Grpc,
422 endpoint: Endpoint::Event("subscribe_push"),
423 needed_scope: None, credential,
425 proto_id: None,
426 body: &[],
427 explicit_acc_id: None,
428 explicit_ctx: None,
429 commit_rate: false,
430 audit_emit: true, };
432 let rec_snapshot = match futu_auth::audit::with_context(audit_ctx.clone(), || {
433 authenticate_request(&self.key_store, &self.counters, env)
434 }) {
435 AuthDecision::Reject { kind, reason, .. } => {
436 return Err(grpc_status_for(kind, reason));
437 }
438 AuthDecision::Allow { rec, .. } => rec,
439 };
440
441 let (scopes, key_id, allowed_acc_ids) = match rec_snapshot.as_ref() {
444 Some(rec) => {
445 let qot_ok = rec.scopes.contains(&Scope::QotRead);
446 let acc_ok = rec.scopes.contains(&Scope::AccRead);
447 if !qot_ok && !acc_ok {
448 futu_auth::audit::with_context(audit_ctx.clone(), || {
449 futu_auth::audit::reject(
450 "grpc",
451 "event=subscribe_push",
452 &rec.id,
453 "missing scope qot:read OR acc:read",
454 );
455 });
456 return Err(grpc_status_for(
457 RejectKind::Forbidden,
458 "missing scope qot:read OR acc:read".to_string(),
459 ));
460 }
461 (
462 rec.scopes.clone(),
463 rec.id.clone(),
464 rec.allowed_acc_ids.clone(),
465 )
466 }
467 None => (
468 [
470 Scope::QotRead,
471 Scope::AccRead,
472 Scope::TradeSimulate,
473 Scope::TradeReal,
474 ]
475 .into_iter()
476 .collect::<std::collections::HashSet<Scope>>(),
477 "<none>".to_string(),
478 None,
479 ),
480 };
481
482 let (tx, rx) = mpsc::channel(256);
483 let mut push_rx = self.push_broadcaster.subscribe();
484
485 tracing::info!(
486 key_id = %key_id,
487 conn_id = stable_conn_id,
490 scopes = ?scopes,
491 allowed_acc_ids = ?allowed_acc_ids.as_ref().map(|s| s.len()),
492 "gRPC client subscribed to push events",
493 );
494
495 let key_store_arc = self.key_store.clone();
505 let counters_arc = self.counters.clone();
506 let filter_registry_arc = self.filter_registry.clone();
509 tokio::spawn(async move {
510 loop {
511 match push_rx.recv().await {
512 Ok(event) => {
513 let allow_event = if let Some(rec) = rec_snapshot.as_ref() {
516 let needed = scope_for_event(&event.event_type);
517 let explicit_acc_id = if event.event_type == "trade" {
518 Some(event.acc_id)
519 } else {
520 None
521 };
522 let env = AuthEnvelope {
523 surface: SurfaceId::Grpc,
524 endpoint: Endpoint::Event(&event.event_type),
525 needed_scope: Some(needed),
526 credential: Credential::PreVerified(rec.clone()),
527 proto_id: None,
528 body: &[],
529 explicit_acc_id,
530 explicit_ctx: None,
531 commit_rate: false, audit_emit: false, };
534 matches!(
535 authenticate_request(&key_store_arc, &counters_arc, env),
536 AuthDecision::Allow { .. }
537 )
538 } else {
539 true
540 };
541
542 if !allow_event {
543 let label = if event.event_type == "trade"
547 && rec_snapshot
548 .as_ref()
549 .and_then(|r| r.allowed_acc_ids.as_ref())
550 .is_some_and(|s| !s.is_empty())
551 {
552 "trade_acc_id"
553 } else {
554 event.event_type.as_str()
555 };
556 futu_auth::metrics::bump_ws_filtered(label, &key_id);
557 continue;
558 }
559
560 let event_trd_market =
574 if event.event_type == "trade" && !event.trd_market.is_empty() {
575 Some(event.trd_market.as_str())
576 } else {
577 None
578 };
579 let allowed_markets_for_filter = rec_snapshot
580 .as_ref()
581 .and_then(|r| r.allowed_markets.as_ref());
582 let push_ctx = PushEventCtx {
583 event_type: &event.event_type,
584 event_acc: if event.event_type == "trade" {
585 Some(event.acc_id)
586 } else {
587 None
588 },
589 allowed_acc_ids: None,
591 sub_state: None,
593 event_trd_market,
595 allowed_markets: allowed_markets_for_filter,
596 };
597 if filter_registry_arc.should_drop_event(&push_ctx) {
598 futu_auth::metrics::bump_ws_filtered("trade_market", &key_id);
599 continue;
600 }
601
602 if event.event_type == "notify" && !notify_subscribe {
606 futu_auth::metrics::bump_ws_filtered("notify_unsub", &key_id);
607 continue;
608 }
609
610 if tx.send(Ok(event)).await.is_err() {
611 break; }
613 }
614 Err(broadcast::error::RecvError::Lagged(n)) => {
615 tracing::warn!(skipped = n, "gRPC push client lagged, skipped events");
616 }
618 Err(broadcast::error::RecvError::Closed) => {
619 break; }
621 }
622 }
623 tracing::info!("gRPC push stream ended");
624 });
625
626 Ok(Response::new(ReceiverStream::new(rx)))
627 }
628}
629
630fn scope_for_event(event_type: &str) -> Scope {
638 match event_type {
639 "trade" => Scope::AccRead, _ => Scope::QotRead, }
642}
643
644#[cfg(test)]
656mod tests;
657
658pub fn build_service(
660 router: Arc<RequestRouter>,
661 push_broadcaster: Arc<GrpcPushBroadcaster>,
662) -> FutuOpenDServer<FutuGrpcService> {
663 apply_grpc_message_limits(FutuOpenDServer::new(FutuGrpcService::new(
664 router,
665 push_broadcaster,
666 )))
667}
668
669pub fn build_service_with_auth(
671 router: Arc<RequestRouter>,
672 push_broadcaster: Arc<GrpcPushBroadcaster>,
673 key_store: Arc<KeyStore>,
674 counters: Arc<RuntimeCounters>,
675) -> FutuOpenDServer<FutuGrpcService> {
676 apply_grpc_message_limits(FutuOpenDServer::new(FutuGrpcService::with_auth(
677 router,
678 push_broadcaster,
679 key_store,
680 counters,
681 )))
682}
683
684fn apply_grpc_message_limits(
685 service: FutuOpenDServer<FutuGrpcService>,
686) -> FutuOpenDServer<FutuGrpcService> {
687 service
688 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE_BYTES)
689 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE_BYTES)
690}
691
692pub async fn start(
694 listen_addr: &str,
695 router: Arc<RequestRouter>,
696 push_broadcaster: Arc<GrpcPushBroadcaster>,
697) -> Result<(), Box<dyn std::error::Error>> {
698 start_with_auth(
699 listen_addr,
700 router,
701 push_broadcaster,
702 Arc::new(KeyStore::empty()),
703 Arc::new(RuntimeCounters::new()),
704 )
705 .await
706}
707
708pub async fn start_with_auth(
710 listen_addr: &str,
711 router: Arc<RequestRouter>,
712 push_broadcaster: Arc<GrpcPushBroadcaster>,
713 key_store: Arc<KeyStore>,
714 counters: Arc<RuntimeCounters>,
715) -> Result<(), Box<dyn std::error::Error>> {
716 let addr = listen_addr
717 .parse()
718 .map_err(|e| format!("invalid addr: {e}"))?;
719 if !key_store.is_configured() {
720 tracing::warn!(
721 "gRPC server running WITHOUT API key auth (legacy mode); \
722 all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
723 );
724 }
725 let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
726 tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
727 tonic::transport::Server::builder()
728 .add_service(service)
729 .serve(addr)
730 .await?;
731 Ok(())
732}
733
734pub async fn start_with_auth_until_shutdown(
737 listen_addr: &str,
738 router: Arc<RequestRouter>,
739 push_broadcaster: Arc<GrpcPushBroadcaster>,
740 key_store: Arc<KeyStore>,
741 counters: Arc<RuntimeCounters>,
742 shutdown_rx: watch::Receiver<bool>,
743) -> Result<(), Box<dyn std::error::Error>> {
744 let addr = listen_addr
745 .parse()
746 .map_err(|e| format!("invalid addr: {e}"))?;
747 if !key_store.is_configured() {
748 tracing::warn!(
749 "gRPC server running WITHOUT API key auth (legacy mode); \
750 all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
751 );
752 }
753 let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
754 tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
755 tonic::transport::Server::builder()
756 .add_service(service)
757 .serve_with_shutdown(addr, grpc_shutdown_requested(shutdown_rx))
758 .await?;
759 Ok(())
760}
761
762async fn grpc_shutdown_requested(mut shutdown_rx: watch::Receiver<bool>) {
763 loop {
764 if *shutdown_rx.borrow() {
765 tracing::info!("gRPC server stopped by shutdown signal");
766 return;
767 }
768 if shutdown_rx.changed().await.is_err() {
769 tracing::info!("gRPC server stopped after shutdown sender dropped");
770 return;
771 }
772 }
773}