Skip to main content

futucli/cmd/
repl.rs

1//! `futucli repl` — 交互式 REPL
2//!
3//! 特性:
4//! - 共享一条 FutuClient 长连接,避免每条命令重连网关
5//! - 复用所有子命令(通过 `cli::dispatch`)
6//! - rustyline 行编辑:↑↓ 历史、Ctrl-R 反向搜索、Ctrl-D 退出
7//! - 历史持久化到 ~/.cache/futucli/history(或 $XDG_CACHE_HOME)
8//! - 订阅推送实时打印且不打断 prompt(`ExternalPrinter`)
9//! - REPL 专属命令:help / exit / reconnect / subs / unsub
10//!
11//! 实现要点:`rustyline::readline` 是阻塞式 API,放进 `tokio::task::spawn_blocking`
12//! 以与 async 命令共存。
13
14use std::collections::{BTreeMap, HashSet};
15use std::io::ErrorKind;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18
19use anyhow::{Context, Result, anyhow, bail};
20use async_trait::async_trait;
21use clap::Parser;
22use futu_net::client::{FutuClient, PushReceiver};
23use futu_qot::push::{QuoteHandler, QuotePushDispatcher};
24use futu_qot::types::{BasicQot, KLine, OrderBookData, Security, SubType};
25use rustyline::error::ReadlineError;
26use rustyline::{DefaultEditor, ExternalPrinter};
27use tokio::sync::Mutex;
28
29use crate::cli::{Cli, Command};
30use crate::common::{connect_gateway, format_symbol, parse_sub_type, parse_symbol};
31use crate::output::OutputFormat;
32
33#[cfg(test)]
34mod tests;
35
36/// REPL 运行时状态。订阅状态按 `Security → 订阅的 SubType 集合` 记录,
37/// 用 `BTreeMap` 保证 `subs` 命令输出稳定可读。
38struct ReplState {
39    gateway: String,
40    output: OutputFormat,
41    client: Mutex<Arc<FutuClient>>,
42    subs: Mutex<BTreeMap<String, HashSet<SubType>>>,
43}
44
45/// 推送打印器:tty 下走 rustyline ExternalPrinter(不撞 prompt),
46/// 非 tty(pipe / 重定向)下降级到 stderr,保证脚本 / 测试里 REPL 仍可用。
47enum PushPrinter {
48    External(Box<dyn ExternalPrinter + Send>),
49    Stderr,
50}
51
52impl PushPrinter {
53    fn print(&mut self, msg: String) {
54        let print_err = match self {
55            PushPrinter::External(p) => p.print(msg.clone()).err(),
56            PushPrinter::Stderr => {
57                eprint!("{msg}");
58                return;
59            }
60        };
61
62        let Some(err) = print_err else {
63            return;
64        };
65
66        eprintln!("warning: REPL push printer failed ({err}); falling back to stderr");
67        eprint!("{msg}");
68        *self = PushPrinter::Stderr;
69    }
70}
71
72/// 包一层 tokio Mutex,跨 task 共享。
73type SharedPrinter = Arc<Mutex<PushPrinter>>;
74
75pub async fn run(gateway: &str, output: OutputFormat) -> Result<()> {
76    // 1. 建立长连接
77    let (client, push_rx) = connect_gateway(gateway, "futucli-repl").await?;
78    eprintln!("✓ connected to {gateway}");
79
80    let state = Arc::new(ReplState {
81        gateway: gateway.to_string(),
82        output,
83        client: Mutex::new(client),
84        subs: Mutex::new(BTreeMap::new()),
85    });
86
87    // 2. rustyline editor + 历史文件
88    let history_path = history_file_path();
89    let mut rl = DefaultEditor::new().context("init rustyline editor")?;
90    if let Some(p) = &history_path {
91        load_history_best_effort(&mut rl, p);
92    }
93
94    // 3. ExternalPrinter:供 push task 打印,不打断 prompt;
95    //    非 tty 环境(pipe)直接降级到 stderr。
96    let printer_inner = match rl.create_external_printer() {
97        Ok(p) => PushPrinter::External(Box::new(p)),
98        Err(e) => {
99            eprintln!("note: external printer unavailable ({e}); push output will go to stderr");
100            PushPrinter::Stderr
101        }
102    };
103    let printer: SharedPrinter = Arc::new(Mutex::new(printer_inner));
104
105    // 4. 启动 push 消费 task
106    tokio::spawn(push_loop(push_rx, printer.clone()));
107
108    // 5. 主循环:readline → 解析 → dispatch
109    print_banner(gateway);
110    loop {
111        // rustyline 的 readline 是阻塞的,放到 blocking 线程里
112        let prompt = format!("futu ({}) > ", state.gateway);
113        let read_result = tokio::task::spawn_blocking(move || {
114            let res = rl.readline(&prompt);
115            (rl, res)
116        })
117        .await
118        .context("readline task panicked")?;
119        rl = read_result.0;
120
121        let line = match read_result.1 {
122            Ok(l) => l,
123            Err(ReadlineError::Interrupted) => {
124                eprintln!("(Ctrl-C) — type 'exit' or Ctrl-D to quit");
125                continue;
126            }
127            Err(ReadlineError::Eof) => {
128                eprintln!("bye");
129                break;
130            }
131            Err(e) => {
132                eprintln!("readline error: {e}");
133                break;
134            }
135        };
136        let trimmed = line.trim();
137        if trimmed.is_empty() {
138            continue;
139        }
140        if let Err(err) = rl.add_history_entry(trimmed) {
141            eprintln!("warning: add REPL history entry failed: {err}");
142        }
143
144        // 6. 派发:REPL 专属 > 子命令
145        match handle_line(trimmed, &state, &printer).await {
146            Ok(ShouldContinue::Continue) => {}
147            Ok(ShouldContinue::Exit) => break,
148            Err(e) => eprintln!("error: {e:#}"),
149        }
150    }
151
152    // 退出前保存历史
153    if let Some(p) = &history_path {
154        save_history_best_effort(&mut rl, p);
155    }
156    Ok(())
157}
158
159fn load_history_best_effort(rl: &mut DefaultEditor, path: &Path) {
160    match rl.load_history(path) {
161        Ok(()) => {}
162        Err(ReadlineError::Io(err)) if err.kind() == ErrorKind::NotFound => {
163            // 首次运行无历史文件属正常。
164        }
165        Err(err) => {
166            eprintln!(
167                "warning: load REPL history failed for {}: {err}",
168                path.display()
169            );
170        }
171    }
172}
173
174fn save_history_best_effort(rl: &mut DefaultEditor, path: &Path) {
175    let parent = path.parent().unwrap_or_else(|| Path::new("."));
176    if let Err(err) = std::fs::create_dir_all(parent) {
177        eprintln!(
178            "warning: create REPL history directory failed for {}: {err}",
179            parent.display()
180        );
181        return;
182    }
183    if let Err(err) = rl.save_history(path) {
184        eprintln!(
185            "warning: save REPL history failed for {}: {err}",
186            path.display()
187        );
188    }
189}
190
191enum ShouldContinue {
192    Continue,
193    Exit,
194}
195
196/// 路由一行输入。REPL 专属命令优先;否则走 clap 解析再转 `cli::dispatch`。
197async fn handle_line(
198    line: &str,
199    state: &Arc<ReplState>,
200    printer: &SharedPrinter,
201) -> Result<ShouldContinue> {
202    let tokens = shlex::split(line).ok_or_else(|| anyhow!("failed to tokenize input"))?;
203    if tokens.is_empty() {
204        return Ok(ShouldContinue::Continue);
205    }
206
207    match tokens[0].as_str() {
208        "exit" | "quit" | ":q" => return Ok(ShouldContinue::Exit),
209        "help" | "?" => {
210            print_help();
211            return Ok(ShouldContinue::Continue);
212        }
213        "reconnect" => {
214            reconnect(state, printer).await?;
215            return Ok(ShouldContinue::Continue);
216        }
217        "subs" => {
218            list_subs(state).await;
219            return Ok(ShouldContinue::Continue);
220        }
221        "sub" if tokens.len() >= 2 && looks_like_symbol(&tokens[1]) => {
222            // 覆盖顶层 `sub` 子命令,改为 REPL 内部后台订阅
223            return sub_inline(&tokens[1..], state, printer)
224                .await
225                .map(|_| ShouldContinue::Continue);
226        }
227        "unsub" => {
228            return unsub_inline(&tokens[1..], state)
229                .await
230                .map(|_| ShouldContinue::Continue);
231        }
232        _ => {}
233    }
234
235    // clap 解析(注入一个伪 argv[0])
236    let argv = std::iter::once("futucli".to_string())
237        .chain(tokens)
238        .collect::<Vec<_>>();
239    let parsed = match Cli::try_parse_from(&argv) {
240        Ok(p) => p,
241        Err(e) => {
242            // clap 的帮助 / 错误消息自己会带换行;直接打出
243            eprint!("{e}");
244            return Ok(ShouldContinue::Continue);
245        }
246    };
247
248    // REPL 里禁止再次进入 repl
249    if matches!(parsed.command, Command::Repl) {
250        bail!("already in REPL");
251    }
252
253    // 使用 REPL 的 gateway/output,忽略解析到的(避免 REPL 里切换网关造成长连接失效)
254    crate::cli::dispatch(&state.gateway, state.output, parsed.command).await?;
255    Ok(ShouldContinue::Continue)
256}
257
258// ========== REPL 专属命令实现 ==========
259
260fn print_banner(gateway: &str) {
261    eprintln!("futucli REPL — gateway {gateway}");
262    eprintln!("type 'help' for commands, 'exit' or Ctrl-D to quit");
263}
264
265fn print_help() {
266    let lines = [
267        "REPL 内置命令:",
268        "  help | ?              显示本帮助",
269        "  exit | quit | :q      退出",
270        "  reconnect             断开并重新连接网关",
271        "  subs                  列出当前活跃订阅",
272        "  sub <SYMBOL> [-t csv] 后台订阅(推送在 prompt 上方实时显示)",
273        "  unsub <SYMBOL> [csv]  取消订阅(csv 省略则全部类型)",
274        "",
275        "子命令(和外层 futucli 一致):",
276        "  ping / quote / snapshot / kline / orderbook / ticker / rt / static /",
277        "  broker / plate-list / plate-stocks / account / funds / position /",
278        "  order / deal / unlock-trade",
279        "",
280        "示例:",
281        "  quote HK.00700 US.AAPL",
282        "  kline HK.00700 -t day -n 5",
283        "  sub HK.00700 -t basic,orderbook",
284    ];
285    for l in lines {
286        eprintln!("{l}");
287    }
288}
289
290async fn reconnect(state: &Arc<ReplState>, printer: &SharedPrinter) -> Result<()> {
291    let (new_client, new_rx) = connect_gateway(&state.gateway, "futucli-repl").await?;
292
293    *state.client.lock().await = new_client;
294    tokio::spawn(push_loop(new_rx, printer.clone()));
295    eprintln!("✓ reconnected to {}", state.gateway);
296    Ok(())
297}
298
299async fn list_subs(state: &Arc<ReplState>) {
300    let guard = state.subs.lock().await;
301    if guard.is_empty() {
302        println!("(no active subscriptions)");
303        return;
304    }
305    println!("Active subscriptions:");
306    for (sym, types) in guard.iter() {
307        let mut names: Vec<&'static str> = types.iter().copied().map(sub_type_label).collect();
308        names.sort_unstable();
309        println!("  {sym:<14} {}", names.join(","));
310    }
311}
312
313fn looks_like_symbol(tok: &str) -> bool {
314    tok.contains('.') && !tok.starts_with('-')
315}
316
317async fn sub_inline(
318    args: &[String],
319    state: &Arc<ReplState>,
320    _printer: &SharedPrinter,
321) -> Result<()> {
322    // 用法: sub <SYMBOL...> [-t csv]
323    let (symbols, types_csv) = split_sub_args(args)?;
324    let secs: Vec<Security> = symbols
325        .iter()
326        .map(|s| parse_symbol(s))
327        .collect::<Result<_>>()?;
328    let sub_types: Vec<SubType> = types_csv
329        .split(',')
330        .map(parse_sub_type)
331        .collect::<Result<_>>()?;
332
333    let client = state.client.lock().await.clone();
334    futu_qot::sub::subscribe(&client, &secs, &sub_types, true, true).await?;
335    {
336        let mut guard = state.subs.lock().await;
337        for s in &symbols {
338            let entry = guard.entry(s.clone()).or_default();
339            for t in &sub_types {
340                entry.insert(*t);
341            }
342        }
343    }
344    eprintln!("✓ subscribed symbols={symbols:?} types={types_csv}");
345    Ok(())
346}
347
348async fn unsub_inline(args: &[String], state: &Arc<ReplState>) -> Result<()> {
349    if args.is_empty() {
350        bail!("usage: unsub <SYMBOL> [csv]");
351    }
352    let symbol = args[0].clone();
353    let types_csv = args.get(1).cloned();
354
355    let sec = parse_symbol(&symbol)?;
356    let sub_types: Vec<SubType> = match types_csv {
357        Some(csv) if !csv.is_empty() => {
358            csv.split(',').map(parse_sub_type).collect::<Result<_>>()?
359        }
360        _ => {
361            let guard = state.subs.lock().await;
362            guard
363                .get(&symbol)
364                .map(|s| s.iter().copied().collect())
365                .unwrap_or_default()
366        }
367    };
368    if sub_types.is_empty() {
369        bail!("no active subscription for {symbol}");
370    }
371
372    let client = state.client.lock().await.clone();
373    futu_qot::sub::unsubscribe(&client, std::slice::from_ref(&sec), &sub_types).await?;
374    {
375        let mut guard = state.subs.lock().await;
376        if let Some(set) = guard.get_mut(&symbol) {
377            for t in &sub_types {
378                set.remove(t);
379            }
380            if set.is_empty() {
381                guard.remove(&symbol);
382            }
383        }
384    }
385    eprintln!(
386        "✓ unsubscribed {symbol} types=[{}]",
387        sub_types
388            .iter()
389            .map(|t| sub_type_label(*t))
390            .collect::<Vec<_>>()
391            .join(",")
392    );
393    Ok(())
394}
395
396/// 拆 `sub HK.00700 US.AAPL -t basic,rt` → (["HK.00700","US.AAPL"], "basic,rt")
397fn split_sub_args(args: &[String]) -> Result<(Vec<String>, String)> {
398    let mut symbols = Vec::new();
399    let mut types = "basic".to_string();
400    let mut i = 0;
401    while i < args.len() {
402        let a = &args[i];
403        if a == "-t" || a == "--type" {
404            i += 1;
405            types = args
406                .get(i)
407                .ok_or_else(|| anyhow!("-t needs a value"))?
408                .clone();
409        } else if let Some(rest) = a.strip_prefix("--type=") {
410            types = rest.to_string();
411        } else if looks_like_symbol(a) {
412            symbols.push(a.clone());
413        } else {
414            bail!("unexpected token {a:?}");
415        }
416        i += 1;
417    }
418    if symbols.is_empty() {
419        bail!("usage: sub <SYMBOL...> [-t csv]");
420    }
421    Ok((symbols, types))
422}
423
424// ========== push 异步打印 ==========
425
426struct PrintHandler {
427    printer: SharedPrinter,
428}
429
430impl PrintHandler {
431    async fn emit(&self, s: String) {
432        let mut g = self.printer.lock().await;
433        g.print(format!("{s}\n"));
434    }
435}
436
437#[async_trait]
438impl QuoteHandler for PrintHandler {
439    async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
440        for q in qot_list {
441            let sym = format_symbol(&q.security);
442            let change = q.cur_price - q.last_close_price;
443            let pct = if q.last_close_price != 0.0 {
444                change / q.last_close_price * 100.0
445            } else {
446                0.0
447            };
448            let sign = if change >= 0.0 { "+" } else { "" };
449            self.emit(format!(
450                "[{}] basic {sym:<12} px={:.3} {sign}{change:.3} ({sign}{pct:.2}%) vol={}",
451                q.update_time, q.cur_price, q.volume
452            ))
453            .await;
454        }
455    }
456
457    async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
458        let sym = format_symbol(&security);
459        for k in kl_list {
460            self.emit(format!(
461                "[{}] kl    {sym:<12} O={:.3} H={:.3} L={:.3} C={:.3} V={}",
462                k.time, k.open_price, k.high_price, k.low_price, k.close_price, k.volume
463            ))
464            .await;
465        }
466    }
467
468    async fn on_order_book_update(&self, data: OrderBookData) {
469        let sym = format_symbol(&data.security);
470        let top_bid = data.bid_list.first();
471        let top_ask = data.ask_list.first();
472        let msg = match (top_bid, top_ask) {
473            (Some(b), Some(a)) => format!(
474                "              ob    {sym:<12} bid={:.3}x{} ask={:.3}x{} spr={:.3}",
475                b.price,
476                b.volume,
477                a.price,
478                a.volume,
479                a.price - b.price
480            ),
481            _ => format!("              ob    {sym:<12} (empty)"),
482        };
483        self.emit(msg).await;
484    }
485
486    async fn on_ticker_update(
487        &self,
488        security: Security,
489        ticker_list: Vec<futu_qot::ticker::Ticker>,
490    ) {
491        let sym = format_symbol(&security);
492        for t in ticker_list {
493            self.emit(format!(
494                "              tick  {sym:<12} px={:.3} vol={} dir={}",
495                t.price, t.volume, t.dir
496            ))
497            .await;
498        }
499    }
500
501    async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
502        let sym = format_symbol(&security);
503        for r in rt_list {
504            self.emit(format!(
505                "[{}] rt    {sym:<12} px={:.3} avg={:.3} vol={}",
506                r.time, r.price, r.avg_price, r.volume
507            ))
508            .await;
509        }
510    }
511}
512
513async fn push_loop(mut rx: PushReceiver, printer: SharedPrinter) {
514    let handler = PrintHandler { printer };
515    while let Some(msg) = rx.recv().await {
516        if let Err(e) = QuotePushDispatcher::dispatch(&handler, msg.proto_id, &msg.body).await {
517            let mut g = handler.printer.lock().await;
518            g.print(format!("push dispatch error: {e}\n"));
519        }
520    }
521}
522
523// ========== util ==========
524
525fn sub_type_label(t: SubType) -> &'static str {
526    match t {
527        SubType::Basic => "basic",
528        SubType::OrderBook => "orderbook",
529        SubType::OrderBookOdd => "orderbook_odd",
530        SubType::Ticker => "ticker",
531        SubType::RT => "rt",
532        SubType::KLDay => "kl_day",
533        SubType::KL1Min => "kl_1min",
534        SubType::KL3Min => "kl_3min",
535        SubType::KL5Min => "kl_5min",
536        SubType::KL15Min => "kl_15min",
537        SubType::KL30Min => "kl_30min",
538        SubType::KL60Min => "kl_60min",
539        SubType::KLWeek => "kl_week",
540        SubType::KLMonth => "kl_month",
541        SubType::KLQuarter => "kl_quarter",
542        SubType::KLYear => "kl_year",
543        SubType::Broker => "broker",
544        SubType::OrderDetail => "order_detail",
545        _ => "other",
546    }
547}
548
549fn history_file_path() -> Option<PathBuf> {
550    let base = dirs::cache_dir()?;
551    Some(base.join("futucli").join("history"))
552}