Streaming Real Time / CEP#

Real time enabled databases, can be both queried for historic and intra-day data, plus subscribed against for real-time data. Market data can be subscribed, or alternatively calculation can occur in real-time, and results can be subscribed against.

Real Time Subscription#

While historic queries return a dataframe, real time subcriptions require a callback interface. As new records are are retrieved, the callback interface will be called. Additionally the run method should also specify:

  • output_mode=otq.QueryOutputMode.callback - Output mode is set to Callback

  • start - If set to a a time before now(), the historical component will also be returned

  • end - Set to a future timestamp (either absolute, or relative to now)

  • callback=InfoCallback() - Is set to the Callback class

  • running_query_flag=True - Indicating this is a running streaming subscription.

  • bs_time_msec =500 - Indicating how to batch returned records in milliseconds

  • cancellation_handle=query_cancellation_handle - Setting how to cancel the streaming subscription

Subscribing to Real Time Trades for ESZ24#
def trd():
    # Tick Data
    pt = otq.Passthrough().tick_type('TRD')
    add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
    graph = otq.Graph(pt >> add_sym)
    return graph

query_cancellation_handle = otq.QueryCancellationHandle()

# Sleep for some time, then cancel the query
def query_cancel_thread():
    time.sleep(30)
    query_cancellation_handle.cancel_query()

# Callback Interface
class InfoCallback(otq.CallbackBase):
    def __init__(self):
        super().__init__()
        symbol = ""

    def replicate(self):
        return InfoCallback()

    def process_symbol_name(self, symbol_name):
        self.symbol = symbol_name

    def process_ticks(self, ticks):
        print("process_ticks: " + self.symbol + "- " + str(ticks))

    def done(self):
        pass


thread.start_new_thread(query_cancel_thread, ())

try:
    result = otq.run(trd(),
        http_address=rest_url,access_token=access_token,
        output_mode=otq.QueryOutputMode.callback,
        start=(datetime.utcnow() - timedelta(minutes=1)).replace(second=0,microsecond=0),
        end = (datetime.utcnow() + timedelta(hours=1)).replace(second=0,microsecond=0),
        timezone='UTC',
        symbols='CME_MINI::ES\Z24',
        callback=InfoCallback(),
        running_query_flag=True,
        bs_time_msec =500,
        cancellation_handle=query_cancellation_handle
        )
except:
    print("ended")

Conflated Real Time Subscription#

High liquidity real time sources may output a large set of real-time data, which can be reduced by conflating the output. A simple conflation approach is to retrieve the last prevailing value periodically, which can be achieved by adding a “LastTick” node to the query graph.

Graphs for Tick and Conflated Output#
def trd():
    # Tick Data
    pt = otq.Passthrough().tick_type('TRD')
    add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
    graph = otq.Graph(pt >> add_sym)
    return graph

def trd_1s():
    # Conflated to Last Second
    pt = otq.Passthrough().tick_type('TRD')
    last_tick = otq.LastTick(bucket_interval=1)
    graph = otq.Graph(pt >> last_tick)
    return graph

def trd_10s():
    # Conflated to Last 10 Second
    pt = otq.Passthrough().tick_type('TRD')
    add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
    last_tick = otq.LastTick(bucket_interval=10)
    graph = otq.Graph(pt >> add_sym >> last_tick)
    return graph

Streaming Bars Subscription#

Streaming Bars can be subscribed against by defining a directed graph with aggregations using the Compute node, and defining the bucket_interval.

Graphs for Tick and Real Time Bars Output#
def trd():
    # Tick Data
    pt = otq.Passthrough().tick_type('TRD')
    add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
    graph = otq.Graph(pt >> add_sym)
    return graph

def trd_bar():
    # Real Time Bar Generated
    pt = otq.Passthrough(fields='PRICE,SIZE').tick_type('TRD')
    comp_fields = otq.ComputeFields({
            'first_price':otq.First(In='PRICE'),
            'low_price':otq.Low(In='PRICE'),
            'high_price':otq.High(In='PRICE'),
            'last_price':otq.Last(In='PRICE',time_series_type='STATE_TS'),
            'vwap_price':otq.Vwap(price_field_name='PRICE',size_field_name='SIZE'),
            'sum_size':otq.Sum(In='SIZE'),
            'trd_count':otq.Count()
            })
    bar = otq.Compute(compute=comp_fields,bucket_interval=1,append_output_field_name=False)
    add_sym = otq.AddField(field='SYMBOL',value='_SYMBOL_NAME')
    graph = otq.Graph(pt >> bar >> add_sym)
    return graph