1use std::collections::HashMap;
2
3use gc_arena::{Arena, CollectionPhase};
4use openpine_compiler::{CompileError, loader::LibraryLoader, program::Program};
5use openpine_error::ErrorWithSourceFile;
6use serde::{Deserialize, Serialize};
7
8use crate::{
9 Error, Exception, ExecutionLimits, InputSessions, OutputMode, Series, StrategyState,
10 SymbolInfo, TimeFrame,
11 bar_state::BarState,
12 context::{ExecuteContext, SecurityCapture},
13 currency::CurrencyConverter,
14 data_provider::InternalProvider,
15 events::{BarStartEvent, Event},
16 gc_serde::{SerializedObject, SerializedRawValue},
17 inst_executor::Interrupt,
18 quote_types::{Candlestick, CandlestickItem},
19 script_info::ScriptInfo,
20 security::{CandlestickBuffer, SecurityLowerTfSubState, SecuritySubState},
21 snapshot::save::serialize_value_in_arena,
22 state::{ArenaType, State},
23 strategy::report::StrategyReport,
24 visuals::Chart,
25};
26
27const COLLECTOR_GRANULARITY: f64 = 1024.0;
28
29#[derive(Debug, Copy, Clone)]
31pub(crate) enum ExecuteMode {
32 Realtime(Candlestick),
34 Confirmed(Candlestick),
36 Confirm,
38}
39
40#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
43pub struct LastInfo {
44 pub index: usize,
46 pub time: i64,
48}
49
50impl LastInfo {
51 #[inline]
53 pub const fn new(index: usize, time: i64) -> Self {
54 Self { index, time }
55 }
56}
57
58pub struct Instance {
60 pub(crate) program: Program,
61 pub(crate) arena: ArenaType,
62 pub(crate) timeframe: TimeFrame,
63 pub(crate) symbol_info: SymbolInfo,
64 pub(crate) candlesticks: Series<Candlestick>,
65 pub(crate) last_info: Option<LastInfo>,
66 pub(crate) bar_index: usize,
67 pub(crate) input_index: usize,
68 pub(crate) script_info: ScriptInfo,
69 pub(crate) chart: Chart,
70 pub(crate) events: Vec<Event>,
71 pub(crate) input_sessions: InputSessions,
72 pub(crate) strategy_state: Option<Box<StrategyState>>,
73 pub(crate) currency_converter: Box<dyn CurrencyConverter>,
74 pub(crate) execution_limits: ExecutionLimits,
75 pub(crate) data_provider: Option<Box<dyn InternalProvider>>,
77 pub(crate) candlestick_buffers: HashMap<String, HashMap<TimeFrame, CandlestickBuffer>>,
79 pub(crate) security_sub_states: HashMap<usize, Box<SecuritySubState>>,
81 pub(crate) security_lower_tf_sub_states: HashMap<usize, Box<SecurityLowerTfSubState>>,
83 pub(crate) last_bar_confirmed: bool,
92 pub(crate) pending_security_capture: Option<SecurityCapture>,
98 pub(crate) output_mode: OutputMode,
101}
102
103impl Instance {
104 #[inline]
106 pub fn warnings(&self) -> &[CompileError] {
107 self.program.warnings()
108 }
109
110 fn gc_collect(&mut self) {
111 if self.arena.metrics().allocation_debt() > COLLECTOR_GRANULARITY {
112 if self.arena.collection_phase() == CollectionPhase::Collecting {
113 self.arena.collect_debt();
114 } else {
115 self.arena.mark_debt();
116 }
117 }
118 }
119
120 fn emit_bar_start(&mut self, bar_state: BarState) {
122 let timestamp = self.candlesticks.last().map(|c| c.time).unwrap_or_default();
123 self.emit_bar_start_with_timestamp(bar_state, timestamp);
124 }
125
126 fn emit_bar_start_with_timestamp(&mut self, bar_state: BarState, timestamp: i64) {
128 self.events.push(Event::BarStart(BarStartEvent {
129 bar_index: self.bar_index,
130 timestamp,
131 bar_state,
132 }));
133 }
134
135 pub(crate) fn before_execute(&mut self, bar_state: BarState) {
136 match bar_state {
137 BarState::RealtimeNew => {
138 self.arena.mutate_root(|_, state| {
139 for variable_value in state.variables.iter_mut() {
140 variable_value.append_new();
141 }
142 });
143 self.candlesticks.append_new();
144 self.chart.append_new();
145 }
146 BarState::RealtimeUpdate => {
147 self.arena
148 .mutate_root(|_, state| state.do_rollback_actions(&mut self.chart));
149 }
150 BarState::History => {
151 self.arena.mutate_root(|_, state| {
152 for variable_value in state.variables.iter_mut() {
153 variable_value.append_new();
154 }
155 state.rollback_actions.clear();
156 });
157 self.candlesticks.append_new();
158 self.chart.append_new();
159 }
160 BarState::RealtimeConfirmed => self
161 .arena
162 .mutate_root(|_, state| state.do_rollback_actions(&mut self.chart)),
163 }
164 }
165
166 pub(crate) fn after_execute(&mut self, bar_state: BarState) {
167 match bar_state {
168 BarState::History => {
169 self.bar_index += 1;
170 self.input_index += 1;
171 self.last_bar_confirmed = true;
172 }
173 BarState::RealtimeNew | BarState::RealtimeUpdate => {
174 self.input_index += 1;
175 self.last_bar_confirmed = false;
176 }
177 BarState::RealtimeConfirmed => {
178 self.bar_index += 1;
179 self.last_bar_confirmed = true;
180 }
181 }
182 }
183
184 fn process_strategy(&mut self) {
185 if let Some(strategy_state) = &mut self.strategy_state {
186 strategy_state.step(&self.candlesticks[self.bar_index]);
187 strategy_state.process_pending_orders(&mut self.chart, self.bar_index);
188 strategy_state.process_exit_orders(&mut self.chart, self.bar_index);
189 strategy_state.check_margin_call(&mut self.chart, self.bar_index);
190 strategy_state.step_update_trades();
191 strategy_state.update_equity_extremes();
192 strategy_state.record_equity_curve();
193 strategy_state.check_risk_limits(&mut self.chart, self.bar_index);
194 }
195 }
196
197 fn process_strategy_on_close(&mut self) {
201 if let Some(strategy_state) = &mut self.strategy_state
202 && strategy_state.should_process_on_close()
203 {
204 strategy_state.process_pending_orders_on_close(&mut self.chart, self.bar_index);
205 strategy_state.update_equity_extremes();
206 strategy_state.check_risk_limits(&mut self.chart, self.bar_index);
207 strategy_state.clear_immediate_flag();
208 }
209 }
210
211 async fn process_intrabar_fills(&mut self, bar_state: BarState) -> Result<(), Error> {
217 let should = self
218 .strategy_state
219 .as_ref()
220 .is_some_and(|s| s.should_calc_on_order_fills());
221 if !should {
222 return Ok(());
223 }
224
225 let tick_prices = self.strategy_state.as_ref().unwrap().intrabar_tick_prices();
226
227 let max_reexecutions = 3;
231 let mut reexecutions = 0;
232
233 for &tick_price in &tick_prices {
234 if reexecutions >= max_reexecutions {
235 break;
236 }
237
238 let had_fill = self
239 .strategy_state
240 .as_mut()
241 .unwrap()
242 .process_orders_at_tick(&mut self.chart, self.bar_index, tick_price);
243
244 if had_fill {
245 reexecutions += 1;
246
247 self.arena
250 .mutate_root(|_, state| state.do_rollback_actions(&mut self.chart));
251
252 let provider_ptr: Option<*const dyn InternalProvider> =
253 self.data_provider.as_deref().map(|p| p as *const _);
254 let provider = provider_ptr.map(|p| unsafe { &*p as &dyn InternalProvider });
255 self.do_execute(bar_state, provider, 0).await?;
256 }
257 }
258
259 Ok(())
260 }
261
262 pub(crate) async fn do_execute(
273 &mut self,
274 bar_state: BarState,
275 security_provider: Option<&dyn InternalProvider>,
276 security_depth: usize,
277 ) -> Result<(), Error> {
278 let execution_limits = self.execution_limits;
279 let program = self.program.clone();
280 let security_capture = self.pending_security_capture.take();
281 let mut ctx = ExecuteContext {
282 program: &program,
283 arena: &mut self.arena,
284 candlesticks: &self.candlesticks,
285 last_info: self.last_info.as_ref(),
286 bar_state,
287 bar_index: self.bar_index,
288 input_index: self.input_index,
289 partial_script_info: None,
290 script_info: Some(&self.script_info),
291 chart: &mut self.chart,
292 events: &mut self.events,
293 current_span: None,
294 module_stack: vec![],
295 timeframe: &self.timeframe,
296 symbol_info: &self.symbol_info,
297 input_sessions: self.input_sessions,
298 strategy_state: self.strategy_state.as_deref_mut(),
299 currency_converter: Some(self.currency_converter.as_ref()),
300 loop_iterations_remaining: execution_limits.max_loop_iterations_per_bar,
301 security_provider,
302 candlestick_buffers: &mut self.candlestick_buffers,
303 security_sub_states: &mut self.security_sub_states,
304 security_lower_tf_sub_states: &mut self.security_lower_tf_sub_states,
305 security_depth,
306 execution_limits,
307 security_capture,
308 output_mode: self.output_mode,
309 };
310 let res = crate::inst_executor::execute(program.instruction(), &mut ctx).await;
311 self.pending_security_capture = ctx.security_capture.take();
313
314 match res {
315 Ok(_) => Ok(()),
316 Err(Interrupt::RuntimeError { error, backtrace }) => {
317 Err(Error::Exception(Exception::new(
318 ErrorWithSourceFile::new(
319 openpine_error::Error::new(vec![error.span], error.value),
320 self.program.source_files().clone(),
321 ),
322 backtrace,
323 )))
324 }
325 Err(_) => unreachable!("Unhandled interrupt in top-level execution"),
326 }
327 }
328
329 fn check_input_candlestick(&self, candlestick: &Candlestick) -> Result<(), Error> {
330 if !self.input_sessions.allow(candlestick.trade_session) {
331 return Err(Error::SessionNotAllowed(candlestick.trade_session));
332 }
333 Ok(())
334 }
335
336 async fn execute(&mut self, mode: ExecuteMode) -> Result<(), Error> {
353 let provider_ptr: Option<*const dyn InternalProvider> =
357 self.data_provider.as_deref().map(|p| p as *const _);
358 let provider = |ptr: Option<*const dyn InternalProvider>| -> Option<&dyn InternalProvider> {
359 ptr.map(|p| unsafe { &*p })
360 };
361 match mode {
362 ExecuteMode::Realtime(candlestick) => {
363 self.check_input_candlestick(&candlestick)?;
364
365 let bar_state = match self.candlesticks.last() {
366 Some(last_candlestick) if last_candlestick.time == candlestick.time => {
367 BarState::RealtimeUpdate
368 }
369 Some(_) => {
370 if !self.last_bar_confirmed {
375 let bar_state = BarState::RealtimeConfirmed;
376 self.emit_bar_start(bar_state);
377 self.before_execute(bar_state);
378 self.process_strategy();
379 self.do_execute(bar_state, provider(provider_ptr), 0)
380 .await?;
381 self.process_strategy_on_close();
382 self.after_execute(bar_state);
383 self.events.push(Event::BarEnd);
384 }
385 BarState::RealtimeNew
386 }
387 None => BarState::RealtimeNew,
388 };
389
390 self.emit_bar_start_with_timestamp(bar_state, candlestick.time);
392 self.before_execute(bar_state);
393 self.process_strategy();
394 self.candlesticks.update(candlestick);
395 self.do_execute(bar_state, provider(provider_ptr), 0)
396 .await?;
397 self.process_strategy_on_close();
398 self.after_execute(bar_state);
399 self.events.push(Event::BarEnd);
400 }
401 ExecuteMode::Confirmed(candlestick) => {
402 self.check_input_candlestick(&candlestick)?;
404 let bar_state = BarState::History;
405 self.emit_bar_start_with_timestamp(bar_state, candlestick.time);
406 self.before_execute(bar_state);
407 self.candlesticks.update(candlestick);
408 self.process_strategy();
409 self.do_execute(bar_state, provider(provider_ptr), 0)
410 .await?;
411 self.process_intrabar_fills(bar_state).await?;
412 self.process_strategy_on_close();
413 self.after_execute(bar_state);
414 self.events.push(Event::BarEnd);
415 }
416 ExecuteMode::Confirm => {
417 let bar_state = BarState::RealtimeConfirmed;
419 self.emit_bar_start(bar_state);
420 self.before_execute(bar_state);
421 self.process_strategy();
422 self.do_execute(bar_state, provider(provider_ptr), 0)
423 .await?;
424 self.process_strategy_on_close();
425 self.after_execute(bar_state);
426 self.events.push(Event::BarEnd);
427 }
428 }
429
430 self.gc_collect();
431 Ok(())
432 }
433
434 pub fn run(
458 &mut self,
459 symbol: &str,
460 timeframe: TimeFrame,
461 ) -> impl futures_util::Stream<Item = Result<Event, Error>> + '_ {
462 self.run_from(symbol, timeframe, 0)
463 }
464
465 pub fn run_from(
472 &mut self,
473 symbol: &str,
474 timeframe: TimeFrame,
475 from_time: i64,
476 ) -> impl futures_util::Stream<Item = Result<Event, Error>> + '_ {
477 use futures_util::StreamExt;
478
479 let symbol = symbol.to_owned();
480 async_stream::stream! {
481 let Some(provider) = self.data_provider.as_deref() else {
482 return;
483 };
484 let mut candlestick_stream = provider.candlesticks(&symbol, timeframe, from_time);
485 let mut last_confirmed = true;
486
487 while let Some(result) = candlestick_stream.next().await {
488 let item = result.map_err(|e| Error::DataProvider(e.to_string()))?;
489 let mode = match item {
490 CandlestickItem::Confirmed(c) => {
491 last_confirmed = true;
492 ExecuteMode::Confirmed(c)
493 }
494 CandlestickItem::Realtime(c) => {
495 last_confirmed = false;
496 ExecuteMode::Realtime(c)
497 }
498 CandlestickItem::HistoryEnd => {
499 yield Ok(Event::HistoryEnd);
500 continue;
501 }
502 };
503 self.execute(mode).await?;
504 for event in self.events.drain(..) {
505 yield Ok(event);
506 }
507 }
508
509 if !last_confirmed {
510 self.execute(ExecuteMode::Confirm).await?;
511 for event in self.events.drain(..) {
512 yield Ok(event);
513 }
514 }
515 }
516 }
517
518 pub async fn run_to_end(&mut self, symbol: &str, timeframe: TimeFrame) -> Result<(), Error> {
523 self.run_from_to_end(symbol, timeframe, 0).await
524 }
525
526 pub async fn run_from_to_end(
529 &mut self,
530 symbol: &str,
531 timeframe: TimeFrame,
532 from_time: i64,
533 ) -> Result<(), Error> {
534 use std::pin::pin;
535
536 use futures_util::StreamExt;
537
538 let mut stream = pin!(self.run_from(symbol, timeframe, from_time));
539 while let Some(result) = stream.next().await {
540 result?;
541 }
542 Ok(())
543 }
544
545 #[inline]
579 pub fn chart(&self) -> &Chart {
580 &self.chart
581 }
582
583 #[inline]
585 pub fn script_info(&self) -> &ScriptInfo {
586 &self.script_info
587 }
588
589 #[inline]
592 pub fn candlesticks(&self) -> &Series<Candlestick> {
593 &self.candlesticks
594 }
595
596 pub fn strategy_report(&self) -> Option<StrategyReport> {
599 self.strategy_state.as_ref().map(|s| s.to_report())
600 }
601
602 #[inline]
607 pub fn into_chart(self) -> Chart {
608 self.chart
609 }
610
611 pub(crate) fn new_for_security(
616 program: Program,
617 timeframe: TimeFrame,
618 symbol_info: SymbolInfo,
619 script_info: ScriptInfo,
620 execution_limits: ExecutionLimits,
621 currency_converter: Box<dyn CurrencyConverter>,
622 ) -> Self {
623 let arena = Arena::new(|mc| State::new(mc, &program, Some(&script_info)));
624 Instance {
625 program,
626 arena,
627 timeframe,
628 symbol_info,
629 candlesticks: Series::new(),
630 last_info: None,
631 bar_index: 0,
632 input_index: 0,
633 script_info,
634 chart: Chart::default(),
635 events: Vec::new(),
636 input_sessions: InputSessions::ALL,
637 strategy_state: None,
638 currency_converter,
639 execution_limits,
640 data_provider: None,
641 candlestick_buffers: HashMap::new(),
642 security_sub_states: HashMap::new(),
643 security_lower_tf_sub_states: HashMap::new(),
644 last_bar_confirmed: true,
645 pending_security_capture: None,
646
647 output_mode: OutputMode::default(),
648 }
649 }
650
651 #[allow(clippy::too_many_arguments)]
669 pub(crate) async fn execute_security_bar(
670 &mut self,
671 bar: Candlestick,
672 bar_state: BarState,
673 call_key: usize,
674 return_type: &openpine_compiler::instructions::TypeDescriptor,
675 security_depth: usize,
676 _execution_limits: ExecutionLimits,
677 provider: Option<&dyn InternalProvider>,
678 ) -> Result<(SerializedRawValue, Vec<SerializedObject>), Error> {
679 use crate::raw_value::RawValue;
680
681 self.before_execute(bar_state);
682 self.candlesticks.update(bar);
683
684 self.pending_security_capture = Some(SecurityCapture {
685 call_key,
686 result: None,
687 });
688
689 self.do_execute(bar_state, provider, security_depth).await?;
690
691 self.after_execute(bar_state);
692
693 let captured = self.pending_security_capture.take().and_then(|c| c.result);
695
696 let inner_type = return_type.instructions_return_type().into_owned();
700 let is_ref = inner_type.is_reference_type();
701 let raw_value = captured.unwrap_or(RawValue::NA);
702 let serialized = serialize_value_in_arena(&mut self.arena, raw_value, is_ref, &inner_type);
703 Ok(serialized)
704 }
705}
706
707pub(crate) fn builtins_loader() -> impl LibraryLoader {
708 openpine_builtins::builtins_loader()
709}
710
711pub(crate) fn builtins_load_options() -> openpine_compiler::loader::LoadOptions {
712 openpine_builtins::load_options()
713}