Skip to content

Commit

Permalink
Add a mostly actor aware API to IB backend
Browse files Browse the repository at this point in the history
Infected `asyncio` support is being added to `tractor` in
goodboy/tractor#121 so delegate to all that new machinery.

Start building out an "actor-aware" api which takes care of all the
`trio`-`asyncio` interaction for data streaming and request handling.
Add a little (shudder) method proxy system which can be used to invoke
client methods from another actor. Start on a streaming api in
preparation for real-time charting.
  • Loading branch information
goodboy committed Sep 2, 2020
1 parent d2e12f6 commit f3800e3
Showing 1 changed file with 209 additions and 93 deletions.
302 changes: 209 additions & 93 deletions piker/brokers/ib.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
"""
Interactive Brokers API backend.
Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware apis must be spawned with
``infected_aio==True``.
"""
import asyncio
from dataclasses import asdict
from typing import List, Dict, Any
from functools import partial
import inspect
from typing import List, Dict, Any, Tuple
from contextlib import asynccontextmanager
import time

import trio
import tractor
from async_generator import aclosing
import ib_insync as ibis
from ib_insync.ticker import Ticker
from ib_insync.contract import Contract, ContractDetails

from ..log import get_logger, get_console_log


log = get_logger(__name__)


_time_frames = {
'1s': '1 Sec',
Expand All @@ -35,14 +48,14 @@

class Client:
"""IB wrapped for our broker backend API.
Note: this client requires running inside an ``asyncio`` loop.
"""
def __init__(
self,
ib: ibis.IB,
) -> None:
self.ib = ib
# connect data feed callback...
self.ib.pendingTickersEvent.connect(self.on_tickers)

async def bars(
self,
Expand All @@ -57,7 +70,7 @@ async def bars(
"""
contract = ibis.ContFuture('ES', exchange='GLOBEX')
# contract = ibis.Stock('WEED', 'SMART', 'CAD')
bars = self.ib.reqHistoricalData(
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime='',
# durationStr='60 S',
Expand Down Expand Up @@ -88,16 +101,25 @@ async def search_stocks(
Return a dictionary of ``upto`` entries worth of contract details.
"""
descriptions = self.ib.reqMatchingSymbols(pattern)
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)

futs = []
for d in descriptions:
con = d.contract
futs.append(self.ib.reqContractDetailsAsync(con))

# batch request all details
results = await asyncio.gather(*futs)

# XXX: if there is more then one entry in the details list
details = {}
for description in descriptions:
con = description.contract
deats = self.ib.reqContractDetails(con)
# XXX: if there is more then one entry in the details list
for details_set in results:
# then the contract is so called "ambiguous".
for d in deats:
for d in details_set:
con = d.contract
unique_sym = f'{con.symbol}.{con.primaryExchange}'
details[unique_sym] = asdict(d) if asdicts else d

if len(details) == upto:
return details

Expand All @@ -118,23 +140,38 @@ def get_cont_fute(
) -> Contract:
raise NotImplementedError

async def stream_ticker(
self,
symbol: str,
to_trio,
opts: Tuple[int] = ('233', '375'),
) -> None:
"""Stream a ticker using the std L1 api.
"""
sym, exch = symbol.split('.')
contract = ibis.Stock(sym.upper(), exchange=exch.upper())
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))

# let the engine run and stream
await self.ib.disconnectedEvent


# default config ports
_tws_port: int = 7497
_gw_port: int = 4002


@asynccontextmanager
async def get_client(
async def _aio_get_client(
host: str = '127.0.0.1',
port: int = None,
client_id: int = 1,
) -> Client:
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
"""
ib = ibis.IB()
# TODO: some detection magic to figure out if tws vs. the
# gateway is up ad choose the appropriate port

if port is None:
ports = [_tws_port, _gw_port]
else:
Expand All @@ -152,91 +189,170 @@ async def get_client(
else:
raise ConnectionRefusedError(_err)

yield Client(ib)
ib.disconnect()

try:
yield Client(ib)
except BaseException:
ib.disconnect()
raise


async def _aio_run_client_method(
meth: str,
to_trio,
from_trio,
**kwargs,
) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:

async_meth = getattr(client, meth)

# handle streaming methods
args = tuple(inspect.getfullargspec(async_meth).args)
if 'to_trio' in args:
kwargs['to_trio'] = to_trio

return await async_meth(**kwargs)


async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
ca = tractor.current_actor()
assert ca.is_infected_aio()

# if the method is an async gen stream for it
meth = getattr(Client, method)
if inspect.isasyncgenfunction(meth):
kwargs['_treat_as_stream'] = True

# if the method is an async func but streams back results
# make sure to also stream from it
args = tuple(inspect.getfullargspec(meth).args)
if 'to_trio' in args:
kwargs['_treat_as_stream'] = True

result = await tractor.to_asyncio.run_task(
_aio_run_client_method,
meth=method,
**kwargs
)
return result


def get_method_proxy(portal):

class MethodProxy:
def __init__(self, portal: tractor._portal.Portal):
self._portal = portal

async def _run_method(
self,
*,
meth: str = None,
**kwargs
) -> Any:
return await self._portal.run(
__name__,
'_trio_run_client_method',
method=meth,
**kwargs
)

proxy = MethodProxy(portal)

# mock all remote methods
for name, method in inspect.getmembers(
Client, predicate=inspect.isfunction
):
if '_' == name[0]:
continue
setattr(proxy, name, partial(proxy._run_method, meth=name))

return proxy

if __name__ == '__main__':

con_es = ibis.ContFuture('ES', exchange='GLOBEX')
es = ibis.Future('ES', '20200918', exchange='GLOBEX')
spy = ibis.Stock('SPY', exchange='ARCA')

# ticker = client.ib.reqTickByTickData(
# contract,
# tickType='Last',
# numberOfTicks=1,
# )
# client.ib.reqTickByTickData(
# contract,
# tickType='AllLast',
# numberOfTicks=1,
# )
# client.ib.reqTickByTickData(
# contract,
# tickType='BidAsk',
# numberOfTicks=1,
# )

# ITC (inter task comms)
from_trio = asyncio.Queue()
to_trio, from_aio = trio.open_memory_channel(float("inf"))

async def start_ib(from_trio, to_trio):
print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with get_client() as client:

# stream ticks to trio task
def ontick(ticker: Ticker):
for t in ticker.ticks:
# send tick data to trio
to_trio.send_nowait(t)

ticker = client.ib.reqMktData(spy, '588', False, False, None)
ticker.updateEvent += ontick

n = await from_trio.get()
assert n == 0

# sleep and let the engine run
await asyncio.sleep(float('inf'))

# TODO: cmd processing from trio
# while True:
# n = await from_trio.get()
# print(f"aio got: {n}")
# to_trio.send_nowait(n + 1)

async def trio_main():
print("trio_main!")

asyncio.create_task(
start_ib(from_trio, to_trio)
)
@asynccontextmanager
async def maybe_spawn_brokerd(
**kwargs,
) -> tractor._portal.Portal:
async with tractor.find_actor('brokerd_ib') as portal:
if portal is None: # no broker daemon created yet

async with tractor.open_nursery() as n:
# XXX: this needs to somehow be hidden
portal = await n.start_actor(
'brokerd_ib',
rpc_module_paths=[__name__],
infect_asyncio=True,
)
async with tractor.wait_for_actor(
'brokerd_ib'
) as portal:
yield portal

# client code may block indefinitely so cancel when
# teardown is invoked
await n.cancel()

from_trio.put_nowait(0)

async for tick in from_aio:
@asynccontextmanager
async def get_client(
**kwargs,
) -> Client:
"""Init the ``ib_insync`` client in another actor and return
a method proxy to it.
"""
async with maybe_spawn_brokerd(**kwargs) as portal:
yield get_method_proxy(portal)


async def trio_stream_ticker(sym):
get_console_log('info')

# con_es = ibis.ContFuture('ES', exchange='GLOBEX')
# es = ibis.Future('ES', '20200918', exchange='GLOBEX')

stream = await tractor.to_asyncio.run_task(
_trio_run_client_method,
method='stream_ticker',
symbol=sym,
)
async with aclosing(stream):
async for ticker in stream:
lft = ticker.lastFillTime
for tick_data in ticker.ticks:
value = tick_data._asdict()
now = time.time()
value['time'] = now
value['last_fill_time'] = lft
if lft:
value['latency'] = now - lft
yield value


async def stream_from_brokerd(sym):

async with maybe_spawn_brokerd() as portal:
stream = await portal.run(
__name__,
'trio_stream_ticker',
sym=sym,
)
async for tick in stream:
print(f"trio got: {tick}")

# TODO: send cmds to asyncio
# from_trio.put_nowait(n + 1)

async def aio_main():
loop = asyncio.get_running_loop()

trio_done_fut = asyncio.Future()

def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)

trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
if __name__ == '__main__':
import sys

(await trio_done_fut).unwrap()
sym = sys.argv[1]

asyncio.run(aio_main())
tractor.run(
stream_from_brokerd,
sym,
# XXX: must be multiprocessing
start_method='forkserver',
loglevel='info'
)

0 comments on commit f3800e3

Please sign in to comment.