Streaming Real Time / CEP#
Real time enabled databases, can be both queried for historic and intra-day data, plus subscribed against for real-time data. Market data can be subscribed, or alternatively calculation can occur in real-time, and results can be subscribed against.
Real Time Subscription#
While historic queries return a dataframe, real time subcriptions require a callback interface. As new records are are retrieved, the callback interface will be called.
Additionally the run
method should also specify:
output_mode=otq.QueryOutputMode.callback
- Output mode is set to Callbackstart
- If set to a a time beforenow()
, the historical component will also be returnedend
- Set to a future timestamp (either absolute, or relative to now)callback=InfoCallback()
- Is set to the Callback classrunning_query_flag=True
- Indicating this is a running streaming subscription.bs_time_msec =500
- Indicating how to batch returned records in millisecondscancellation_handle=query_cancellation_handle
- Setting how to cancel the streaming subscription
def trd():
# Tick Data
pt = otq.Passthrough().tick_type('TRD')
add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
graph = otq.Graph(pt >> add_sym)
return graph
query_cancellation_handle = otq.QueryCancellationHandle()
# Sleep for some time, then cancel the query
def query_cancel_thread():
time.sleep(30)
query_cancellation_handle.cancel_query()
# Callback Interface
class InfoCallback(otq.CallbackBase):
def __init__(self):
super().__init__()
symbol = ""
def replicate(self):
return InfoCallback()
def process_symbol_name(self, symbol_name):
self.symbol = symbol_name
def process_ticks(self, ticks):
print("process_ticks: " + self.symbol + "- " + str(ticks))
def done(self):
pass
thread.start_new_thread(query_cancel_thread, ())
try:
result = otq.run(trd(),
http_address=rest_url,access_token=access_token,
output_mode=otq.QueryOutputMode.callback,
start=(datetime.utcnow() - timedelta(minutes=1)).replace(second=0,microsecond=0),
end = (datetime.utcnow() + timedelta(hours=1)).replace(second=0,microsecond=0),
timezone='UTC',
symbols='CME_MINI::ES\Z24',
callback=InfoCallback(),
running_query_flag=True,
bs_time_msec =500,
cancellation_handle=query_cancellation_handle
)
except:
print("ended")
Conflated Real Time Subscription#
High liquidity real time sources may output a large set of real-time data, which can be reduced by conflating the output. A simple conflation approach is to retrieve the last prevailing value periodically, which can be achieved by adding a “LastTick” node to the query graph.
def trd():
# Tick Data
pt = otq.Passthrough().tick_type('TRD')
add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
graph = otq.Graph(pt >> add_sym)
return graph
def trd_1s():
# Conflated to Last Second
pt = otq.Passthrough().tick_type('TRD')
last_tick = otq.LastTick(bucket_interval=1)
graph = otq.Graph(pt >> last_tick)
return graph
def trd_10s():
# Conflated to Last 10 Second
pt = otq.Passthrough().tick_type('TRD')
add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
last_tick = otq.LastTick(bucket_interval=10)
graph = otq.Graph(pt >> add_sym >> last_tick)
return graph
Streaming Bars Subscription#
Streaming Bars can be subscribed against by defining a directed graph with aggregations using the Compute
node, and defining the bucket_interval
.
def trd():
# Tick Data
pt = otq.Passthrough().tick_type('TRD')
add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
graph = otq.Graph(pt >> add_sym)
return graph
def trd_bar():
# Real Time Bar Generated
pt = otq.Passthrough(fields='PRICE,SIZE').tick_type('TRD')
comp_fields = otq.ComputeFields({
'first_price':otq.First(In='PRICE'),
'low_price':otq.Low(In='PRICE'),
'high_price':otq.High(In='PRICE'),
'last_price':otq.Last(In='PRICE',time_series_type='STATE_TS'),
'vwap_price':otq.Vwap(price_field_name='PRICE',size_field_name='SIZE'),
'sum_size':otq.Sum(In='SIZE'),
'trd_count':otq.Count()
})
bar = otq.Compute(compute=comp_fields,bucket_interval=1,append_output_field_name=False)
add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
graph = otq.Graph(pt >> bar >> add_sym)
return graph