資料提供者
DataProvider 向 VM 提供 K 線資料和標的元資料,服務於兩個目的:
- 主 chart 資料 —
Instance::run()從中拉取主 K 線流驅動逐 bar 執行。 request.security()資料 — 多週期和跨標的呼叫也從同一 provider 拉取。
未設定 provider 時,run() 立即返回不執行任何 bar,所有 request.security() 呼叫均回傳 na。
將其作為 Instance::builder() 的第一個參數傳入,然後呼叫 run():
let instance = Instance::builder(MyProvider::new(), source, timeframe, "NASDAQ:AAPL")
.build().await?;
instance.run_to_end("NASDAQ:AAPL", timeframe).await?;Trait 定義
#[async_trait(?Send)]
pub trait DataProvider {
type Error: fmt::Display + fmt::Debug;
async fn symbol_info(
&self,
symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>>;
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>>;
}symbol_info 使用 async_trait(?Send)——直接實作為 async fn 即可。 candlesticks 返回 LocalBoxStream<'static, Result<CandlestickItem, ...>>,在返回前將所有需要的共享狀態 clone 進 stream 中。
candlesticks
返回指定 (symbol, timeframe) 的 CandlestickItem 串流。
CandlestickItem 是一個列舉,包含三個變體:
CandlestickItem::Confirmed(candle)— 已完全收盤的 bar。僅用於歷史 bar。 請勿透過發送Confirmed來表示某個即時 bar 已收盤;應改為發送時間戳更晚的下一個Realtimeitem,VM 會自動確認上一個 bar。CandlestickItem::Realtime(candle)— 正在形成的 bar。時間戳相同的連續 item 表示原地更新(tick 更新);時間戳更晚的 item 會隱式確認上一個 bar 並開啟新 bar。同一時刻最多只有一個即時 bar 處於開放狀態。CandlestickItem::HistoryEnd— 恰好發送一次,位於所有歷史Confirmedbar 之後、第一個Realtimebar 之前。若串流結束時存在未確認的即時 bar,VM 會自動確認它。
串流協議時序
期望的 item 順序如下:
Confirmed(t0) Confirmed(t1) … Confirmed(tN) ← 歷史已收盤 bar
HistoryEnd ← 邊界標記(發送一次)
Realtime(tA) ← 正在形成的 bar,第一個 tick
Realtime(tA) ← 相同時間戳 → 原地更新
Realtime(tB) (tB > tA) ← 更晚的時間戳:
VM 自動確認 tA,開啟 tB
Realtime(tB) ← 更新 tB
…當即時 bar 收盤時,不要發送 Confirmed(tA)。只需發送時間戳更晚的下一個 Realtime(tB),VM 會在內部自動處理確認邏輯。
| 要求 | 說明 |
|---|---|
| 升序時間 | K 線必須按 time 嚴格升序返回 |
起點早於 from_time | 從第一根圖表 K 線開始時刻(epoch 毫秒)之前的一根 K 線開始,讓 VM 能在第一個 chart bar 之前初始化 MTF 狀態 |
HistoryEnd | 在所有歷史 bar 之後、即時 bar 之前 emit |
'static stream | LocalBoxStream 不能借用 self——在返回前 clone 需要的句柄 |
InvalidSymbol | 不支援該標的時,作為第一個 item 返回 Err(DataProviderError::InvalidSymbol(_)) |
| 其他錯誤 | 網路故障等使用 Err(DataProviderError::Other(e)) |
錯誤類型
當唯一可能的失敗是 InvalidSymbol 時,使用 type Error = std::convert::Infallible:
type Error = std::convert::Infallible;ignore_invalid_symbol
當 candlesticks 返回 Err(DataProviderError::InvalidSymbol(_)) 時:
- 如果 Pine 腳本以
ignore_invalid_symbol = true呼叫了request.security(),該呼叫每個 bar 都靜默返回na。 - 否則 VM 拋出運行時錯誤。
這樣可以區分「標的不在資料集中」(軟錯誤,可忽略)和「連線失敗」(硬錯誤,必須回報):
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
_from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
if !self.supports(&symbol, timeframe) {
return Box::pin(stream::iter(vec![Err(DataProviderError::InvalidSymbol(
format!("{symbol}/{timeframe} not found"),
))]));
}
// ... 正常路徑
}symbol_info
返回指定標的的元資料。所有欄位都是 Option——只填寫已知的部分,VM 會用交易所前綴的預設值補全其餘欄位。
沒有額外元資料時,直接返回 PartialSymbolInfo::default():
async fn symbol_info(
&self,
_symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
Ok(PartialSymbolInfo::default())
}完整欄位列表
| 欄位 | 說明 |
|---|---|
market | 標的所屬交易所/市場(Option<Market>)。變體:NYSE、NASDAQ、SHSE、SZSE、HKEX、SGX |
description | 標的可讀名稱,如 "Apple Inc." |
type_ | 工具類型(Option<SymbolType>),見下方變體列表 |
country | 交易所所在國家的 ISO 3166-1 alpha-2 代碼,如 "US"、"HK" |
isin | 國際證券識別號(ISIN) |
root | 衍生品根代碼,如 "ESZ4" 的根代碼為 "ES" |
min_move | mintick 公式分子:mintick = min_move / price_scale |
price_scale | mintick 公式分母。如 min_move=1, price_scale=100 → mintick 0.01 |
point_value | 合約點值乘數,通常為 1.0;如 ES 期貨為 50.0 |
currency | 標的價格所用貨幣(Option<Currency>),見下方變體列表 |
expiration_date | 當前期貨合約到期日 |
current_contract | 標的合約的代碼識別符 |
base_currency | 外匯/加密貨幣對的基礎貨幣,如 BTCUSD 中的 BTC |
employees | 員工人數(僅限股票) |
industry | 行業分類 |
sector | 板塊分類 |
min_contract | 最小合約規模 |
volume_type | 成交量解釋方式:Base(基礎貨幣)、Quote(計價貨幣)、Tick(成交筆數) |
shareholders | 股東人數 |
shares_outstanding_float | 流通股數量 |
shares_outstanding_total | 總股本數量 |
recommendations_buy | 分析師買入評級數量 |
recommendations_buy_strong | 分析師強烈買入評級數量 |
recommendations_hold | 分析師持有評級數量 |
recommendations_sell | 分析師賣出評級數量 |
recommendations_sell_strong | 分析師強烈賣出評級數量 |
recommendations_date | 最新分析師評級更新日期 |
recommendations_total | 分析師評級總數量 |
target_price_average | 分析師平均目標價 |
target_price_date | 最新目標價更新日期 |
target_price_estimates | 目標價預測的總數量 |
target_price_high | 分析師最高目標價 |
target_price_low | 分析師最低目標價 |
target_price_median | 分析師中位目標價 |
完整變體列表請參閱 Rust API 文件中的 SymbolType 和 Currency 列舉定義。
標的格式
標的使用 "EXCHANGE:TICKER" 格式,與 TradingView 約定一致。VM 將 Pine 腳本中的原始字串直接傳遞給 provider,不做任何修改:
| Pine 腳本 | 傳給 provider 的值 |
|---|---|
syminfo.tickerid | "NASDAQ:AAPL"(來自圖表標的) |
"BINANCE:BTCUSDT" | "BINANCE:BTCUSDT" |
Ticker 表達式(如 "NASDAQ:AAPL*0.5+NYSE:SPY*0.5")會被 VM 分解為各個標的的獨立查詢,再分別傳給 provider。
時間框架值
TimeFrame 直接來自 Pine 腳本的時間框架字串參數:
| Pine 字串 | 含義 |
|---|---|
"1" | 1 分鐘 |
"5" | 5 分鐘 |
"60" | 60 分鐘 |
"D" | 日線 |
"W" | 週線 |
"M" | 月線 |
Vec<Candlestick>
Vec<Candlestick> 直接實作了 DataProvider。對於單標的離線回測,將 vec 直接傳給 Instance::builder(),無需任何包裝:
use openpine_vm::{Candlestick, TimeFrame, TradeSession};
let bars = vec![
Candlestick::new(
1_700_000_000_000, // epoch 毫秒
150.0, 155.0, 149.0, 153.0,
1_000_000.0, 0.0,
TradeSession::Regular,
),
// ... 更多按時間升序排列的 K 線
];
let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
.build()
.await?;
instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;對於惰性載入或串流資料,先收集為 Vec<Candlestick>:
// 從迭代器收集後再建構
let bars: Vec<Candlestick> = load_bars_from_disk().collect();
let mut instance = Instance::builder(bars, source, TimeFrame::days(1), "NASDAQ:AAPL")
.build().await?;如果只需要腳本元資料(輸入、腳本類型等),可使用獨立函數 script_info()——無需資料提供者、品種或週期:
use openpine_vm::script_info;
let info = script_info(source)?;自訂內存提供者示例
需要多標的支援(如 request.security())時,直接實作 DataProvider:
use std::{collections::HashMap, convert::Infallible};
use futures_util::{stream, stream::LocalBoxStream};
use openpine_vm::{
Candlestick, CandlestickItem, DataProvider, DataProviderError, PartialSymbolInfo, TimeFrame,
TradeSession,
};
struct InMemoryProvider {
data: HashMap<(String, TimeFrame), Vec<Candlestick>>,
}
#[async_trait::async_trait(?Send)]
impl DataProvider for InMemoryProvider {
type Error = Infallible;
async fn symbol_info(
&self,
_symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
Ok(PartialSymbolInfo::default())
}
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
let bars = self
.data
.get(&(symbol.clone(), timeframe))
.cloned()
.unwrap_or_default();
// 從 from_time 前一根 K 線開始,確保 VM 能初始化 MTF 狀態
let start = bars
.partition_point(|b| b.time < from_time)
.saturating_sub(1);
Box::pin(stream::iter(
bars[start..]
.to_vec()
.into_iter()
.map(|c| Ok(CandlestickItem::Confirmed(c)))
.chain(std::iter::once(Ok(CandlestickItem::HistoryEnd))),
))
}
}使用方式:
let mut provider = InMemoryProvider { data: HashMap::new() };
provider.data.insert(
("NASDAQ:AAPL".to_string(), TimeFrame::weeks(1)),
vec![/* ... */],
);
let mut instance = Instance::builder(provider, source, TimeFrame::days(1), "NASDAQ:AAPL")
.build()
.await?;
instance.run_to_end("NASDAQ:AAPL", TimeFrame::days(1)).await?;異步資料庫提供者示例
適用於從遠端 API 或資料庫獲取資料的場景:
use std::sync::Arc;
use futures_util::stream::LocalBoxStream;
struct DatabaseProvider {
pool: Arc<DbPool>,
}
#[async_trait::async_trait(?Send)]
impl DataProvider for DatabaseProvider {
type Error = DbError;
async fn symbol_info(
&self,
symbol: String,
) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
self.pool
.query_symbol_info(&symbol)
.await
.map_err(DataProviderError::Other)
}
fn candlesticks(
&self,
symbol: String,
timeframe: TimeFrame,
from_time: i64,
) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
let pool = Arc::clone(&self.pool);
Box::pin(async_stream::stream! {
match pool.query_candles(&symbol, timeframe, from_time).await {
Err(e) => yield Err(DataProviderError::Other(e)),
Ok(rows) => {
for row in rows {
// Confirmed 表示歷史 bar;即時 bar 使用 Realtime 變體
yield Ok(CandlestickItem::Confirmed(row.into_candlestick()));
}
// 發出歷史結束標記,讓 VM 切換為非阻塞輪詢
yield Ok(CandlestickItem::HistoryEnd);
}
}
})
}
}