Skip to content

数据提供者

DataProvider 向 VM 提供 K 线数据和标的元数据,服务于两个目的:

  1. 主 chart 数据Instance::run() 从中拉取主 K 线流驱动逐 bar 执行。
  2. request.security() 数据 — 多周期和跨标的调用也从同一 provider 拉取。

未设置 provider 时,run() 立即返回不执行任何 bar,所有 request.security() 调用均返回 na

将其作为 Instance::builder() 的第一个参数传入,然后调用 run()

rust
let instance = Instance::builder(MyProvider::new(), source, timeframe, "NASDAQ:AAPL")
    .build().await?;

instance.run_to_end("NASDAQ:AAPL", timeframe).await?;

Trait 定义

rust
#[async_trait(?Send)]
pub trait DataProvider {
    type Error: fmt::Display + fmt::Debug;

    async fn symbol_info(
        &self,
        symbol: String,
    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>>;

    fn candlesticks(
        &self,
        symbol: String,
        timeframe: TimeFrame,
        from_time: i64,
    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>>;
}

symbol_info 使用 async_trait(?Send)——直接实现为 async fn 即可。 candlesticks 返回 LocalBoxStream<'static, Result<CandlestickItem, ...>>,在返回前将所有需要的共享状态 clone 进 stream 中。

candlesticks

返回指定 (symbol, timeframe)CandlestickItem 流。

CandlestickItem 是一个枚举,包含三个变体:

  • CandlestickItem::Confirmed(candle) — 已完全收盘的 bar。仅用于历史 bar。 请勿通过发送 Confirmed 来表示某个实时 bar 已收盘;应改为发送时间戳更晚的下一个 Realtime item,VM 会自动确认上一个 bar。
  • CandlestickItem::Realtime(candle) — 正在形成的 bar。时间戳相同的连续 item 表示原地更新(tick 更新);时间戳更晚的 item 会隐式确认上一个 bar 并开启新 bar。同一时刻最多只有一个实时 bar 处于开放状态。
  • CandlestickItem::HistoryEnd — 恰好发送一次,位于所有历史 Confirmed bar 之后、第一个 Realtime bar 之前。若流结束时存在未确认的实时 bar,VM 会自动确认它。

流协议时序

期望的 item 顺序如下:

Confirmed(t0)  Confirmed(t1)  …  Confirmed(tN)   ← 历史已收盘 bar
HistoryEnd                                        ← 边界标记(发送一次)
Realtime(tA)                                      ← 正在形成的 bar,第一个 tick
Realtime(tA)                                      ← 相同时间戳 → 原地更新
Realtime(tB)   (tB > tA)                          ← 更晚的时间戳:
                                                     VM 自动确认 tA,开启 tB
Realtime(tB)                                      ← 更新 tB

当实时 bar 收盘时,不要发送 Confirmed(tA)。只需发送时间戳更晚的下一个 Realtime(tB),VM 会在内部自动处理确认逻辑。

要求说明
升序时间K 线必须按 time 严格升序返回
起点早于 from_time从第一根图表 K 线开始时刻(epoch 毫秒)之前的一根 K 线开始,让 VM 能在第一个 chart bar 之前初始化 MTF 状态
HistoryEnd在所有历史 bar 之后、实时 bar 之前 emit
'static streamLocalBoxStream 不能借用 self——在返回前 clone 需要的句柄
InvalidSymbol不支持该标的时,作为第一个 item 返回 Err(DataProviderError::InvalidSymbol(_))
其他错误网络故障等使用 Err(DataProviderError::Other(e))

错误类型

当唯一可能的失败是 InvalidSymbol 时,使用 type Error = std::convert::Infallible

rust
type Error = std::convert::Infallible;

ignore_invalid_symbol

candlesticks 返回 Err(DataProviderError::InvalidSymbol(_)) 时:

  • 如果 Pine 脚本以 ignore_invalid_symbol = true 调用了 request.security(),该调用每个 bar 都静默返回 na
  • 否则 VM 抛出运行时错误。

这样可以区分"标的不在数据集中"(软错误,可忽略)和"连接失败"(硬错误,必须报告):

rust
fn candlesticks(
    &self,
    symbol: String,
    timeframe: TimeFrame,
    _from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
    if !self.supports(&symbol, timeframe) {
        return Box::pin(stream::iter(vec![Err(DataProviderError::InvalidSymbol(
            format!("{symbol}/{timeframe} not found"),
        ))]));
    }
    // ... 正常路径
}

symbol_info

返回指定标的的元数据。所有字段都是 Option——只填写你已知的部分,VM 会用交易所前缀的默认值补全其余字段。

没有额外元数据时,直接返回 PartialSymbolInfo::default()

rust
async fn symbol_info(
    &self,
    _symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
    Ok(PartialSymbolInfo::default())
}

完整字段列表

字段说明
market标的所属交易所/市场(Option<Market>)。变体:NYSENASDAQSHSESZSEHKEXSGX
description标的可读名称,如 "Apple Inc."
type_工具类型(Option<SymbolType>),见下方变体列表
country交易所所在国家的 ISO 3166-1 alpha-2 代码,如 "US""HK"
isin国际证券识别号(ISIN)
root衍生品根代码,如 "ESZ4" 的根代码为 "ES"
min_movemintick 公式分子:mintick = min_move / price_scale
price_scalemintick 公式分母。如 min_move=1, price_scale=100 → mintick 0.01
point_value合约点值乘数,通常为 1.0;如 ES 期货为 50.0
currency标的价格所用货币(Option<Currency>),见下方变体列表
expiration_date当前期货合约到期日
current_contract标的合约的代码标识
base_currency外汇/加密货币对的基础货币,如 BTCUSD 中的 BTC
employees员工数量(仅限股票)
industry行业分类
sector板块分类
min_contract最小合约规模
volume_type成交量解释方式:Base(基础货币)、Quote(计价货币)、Tick(成交笔数)
shareholders股东人数
shares_outstanding_float流通股数量
shares_outstanding_total总股本数量
recommendations_buy分析师买入评级数量
recommendations_buy_strong分析师强烈买入评级数量
recommendations_hold分析师持有评级数量
recommendations_sell分析师卖出评级数量
recommendations_sell_strong分析师强烈卖出评级数量
recommendations_date最新分析师评级更新日期
recommendations_total分析师评级总数量
target_price_average分析师平均目标价
target_price_date最新目标价更新日期
target_price_estimates目标价预测的总数量
target_price_high分析师最高目标价
target_price_low分析师最低目标价
target_price_median分析师中位目标价

完整变体列表请参阅 Rust API 文档中的 SymbolTypeCurrency 枚举定义。

标的格式

标的使用 "EXCHANGE:TICKER" 格式,与 TradingView 约定一致。VM 将 Pine 脚本中的原始字符串直接传递给 provider,不做任何修改:

Pine 脚本传给 provider 的值
syminfo.tickerid"NASDAQ:AAPL"(来自图表标的)
"BINANCE:BTCUSDT""BINANCE:BTCUSDT"

Ticker 表达式(如 "NASDAQ:AAPL*0.5+NYSE:SPY*0.5")会被 VM 分解为各个标的的独立查询,再分别传给 provider。

时间框架值

TimeFrame 直接来自 Pine 脚本的时间框架字符串参数:

Pine 字符串含义
"1"1 分钟
"5"5 分钟
"60"60 分钟
"D"日线
"W"周线
"M"月线

Vec<Candlestick>

Vec<Candlestick> 直接实现了 DataProvider。对于单标的离线回测,将 vec 直接传给 Instance::builder(),无需任何包装:

rust
use openpine_vm::{Candlestick, TimeFrame, TradeSession};

let bars = vec![
    Candlestick::new(
        1_700_000_000_000, // epoch 毫秒
        150.0, 155.0, 149.0, 153.0,
        1_000_000.0, 0.0,
        TradeSession::Regular,
    ),
    // ... 更多按时间升序排列的 K 线
];

let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
    .build()
    .await?;

instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;

对于懒加载或流式数据源,先收集为 Vec<Candlestick>

rust
// 从迭代器收集后再构建
let bars: Vec<Candlestick> = load_bars_from_disk().collect();
let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
    .build().await?;

如果只需要脚本元数据(输入、脚本类型等),可使用独立函数 script_info()——无需数据提供者、品种或周期:

rust
use openpine_vm::script_info;

let info = script_info(source)?;

自定义内存提供者示例

需要多标的支持(如 request.security())时,直接实现 DataProvider

rust
use std::{collections::HashMap, convert::Infallible};

use futures_util::{stream, stream::LocalBoxStream};
use openpine_vm::{
    Candlestick, CandlestickItem, DataProvider, DataProviderError, PartialSymbolInfo, TimeFrame,
    TradeSession,
};

struct InMemoryProvider {
    data: HashMap<(String, TimeFrame), Vec<Candlestick>>,
}

#[async_trait::async_trait(?Send)]
impl DataProvider for InMemoryProvider {
    type Error = Infallible;

    async fn symbol_info(
        &self,
        _symbol: String,
    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
        Ok(PartialSymbolInfo::default())
    }

    fn candlesticks(
        &self,
        symbol: String,
        timeframe: TimeFrame,
        from_time: i64,
    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
        let bars = self
            .data
            .get(&(symbol.clone(), timeframe))
            .cloned()
            .unwrap_or_default();

        // 从 from_time 前一根 K 线开始,确保 VM 能初始化 MTF 状态
        let start = bars
            .partition_point(|b| b.time < from_time)
            .saturating_sub(1);

        Box::pin(stream::iter(
            bars[start..]
                .to_vec()
                .into_iter()
                .map(|c| Ok(CandlestickItem::Confirmed(c)))
                .chain(std::iter::once(Ok(CandlestickItem::HistoryEnd))),
        ))
    }
}

使用方式:

rust
let mut provider = InMemoryProvider { data: HashMap::new() };
provider.data.insert(
    ("NASDAQ:AAPL".to_string(), TimeFrame::weeks(1)),
    vec![/* ... */],
);

let mut instance = Instance::builder(provider, source, TimeFrame::days(1), "NASDAQ:AAPL")
    .build()
    .await?;

instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;

异步数据库提供者示例

适用于从远程 API 或数据库获取数据的场景:

rust
use std::sync::Arc;
use futures_util::stream::LocalBoxStream;

struct DatabaseProvider {
    pool: Arc<DbPool>,
}

#[async_trait::async_trait(?Send)]
impl DataProvider for DatabaseProvider {
    type Error = DbError;

    async fn symbol_info(
        &self,
        symbol: String,
    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
        self.pool
            .query_symbol_info(&symbol)
            .await
            .map_err(DataProviderError::Other)
    }

    fn candlesticks(
        &self,
        symbol: String,
        timeframe: TimeFrame,
        from_time: i64,
    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
        let pool = Arc::clone(&self.pool);
        Box::pin(async_stream::stream! {
            match pool.query_candles(&symbol, timeframe, from_time).await {
                Err(e) => yield Err(DataProviderError::Other(e)),
                Ok(rows) => {
                    for row in rows {
                        // Confirmed 表示历史 bar;实时 bar 使用 Realtime 变体
                        yield Ok(CandlestickItem::Confirmed(row.into_candlestick()));
                    }
                    // 发出历史结束标记,让 VM 切换为非阻塞轮询
                    yield Ok(CandlestickItem::HistoryEnd);
                }
            }
        })
    }
}

参阅

基于 MIT 许可证发布。