数据提供者
DataProvider 向 VM 提供 K 线数据和标的元数据,服务于两个目的:
- 主 chart 数据 —
Instance::run()从中拉取主 K 线流驱动逐 bar 执行。 request.security()数据 — 多周期和跨标的调用也从同一 provider 拉取。
未设置 provider 时,run() 立即返回不执行任何 bar,所有 request.security() 调用均返回 na。
将其作为 Instance::builder() 的第一个参数传入,然后调用 run():
let instance = Instance::builder(MyProvider::new(), source, timeframe, "NASDAQ:AAPL")
.build().await?;
instance.run_to_end("NASDAQ:AAPL", timeframe).await?;Trait 定义
#[async_trait(?Send)]
pub trait DataProvider {
type Error: fmt::Display + fmt::Debug;
async fn symbol_info(
&self,
symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>>;
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>>;
}symbol_info 使用 async_trait(?Send)——直接实现为 async fn 即可。 candlesticks 返回 LocalBoxStream<'static, Result<CandlestickItem, ...>>,在返回前将所有需要的共享状态 clone 进 stream 中。
candlesticks
返回指定 (symbol, timeframe) 的 CandlestickItem 流。
CandlestickItem 是一个枚举,包含三个变体:
CandlestickItem::Confirmed(candle)— 已完全收盘的 bar。仅用于历史 bar。 请勿通过发送Confirmed来表示某个实时 bar 已收盘;应改为发送时间戳更晚的下一个Realtimeitem,VM 会自动确认上一个 bar。CandlestickItem::Realtime(candle)— 正在形成的 bar。时间戳相同的连续 item 表示原地更新(tick 更新);时间戳更晚的 item 会隐式确认上一个 bar 并开启新 bar。同一时刻最多只有一个实时 bar 处于开放状态。CandlestickItem::HistoryEnd— 恰好发送一次,位于所有历史Confirmedbar 之后、第一个Realtimebar 之前。若流结束时存在未确认的实时 bar,VM 会自动确认它。
流协议时序
期望的 item 顺序如下:
Confirmed(t0) Confirmed(t1) … Confirmed(tN) ← 历史已收盘 bar
HistoryEnd ← 边界标记(发送一次)
Realtime(tA) ← 正在形成的 bar,第一个 tick
Realtime(tA) ← 相同时间戳 → 原地更新
Realtime(tB) (tB > tA) ← 更晚的时间戳:
VM 自动确认 tA,开启 tB
Realtime(tB) ← 更新 tB
…当实时 bar 收盘时,不要发送 Confirmed(tA)。只需发送时间戳更晚的下一个 Realtime(tB),VM 会在内部自动处理确认逻辑。
| 要求 | 说明 |
|---|---|
| 升序时间 | K 线必须按 time 严格升序返回 |
起点早于 from_time | 从第一根图表 K 线开始时刻(epoch 毫秒)之前的一根 K 线开始,让 VM 能在第一个 chart bar 之前初始化 MTF 状态 |
HistoryEnd | 在所有历史 bar 之后、实时 bar 之前 emit |
'static stream | LocalBoxStream 不能借用 self——在返回前 clone 需要的句柄 |
InvalidSymbol | 不支持该标的时,作为第一个 item 返回 Err(DataProviderError::InvalidSymbol(_)) |
| 其他错误 | 网络故障等使用 Err(DataProviderError::Other(e)) |
错误类型
当唯一可能的失败是 InvalidSymbol 时,使用 type Error = std::convert::Infallible:
type Error = std::convert::Infallible;ignore_invalid_symbol
当 candlesticks 返回 Err(DataProviderError::InvalidSymbol(_)) 时:
- 如果 Pine 脚本以
ignore_invalid_symbol = true调用了request.security(),该调用每个 bar 都静默返回na。 - 否则 VM 抛出运行时错误。
这样可以区分"标的不在数据集中"(软错误,可忽略)和"连接失败"(硬错误,必须报告):
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
_from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
if !self.supports(&symbol, timeframe) {
return Box::pin(stream::iter(vec![Err(DataProviderError::InvalidSymbol(
format!("{symbol}/{timeframe} not found"),
))]));
}
// ... 正常路径
}symbol_info
返回指定标的的元数据。所有字段都是 Option——只填写你已知的部分,VM 会用交易所前缀的默认值补全其余字段。
没有额外元数据时,直接返回 PartialSymbolInfo::default():
async fn symbol_info(
&self,
_symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
Ok(PartialSymbolInfo::default())
}完整字段列表
| 字段 | 说明 |
|---|---|
market | 标的所属交易所/市场(Option<Market>)。变体:NYSE、NASDAQ、SHSE、SZSE、HKEX、SGX |
description | 标的可读名称,如 "Apple Inc." |
type_ | 工具类型(Option<SymbolType>),见下方变体列表 |
country | 交易所所在国家的 ISO 3166-1 alpha-2 代码,如 "US"、"HK" |
isin | 国际证券识别号(ISIN) |
root | 衍生品根代码,如 "ESZ4" 的根代码为 "ES" |
min_move | mintick 公式分子:mintick = min_move / price_scale |
price_scale | mintick 公式分母。如 min_move=1, price_scale=100 → mintick 0.01 |
point_value | 合约点值乘数,通常为 1.0;如 ES 期货为 50.0 |
currency | 标的价格所用货币(Option<Currency>),见下方变体列表 |
expiration_date | 当前期货合约到期日 |
current_contract | 标的合约的代码标识 |
base_currency | 外汇/加密货币对的基础货币,如 BTCUSD 中的 BTC |
employees | 员工数量(仅限股票) |
industry | 行业分类 |
sector | 板块分类 |
min_contract | 最小合约规模 |
volume_type | 成交量解释方式:Base(基础货币)、Quote(计价货币)、Tick(成交笔数) |
shareholders | 股东人数 |
shares_outstanding_float | 流通股数量 |
shares_outstanding_total | 总股本数量 |
recommendations_buy | 分析师买入评级数量 |
recommendations_buy_strong | 分析师强烈买入评级数量 |
recommendations_hold | 分析师持有评级数量 |
recommendations_sell | 分析师卖出评级数量 |
recommendations_sell_strong | 分析师强烈卖出评级数量 |
recommendations_date | 最新分析师评级更新日期 |
recommendations_total | 分析师评级总数量 |
target_price_average | 分析师平均目标价 |
target_price_date | 最新目标价更新日期 |
target_price_estimates | 目标价预测的总数量 |
target_price_high | 分析师最高目标价 |
target_price_low | 分析师最低目标价 |
target_price_median | 分析师中位目标价 |
完整变体列表请参阅 Rust API 文档中的 SymbolType 和 Currency 枚举定义。
标的格式
标的使用 "EXCHANGE:TICKER" 格式,与 TradingView 约定一致。VM 将 Pine 脚本中的原始字符串直接传递给 provider,不做任何修改:
| Pine 脚本 | 传给 provider 的值 |
|---|---|
syminfo.tickerid | "NASDAQ:AAPL"(来自图表标的) |
"BINANCE:BTCUSDT" | "BINANCE:BTCUSDT" |
Ticker 表达式(如 "NASDAQ:AAPL*0.5+NYSE:SPY*0.5")会被 VM 分解为各个标的的独立查询,再分别传给 provider。
时间框架值
TimeFrame 直接来自 Pine 脚本的时间框架字符串参数:
| Pine 字符串 | 含义 |
|---|---|
"1" | 1 分钟 |
"5" | 5 分钟 |
"60" | 60 分钟 |
"D" | 日线 |
"W" | 周线 |
"M" | 月线 |
Vec<Candlestick>
Vec<Candlestick> 直接实现了 DataProvider。对于单标的离线回测,将 vec 直接传给 Instance::builder(),无需任何包装:
use openpine_vm::{Candlestick, TimeFrame, TradeSession};
let bars = vec![
Candlestick::new(
1_700_000_000_000, // epoch 毫秒
150.0, 155.0, 149.0, 153.0,
1_000_000.0, 0.0,
TradeSession::Regular,
),
// ... 更多按时间升序排列的 K 线
];
let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
.build()
.await?;
instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;对于懒加载或流式数据源,先收集为 Vec<Candlestick>:
// 从迭代器收集后再构建
let bars: Vec<Candlestick> = load_bars_from_disk().collect();
let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
.build().await?;如果只需要脚本元数据(输入、脚本类型等),可使用独立函数 script_info()——无需数据提供者、品种或周期:
use openpine_vm::script_info;
let info = script_info(source)?;自定义内存提供者示例
需要多标的支持(如 request.security())时,直接实现 DataProvider:
use std::{collections::HashMap, convert::Infallible};
use futures_util::{stream, stream::LocalBoxStream};
use openpine_vm::{
Candlestick, CandlestickItem, DataProvider, DataProviderError, PartialSymbolInfo, TimeFrame,
TradeSession,
};
struct InMemoryProvider {
data: HashMap<(String, TimeFrame), Vec<Candlestick>>,
}
#[async_trait::async_trait(?Send)]
impl DataProvider for InMemoryProvider {
type Error = Infallible;
async fn symbol_info(
&self,
_symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
Ok(PartialSymbolInfo::default())
}
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
let bars = self
.data
.get(&(symbol.clone(), timeframe))
.cloned()
.unwrap_or_default();
// 从 from_time 前一根 K 线开始,确保 VM 能初始化 MTF 状态
let start = bars
.partition_point(|b| b.time < from_time)
.saturating_sub(1);
Box::pin(stream::iter(
bars[start..]
.to_vec()
.into_iter()
.map(|c| Ok(CandlestickItem::Confirmed(c)))
.chain(std::iter::once(Ok(CandlestickItem::HistoryEnd))),
))
}
}使用方式:
let mut provider = InMemoryProvider { data: HashMap::new() };
provider.data.insert(
("NASDAQ:AAPL".to_string(), TimeFrame::weeks(1)),
vec![/* ... */],
);
let mut instance = Instance::builder(provider, source, TimeFrame::days(1), "NASDAQ:AAPL")
.build()
.await?;
instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;异步数据库提供者示例
适用于从远程 API 或数据库获取数据的场景:
use std::sync::Arc;
use futures_util::stream::LocalBoxStream;
struct DatabaseProvider {
pool: Arc<DbPool>,
}
#[async_trait::async_trait(?Send)]
impl DataProvider for DatabaseProvider {
type Error = DbError;
async fn symbol_info(
&self,
symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
self.pool
.query_symbol_info(&symbol)
.await
.map_err(DataProviderError::Other)
}
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
let pool = Arc::clone(&self.pool);
Box::pin(async_stream::stream! {
match pool.query_candles(&symbol, timeframe, from_time).await {
Err(e) => yield Err(DataProviderError::Other(e)),
Ok(rows) => {
for row in rows {
// Confirmed 表示历史 bar;实时 bar 使用 Realtime 变体
yield Ok(CandlestickItem::Confirmed(row.into_candlestick()));
}
// 发出历史结束标记,让 VM 切换为非阻塞轮询
yield Ok(CandlestickItem::HistoryEnd);
}
}
})
}
}