1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
8use std::sync::Arc;
9
10use bytes::Bytes;
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14
15use chrono::Utc;
16use futu_auth::{CheckCtx, KeyRecord, KeyStore, LimitOutcome, RuntimeCounters, Scope};
17use futu_codec::header::ProtoFmtType;
18use futu_proto::{trd_modify_order, trd_place_order};
19use futu_server::conn::IncomingRequest;
20use futu_server::push::ExternalPushSink;
21use futu_server::router::RequestRouter;
22use prost::Message;
23
24use crate::auth::{authenticate, check_scope, scope_for_proto};
25use crate::proto::futu_open_d_server::{FutuOpenD, FutuOpenDServer};
26use crate::proto::{FutuRequest, FutuResponse, PushEvent, SubscribePushRequest};
27
28#[derive(Clone)]
33pub struct GrpcPushBroadcaster {
34 tx: broadcast::Sender<PushEvent>,
35}
36
37impl GrpcPushBroadcaster {
38 pub fn new(capacity: usize) -> Self {
39 let (tx, _) = broadcast::channel(capacity);
40 Self { tx }
41 }
42
43 pub fn subscribe(&self) -> broadcast::Receiver<PushEvent> {
45 self.tx.subscribe()
46 }
47
48 fn send(&self, event: PushEvent) {
49 let _ = self.tx.send(event);
50 }
51}
52
53impl ExternalPushSink for GrpcPushBroadcaster {
54 fn on_quote_push(&self, sec_key: &str, sub_type: i32, proto_id: u32, body: &[u8]) {
55 self.send(PushEvent {
56 proto_id,
57 sec_key: sec_key.to_string(),
58 sub_type,
59 body: body.to_vec(),
60 event_type: "quote".to_string(),
61 acc_id: 0,
62 });
63 }
64
65 fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
66 self.send(PushEvent {
67 proto_id,
68 sec_key: String::new(),
69 sub_type: 0,
70 body: body.to_vec(),
71 event_type: "notify".to_string(),
72 acc_id: 0,
73 });
74 }
75
76 fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8]) {
77 self.send(PushEvent {
78 proto_id,
79 sec_key: String::new(),
80 sub_type: 0,
81 body: body.to_vec(),
82 event_type: "trade".to_string(),
83 acc_id,
84 });
85 }
86}
87
88pub struct FutuGrpcService {
90 router: Arc<RequestRouter>,
91 push_broadcaster: Arc<GrpcPushBroadcaster>,
92 key_store: Arc<KeyStore>,
93 counters: Arc<RuntimeCounters>,
94 conn_id_counter: AtomicU64,
95 serial_counter: AtomicU32,
96}
97
98impl FutuGrpcService {
99 pub fn new(router: Arc<RequestRouter>, push_broadcaster: Arc<GrpcPushBroadcaster>) -> Self {
100 Self::with_auth(
101 router,
102 push_broadcaster,
103 Arc::new(KeyStore::empty()),
104 Arc::new(RuntimeCounters::new()),
105 )
106 }
107
108 pub fn with_key_store(
110 router: Arc<RequestRouter>,
111 push_broadcaster: Arc<GrpcPushBroadcaster>,
112 key_store: Arc<KeyStore>,
113 ) -> Self {
114 Self::with_auth(
115 router,
116 push_broadcaster,
117 key_store,
118 Arc::new(RuntimeCounters::new()),
119 )
120 }
121
122 pub fn with_auth(
127 router: Arc<RequestRouter>,
128 push_broadcaster: Arc<GrpcPushBroadcaster>,
129 key_store: Arc<KeyStore>,
130 counters: Arc<RuntimeCounters>,
131 ) -> Self {
132 Self {
133 router,
134 push_broadcaster,
135 key_store,
136 counters,
137 conn_id_counter: AtomicU64::new(20_000_000), serial_counter: AtomicU32::new(1),
139 }
140 }
141
142 fn next_conn_id(&self) -> u64 {
143 self.conn_id_counter.fetch_add(1, Ordering::Relaxed)
144 }
145
146 fn next_serial(&self) -> u32 {
147 self.serial_counter.fetch_add(1, Ordering::Relaxed)
148 }
149}
150
151#[tonic::async_trait]
152impl FutuOpenD for FutuGrpcService {
153 async fn request(
155 &self,
156 request: Request<FutuRequest>,
157 ) -> Result<Response<FutuResponse>, Status> {
158 let authed = authenticate(&self.key_store, &request)?;
160
161 let req = request.into_inner();
162
163 if req.proto_id == 0 {
164 return Err(Status::invalid_argument("proto_id is required"));
165 }
166
167 if let Some(needed) = scope_for_proto(req.proto_id) {
168 check_scope(&authed, req.proto_id, needed)?;
169
170 if needed == Scope::TradeReal {
173 if let Some(rec) = authed.as_ref() {
174 let ctx = CheckCtx {
175 market: String::new(),
176 symbol: String::new(),
177 order_value: None,
178 trd_side: None,
179 };
180 if let LimitOutcome::Reject(reason) =
181 self.counters
182 .check_and_commit(&rec.id, &rec.limits(), &ctx, Utc::now())
183 {
184 let endpoint = format!("proto_id={}", req.proto_id);
185 futu_auth::audit::reject(
186 "grpc",
187 &endpoint,
188 &rec.id,
189 &format!("limit: {reason}"),
190 );
191 return Err(Status::resource_exhausted(format!(
192 "limit check failed: {reason}"
193 )));
194 }
195
196 grpc_handler_full_check(&self.counters, rec, req.proto_id, &req.body)?;
200 }
201 }
202 }
203
204 let incoming = IncomingRequest {
205 conn_id: self.next_conn_id(),
206 proto_id: req.proto_id,
207 serial_no: self.next_serial(),
208 proto_fmt_type: ProtoFmtType::Protobuf,
209 body: Bytes::from(req.body),
210 };
211
212 match self.router.dispatch(incoming.conn_id, &incoming).await {
213 Some(resp_bytes) => Ok(Response::new(FutuResponse {
214 ret_type: 0,
215 ret_msg: String::new(),
216 proto_id: req.proto_id,
217 body: resp_bytes,
218 })),
219 None => Ok(Response::new(FutuResponse {
220 ret_type: -1,
221 ret_msg: "handler returned no response".to_string(),
222 proto_id: req.proto_id,
223 body: Vec::new(),
224 })),
225 }
226 }
227
228 type SubscribePushStream = ReceiverStream<Result<PushEvent, Status>>;
229
230 async fn subscribe_push(
235 &self,
236 request: Request<SubscribePushRequest>,
237 ) -> Result<Response<Self::SubscribePushStream>, Status> {
238 let authed = authenticate(&self.key_store, &request)?;
240 check_scope(&authed, 0, Scope::QotRead)?;
241
242 let (scopes, key_id) = match authed.as_ref() {
245 Some(rec) => (rec.scopes.clone(), rec.id.clone()),
246 None => (
247 [
248 Scope::QotRead,
249 Scope::AccRead,
250 Scope::TradeSimulate,
251 Scope::TradeReal,
252 ]
253 .into_iter()
254 .collect::<std::collections::HashSet<Scope>>(),
255 "<none>".to_string(),
256 ),
257 };
258
259 let (tx, rx) = mpsc::channel(256);
260 let mut push_rx = self.push_broadcaster.subscribe();
261
262 tracing::info!(key_id = %key_id, scopes = ?scopes, "gRPC client subscribed to push events");
263
264 tokio::spawn(async move {
266 loop {
267 match push_rx.recv().await {
268 Ok(event) => {
269 let needed = scope_for_event(&event.event_type);
270 if !scopes.contains(&needed) {
271 futu_auth::metrics::bump_ws_filtered(&event.event_type, &key_id);
273 continue;
274 }
275 if tx.send(Ok(event)).await.is_err() {
276 break; }
278 }
279 Err(broadcast::error::RecvError::Lagged(n)) => {
280 tracing::warn!(skipped = n, "gRPC push client lagged, skipped events");
281 }
283 Err(broadcast::error::RecvError::Closed) => {
284 break; }
286 }
287 }
288 tracing::info!("gRPC push stream ended");
289 });
290
291 Ok(Response::new(ReceiverStream::new(rx)))
292 }
293}
294
295fn trd_market_str(i: i32) -> &'static str {
299 match i {
300 1 => "HK",
301 2 => "US",
302 3 => "CN",
303 4 => "HKCC",
304 5 => "FUTURES",
305 6 => "SG",
306 7 => "JP",
307 _ => "",
308 }
309}
310
311fn trd_side_str(i: i32) -> &'static str {
313 match i {
314 1 => "BUY",
315 2 => "SELL",
316 3 => "SELL_SHORT",
317 4 => "BUY_BACK",
318 _ => "",
319 }
320}
321
322#[allow(clippy::result_large_err)] fn grpc_handler_full_check(
331 counters: &RuntimeCounters,
332 rec: &KeyRecord,
333 proto_id: u32,
334 body: &[u8],
335) -> Result<(), Status> {
336 let ctx = match proto_id {
337 2202 => {
338 let parsed = match trd_place_order::Request::decode(body) {
340 Ok(p) => p,
341 Err(_) => return Ok(()), };
343 let c2s = &parsed.c2s;
344 let market = trd_market_str(c2s.header.trd_market);
345 let symbol = if market.is_empty() {
346 String::new()
347 } else {
348 format!("{market}.{}", c2s.code)
349 };
350 let trd_side = match trd_side_str(c2s.trd_side) {
351 "" => None,
352 s => Some(s.to_string()),
353 };
354 CheckCtx {
355 market: market.to_string(),
356 symbol,
357 order_value: c2s.price.map(|p| p * c2s.qty),
358 trd_side,
359 }
360 }
361 2205 => {
362 let parsed = match trd_modify_order::Request::decode(body) {
364 Ok(p) => p,
365 Err(_) => return Ok(()),
366 };
367 CheckCtx {
368 market: trd_market_str(parsed.c2s.header.trd_market).to_string(),
369 symbol: String::new(),
370 order_value: None,
371 trd_side: None,
372 }
373 }
374 _ => return Ok(()),
375 };
376
377 let now = Utc::now();
378 if let LimitOutcome::Reject(reason) =
379 counters.check_full_skip_rate(&rec.id, &rec.limits(), &ctx, now)
380 {
381 let endpoint = format!("proto_id={proto_id}");
382 futu_auth::audit::reject("grpc", &endpoint, &rec.id, &format!("limit: {reason}"));
383 return Err(Status::resource_exhausted(format!(
384 "limit check failed: {reason}"
385 )));
386 }
387 Ok(())
388}
389
390fn scope_for_event(event_type: &str) -> Scope {
391 match event_type {
392 "trade" => Scope::AccRead, _ => Scope::QotRead, }
395}
396
397pub fn build_service(
399 router: Arc<RequestRouter>,
400 push_broadcaster: Arc<GrpcPushBroadcaster>,
401) -> FutuOpenDServer<FutuGrpcService> {
402 FutuOpenDServer::new(FutuGrpcService::new(router, push_broadcaster))
403}
404
405pub fn build_service_with_auth(
407 router: Arc<RequestRouter>,
408 push_broadcaster: Arc<GrpcPushBroadcaster>,
409 key_store: Arc<KeyStore>,
410 counters: Arc<RuntimeCounters>,
411) -> FutuOpenDServer<FutuGrpcService> {
412 FutuOpenDServer::new(FutuGrpcService::with_auth(
413 router,
414 push_broadcaster,
415 key_store,
416 counters,
417 ))
418}
419
420pub async fn start(
422 listen_addr: &str,
423 router: Arc<RequestRouter>,
424 push_broadcaster: Arc<GrpcPushBroadcaster>,
425) -> Result<(), Box<dyn std::error::Error>> {
426 start_with_auth(
427 listen_addr,
428 router,
429 push_broadcaster,
430 Arc::new(KeyStore::empty()),
431 Arc::new(RuntimeCounters::new()),
432 )
433 .await
434}
435
436pub async fn start_with_auth(
438 listen_addr: &str,
439 router: Arc<RequestRouter>,
440 push_broadcaster: Arc<GrpcPushBroadcaster>,
441 key_store: Arc<KeyStore>,
442 counters: Arc<RuntimeCounters>,
443) -> Result<(), Box<dyn std::error::Error>> {
444 let addr = listen_addr
445 .parse()
446 .map_err(|e| format!("invalid addr: {e}"))?;
447 if !key_store.is_configured() {
448 tracing::warn!(
449 "gRPC server running WITHOUT API key auth (legacy mode); \
450 all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
451 );
452 }
453 let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
454 tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
455 tonic::transport::Server::builder()
456 .add_service(service)
457 .serve(addr)
458 .await?;
459 Ok(())
460}