openpine_vm/
data_provider.rs

1//! [`DataProvider`] trait and associated types.
2//!
3//! Implement [`DataProvider`] and pass it as the first argument to
4//! [`Instance::builder`](crate::Instance::builder) to supply candlestick data
5//! for both the main chart and `request.security()` calls.
6
7use std::{fmt, rc::Rc};
8
9use async_trait::async_trait;
10use futures_util::{StreamExt, stream::LocalBoxStream};
11use time::OffsetDateTime;
12
13use crate::{
14    Currency, Market, TimeFrame,
15    quote_types::{Candlestick, CandlestickItem},
16    symbol_info::{SymbolType, VolumeType},
17};
18
19/// Partial symbol metadata returned by [`DataProvider::symbol_info`].
20///
21/// All fields are `Option`. Any field left as `None` will fall back to the
22/// default value derived from the symbol string (exchange prefix → market →
23/// `market.default_*()` helpers).
24#[derive(Debug, Clone, Default)]
25pub struct PartialSymbolInfo {
26    /// The market/exchange the symbol belongs to.
27    pub market: Option<Market>,
28    /// Human-readable description of the symbol (e.g. `"Apple Inc."`).
29    pub description: Option<String>,
30    /// The type of instrument (stock, futures, crypto, …).
31    pub type_: Option<SymbolType>,
32    /// ISO 3166-1 alpha-2 country code of the exchange (e.g. `"US"`, `"HK"`).
33    pub country: Option<String>,
34    /// International Securities Identification Number.
35    pub isin: Option<String>,
36    /// Root identifier for derivative instruments (e.g. `"ES"` for `"ESZ4"`).
37    pub root: Option<String>,
38    /// Numerator of the `syminfo.mintick` formula (`min_move / price_scale`).
39    pub min_move: Option<i32>,
40    /// Denominator of the `syminfo.mintick` formula.
41    pub price_scale: Option<i32>,
42    /// Point value multiplier (usually `1.0`; relevant for futures).
43    pub point_value: Option<f64>,
44    /// Currency of the symbol's prices.
45    pub currency: Option<Currency>,
46    /// Expiration date of the current futures contract.
47    pub expiration_date: Option<OffsetDateTime>,
48    /// Ticker identifier of the underlying contract.
49    pub current_contract: Option<String>,
50    /// Base currency for forex/crypto pairs.
51    pub base_currency: Option<Currency>,
52    /// Number of employees (stocks only).
53    pub employees: Option<i64>,
54    /// Industry classification.
55    pub industry: Option<String>,
56    /// Sector classification.
57    pub sector: Option<String>,
58    /// Minimum contract size.
59    pub min_contract: Option<f64>,
60    /// Volume type interpretation.
61    pub volume_type: Option<VolumeType>,
62    /// Number of shareholders.
63    pub shareholders: Option<i64>,
64    /// Free-float shares outstanding.
65    pub shares_outstanding_float: Option<f64>,
66    /// Total shares outstanding.
67    pub shares_outstanding_total: Option<f64>,
68    /// Number of analyst buy recommendations.
69    pub recommendations_buy: Option<i32>,
70    /// Number of analyst strong-buy recommendations.
71    pub recommendations_buy_strong: Option<i32>,
72    /// Number of analyst hold recommendations.
73    pub recommendations_hold: Option<i32>,
74    /// Number of analyst sell recommendations.
75    pub recommendations_sell: Option<i32>,
76    /// Number of analyst strong-sell recommendations.
77    pub recommendations_sell_strong: Option<i32>,
78    /// Date of the latest analyst recommendations update.
79    pub recommendations_date: Option<OffsetDateTime>,
80    /// Total number of analyst recommendations.
81    pub recommendations_total: Option<i32>,
82    /// Average analyst price target.
83    pub target_price_average: Option<f64>,
84    /// Date of the latest price target update.
85    pub target_price_date: Option<OffsetDateTime>,
86    /// Total number of price target estimates.
87    pub target_price_estimates: Option<f64>,
88    /// Highest analyst price target.
89    pub target_price_high: Option<f64>,
90    /// Lowest analyst price target.
91    pub target_price_low: Option<f64>,
92    /// Median analyst price target.
93    pub target_price_median: Option<f64>,
94}
95
96/// Error returned by [`DataProvider`] methods.
97///
98/// Use `std::convert::Infallible` as `E` when the provider can only fail with
99/// [`InvalidSymbol`](DataProviderError::InvalidSymbol) and never with
100/// [`Other`](DataProviderError::Other).
101#[non_exhaustive]
102#[derive(Debug, thiserror::Error)]
103pub enum DataProviderError<E>
104where
105    E: fmt::Display + fmt::Debug,
106{
107    /// The requested symbol does not exist or is not supported.
108    ///
109    /// When `request.security()` is called with `ignore_invalid_symbol = true`,
110    /// this causes the function to return `na` instead of a runtime error.
111    #[error("invalid symbol: {0}")]
112    InvalidSymbol(String),
113    /// Any other provider-side error.
114    #[error("{0}")]
115    Other(E),
116}
117
118/// Supplies candlestick data and symbol metadata to the VM.
119///
120/// Implement this trait and pass it as the first argument to
121/// [`Instance::builder`](crate::Instance::builder). It serves two purposes:
122/// main chart K-lines consumed by [`Instance::run`](crate::Instance::run) and
123/// MTF/cross-symbol data for `request.security()` calls.
124///
125/// For simple single-symbol use cases, pass a `Vec<Candlestick>` directly —
126/// it implements `DataProvider` and wraps bars as
127/// [`CandlestickItem::Confirmed`].
128///
129/// # Contract
130///
131/// - [`candlesticks`](DataProvider::candlesticks) must return bars in **strict
132///   ascending time order**.
133/// - The stream should start at or before `from_time` so that MTF bar state is
134///   initialised even before the first chart bar.
135/// - Returning `Err(DataProviderError::InvalidSymbol(_))` from either method
136///   causes `request.security()` to return `na` when `ignore_invalid_symbol =
137///   true`, or a runtime error otherwise.
138#[async_trait(?Send)]
139pub trait DataProvider {
140    /// The error payload type for [`DataProviderError::Other`].
141    type Error: fmt::Display + fmt::Debug;
142
143    /// Returns partial symbol metadata for the given symbol string.
144    ///
145    /// Any `None` field falls back to a default derived from the symbol's
146    /// exchange prefix. Return `PartialSymbolInfo::default()` if you only want
147    /// the defaults.
148    async fn symbol_info(
149        &self,
150        symbol: String,
151    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>>;
152
153    /// Returns a stream of candlestick items for the given symbol and
154    /// timeframe.
155    ///
156    /// - `symbol`    — e.g. `"BINANCE:BTCUSDT"`, `"NASDAQ:AAPL"`
157    /// - `timeframe` — the requested timeframe
158    /// - `from_time` — epoch-millisecond timestamp of the first chart bar;
159    ///   start at or before this time
160    ///
161    /// Items must be yielded in **strict ascending time order**.
162    ///
163    /// # Stream protocol
164    ///
165    /// The expected sequence is:
166    ///
167    /// ```text
168    /// Confirmed(t0) Confirmed(t1) … Confirmed(tN)   <- historical closed bars
169    /// HistoryEnd                                     <- boundary marker
170    /// Realtime(tA)                                   <- forming bar, first tick
171    /// Realtime(tA)                                   <- same timestamp = update
172    /// Realtime(tB)  (tB > tA)                        <- new timestamp:
173    ///                                                    VM auto-confirms tA,
174    ///                                                    then opens tB
175    /// Realtime(tB)                                   <- update tB
176    /// …
177    /// ```
178    ///
179    /// Key rules:
180    ///
181    /// - **[`Confirmed`](crate::CandlestickItem::Confirmed)** — a fully closed
182    ///   bar. Only used for historical bars. **Do not** emit `Confirmed` for a
183    ///   realtime bar that has just closed; instead, send the next
184    ///   `Realtime(tNew)` with a later timestamp and the VM will automatically
185    ///   confirm the previous bar internally.
186    /// - **[`Realtime`](crate::CandlestickItem::Realtime)** — a bar that is
187    ///   still forming. Multiple consecutive `Realtime` items with the *same*
188    ///   timestamp are treated as in-place updates (tick updates). A `Realtime`
189    ///   item whose timestamp is *later* than the previous bar implicitly
190    ///   confirms the previous bar and starts a new one. At most one realtime
191    ///   bar is open at any time.
192    /// - **[`HistoryEnd`](crate::CandlestickItem::HistoryEnd)** — emitted once,
193    ///   after all historical `Confirmed` bars and before the first `Realtime`
194    ///   bar. If the stream ends after `HistoryEnd` with an open `Realtime`
195    ///   bar, the VM confirms it automatically.
196    ///
197    /// Yield `Err(DataProviderError::InvalidSymbol(_))` as the first item when
198    /// the symbol is not recognised.
199    fn candlesticks(
200        &self,
201        symbol: String,
202        timeframe: TimeFrame,
203        from_time: i64,
204    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>>;
205}
206
207/// Internal error produced by [`InternalProvider`].
208///
209/// Unlike [`DataProviderError`], this always carries full context. It is
210/// produced by the blanket [`InternalProvider`] impl.
211#[derive(Debug, thiserror::Error)]
212pub(crate) enum InternalProviderError {
213    /// The symbol does not exist or is not supported.
214    #[error("invalid symbol: {0}")]
215    InvalidSymbol(String),
216    /// `get_symbol_info` failed with a non-InvalidSymbol error.
217    #[error("error fetching symbol info for {symbol}: {message}")]
218    SymbolInfoError { symbol: String, message: String },
219    /// `get_candlesticks` failed with a non-InvalidSymbol error.
220    #[error("error fetching {symbol}/{timeframe}: {message}")]
221    CandlesticksError {
222        symbol: String,
223        timeframe: TimeFrame,
224        message: String,
225    },
226}
227
228/// Internal trait used to store `DataProvider` as a `Box<dyn
229/// InternalProvider>`.
230///
231/// Automatically implemented for all `T: DataProvider + 'static`. Bridges the
232/// public `DataProvider` API (which uses `DataProviderError<E>`) to the
233/// internal `InternalProviderError` used by the VM.
234#[async_trait(?Send)]
235pub(crate) trait InternalProvider {
236    async fn symbol_info(&self, symbol: String)
237    -> Result<PartialSymbolInfo, InternalProviderError>;
238
239    fn candlesticks(
240        &self,
241        symbol: &str,
242        timeframe: TimeFrame,
243        from_time: i64,
244    ) -> LocalBoxStream<'static, Result<CandlestickItem, InternalProviderError>>;
245}
246
247#[async_trait(?Send)]
248impl<T> InternalProvider for T
249where
250    T: DataProvider + 'static,
251{
252    async fn symbol_info(
253        &self,
254        symbol: String,
255    ) -> Result<PartialSymbolInfo, InternalProviderError> {
256        DataProvider::symbol_info(self, symbol.clone())
257            .await
258            .map_err(|e| match e {
259                DataProviderError::InvalidSymbol(msg) => InternalProviderError::InvalidSymbol(msg),
260                DataProviderError::Other(e) => InternalProviderError::SymbolInfoError {
261                    symbol,
262                    message: e.to_string(),
263                },
264            })
265    }
266
267    fn candlesticks(
268        &self,
269        symbol: &str,
270        timeframe: TimeFrame,
271        from_time: i64,
272    ) -> LocalBoxStream<'static, Result<CandlestickItem, InternalProviderError>> {
273        let symbol_owned = symbol.to_string();
274        Box::pin(
275            DataProvider::candlesticks(self, symbol_owned.clone(), timeframe, from_time).map(
276                move |result| {
277                    result.map_err(|e| match e {
278                        DataProviderError::InvalidSymbol(msg) => {
279                            InternalProviderError::InvalidSymbol(msg)
280                        }
281                        DataProviderError::Other(e) => InternalProviderError::CandlesticksError {
282                            symbol: symbol_owned.clone(),
283                            timeframe,
284                            message: e.to_string(),
285                        },
286                    })
287                },
288            ),
289        )
290    }
291}
292
293// ---------------------------------------------------------------------------
294// Blanket DataProvider implementations for common in-memory types
295// ---------------------------------------------------------------------------
296
297/// `Vec<CandlestickItem>` implements [`DataProvider`] by yielding the items
298/// as-is on every `candlesticks()` call, regardless of symbol/timeframe.
299///
300/// Use this when you need to mix [`CandlestickItem::Confirmed`],
301/// [`CandlestickItem::Realtime`], and/or [`CandlestickItem::HistoryEnd`]
302/// in a single stream.  For plain historical bars, prefer `Vec<Candlestick>`.
303#[async_trait(?Send)]
304impl DataProvider for Vec<CandlestickItem> {
305    type Error = std::convert::Infallible;
306
307    async fn symbol_info(
308        &self,
309        _symbol: String,
310    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
311        Ok(PartialSymbolInfo::default())
312    }
313
314    fn candlesticks(
315        &self,
316        _symbol: String,
317        _timeframe: TimeFrame,
318        _from_time: i64,
319    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
320        let items = self.clone();
321        Box::pin(futures_util::stream::iter(items.into_iter().map(Ok)))
322    }
323}
324
325/// `Rc<P>` implements [`DataProvider`] by delegating to the inner `P`.
326///
327/// Allows sharing a single provider instance between multiple
328/// [`Instance`](crate::Instance) builders or background tasks without cloning
329/// the provider value itself.
330#[async_trait(?Send)]
331impl<P> DataProvider for Rc<P>
332where
333    P: DataProvider + 'static,
334{
335    type Error = P::Error;
336
337    async fn symbol_info(
338        &self,
339        symbol: String,
340    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
341        (**self).symbol_info(symbol).await
342    }
343
344    fn candlesticks(
345        &self,
346        symbol: String,
347        timeframe: TimeFrame,
348        from_time: i64,
349    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
350        (**self).candlesticks(symbol, timeframe, from_time)
351    }
352}
353
354/// `Vec<Candlestick>` implements [`DataProvider`] for the simple case of
355/// running a script against a fixed set of historical bars.
356///
357/// All `candlesticks()` requests (regardless of symbol/timeframe) receive the
358/// same bars wrapped as [`CandlestickItem::Confirmed`], followed by a
359/// [`CandlestickItem::HistoryEnd`] marker.
360///
361/// For `script_info()` queries that need no data, pass `vec![]`.
362#[async_trait(?Send)]
363impl DataProvider for Vec<Candlestick> {
364    type Error = std::convert::Infallible;
365
366    async fn symbol_info(
367        &self,
368        _symbol: String,
369    ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
370        Ok(PartialSymbolInfo::default())
371    }
372
373    fn candlesticks(
374        &self,
375        _symbol: String,
376        _timeframe: TimeFrame,
377        _from_time: i64,
378    ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
379        let confirmed = self
380            .clone()
381            .into_iter()
382            .map(|c| Ok(CandlestickItem::Confirmed(c)));
383        let end = std::iter::once(Ok(CandlestickItem::HistoryEnd));
384        Box::pin(futures_util::stream::iter(confirmed.chain(end)))
385    }
386}