Skip to content

資料提供者

DataProvider 向 VM 提供 K 線資料和標的元資料,服務於兩個目的:

  1. 主 chart 資料Instance::run() 從中拉取主 K 線流驅動逐 bar 執行。
  2. request.security() 資料 — 多週期和跨標的呼叫也從同一 provider 拉取。

未設定 provider 時,run() 立即返回不執行任何 bar,所有 request.security() 呼叫均回傳 na

將其作為 Instance::builder() 的第一個參數傳入,然後呼叫 run()

rust
let instance = Instance::builder(MyProvider::new(), source, timeframe, "NASDAQ:AAPL")
    .build().await?;

instance.run_to_end("NASDAQ:AAPL", timeframe).await?;

Trait 定義

rust
#[async_trait(?Send)]
pub trait DataProvider {
    type Error: fmt::Display + fmt::Debug;

    async fn symbol_info(
        &self,
        symbol: String,
    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>>;

    fn candlesticks(
        &self,
        symbol: String,
        timeframe: TimeFrame,
        from_time: i64,
    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>>;
}

symbol_info 使用 async_trait(?Send)——直接實作為 async fn 即可。 candlesticks 返回 LocalBoxStream<'static, Result<CandlestickItem, ...>>,在返回前將所有需要的共享狀態 clone 進 stream 中。

candlesticks

返回指定 (symbol, timeframe)CandlestickItem 串流。

CandlestickItem 是一個列舉,包含三個變體:

  • CandlestickItem::Confirmed(candle) — 已完全收盤的 bar。僅用於歷史 bar。 請勿透過發送 Confirmed 來表示某個即時 bar 已收盤;應改為發送時間戳更晚的下一個 Realtime item,VM 會自動確認上一個 bar。
  • CandlestickItem::Realtime(candle) — 正在形成的 bar。時間戳相同的連續 item 表示原地更新(tick 更新);時間戳更晚的 item 會隱式確認上一個 bar 並開啟新 bar。同一時刻最多只有一個即時 bar 處於開放狀態。
  • CandlestickItem::HistoryEnd — 恰好發送一次,位於所有歷史 Confirmed bar 之後、第一個 Realtime bar 之前。若串流結束時存在未確認的即時 bar,VM 會自動確認它。

串流協議時序

期望的 item 順序如下:

Confirmed(t0)  Confirmed(t1)  …  Confirmed(tN)   ← 歷史已收盤 bar
HistoryEnd                                        ← 邊界標記(發送一次)
Realtime(tA)                                      ← 正在形成的 bar,第一個 tick
Realtime(tA)                                      ← 相同時間戳 → 原地更新
Realtime(tB)   (tB > tA)                          ← 更晚的時間戳:
                                                     VM 自動確認 tA,開啟 tB
Realtime(tB)                                      ← 更新 tB

當即時 bar 收盤時,不要發送 Confirmed(tA)。只需發送時間戳更晚的下一個 Realtime(tB),VM 會在內部自動處理確認邏輯。

要求說明
升序時間K 線必須按 time 嚴格升序返回
起點早於 from_time從第一根圖表 K 線開始時刻(epoch 毫秒)之前的一根 K 線開始,讓 VM 能在第一個 chart bar 之前初始化 MTF 狀態
HistoryEnd在所有歷史 bar 之後、即時 bar 之前 emit
'static streamLocalBoxStream 不能借用 self——在返回前 clone 需要的句柄
InvalidSymbol不支援該標的時,作為第一個 item 返回 Err(DataProviderError::InvalidSymbol(_))
其他錯誤網路故障等使用 Err(DataProviderError::Other(e))

錯誤類型

當唯一可能的失敗是 InvalidSymbol 時,使用 type Error = std::convert::Infallible

rust
type Error = std::convert::Infallible;

ignore_invalid_symbol

candlesticks 返回 Err(DataProviderError::InvalidSymbol(_)) 時:

  • 如果 Pine 腳本以 ignore_invalid_symbol = true 呼叫了 request.security(),該呼叫每個 bar 都靜默返回 na
  • 否則 VM 拋出運行時錯誤。

這樣可以區分「標的不在資料集中」(軟錯誤,可忽略)和「連線失敗」(硬錯誤,必須回報):

rust
fn candlesticks(
    &self,
    symbol: String,
    timeframe: TimeFrame,
    _from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
    if !self.supports(&symbol, timeframe) {
        return Box::pin(stream::iter(vec![Err(DataProviderError::InvalidSymbol(
            format!("{symbol}/{timeframe} not found"),
        ))]));
    }
    // ... 正常路徑
}

symbol_info

返回指定標的的元資料。所有欄位都是 Option——只填寫已知的部分,VM 會用交易所前綴的預設值補全其餘欄位。

沒有額外元資料時,直接返回 PartialSymbolInfo::default()

rust
async fn symbol_info(
    &self,
    _symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
    Ok(PartialSymbolInfo::default())
}

完整欄位列表

欄位說明
market標的所屬交易所/市場(Option<Market>)。變體:NYSENASDAQSHSESZSEHKEXSGX
description標的可讀名稱,如 "Apple Inc."
type_工具類型(Option<SymbolType>),見下方變體列表
country交易所所在國家的 ISO 3166-1 alpha-2 代碼,如 "US""HK"
isin國際證券識別號(ISIN)
root衍生品根代碼,如 "ESZ4" 的根代碼為 "ES"
min_movemintick 公式分子:mintick = min_move / price_scale
price_scalemintick 公式分母。如 min_move=1, price_scale=100 → mintick 0.01
point_value合約點值乘數,通常為 1.0;如 ES 期貨為 50.0
currency標的價格所用貨幣(Option<Currency>),見下方變體列表
expiration_date當前期貨合約到期日
current_contract標的合約的代碼識別符
base_currency外匯/加密貨幣對的基礎貨幣,如 BTCUSD 中的 BTC
employees員工人數(僅限股票)
industry行業分類
sector板塊分類
min_contract最小合約規模
volume_type成交量解釋方式:Base(基礎貨幣)、Quote(計價貨幣)、Tick(成交筆數)
shareholders股東人數
shares_outstanding_float流通股數量
shares_outstanding_total總股本數量
recommendations_buy分析師買入評級數量
recommendations_buy_strong分析師強烈買入評級數量
recommendations_hold分析師持有評級數量
recommendations_sell分析師賣出評級數量
recommendations_sell_strong分析師強烈賣出評級數量
recommendations_date最新分析師評級更新日期
recommendations_total分析師評級總數量
target_price_average分析師平均目標價
target_price_date最新目標價更新日期
target_price_estimates目標價預測的總數量
target_price_high分析師最高目標價
target_price_low分析師最低目標價
target_price_median分析師中位目標價

完整變體列表請參閱 Rust API 文件中的 SymbolTypeCurrency 列舉定義。

標的格式

標的使用 "EXCHANGE:TICKER" 格式,與 TradingView 約定一致。VM 將 Pine 腳本中的原始字串直接傳遞給 provider,不做任何修改:

Pine 腳本傳給 provider 的值
syminfo.tickerid"NASDAQ:AAPL"(來自圖表標的)
"BINANCE:BTCUSDT""BINANCE:BTCUSDT"

Ticker 表達式(如 "NASDAQ:AAPL*0.5+NYSE:SPY*0.5")會被 VM 分解為各個標的的獨立查詢,再分別傳給 provider。

時間框架值

TimeFrame 直接來自 Pine 腳本的時間框架字串參數:

Pine 字串含義
"1"1 分鐘
"5"5 分鐘
"60"60 分鐘
"D"日線
"W"週線
"M"月線

Vec<Candlestick>

Vec<Candlestick> 直接實作了 DataProvider。對於單標的離線回測,將 vec 直接傳給 Instance::builder(),無需任何包裝:

rust
use openpine_vm::{Candlestick, TimeFrame, TradeSession};

let bars = vec![
    Candlestick::new(
        1_700_000_000_000, // epoch 毫秒
        150.0, 155.0, 149.0, 153.0,
        1_000_000.0, 0.0,
        TradeSession::Regular,
    ),
    // ... 更多按時間升序排列的 K 線
];

let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
    .build()
    .await?;

instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;

對於惰性載入或串流資料,先收集為 Vec<Candlestick>

rust
// 從迭代器收集後再建構
let bars: Vec<Candlestick> = load_bars_from_disk().collect();
let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
    .build().await?;

如果只需要腳本元資料(輸入、腳本類型等),可使用獨立函數 script_info()——無需資料提供者、品種或週期:

rust
use openpine_vm::script_info;

let info = script_info(source)?;

自訂內存提供者示例

需要多標的支援(如 request.security())時,直接實作 DataProvider

rust
use std::{collections::HashMap, convert::Infallible};

use futures_util::{stream, stream::LocalBoxStream};
use openpine_vm::{
    Candlestick, CandlestickItem, DataProvider, DataProviderError, PartialSymbolInfo, TimeFrame,
    TradeSession,
};

struct InMemoryProvider {
    data: HashMap<(String, TimeFrame), Vec<Candlestick>>,
}

#[async_trait::async_trait(?Send)]
impl DataProvider for InMemoryProvider {
    type Error = Infallible;

    async fn symbol_info(
        &self,
        _symbol: String,
    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
        Ok(PartialSymbolInfo::default())
    }

    fn candlesticks(
        &self,
        symbol: String,
        timeframe: TimeFrame,
        from_time: i64,
    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
        let bars = self
            .data
            .get(&(symbol.clone(), timeframe))
            .cloned()
            .unwrap_or_default();

        // 從 from_time 前一根 K 線開始,確保 VM 能初始化 MTF 狀態
        let start = bars
            .partition_point(|b| b.time < from_time)
            .saturating_sub(1);

        Box::pin(stream::iter(
            bars[start..]
                .to_vec()
                .into_iter()
                .map(|c| Ok(CandlestickItem::Confirmed(c)))
                .chain(std::iter::once(Ok(CandlestickItem::HistoryEnd))),
        ))
    }
}

使用方式:

rust
let mut provider = InMemoryProvider { data: HashMap::new() };
provider.data.insert(
    ("NASDAQ:AAPL".to_string(), TimeFrame::weeks(1)),
    vec![/* ... */],
);

let mut instance = Instance::builder(provider, source, TimeFrame::days(1), "NASDAQ:AAPL")
    .build()
    .await?;

instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;

異步資料庫提供者示例

適用於從遠端 API 或資料庫獲取資料的場景:

rust
use std::sync::Arc;
use futures_util::stream::LocalBoxStream;

struct DatabaseProvider {
    pool: Arc<DbPool>,
}

#[async_trait::async_trait(?Send)]
impl DataProvider for DatabaseProvider {
    type Error = DbError;

    async fn symbol_info(
        &self,
        symbol: String,
    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
        self.pool
            .query_symbol_info(&symbol)
            .await
            .map_err(DataProviderError::Other)
    }

    fn candlesticks(
        &self,
        symbol: String,
        timeframe: TimeFrame,
        from_time: i64,
    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
        let pool = Arc::clone(&self.pool);
        Box::pin(async_stream::stream! {
            match pool.query_candles(&symbol, timeframe, from_time).await {
                Err(e) => yield Err(DataProviderError::Other(e)),
                Ok(rows) => {
                    for row in rows {
                        // Confirmed 表示歷史 bar;即時 bar 使用 Realtime 變體
                        yield Ok(CandlestickItem::Confirmed(row.into_candlestick()));
                    }
                    // 發出歷史結束標記,讓 VM 切換為非阻塞輪詢
                    yield Ok(CandlestickItem::HistoryEnd);
                }
            }
        })
    }
}

參閱

基於 MIT 許可證發佈。