openpine_vm/data_provider.rs
1//! [`DataProvider`] trait and associated types.
2//!
3//! Implement [`DataProvider`] and pass it as the first argument to
4//! [`Instance::builder`](crate::Instance::builder) to supply candlestick data
5//! for both the main chart and `request.security()` calls.
6
7use std::{fmt, rc::Rc};
8
9use async_trait::async_trait;
10use futures_util::{StreamExt, stream::LocalBoxStream};
11use time::OffsetDateTime;
12
13use crate::{
14 Currency, Market, TimeFrame,
15 quote_types::{Candlestick, CandlestickItem},
16 symbol_info::{SymbolType, VolumeType},
17};
18
19/// Partial symbol metadata returned by [`DataProvider::symbol_info`].
20///
21/// All fields are `Option`. Any field left as `None` will fall back to the
22/// default value derived from the symbol string (exchange prefix → market →
23/// `market.default_*()` helpers).
24#[derive(Debug, Clone, Default)]
25pub struct PartialSymbolInfo {
26 /// The market/exchange the symbol belongs to.
27 pub market: Option<Market>,
28 /// Human-readable description of the symbol (e.g. `"Apple Inc."`).
29 pub description: Option<String>,
30 /// The type of instrument (stock, futures, crypto, …).
31 pub type_: Option<SymbolType>,
32 /// ISO 3166-1 alpha-2 country code of the exchange (e.g. `"US"`, `"HK"`).
33 pub country: Option<String>,
34 /// International Securities Identification Number.
35 pub isin: Option<String>,
36 /// Root identifier for derivative instruments (e.g. `"ES"` for `"ESZ4"`).
37 pub root: Option<String>,
38 /// Numerator of the `syminfo.mintick` formula (`min_move / price_scale`).
39 pub min_move: Option<i32>,
40 /// Denominator of the `syminfo.mintick` formula.
41 pub price_scale: Option<i32>,
42 /// Point value multiplier (usually `1.0`; relevant for futures).
43 pub point_value: Option<f64>,
44 /// Currency of the symbol's prices.
45 pub currency: Option<Currency>,
46 /// Expiration date of the current futures contract.
47 pub expiration_date: Option<OffsetDateTime>,
48 /// Ticker identifier of the underlying contract.
49 pub current_contract: Option<String>,
50 /// Base currency for forex/crypto pairs.
51 pub base_currency: Option<Currency>,
52 /// Number of employees (stocks only).
53 pub employees: Option<i64>,
54 /// Industry classification.
55 pub industry: Option<String>,
56 /// Sector classification.
57 pub sector: Option<String>,
58 /// Minimum contract size.
59 pub min_contract: Option<f64>,
60 /// Volume type interpretation.
61 pub volume_type: Option<VolumeType>,
62 /// Number of shareholders.
63 pub shareholders: Option<i64>,
64 /// Free-float shares outstanding.
65 pub shares_outstanding_float: Option<f64>,
66 /// Total shares outstanding.
67 pub shares_outstanding_total: Option<f64>,
68 /// Number of analyst buy recommendations.
69 pub recommendations_buy: Option<i32>,
70 /// Number of analyst strong-buy recommendations.
71 pub recommendations_buy_strong: Option<i32>,
72 /// Number of analyst hold recommendations.
73 pub recommendations_hold: Option<i32>,
74 /// Number of analyst sell recommendations.
75 pub recommendations_sell: Option<i32>,
76 /// Number of analyst strong-sell recommendations.
77 pub recommendations_sell_strong: Option<i32>,
78 /// Date of the latest analyst recommendations update.
79 pub recommendations_date: Option<OffsetDateTime>,
80 /// Total number of analyst recommendations.
81 pub recommendations_total: Option<i32>,
82 /// Average analyst price target.
83 pub target_price_average: Option<f64>,
84 /// Date of the latest price target update.
85 pub target_price_date: Option<OffsetDateTime>,
86 /// Total number of price target estimates.
87 pub target_price_estimates: Option<f64>,
88 /// Highest analyst price target.
89 pub target_price_high: Option<f64>,
90 /// Lowest analyst price target.
91 pub target_price_low: Option<f64>,
92 /// Median analyst price target.
93 pub target_price_median: Option<f64>,
94}
95
96/// Error returned by [`DataProvider`] methods.
97///
98/// Use `std::convert::Infallible` as `E` when the provider can only fail with
99/// [`InvalidSymbol`](DataProviderError::InvalidSymbol) and never with
100/// [`Other`](DataProviderError::Other).
101#[non_exhaustive]
102#[derive(Debug, thiserror::Error)]
103pub enum DataProviderError<E>
104where
105 E: fmt::Display + fmt::Debug,
106{
107 /// The requested symbol does not exist or is not supported.
108 ///
109 /// When `request.security()` is called with `ignore_invalid_symbol = true`,
110 /// this causes the function to return `na` instead of a runtime error.
111 #[error("invalid symbol: {0}")]
112 InvalidSymbol(String),
113 /// Any other provider-side error.
114 #[error("{0}")]
115 Other(E),
116}
117
118/// Supplies candlestick data and symbol metadata to the VM.
119///
120/// Implement this trait and pass it as the first argument to
121/// [`Instance::builder`](crate::Instance::builder). It serves two purposes:
122/// main chart K-lines consumed by [`Instance::run`](crate::Instance::run) and
123/// MTF/cross-symbol data for `request.security()` calls.
124///
125/// For simple single-symbol use cases, pass a `Vec<Candlestick>` directly —
126/// it implements `DataProvider` and wraps bars as
127/// [`CandlestickItem::Confirmed`].
128///
129/// # Contract
130///
131/// - [`candlesticks`](DataProvider::candlesticks) must return bars in **strict
132/// ascending time order**.
133/// - The stream should start at or before `from_time` so that MTF bar state is
134/// initialised even before the first chart bar.
135/// - Returning `Err(DataProviderError::InvalidSymbol(_))` from either method
136/// causes `request.security()` to return `na` when `ignore_invalid_symbol =
137/// true`, or a runtime error otherwise.
138#[async_trait(?Send)]
139pub trait DataProvider {
140 /// The error payload type for [`DataProviderError::Other`].
141 type Error: fmt::Display + fmt::Debug;
142
143 /// Returns partial symbol metadata for the given symbol string.
144 ///
145 /// Any `None` field falls back to a default derived from the symbol's
146 /// exchange prefix. Return `PartialSymbolInfo::default()` if you only want
147 /// the defaults.
148 async fn symbol_info(
149 &self,
150 symbol: String,
151 ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>>;
152
153 /// Returns a stream of candlestick items for the given symbol and
154 /// timeframe.
155 ///
156 /// - `symbol` — e.g. `"BINANCE:BTCUSDT"`, `"NASDAQ:AAPL"`
157 /// - `timeframe` — the requested timeframe
158 /// - `from_time` — epoch-millisecond timestamp of the first chart bar;
159 /// start at or before this time
160 ///
161 /// Items must be yielded in **strict ascending time order**.
162 ///
163 /// # Stream protocol
164 ///
165 /// The expected sequence is:
166 ///
167 /// ```text
168 /// Confirmed(t0) Confirmed(t1) … Confirmed(tN) <- historical closed bars
169 /// HistoryEnd <- boundary marker
170 /// Realtime(tA) <- forming bar, first tick
171 /// Realtime(tA) <- same timestamp = update
172 /// Realtime(tB) (tB > tA) <- new timestamp:
173 /// VM auto-confirms tA,
174 /// then opens tB
175 /// Realtime(tB) <- update tB
176 /// …
177 /// ```
178 ///
179 /// Key rules:
180 ///
181 /// - **[`Confirmed`](crate::CandlestickItem::Confirmed)** — a fully closed
182 /// bar. Only used for historical bars. **Do not** emit `Confirmed` for a
183 /// realtime bar that has just closed; instead, send the next
184 /// `Realtime(tNew)` with a later timestamp and the VM will automatically
185 /// confirm the previous bar internally.
186 /// - **[`Realtime`](crate::CandlestickItem::Realtime)** — a bar that is
187 /// still forming. Multiple consecutive `Realtime` items with the *same*
188 /// timestamp are treated as in-place updates (tick updates). A `Realtime`
189 /// item whose timestamp is *later* than the previous bar implicitly
190 /// confirms the previous bar and starts a new one. At most one realtime
191 /// bar is open at any time.
192 /// - **[`HistoryEnd`](crate::CandlestickItem::HistoryEnd)** — emitted once,
193 /// after all historical `Confirmed` bars and before the first `Realtime`
194 /// bar. If the stream ends after `HistoryEnd` with an open `Realtime`
195 /// bar, the VM confirms it automatically.
196 ///
197 /// Yield `Err(DataProviderError::InvalidSymbol(_))` as the first item when
198 /// the symbol is not recognised.
199 fn candlesticks(
200 &self,
201 symbol: String,
202 timeframe: TimeFrame,
203 from_time: i64,
204 ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>>;
205}
206
207/// Internal error produced by [`InternalProvider`].
208///
209/// Unlike [`DataProviderError`], this always carries full context. It is
210/// produced by the blanket [`InternalProvider`] impl.
211#[derive(Debug, thiserror::Error)]
212pub(crate) enum InternalProviderError {
213 /// The symbol does not exist or is not supported.
214 #[error("invalid symbol: {0}")]
215 InvalidSymbol(String),
216 /// `get_symbol_info` failed with a non-InvalidSymbol error.
217 #[error("error fetching symbol info for {symbol}: {message}")]
218 SymbolInfoError { symbol: String, message: String },
219 /// `get_candlesticks` failed with a non-InvalidSymbol error.
220 #[error("error fetching {symbol}/{timeframe}: {message}")]
221 CandlesticksError {
222 symbol: String,
223 timeframe: TimeFrame,
224 message: String,
225 },
226}
227
228/// Internal trait used to store `DataProvider` as a `Box<dyn
229/// InternalProvider>`.
230///
231/// Automatically implemented for all `T: DataProvider + 'static`. Bridges the
232/// public `DataProvider` API (which uses `DataProviderError<E>`) to the
233/// internal `InternalProviderError` used by the VM.
234#[async_trait(?Send)]
235pub(crate) trait InternalProvider {
236 async fn symbol_info(&self, symbol: String)
237 -> Result<PartialSymbolInfo, InternalProviderError>;
238
239 fn candlesticks(
240 &self,
241 symbol: &str,
242 timeframe: TimeFrame,
243 from_time: i64,
244 ) -> LocalBoxStream<'static, Result<CandlestickItem, InternalProviderError>>;
245}
246
247#[async_trait(?Send)]
248impl<T> InternalProvider for T
249where
250 T: DataProvider + 'static,
251{
252 async fn symbol_info(
253 &self,
254 symbol: String,
255 ) -> Result<PartialSymbolInfo, InternalProviderError> {
256 DataProvider::symbol_info(self, symbol.clone())
257 .await
258 .map_err(|e| match e {
259 DataProviderError::InvalidSymbol(msg) => InternalProviderError::InvalidSymbol(msg),
260 DataProviderError::Other(e) => InternalProviderError::SymbolInfoError {
261 symbol,
262 message: e.to_string(),
263 },
264 })
265 }
266
267 fn candlesticks(
268 &self,
269 symbol: &str,
270 timeframe: TimeFrame,
271 from_time: i64,
272 ) -> LocalBoxStream<'static, Result<CandlestickItem, InternalProviderError>> {
273 let symbol_owned = symbol.to_string();
274 Box::pin(
275 DataProvider::candlesticks(self, symbol_owned.clone(), timeframe, from_time).map(
276 move |result| {
277 result.map_err(|e| match e {
278 DataProviderError::InvalidSymbol(msg) => {
279 InternalProviderError::InvalidSymbol(msg)
280 }
281 DataProviderError::Other(e) => InternalProviderError::CandlesticksError {
282 symbol: symbol_owned.clone(),
283 timeframe,
284 message: e.to_string(),
285 },
286 })
287 },
288 ),
289 )
290 }
291}
292
293// ---------------------------------------------------------------------------
294// Blanket DataProvider implementations for common in-memory types
295// ---------------------------------------------------------------------------
296
297/// `Vec<CandlestickItem>` implements [`DataProvider`] by yielding the items
298/// as-is on every `candlesticks()` call, regardless of symbol/timeframe.
299///
300/// Use this when you need to mix [`CandlestickItem::Confirmed`],
301/// [`CandlestickItem::Realtime`], and/or [`CandlestickItem::HistoryEnd`]
302/// in a single stream. For plain historical bars, prefer `Vec<Candlestick>`.
303#[async_trait(?Send)]
304impl DataProvider for Vec<CandlestickItem> {
305 type Error = std::convert::Infallible;
306
307 async fn symbol_info(
308 &self,
309 _symbol: String,
310 ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
311 Ok(PartialSymbolInfo::default())
312 }
313
314 fn candlesticks(
315 &self,
316 _symbol: String,
317 _timeframe: TimeFrame,
318 _from_time: i64,
319 ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
320 let items = self.clone();
321 Box::pin(futures_util::stream::iter(items.into_iter().map(Ok)))
322 }
323}
324
325/// `Rc<P>` implements [`DataProvider`] by delegating to the inner `P`.
326///
327/// Allows sharing a single provider instance between multiple
328/// [`Instance`](crate::Instance) builders or background tasks without cloning
329/// the provider value itself.
330#[async_trait(?Send)]
331impl<P> DataProvider for Rc<P>
332where
333 P: DataProvider + 'static,
334{
335 type Error = P::Error;
336
337 async fn symbol_info(
338 &self,
339 symbol: String,
340 ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
341 (**self).symbol_info(symbol).await
342 }
343
344 fn candlesticks(
345 &self,
346 symbol: String,
347 timeframe: TimeFrame,
348 from_time: i64,
349 ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
350 (**self).candlesticks(symbol, timeframe, from_time)
351 }
352}
353
354/// `Vec<Candlestick>` implements [`DataProvider`] for the simple case of
355/// running a script against a fixed set of historical bars.
356///
357/// All `candlesticks()` requests (regardless of symbol/timeframe) receive the
358/// same bars wrapped as [`CandlestickItem::Confirmed`], followed by a
359/// [`CandlestickItem::HistoryEnd`] marker.
360///
361/// For `script_info()` queries that need no data, pass `vec![]`.
362#[async_trait(?Send)]
363impl DataProvider for Vec<Candlestick> {
364 type Error = std::convert::Infallible;
365
366 async fn symbol_info(
367 &self,
368 _symbol: String,
369 ) -> Result<PartialSymbolInfo, DataProviderError<Self::Error>> {
370 Ok(PartialSymbolInfo::default())
371 }
372
373 fn candlesticks(
374 &self,
375 _symbol: String,
376 _timeframe: TimeFrame,
377 _from_time: i64,
378 ) -> LocalBoxStream<'static, Result<CandlestickItem, DataProviderError<Self::Error>>> {
379 let confirmed = self
380 .clone()
381 .into_iter()
382 .map(|c| Ok(CandlestickItem::Confirmed(c)));
383 let end = std::iter::once(Ok(CandlestickItem::HistoryEnd));
384 Box::pin(futures_util::stream::iter(confirmed.chain(end)))
385 }
386}