Skip to main content

futu_grpc/
server.rs

1//! gRPC 服务实现
2//!
3//! FutuOpenD 服务通过通用的 proto_id + body 方式,
4//! 将所有请求转发到现有的 RequestRouter。
5//! 支持流式推送:行情、交易、广播事件通过 SubscribePush 接口推送给客户端。
6//!
7//! ## v1.4.106 codex 0517 ζ25-redo F2: stateful QOT stable identity
8//!
9//! gRPC `request()` 与 `subscribe_push()` 共享同一身份派生函数
10//! [`auth::derive_grpc_conn_id`]:从 `(Bearer token, optional grpc-session-id
11//! metadata)` 派生 deterministic stable conn_id(在
12//! [`auth::GRPC_STABLE_CONN_NAMESPACE`] 即 bit 62 namespace 内)。
13//!
14//! 这让同一 caller 的连续 RPC 命中同一 SubscriptionManager / cache 状态:
15//! - subscribe → unsubscribe / get_sub_info / query_subscription 全对齐
16//! - quote cache 命中(per-conn cache 不再每次 RPC miss)
17//! - QOT push fanout per-conn filter 能找到 caller
18//!
19//! 不同 Bearer / 不同 session_id → 自然隔离(caller 之间互不影响)。
20//! Legacy mode(KeyStore 未配置 / Bearer 缺失)→ 共享一个固定 conn_id(
21//! 全 legacy caller 共享同 sub state,对齐"无鉴权配置 = 单租户"语义)。
22//!
23//! 历史:v1.4.105 之前用自增 `conn_id_counter`,每次 RPC 拿新 ID,
24//! 等价 REST v1.4.90 P0-B 之前的 quota 永久泄漏 bug。F2 修复对齐
25//! `REST_SHARED_CONN` 设计哲学,不同点是 gRPC 按 caller 隔离(REST 共享)。
26
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU32, Ordering};
29
30use bytes::Bytes;
31use tokio::sync::{broadcast, mpsc, watch};
32use tokio_stream::wrappers::ReceiverStream;
33use tonic::{Request, Response, Status};
34
35use futu_auth::{KeyStore, RuntimeCounters, Scope};
36use futu_codec::header::ProtoFmtType;
37use futu_server::conn::IncomingRequest;
38use futu_server::push::ExternalPushSink;
39use futu_server::router::RequestRouter;
40
41use crate::auth::{
42    derive_grpc_conn_id, extract_grpc_idempotency_key, extract_grpc_session_id, extract_grpc_token,
43    grpc_audit_context, grpc_status_for,
44};
45use crate::proto::futu_open_d_server::{FutuOpenD, FutuOpenDServer};
46use crate::proto::{FutuRequest, FutuResponse, PushEvent, SubscribePushRequest};
47use futu_auth_pipeline::{
48    AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, PushEventCtx, RejectKind,
49    SurfaceId, authenticate_request,
50};
51
52/// Explicit gRPC message boundary.
53///
54/// Native FTAPI frames are capped at 12 MiB; keeping gRPC at the same ceiling
55/// prevents relying on tonic defaults while preserving protocol-sized payloads.
56pub const GRPC_MAX_MESSAGE_SIZE_BYTES: usize = 12 * 1024 * 1024;
57
58/// gRPC 推送广播器
59///
60/// 实现 `ExternalPushSink` trait,接收 PushDispatcher 的推送事件,
61/// 通过 broadcast channel 分发给所有 SubscribePush 流式连接。
62#[derive(Clone)]
63pub struct GrpcPushBroadcaster {
64    tx: broadcast::Sender<PushEvent>,
65}
66
67impl GrpcPushBroadcaster {
68    pub fn new(capacity: usize) -> Self {
69        let (tx, _) = broadcast::channel(capacity);
70        Self { tx }
71    }
72
73    /// 创建接收端
74    pub fn subscribe(&self) -> broadcast::Receiver<PushEvent> {
75        self.tx.subscribe()
76    }
77
78    fn has_receivers(&self) -> bool {
79        self.tx.receiver_count() > 0
80    }
81
82    fn send(&self, event: PushEvent) {
83        if !self.has_receivers() {
84            return;
85        }
86        let proto_id = event.proto_id;
87        let event_type = event.event_type.clone();
88        if self.tx.send(event).is_err() {
89            tracing::debug!(
90                proto_id,
91                event_type,
92                receiver_count = self.tx.receiver_count(),
93                "grpc push broadcast send skipped"
94            );
95        }
96    }
97}
98
99impl ExternalPushSink for GrpcPushBroadcaster {
100    /// **v1.4.106 codex 1131 F4 [P1]**: `rehab_type` 透传到 gRPC PushEvent.
101    /// gRPC 当前 broadcast 模型 (所有 qot:read subscriber 收到所有 quote
102    /// event), per-conn (sec_key, sub_type, rehab_type) 三元 filter 是 raw TCP
103    /// 专属. rehab_type 通过 PushEvent 让客户端可见.
104    fn on_quote_push(
105        &self,
106        sec_key: &str,
107        sub_type: i32,
108        rehab_type: i32,
109        proto_id: u32,
110        body: &[u8],
111    ) {
112        if !self.has_receivers() {
113            return;
114        }
115        self.send(PushEvent {
116            proto_id,
117            sec_key: sec_key.to_string(),
118            sub_type,
119            rehab_type,
120            body: body.to_vec(),
121            event_type: "quote".to_string(),
122            acc_id: 0,
123            trd_market: String::new(), // 行情推送无 trd_market
124        });
125    }
126
127    fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
128        if !self.has_receivers() {
129            return;
130        }
131        self.send(PushEvent {
132            proto_id,
133            sec_key: String::new(),
134            sub_type: 0,
135            rehab_type: 0,
136            body: body.to_vec(),
137            event_type: "notify".to_string(),
138            acc_id: 0,
139            trd_market: String::new(),
140        });
141    }
142
143    /// v1.4.105 D3 (Phase 4) T-B3: trade push trd_market 由 PushDispatcher 一
144    /// 次 decode 后透传, 不再各 sink 独立 decode body. 空 / unknown → 空
145    /// 字符串 (PushEvent proto3 默认值, 老 client 解析兼容).
146    fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
147        if !self.has_receivers() {
148            return;
149        }
150        self.send(PushEvent {
151            proto_id,
152            sec_key: String::new(),
153            sub_type: 0,
154            rehab_type: 0,
155            body: body.to_vec(),
156            event_type: "trade".to_string(),
157            acc_id,
158            trd_market: trd_market.unwrap_or("").to_string(),
159        });
160    }
161}
162
163/// gRPC 服务实现
164pub struct FutuGrpcService {
165    router: Arc<RequestRouter>,
166    push_broadcaster: Arc<GrpcPushBroadcaster>,
167    key_store: Arc<KeyStore>,
168    counters: Arc<RuntimeCounters>,
169    /// v1.4.104: response filter 注册中心 (proto 2001 = AccListFilter).
170    /// 加新 filter 在 `FilterRegistry::install_defaults` 注册一次, 4 surface
171    /// 自动生效.
172    filter_registry: Arc<FilterRegistry>,
173    /// v1.4.106 codex 0517 ζ25-redo F2: gRPC `conn_id_counter` 已删除 —
174    /// 自增 conn_id 让同一 caller 连续 RPC 拿不到同 sub state, 等价 REST
175    /// v1.4.90 P0-B 之前的 quota 永久泄漏 bug. 现在改用
176    /// [`auth::derive_grpc_conn_id`] 从 `(bearer, session_id)` 派生
177    /// deterministic stable conn_id, 同 caller 连续 RPC 命中同一 sub state.
178    serial_counter: AtomicU32,
179}
180
181impl FutuGrpcService {
182    pub fn new(router: Arc<RequestRouter>, push_broadcaster: Arc<GrpcPushBroadcaster>) -> Self {
183        Self::with_auth(
184            router,
185            push_broadcaster,
186            Arc::new(KeyStore::empty()),
187            Arc::new(RuntimeCounters::new()),
188        )
189    }
190
191    /// 完整构造:同时接 key_store + counters(v1.0 推荐入口)
192    ///
193    /// `counters` 应由 main 全进程共享:REST / gRPC / MCP 共用一个实例才能保证
194    /// rate limit / 日累计跨接口一致
195    pub fn with_auth(
196        router: Arc<RequestRouter>,
197        push_broadcaster: Arc<GrpcPushBroadcaster>,
198        key_store: Arc<KeyStore>,
199        counters: Arc<RuntimeCounters>,
200    ) -> Self {
201        Self {
202            router,
203            push_broadcaster,
204            key_store,
205            counters,
206            filter_registry: Arc::new(FilterRegistry::with_defaults()),
207            serial_counter: AtomicU32::new(1),
208        }
209    }
210
211    fn next_serial(&self) -> u32 {
212        self.serial_counter.fetch_add(1, Ordering::Relaxed)
213    }
214}
215
216#[tonic::async_trait]
217impl FutuOpenD for FutuGrpcService {
218    /// 通用请求-响应
219    async fn request(
220        &self,
221        request: Request<FutuRequest>,
222    ) -> Result<Response<FutuResponse>, Status> {
223        // v1.4.104: 走 futu_auth_pipeline::authenticate_request 单一函数,
224        // 不再 inline authenticate / check_scope / rate gate / body-aware /
225        // audit. surface adapter 极薄 — 仅 transport extract + RejectKind →
226        // Status 翻译.
227
228        let proto_id = request.get_ref().proto_id;
229        if proto_id == 0 {
230            return Err(Status::invalid_argument("proto_id is required"));
231        }
232        // v1.4.106 codex 0532 F3 (P2): daemon-internal proto_id (高位
233        // 0x8000_0000 bit) 绝不应从 gRPC 公开 surface 进入 — 仅 REST handler
234        // 内部合成给 router. 显式 reject + audit, 防探测 daemon 内部 routing.
235        if futu_auth::is_internal_proto_id(proto_id) {
236            tracing::warn!(
237                proto_id,
238                "rejecting daemon-internal proto_id at gRPC public surface (audit 0532 F3)"
239            );
240            return Err(Status::permission_denied(
241                "daemon-internal proto_id not allowed on public surface",
242            ));
243        }
244
245        // 1. extract token + session (owned String 避免 borrow vs move 冲突).
246        // v1.4.106 codex 0517 ζ25-redo F2: 同时拿 grpc-session-id metadata,
247        // 派生 stateful QOT stable conn_id (derive_grpc_conn_id).
248        let token = extract_grpc_token(&request);
249        let session_id = extract_grpc_session_id(&request);
250        let idempotency_key = extract_grpc_idempotency_key(&request);
251        let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
252        let audit_ctx = grpc_audit_context(&request, stable_conn_id);
253        // v1.4.110 Surface Spec v2: gRPC generic proto path is spec-owned.
254        // Unknown proto_id must not bypass EndpointSpec into auth/router.
255        let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) else {
256            tracing::warn!(
257                proto_id,
258                "rejecting gRPC request for proto_id not declared in EndpointSpec"
259            );
260            return Err(Status::invalid_argument(
261                "proto_id is not declared in endpoint spec",
262            ));
263        };
264        match spec.grpc_exposure() {
265            futu_surface_spec::SurfaceExposure::Exposed(
266                futu_surface_spec::GrpcSurface::GenericProtoRequest,
267            ) => {}
268            futu_surface_spec::SurfaceExposure::NotExposed { reason } => {
269                tracing::warn!(
270                    proto_id,
271                    endpoint = spec.canonical_name,
272                    reason,
273                    "rejecting gRPC request for endpoint not exposed to gRPC"
274                );
275                return Err(Status::permission_denied(reason));
276            }
277        }
278        let needed_scope = Some(spec.runtime.scope);
279        let endpoint_name = spec.canonical_name;
280        tracing::debug!(
281            proto_id,
282            endpoint = endpoint_name,
283            "gRPC request dispatch (Layer 2 spec lookup)"
284        );
285        let req_inner = request.into_inner();
286
287        // 2. 构 credential + envelope, 调 pipeline
288        let credential = match token.as_deref() {
289            Some(t) => Credential::Bearer(t),
290            None => Credential::None,
291        };
292        let env = AuthEnvelope {
293            surface: SurfaceId::Grpc,
294            endpoint: Endpoint::Proto(proto_id),
295            needed_scope,
296            credential,
297            proto_id: Some(proto_id),
298            body: &req_inner.body,
299            explicit_acc_id: None,
300            explicit_ctx: None,
301            commit_rate: true, // gRPC middleware 层 commit rate (trade:real)
302            audit_emit: true,
303        };
304        let (allowed_for_filter, caller_rec) =
305            match futu_auth::audit::with_context(audit_ctx.clone(), || {
306                authenticate_request(&self.key_store, &self.counters, env)
307            }) {
308                AuthDecision::Reject { kind, reason, .. } => {
309                    return Err(grpc_status_for(kind, reason));
310                }
311                AuthDecision::Allow {
312                    allowed_acc_ids,
313                    rec,
314                    ..
315                } => (allowed_acc_ids, rec),
316            };
317
318        // 3. dispatch + response filter
319        // v1.4.105 D2 T-A1 fix: caller_allowed_acc_ids 从 pipeline allow decision
320        // 真填进 IncomingRequest, 让 dispatch handler (e.g. SubAccPushHandler)
321        // 端 enforce per-acc whitelist defense-in-depth.
322        // codex 0522 F1 v1.4.106: 同步填 caller_key_id 让 cross-surface handler
323        // 能识别 gRPC caller.
324        let caller_allowed = allowed_for_filter
325            .as_ref()
326            .map(|s| std::sync::Arc::new(s.clone()));
327        let caller_key_id = caller_rec.as_ref().map(|r| r.id.clone());
328        let serial_no = self.next_serial();
329        let body = futu_server::trade_packet_id::fill_omitted_trade_packet_id_bytes(
330            proto_id,
331            req_inner.body,
332            stable_conn_id,
333            serial_no,
334        )
335        .map_err(|e| Status::internal(format!("failed to prepare trade PacketID: {e}")))?;
336        let incoming = IncomingRequest::builder(
337            stable_conn_id,
338            proto_id,
339            serial_no,
340            ProtoFmtType::Protobuf,
341            Bytes::from(body),
342        )
343        .with_idempotency_key(idempotency_key)
344        .with_caller_scope(caller_allowed, caller_key_id)
345        .build();
346
347        match self.router.dispatch(incoming.conn_id, &incoming).await {
348            Some(resp_bytes) => {
349                // v1.4.104: 用 FilterRegistry (单一注册表) 替代 inline
350                // filter_acc_list_response. 加新 filter (e.g. cash-flow) 只
351                // 在 registry 注册一次, 4 surface 自动生效.
352                let filtered_body =
353                    self.filter_registry
354                        .apply(proto_id, resp_bytes, allowed_for_filter.as_ref());
355                Ok(Response::new(FutuResponse {
356                    ret_type: 0,
357                    ret_msg: String::new(),
358                    proto_id,
359                    body: filtered_body,
360                }))
361            }
362            None => Ok(Response::new(FutuResponse {
363                ret_type: -1,
364                ret_msg: "handler returned no response".to_string(),
365                proto_id,
366                body: Vec::new(),
367            })),
368        }
369    }
370
371    type SubscribePushStream = ReceiverStream<Result<PushEvent, Status>>;
372
373    /// 流式推送:客户端建立连接后持续接收行情、交易、广播推送
374    ///
375    /// v1.1:按订阅 key 的 scope 过滤推送 —— `qot:read`-only 的 key 不会收到
376    /// `trade` 类(账户交易回报),对齐 REST `/ws` v0.9.0 加的 push filter。
377    ///
378    /// ## v1.4.104 阶段 7-2: pipeline 委托
379    ///
380    /// **handshake**: pipeline 调一次 with `Endpoint::Event("subscribe")` +
381    /// `needed_scope=None` (跳 scope check, 走 OR 语义手工 check). Allow → 拿
382    /// rec snapshot. Reject (Bearer invalid / expired) → translate to gRPC Status.
383    ///
384    /// **per-event filter**: stream 内每 event 调一次 pipeline with
385    /// `Credential::PreVerified(rec)` + `Endpoint::Event(event_type)` +
386    /// `needed_scope=Some(scope_for_event(...))` + `explicit_acc_id` (trade event
387    /// 给 event.acc_id, 其他不传) + `audit_emit=false` (避免每 event 一条 audit
388    /// 把日志冲爆) + `commit_rate=false` (push 不计 rate). Reject → silent drop +
389    /// `metrics::bump_ws_filtered`. Allow → forward.
390    ///
391    /// 与 v1.4.103 行为 byte-identical: handshake qot:read OR acc:read, per-event
392    /// scope match + trade acc_id whitelist (受限 key + acc_id=0 仍 drop, 由 pipeline
393    /// `body_aware::CheckCtx { acc_id: Some(0) }` + `allowed_acc_ids` 非空 → reject 实现).
394    async fn subscribe_push(
395        &self,
396        request: Request<SubscribePushRequest>,
397    ) -> Result<Response<Self::SubscribePushStream>, Status> {
398        // v1.4.106 codex 1125 F6 [P2]: 显式 notify subscription opt-in.
399        // 对齐 C++ raw TCP `IsConnSubRecvNotify` 语义 (broadcast push 必须显式
400        // sub 才下发).
401        let notify_subscribe = request.get_ref().notify_subscribe;
402
403        // ── handshake: pipeline 调一次拿 caller-key + audit 一次 ──────────────────
404        //
405        // 用 `needed_scope=None` 跳 pipeline scope check (因为我们要 qot:read OR
406        // acc:read 双 OR 语义, 不是单 scope match). Bearer 提取 / verify / expiry
407        // 都走 pipeline (单一 source).
408        //
409        // v1.4.106 codex 0517 ζ25-redo F2: 与 `request()` 共享 stable conn_id.
410        // 这让 ops/log 能把 push stream 和同 caller 的 subscribe request 串起来:
411        // grep `conn_id=<stable>` 看到 subscribe RPC + push event filter 同 id.
412        let token = extract_grpc_token(&request);
413        let session_id = extract_grpc_session_id(&request);
414        let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
415        let audit_ctx = grpc_audit_context(&request, stable_conn_id);
416        let credential = match token.as_deref() {
417            Some(t) => Credential::Bearer(t),
418            None => Credential::None,
419        };
420        let env = AuthEnvelope {
421            surface: SurfaceId::Grpc,
422            endpoint: Endpoint::Event("subscribe_push"),
423            needed_scope: None, // OR 语义手工 check, pipeline 不参与
424            credential,
425            proto_id: None,
426            body: &[],
427            explicit_acc_id: None,
428            explicit_ctx: None,
429            commit_rate: false,
430            audit_emit: true, // handshake 一次 audit
431        };
432        let rec_snapshot = match futu_auth::audit::with_context(audit_ctx.clone(), || {
433            authenticate_request(&self.key_store, &self.counters, env)
434        }) {
435            AuthDecision::Reject { kind, reason, .. } => {
436                return Err(grpc_status_for(kind, reason));
437            }
438            AuthDecision::Allow { rec, .. } => rec,
439        };
440
441        // OR 语义 scope check: legacy mode (rec=None) 全放行;
442        // scope mode 必须持 qot:read OR acc:read 任一.
443        let (scopes, key_id, allowed_acc_ids) = match rec_snapshot.as_ref() {
444            Some(rec) => {
445                let qot_ok = rec.scopes.contains(&Scope::QotRead);
446                let acc_ok = rec.scopes.contains(&Scope::AccRead);
447                if !qot_ok && !acc_ok {
448                    futu_auth::audit::with_context(audit_ctx.clone(), || {
449                        futu_auth::audit::reject(
450                            "grpc",
451                            "event=subscribe_push",
452                            &rec.id,
453                            "missing scope qot:read OR acc:read",
454                        );
455                    });
456                    return Err(grpc_status_for(
457                        RejectKind::Forbidden,
458                        "missing scope qot:read OR acc:read".to_string(),
459                    ));
460                }
461                (
462                    rec.scopes.clone(),
463                    rec.id.clone(),
464                    rec.allowed_acc_ids.clone(),
465                )
466            }
467            None => (
468                // legacy: 全 scope + 无 acc 限制
469                [
470                    Scope::QotRead,
471                    Scope::AccRead,
472                    Scope::TradeSimulate,
473                    Scope::TradeReal,
474                ]
475                .into_iter()
476                .collect::<std::collections::HashSet<Scope>>(),
477                "<none>".to_string(),
478                None,
479            ),
480        };
481
482        let (tx, rx) = mpsc::channel(256);
483        let mut push_rx = self.push_broadcaster.subscribe();
484
485        tracing::info!(
486            key_id = %key_id,
487            // v1.4.106 codex 0517 ζ25-redo F2: stable conn_id 让 subscribe RPC
488            // 和 push stream 在 log 里可关联.
489            conn_id = stable_conn_id,
490            scopes = ?scopes,
491            allowed_acc_ids = ?allowed_acc_ids.as_ref().map(|s| s.len()),
492            "gRPC client subscribed to push events",
493        );
494
495        // ── per-event filter: pipeline 二次调 (audit_emit=false 防日志冲爆) ───────
496        //
497        // 每 event 调一次 pipeline:
498        //   - Credential::PreVerified(rec) 复用 handshake 已 verify 的 rec
499        //   - Endpoint::Event(event_type) 给 audit label (虽然 audit_emit=false)
500        //   - needed_scope = scope_for_event(event_type)
501        //   - explicit_acc_id = trade event 给 event.acc_id, 其他 None
502        //
503        // legacy mode (rec_snapshot=None) 不调 pipeline (没必要, 全放行).
504        let key_store_arc = self.key_store.clone();
505        let counters_arc = self.counters.clone();
506        // v1.4.105 D3 (Phase 4): clone filter_registry for per-event Layer 3
507        // (allowed_markets) check + Layer 1/2 reuse.
508        let filter_registry_arc = self.filter_registry.clone();
509        tokio::spawn(async move {
510            loop {
511                match push_rx.recv().await {
512                    Ok(event) => {
513                        // legacy fast-path: rec=None → 全放行, scope check / acc_id
514                        // whitelist 都不需要走 pipeline.
515                        let allow_event = if let Some(rec) = rec_snapshot.as_ref() {
516                            let needed = scope_for_event(&event.event_type);
517                            let explicit_acc_id = if event.event_type == "trade" {
518                                Some(event.acc_id)
519                            } else {
520                                None
521                            };
522                            let env = AuthEnvelope {
523                                surface: SurfaceId::Grpc,
524                                endpoint: Endpoint::Event(&event.event_type),
525                                needed_scope: Some(needed),
526                                credential: Credential::PreVerified(rec.clone()),
527                                proto_id: None,
528                                body: &[],
529                                explicit_acc_id,
530                                explicit_ctx: None,
531                                commit_rate: false, // push 不计 rate
532                                audit_emit: false,  // per-event 不 audit, 避免冲爆
533                            };
534                            matches!(
535                                authenticate_request(&key_store_arc, &counters_arc, env),
536                                AuthDecision::Allow { .. }
537                            )
538                        } else {
539                            true
540                        };
541
542                        if !allow_event {
543                            // metric label 区分 trade acc_id reject vs scope reject
544                            // (近似旧行为: trade 类 reject 标 "trade_acc_id" 仅当
545                            // acc_id whitelist 非空; 否则按 event_type)
546                            let label = if event.event_type == "trade"
547                                && rec_snapshot
548                                    .as_ref()
549                                    .and_then(|r| r.allowed_acc_ids.as_ref())
550                                    .is_some_and(|s| !s.is_empty())
551                            {
552                                "trade_acc_id"
553                            } else {
554                                event.event_type.as_str()
555                            };
556                            futu_auth::metrics::bump_ws_filtered(label, &key_id);
557                            continue;
558                        }
559
560                        // v1.4.105 D3 (Phase 4): FilterRegistry::should_drop_event
561                        // Layer 3 (allowed_markets). Layer 1 (allowed_acc_ids) 已在
562                        // 上面 pipeline body-aware 跑过, 此处串行 Layer 3 双重
563                        // 防御 + 未来 Layer 扩展自动覆盖.
564                        // ⚠️ UNVERIFIED — pending real-machine verify (HK+US 双
565                        // 账户跨 market 推送). 这里是 backend-semantic 风险,
566                        // 只能靠真机跨 market trade push 验证后再升 confidence.
567                        //
568                        // v1.4.105 T-B3: trd_market 改读 PushEvent.trd_market 字段
569                        // (PushDispatcher 端一次 decode), 不再各 surface 独立
570                        // decode body. 空字符串 = 非 trade event / decode 失败 /
571                        // market 未知 → 转 None (Layer 3 不 trigger drop, 向后
572                        // 兼容).
573                        let event_trd_market =
574                            if event.event_type == "trade" && !event.trd_market.is_empty() {
575                                Some(event.trd_market.as_str())
576                            } else {
577                                None
578                            };
579                        let allowed_markets_for_filter = rec_snapshot
580                            .as_ref()
581                            .and_then(|r| r.allowed_markets.as_ref());
582                        let push_ctx = PushEventCtx {
583                            event_type: &event.event_type,
584                            event_acc: if event.event_type == "trade" {
585                                Some(event.acc_id)
586                            } else {
587                                None
588                            },
589                            // Layer 1 已 pipeline 跑, 此处传 None 防双重 reject (LE 之间 OR 短路)
590                            allowed_acc_ids: None,
591                            // gRPC 没 sub_state (REST 特有)
592                            sub_state: None,
593                            // Layer 3 — 新加
594                            event_trd_market,
595                            allowed_markets: allowed_markets_for_filter,
596                        };
597                        if filter_registry_arc.should_drop_event(&push_ctx) {
598                            futu_auth::metrics::bump_ws_filtered("trade_market", &key_id);
599                            continue;
600                        }
601
602                        // v1.4.106 codex 1125 F6 [P2]: notify_subscribe gate.
603                        // 对齐 C++ `IsConnSubRecvNotify` (raw TCP). broadcast notify
604                        // 类 push (e.g. price reminder) 必须 client 显式 sub 才下发.
605                        if event.event_type == "notify" && !notify_subscribe {
606                            futu_auth::metrics::bump_ws_filtered("notify_unsub", &key_id);
607                            continue;
608                        }
609
610                        if tx.send(Ok(event)).await.is_err() {
611                            break; // 客户端断开
612                        }
613                    }
614                    Err(broadcast::error::RecvError::Lagged(n)) => {
615                        tracing::warn!(skipped = n, "gRPC push client lagged, skipped events");
616                        // 继续接收,不断开
617                    }
618                    Err(broadcast::error::RecvError::Closed) => {
619                        break; // 广播器关闭
620                    }
621                }
622            }
623            tracing::info!("gRPC push stream ended");
624        });
625
626        Ok(Response::new(ReceiverStream::new(rx)))
627    }
628}
629
630// v1.4.104: `grpc_handler_full_check` 已删除 — 功能搬到
631// `futu_auth_pipeline::pipeline::authenticate_request` 内的 body-aware step.
632// gRPC `request()` 调一次 pipeline 即覆盖所有 (caller-key + scope + body-aware
633// + audit + rate). 4 surface 共用同一函数, 不再有 hand-copy diverge.
634
635/// gRPC PushEvent 的 event_type → 客户端必须持有的 scope (与 REST `/ws` 对齐).
636/// 用于 SubscribePush stream 内 per-event filter.
637fn scope_for_event(event_type: &str) -> Scope {
638    match event_type {
639        "trade" => Scope::AccRead, // 账户交易回报
640        _ => Scope::QotRead,       // quote / notify / 未知都按行情门槛
641    }
642}
643
644// v1.4.105 D3 (Phase 4) T-B3: 旧 `extract_trd_market_from_trade_event` 已搬到
645// `futu_server::push::extract_trd_market_from_trade_body` + PushDispatcher 端
646// 一次 decode 后透传给 PushEvent.trd_market 字段, 4 surface (gRPC / REST WS /
647// MCP) 共用. gRPC subscribe_push 现读 event.trd_market 字段不再 decode body.
648//
649// 这里若想要静态 import 提示, 可保留:
650//
651//     use futu_server::push::extract_trd_market_from_trade_body;
652//
653// 但目前 server.rs 不再调用, 全凭 PushEvent 字段透传, 故不 import.
654
655#[cfg(test)]
656mod tests;
657
658/// 构建 gRPC 服务(供外部调用 tonic Server 使用)
659pub fn build_service(
660    router: Arc<RequestRouter>,
661    push_broadcaster: Arc<GrpcPushBroadcaster>,
662) -> FutuOpenDServer<FutuGrpcService> {
663    apply_grpc_message_limits(FutuOpenDServer::new(FutuGrpcService::new(
664        router,
665        push_broadcaster,
666    )))
667}
668
669/// 构建 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters)
670pub fn build_service_with_auth(
671    router: Arc<RequestRouter>,
672    push_broadcaster: Arc<GrpcPushBroadcaster>,
673    key_store: Arc<KeyStore>,
674    counters: Arc<RuntimeCounters>,
675) -> FutuOpenDServer<FutuGrpcService> {
676    apply_grpc_message_limits(FutuOpenDServer::new(FutuGrpcService::with_auth(
677        router,
678        push_broadcaster,
679        key_store,
680        counters,
681    )))
682}
683
684fn apply_grpc_message_limits(
685    service: FutuOpenDServer<FutuGrpcService>,
686) -> FutuOpenDServer<FutuGrpcService> {
687    service
688        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE_BYTES)
689        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE_BYTES)
690}
691
692/// 启动 gRPC 服务
693pub async fn start(
694    listen_addr: &str,
695    router: Arc<RequestRouter>,
696    push_broadcaster: Arc<GrpcPushBroadcaster>,
697) -> Result<(), Box<dyn std::error::Error>> {
698    start_with_auth(
699        listen_addr,
700        router,
701        push_broadcaster,
702        Arc::new(KeyStore::empty()),
703        Arc::new(RuntimeCounters::new()),
704    )
705    .await
706}
707
708/// 启动 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters)
709pub async fn start_with_auth(
710    listen_addr: &str,
711    router: Arc<RequestRouter>,
712    push_broadcaster: Arc<GrpcPushBroadcaster>,
713    key_store: Arc<KeyStore>,
714    counters: Arc<RuntimeCounters>,
715) -> Result<(), Box<dyn std::error::Error>> {
716    let addr = listen_addr
717        .parse()
718        .map_err(|e| format!("invalid addr: {e}"))?;
719    if !key_store.is_configured() {
720        tracing::warn!(
721            "gRPC server running WITHOUT API key auth (legacy mode); \
722             all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
723        );
724    }
725    let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
726    tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
727    tonic::transport::Server::builder()
728        .add_service(service)
729        .serve(addr)
730        .await?;
731    Ok(())
732}
733
734/// 启动 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters),并支持 daemon
735/// 统一 shutdown 信号。
736pub async fn start_with_auth_until_shutdown(
737    listen_addr: &str,
738    router: Arc<RequestRouter>,
739    push_broadcaster: Arc<GrpcPushBroadcaster>,
740    key_store: Arc<KeyStore>,
741    counters: Arc<RuntimeCounters>,
742    shutdown_rx: watch::Receiver<bool>,
743) -> Result<(), Box<dyn std::error::Error>> {
744    let addr = listen_addr
745        .parse()
746        .map_err(|e| format!("invalid addr: {e}"))?;
747    if !key_store.is_configured() {
748        tracing::warn!(
749            "gRPC server running WITHOUT API key auth (legacy mode); \
750             all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
751        );
752    }
753    let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
754    tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
755    tonic::transport::Server::builder()
756        .add_service(service)
757        .serve_with_shutdown(addr, grpc_shutdown_requested(shutdown_rx))
758        .await?;
759    Ok(())
760}
761
762async fn grpc_shutdown_requested(mut shutdown_rx: watch::Receiver<bool>) {
763    loop {
764        if *shutdown_rx.borrow() {
765            tracing::info!("gRPC server stopped by shutdown signal");
766            return;
767        }
768        if shutdown_rx.changed().await.is_err() {
769            tracing::info!("gRPC server stopped after shutdown sender dropped");
770            return;
771        }
772    }
773}