1use std::collections::{BTreeMap, HashSet};
15use std::io::ErrorKind;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18
19use anyhow::{Context, Result, anyhow, bail};
20use async_trait::async_trait;
21use clap::Parser;
22use futu_net::client::{FutuClient, PushReceiver};
23use futu_qot::push::{QuoteHandler, QuotePushDispatcher};
24use futu_qot::types::{BasicQot, KLine, OrderBookData, Security, SubType};
25use rustyline::error::ReadlineError;
26use rustyline::{DefaultEditor, ExternalPrinter};
27use tokio::sync::Mutex;
28
29use crate::cli::{Cli, Command};
30use crate::common::{connect_gateway, format_symbol, parse_sub_type, parse_symbol};
31use crate::output::OutputFormat;
32
33#[cfg(test)]
34mod tests;
35
36struct ReplState {
39 gateway: String,
40 output: OutputFormat,
41 client: Mutex<Arc<FutuClient>>,
42 subs: Mutex<BTreeMap<String, HashSet<SubType>>>,
43}
44
45enum PushPrinter {
48 External(Box<dyn ExternalPrinter + Send>),
49 Stderr,
50}
51
52impl PushPrinter {
53 fn print(&mut self, msg: String) {
54 let print_err = match self {
55 PushPrinter::External(p) => p.print(msg.clone()).err(),
56 PushPrinter::Stderr => {
57 eprint!("{msg}");
58 return;
59 }
60 };
61
62 let Some(err) = print_err else {
63 return;
64 };
65
66 eprintln!("warning: REPL push printer failed ({err}); falling back to stderr");
67 eprint!("{msg}");
68 *self = PushPrinter::Stderr;
69 }
70}
71
72type SharedPrinter = Arc<Mutex<PushPrinter>>;
74
75pub async fn run(gateway: &str, output: OutputFormat) -> Result<()> {
76 let (client, push_rx) = connect_gateway(gateway, "futucli-repl").await?;
78 eprintln!("✓ connected to {gateway}");
79
80 let state = Arc::new(ReplState {
81 gateway: gateway.to_string(),
82 output,
83 client: Mutex::new(client),
84 subs: Mutex::new(BTreeMap::new()),
85 });
86
87 let history_path = history_file_path();
89 let mut rl = DefaultEditor::new().context("init rustyline editor")?;
90 if let Some(p) = &history_path {
91 load_history_best_effort(&mut rl, p);
92 }
93
94 let printer_inner = match rl.create_external_printer() {
97 Ok(p) => PushPrinter::External(Box::new(p)),
98 Err(e) => {
99 eprintln!("note: external printer unavailable ({e}); push output will go to stderr");
100 PushPrinter::Stderr
101 }
102 };
103 let printer: SharedPrinter = Arc::new(Mutex::new(printer_inner));
104
105 tokio::spawn(push_loop(push_rx, printer.clone()));
107
108 print_banner(gateway);
110 loop {
111 let prompt = format!("futu ({}) > ", state.gateway);
113 let read_result = tokio::task::spawn_blocking(move || {
114 let res = rl.readline(&prompt);
115 (rl, res)
116 })
117 .await
118 .context("readline task panicked")?;
119 rl = read_result.0;
120
121 let line = match read_result.1 {
122 Ok(l) => l,
123 Err(ReadlineError::Interrupted) => {
124 eprintln!("(Ctrl-C) — type 'exit' or Ctrl-D to quit");
125 continue;
126 }
127 Err(ReadlineError::Eof) => {
128 eprintln!("bye");
129 break;
130 }
131 Err(e) => {
132 eprintln!("readline error: {e}");
133 break;
134 }
135 };
136 let trimmed = line.trim();
137 if trimmed.is_empty() {
138 continue;
139 }
140 if let Err(err) = rl.add_history_entry(trimmed) {
141 eprintln!("warning: add REPL history entry failed: {err}");
142 }
143
144 match handle_line(trimmed, &state, &printer).await {
146 Ok(ShouldContinue::Continue) => {}
147 Ok(ShouldContinue::Exit) => break,
148 Err(e) => eprintln!("error: {e:#}"),
149 }
150 }
151
152 if let Some(p) = &history_path {
154 save_history_best_effort(&mut rl, p);
155 }
156 Ok(())
157}
158
159fn load_history_best_effort(rl: &mut DefaultEditor, path: &Path) {
160 match rl.load_history(path) {
161 Ok(()) => {}
162 Err(ReadlineError::Io(err)) if err.kind() == ErrorKind::NotFound => {
163 }
165 Err(err) => {
166 eprintln!(
167 "warning: load REPL history failed for {}: {err}",
168 path.display()
169 );
170 }
171 }
172}
173
174fn save_history_best_effort(rl: &mut DefaultEditor, path: &Path) {
175 let parent = path.parent().unwrap_or_else(|| Path::new("."));
176 if let Err(err) = std::fs::create_dir_all(parent) {
177 eprintln!(
178 "warning: create REPL history directory failed for {}: {err}",
179 parent.display()
180 );
181 return;
182 }
183 if let Err(err) = rl.save_history(path) {
184 eprintln!(
185 "warning: save REPL history failed for {}: {err}",
186 path.display()
187 );
188 }
189}
190
191enum ShouldContinue {
192 Continue,
193 Exit,
194}
195
196async fn handle_line(
198 line: &str,
199 state: &Arc<ReplState>,
200 printer: &SharedPrinter,
201) -> Result<ShouldContinue> {
202 let tokens = shlex::split(line).ok_or_else(|| anyhow!("failed to tokenize input"))?;
203 if tokens.is_empty() {
204 return Ok(ShouldContinue::Continue);
205 }
206
207 match tokens[0].as_str() {
208 "exit" | "quit" | ":q" => return Ok(ShouldContinue::Exit),
209 "help" | "?" => {
210 print_help();
211 return Ok(ShouldContinue::Continue);
212 }
213 "reconnect" => {
214 reconnect(state, printer).await?;
215 return Ok(ShouldContinue::Continue);
216 }
217 "subs" => {
218 list_subs(state).await;
219 return Ok(ShouldContinue::Continue);
220 }
221 "sub" if tokens.len() >= 2 && looks_like_symbol(&tokens[1]) => {
222 return sub_inline(&tokens[1..], state, printer)
224 .await
225 .map(|_| ShouldContinue::Continue);
226 }
227 "unsub" => {
228 return unsub_inline(&tokens[1..], state)
229 .await
230 .map(|_| ShouldContinue::Continue);
231 }
232 _ => {}
233 }
234
235 let argv = std::iter::once("futucli".to_string())
237 .chain(tokens)
238 .collect::<Vec<_>>();
239 let parsed = match Cli::try_parse_from(&argv) {
240 Ok(p) => p,
241 Err(e) => {
242 eprint!("{e}");
244 return Ok(ShouldContinue::Continue);
245 }
246 };
247
248 if matches!(parsed.command, Command::Repl) {
250 bail!("already in REPL");
251 }
252
253 crate::cli::dispatch(&state.gateway, state.output, parsed.command).await?;
255 Ok(ShouldContinue::Continue)
256}
257
258fn print_banner(gateway: &str) {
261 eprintln!("futucli REPL — gateway {gateway}");
262 eprintln!("type 'help' for commands, 'exit' or Ctrl-D to quit");
263}
264
265fn print_help() {
266 let lines = [
267 "REPL 内置命令:",
268 " help | ? 显示本帮助",
269 " exit | quit | :q 退出",
270 " reconnect 断开并重新连接网关",
271 " subs 列出当前活跃订阅",
272 " sub <SYMBOL> [-t csv] 后台订阅(推送在 prompt 上方实时显示)",
273 " unsub <SYMBOL> [csv] 取消订阅(csv 省略则全部类型)",
274 "",
275 "子命令(和外层 futucli 一致):",
276 " ping / quote / snapshot / kline / orderbook / ticker / rt / static /",
277 " broker / plate-list / plate-stocks / account / funds / position /",
278 " order / deal / unlock-trade",
279 "",
280 "示例:",
281 " quote HK.00700 US.AAPL",
282 " kline HK.00700 -t day -n 5",
283 " sub HK.00700 -t basic,orderbook",
284 ];
285 for l in lines {
286 eprintln!("{l}");
287 }
288}
289
290async fn reconnect(state: &Arc<ReplState>, printer: &SharedPrinter) -> Result<()> {
291 let (new_client, new_rx) = connect_gateway(&state.gateway, "futucli-repl").await?;
292
293 *state.client.lock().await = new_client;
294 tokio::spawn(push_loop(new_rx, printer.clone()));
295 eprintln!("✓ reconnected to {}", state.gateway);
296 Ok(())
297}
298
299async fn list_subs(state: &Arc<ReplState>) {
300 let guard = state.subs.lock().await;
301 if guard.is_empty() {
302 println!("(no active subscriptions)");
303 return;
304 }
305 println!("Active subscriptions:");
306 for (sym, types) in guard.iter() {
307 let mut names: Vec<&'static str> = types.iter().copied().map(sub_type_label).collect();
308 names.sort_unstable();
309 println!(" {sym:<14} {}", names.join(","));
310 }
311}
312
313fn looks_like_symbol(tok: &str) -> bool {
314 tok.contains('.') && !tok.starts_with('-')
315}
316
317async fn sub_inline(
318 args: &[String],
319 state: &Arc<ReplState>,
320 _printer: &SharedPrinter,
321) -> Result<()> {
322 let (symbols, types_csv) = split_sub_args(args)?;
324 let secs: Vec<Security> = symbols
325 .iter()
326 .map(|s| parse_symbol(s))
327 .collect::<Result<_>>()?;
328 let sub_types: Vec<SubType> = types_csv
329 .split(',')
330 .map(parse_sub_type)
331 .collect::<Result<_>>()?;
332
333 let client = state.client.lock().await.clone();
334 futu_qot::sub::subscribe(&client, &secs, &sub_types, true, true).await?;
335 {
336 let mut guard = state.subs.lock().await;
337 for s in &symbols {
338 let entry = guard.entry(s.clone()).or_default();
339 for t in &sub_types {
340 entry.insert(*t);
341 }
342 }
343 }
344 eprintln!("✓ subscribed symbols={symbols:?} types={types_csv}");
345 Ok(())
346}
347
348async fn unsub_inline(args: &[String], state: &Arc<ReplState>) -> Result<()> {
349 if args.is_empty() {
350 bail!("usage: unsub <SYMBOL> [csv]");
351 }
352 let symbol = args[0].clone();
353 let types_csv = args.get(1).cloned();
354
355 let sec = parse_symbol(&symbol)?;
356 let sub_types: Vec<SubType> = match types_csv {
357 Some(csv) if !csv.is_empty() => {
358 csv.split(',').map(parse_sub_type).collect::<Result<_>>()?
359 }
360 _ => {
361 let guard = state.subs.lock().await;
362 guard
363 .get(&symbol)
364 .map(|s| s.iter().copied().collect())
365 .unwrap_or_default()
366 }
367 };
368 if sub_types.is_empty() {
369 bail!("no active subscription for {symbol}");
370 }
371
372 let client = state.client.lock().await.clone();
373 futu_qot::sub::unsubscribe(&client, std::slice::from_ref(&sec), &sub_types).await?;
374 {
375 let mut guard = state.subs.lock().await;
376 if let Some(set) = guard.get_mut(&symbol) {
377 for t in &sub_types {
378 set.remove(t);
379 }
380 if set.is_empty() {
381 guard.remove(&symbol);
382 }
383 }
384 }
385 eprintln!(
386 "✓ unsubscribed {symbol} types=[{}]",
387 sub_types
388 .iter()
389 .map(|t| sub_type_label(*t))
390 .collect::<Vec<_>>()
391 .join(",")
392 );
393 Ok(())
394}
395
396fn split_sub_args(args: &[String]) -> Result<(Vec<String>, String)> {
398 let mut symbols = Vec::new();
399 let mut types = "basic".to_string();
400 let mut i = 0;
401 while i < args.len() {
402 let a = &args[i];
403 if a == "-t" || a == "--type" {
404 i += 1;
405 types = args
406 .get(i)
407 .ok_or_else(|| anyhow!("-t needs a value"))?
408 .clone();
409 } else if let Some(rest) = a.strip_prefix("--type=") {
410 types = rest.to_string();
411 } else if looks_like_symbol(a) {
412 symbols.push(a.clone());
413 } else {
414 bail!("unexpected token {a:?}");
415 }
416 i += 1;
417 }
418 if symbols.is_empty() {
419 bail!("usage: sub <SYMBOL...> [-t csv]");
420 }
421 Ok((symbols, types))
422}
423
424struct PrintHandler {
427 printer: SharedPrinter,
428}
429
430impl PrintHandler {
431 async fn emit(&self, s: String) {
432 let mut g = self.printer.lock().await;
433 g.print(format!("{s}\n"));
434 }
435}
436
437#[async_trait]
438impl QuoteHandler for PrintHandler {
439 async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
440 for q in qot_list {
441 let sym = format_symbol(&q.security);
442 let change = q.cur_price - q.last_close_price;
443 let pct = if q.last_close_price != 0.0 {
444 change / q.last_close_price * 100.0
445 } else {
446 0.0
447 };
448 let sign = if change >= 0.0 { "+" } else { "" };
449 self.emit(format!(
450 "[{}] basic {sym:<12} px={:.3} {sign}{change:.3} ({sign}{pct:.2}%) vol={}",
451 q.update_time, q.cur_price, q.volume
452 ))
453 .await;
454 }
455 }
456
457 async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
458 let sym = format_symbol(&security);
459 for k in kl_list {
460 self.emit(format!(
461 "[{}] kl {sym:<12} O={:.3} H={:.3} L={:.3} C={:.3} V={}",
462 k.time, k.open_price, k.high_price, k.low_price, k.close_price, k.volume
463 ))
464 .await;
465 }
466 }
467
468 async fn on_order_book_update(&self, data: OrderBookData) {
469 let sym = format_symbol(&data.security);
470 let top_bid = data.bid_list.first();
471 let top_ask = data.ask_list.first();
472 let msg = match (top_bid, top_ask) {
473 (Some(b), Some(a)) => format!(
474 " ob {sym:<12} bid={:.3}x{} ask={:.3}x{} spr={:.3}",
475 b.price,
476 b.volume,
477 a.price,
478 a.volume,
479 a.price - b.price
480 ),
481 _ => format!(" ob {sym:<12} (empty)"),
482 };
483 self.emit(msg).await;
484 }
485
486 async fn on_ticker_update(
487 &self,
488 security: Security,
489 ticker_list: Vec<futu_qot::ticker::Ticker>,
490 ) {
491 let sym = format_symbol(&security);
492 for t in ticker_list {
493 self.emit(format!(
494 " tick {sym:<12} px={:.3} vol={} dir={}",
495 t.price, t.volume, t.dir
496 ))
497 .await;
498 }
499 }
500
501 async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
502 let sym = format_symbol(&security);
503 for r in rt_list {
504 self.emit(format!(
505 "[{}] rt {sym:<12} px={:.3} avg={:.3} vol={}",
506 r.time, r.price, r.avg_price, r.volume
507 ))
508 .await;
509 }
510 }
511}
512
513async fn push_loop(mut rx: PushReceiver, printer: SharedPrinter) {
514 let handler = PrintHandler { printer };
515 while let Some(msg) = rx.recv().await {
516 if let Err(e) = QuotePushDispatcher::dispatch(&handler, msg.proto_id, &msg.body).await {
517 let mut g = handler.printer.lock().await;
518 g.print(format!("push dispatch error: {e}\n"));
519 }
520 }
521}
522
523fn sub_type_label(t: SubType) -> &'static str {
526 match t {
527 SubType::Basic => "basic",
528 SubType::OrderBook => "orderbook",
529 SubType::OrderBookOdd => "orderbook_odd",
530 SubType::Ticker => "ticker",
531 SubType::RT => "rt",
532 SubType::KLDay => "kl_day",
533 SubType::KL1Min => "kl_1min",
534 SubType::KL3Min => "kl_3min",
535 SubType::KL5Min => "kl_5min",
536 SubType::KL15Min => "kl_15min",
537 SubType::KL30Min => "kl_30min",
538 SubType::KL60Min => "kl_60min",
539 SubType::KLWeek => "kl_week",
540 SubType::KLMonth => "kl_month",
541 SubType::KLQuarter => "kl_quarter",
542 SubType::KLYear => "kl_year",
543 SubType::Broker => "broker",
544 SubType::OrderDetail => "order_detail",
545 _ => "other",
546 }
547}
548
549fn history_file_path() -> Option<PathBuf> {
550 let base = dirs::cache_dir()?;
551 Some(base.join("futucli").join("history"))
552}