Skip to main content

futu_auth/
audit.rs

1//! 审计事件发射 helpers + JSONL 订阅层
2//!
3//! ## 设计
4//!
5//! 所有 interface(grpc / rest / ws / mcp)把 auth 决策和下单事件都走同一组
6//! helper,emit 的 `tracing::event!` 使用固定 `target = "futu_audit"`。这样:
7//!
8//! - 正常日志订阅(stderr / 文件)照常能看到它们(没过滤就全走)
9//! - 专用 audit JSONL 文件订阅层走 `filter_fn(meta.target() == TARGET)` 只收这些
10//!
11//! JSONL 每条一行,字段固定 —— 字段顺序由 tracing-subscriber 的 json formatter
12//! 决定(level / timestamp / target / message / 其它 fields),方便 `jq`/`duckdb`
13//! 后处理。
14//!
15//! ## 事件形状(field 命名)
16//!
17//! | 字段       | 含义                                        |
18//! |-----------|---------------------------------------------|
19//! | iface     | `"grpc"` / `"rest"` / `"ws"` / `"mcp"`    |
20//! | endpoint  | 具体接口:路径 / proto_id / tool 名         |
21//! | key_id    | API key id,或 `<missing>` / `<invalid>`   |
22//! | outcome   | `"allow"` / `"reject"`                     |
23//! | reason    | reject 时的文字原因(allow 时也可选)       |
24//! | args_hash | 下单工具额外附:args 的 SHA-256 前 8 hex   |
25//! | scope     | 校验的 scope(allow 时可选)                |
26//! | remote_addr | transport 层可得的远端地址(未知时为空) |
27//! | session_id | transport 层可得的连接 / session 标识(未知时为空) |
28
29use std::cell::RefCell;
30use std::path::Path;
31
32use tracing::Level;
33
34/// 固定 target,供 tracing filter 使用
35pub const TARGET: &str = "futu_audit";
36
37const MAX_AUDIT_CONTEXT_FIELD_LEN: usize = 128;
38
39/// Optional transport context attached to audit events.
40///
41/// The context intentionally contains only coarse correlation fields. API key
42/// ids and request args stay in their existing dedicated fields; caller-supplied
43/// session values are length-limited and stripped of control characters before
44/// they are written to JSONL.
45#[derive(Debug, Clone, Default, PartialEq, Eq)]
46pub struct AuditContext {
47    remote_addr: String,
48    session_id: String,
49}
50
51impl AuditContext {
52    #[must_use]
53    pub fn new<R, S>(remote_addr: Option<R>, session_id: Option<S>) -> Self
54    where
55        R: AsRef<str>,
56        S: AsRef<str>,
57    {
58        Self {
59            remote_addr: sanitize_context_field(remote_addr),
60            session_id: sanitize_context_field(session_id),
61        }
62    }
63
64    #[must_use]
65    pub fn empty() -> Self {
66        Self::default()
67    }
68
69    #[must_use]
70    pub fn remote_addr(&self) -> &str {
71        &self.remote_addr
72    }
73
74    #[must_use]
75    pub fn session_id(&self) -> &str {
76        &self.session_id
77    }
78}
79
80thread_local! {
81    static CURRENT_CONTEXT: RefCell<AuditContext> = RefCell::new(AuditContext::empty());
82}
83
84/// Run a synchronous audit-emitting block with transport context.
85///
86/// The auth pipeline is synchronous, so thread-local scoped context lets REST,
87/// gRPC, WS, and MCP adapters attach source metadata without changing every
88/// audit call site. Async code should wrap only the immediate synchronous
89/// `audit::*` / `authenticate_request` call, not an `.await` boundary.
90pub fn with_context<T>(ctx: AuditContext, f: impl FnOnce() -> T) -> T {
91    let previous = CURRENT_CONTEXT.with(|cell| std::mem::replace(&mut *cell.borrow_mut(), ctx));
92    let _guard = AuditContextGuard {
93        previous: Some(previous),
94    };
95    f()
96}
97
98fn current_context() -> AuditContext {
99    CURRENT_CONTEXT.with(|cell| cell.borrow().clone())
100}
101
102struct AuditContextGuard {
103    previous: Option<AuditContext>,
104}
105
106impl Drop for AuditContextGuard {
107    fn drop(&mut self) {
108        if let Some(previous) = self.previous.take() {
109            CURRENT_CONTEXT.with(|cell| {
110                *cell.borrow_mut() = previous;
111            });
112        }
113    }
114}
115
116fn sanitize_context_field<T>(value: Option<T>) -> String
117where
118    T: AsRef<str>,
119{
120    value
121        .map(|v| {
122            v.as_ref()
123                .chars()
124                .filter(|c| !c.is_control())
125                .take(MAX_AUDIT_CONTEXT_FIELD_LEN)
126                .collect()
127        })
128        .unwrap_or_default()
129}
130
131/// auth 拒绝事件
132pub fn reject(iface: &str, endpoint: &str, key_id: &str, reason: &str) {
133    let ctx = current_context();
134    tracing::event!(
135        target: TARGET,
136        Level::WARN,
137        iface = iface,
138        endpoint = endpoint,
139        key_id = key_id,
140        outcome = "reject",
141        reason = reason,
142        remote_addr = ctx.remote_addr(),
143        session_id = ctx.session_id(),
144        "auth reject"
145    );
146    crate::metrics::bump_auth_event(iface, "reject", key_id);
147    // 限额类的 reject 额外分桶计数(reason 以 "limit: " 开头是 guard 产生的)
148    if let Some(rest) = reason.strip_prefix("limit: ") {
149        crate::metrics::bump_limit_reject(iface, key_id, rest);
150    } else if reason.starts_with("rate limit")
151        || reason.starts_with("daily value")
152        || reason.starts_with("order value")
153    {
154        crate::metrics::bump_limit_reject(iface, key_id, reason);
155    }
156}
157
158/// auth 通过事件(用 INFO 级别;debug 模式会看到量比较大,由 EnvFilter 过滤)
159pub fn allow(iface: &str, endpoint: &str, key_id: &str, scope: Option<&str>) {
160    let ctx = current_context();
161    tracing::event!(
162        target: TARGET,
163        Level::INFO,
164        iface = iface,
165        endpoint = endpoint,
166        key_id = key_id,
167        outcome = "allow",
168        scope = scope.unwrap_or(""),
169        remote_addr = ctx.remote_addr(),
170        session_id = ctx.session_id(),
171        "auth allow"
172    );
173    crate::metrics::bump_auth_event(iface, "allow", key_id);
174}
175
176/// 交易事件(下单 / 改单 / 撤单)—— 无论 allow / reject 都记录
177pub fn trade(
178    iface: &str,
179    tool: &str,
180    key_id: &str,
181    args_hash: &str,
182    outcome: &str,
183    reason: Option<&str>,
184) {
185    let ctx = current_context();
186    tracing::event!(
187        target: TARGET,
188        Level::WARN,
189        iface = iface,
190        endpoint = tool,
191        key_id = key_id,
192        outcome = outcome,
193        args_hash = args_hash,
194        reason = reason.unwrap_or(""),
195        remote_addr = ctx.remote_addr(),
196        session_id = ctx.session_id(),
197        "trade event"
198    );
199    crate::metrics::bump_auth_event(iface, outcome, key_id);
200}
201
202// -------- JSONL 层安装 --------
203//
204// 这里不直接返回一个 `impl Layer<S>` 是因为 tracing-subscriber 的类型签名挺拗,
205// 放在 main.rs 里现场拼装反而清爽。下面这对 helper 把 "打开文件 + non-blocking
206// 包装" 抽成一个函数,并在退出时保留 guard 防止 flush 丢失。
207
208/// 打开 audit 输出路径,返回一个非阻塞 writer 和 guard(guard 必须活到进程退出)
209///
210/// - 如果 path 以 `.jsonl` / `.log` 等后缀结尾,直接当成单文件 append 打开
211/// - 否则视为目录,使用每日滚动,文件名 `futu-audit.log`
212pub fn open_writer(
213    path: &Path,
214) -> std::io::Result<(
215    tracing_appender::non_blocking::NonBlocking,
216    tracing_appender::non_blocking::WorkerGuard,
217)> {
218    futu_core::audit_log_writer::open_writer(path)
219}
220
221#[cfg(test)]
222mod tests;