Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all_or_nothing transactions #590

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 76 additions & 7 deletions flumine/execution/transaction.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from collections import defaultdict
from typing import Optional

from ..order.orderpackage import OrderPackageType, BetfairOrderPackage
from ..events import events
Expand Down Expand Up @@ -28,9 +29,27 @@ class Transaction:
..
t.cancel_order(order)
t.place_order(order) # both executed on transaction __exit__

When atomic==True, orders within a transaction will
only be placed if the all pass validation.
For example, supposed we have:

with market.transaction(atomic=True) as t:
t.place_order(order1) # validation passes
t.place_order(order2) # validation does not pass and raises a ControlError

Neither order1 nor order2 will be executed, as order2 failed validation.

"""

def __init__(self, market, id_: int, async_place_orders: bool, client):
def __init__(
self,
market,
id_: int,
async_place_orders: bool,
client,
atomic: Optional[bool] = False,
):
self.market = market
self._client = client
self._id = id_ # unique per market only
Expand All @@ -40,6 +59,7 @@ def __init__(self, market, id_: int, async_place_orders: bool, client):
self._pending_cancel = [] # list of (<Order>, None)
self._pending_update = [] # list of (<Order>, None)
self._pending_replace = [] # list of (<Order>, market_version)
self.atomic = atomic

def place_order(
self,
Expand All @@ -52,6 +72,7 @@ def place_order(
if (
execute
and not force
and not self.atomic
and self._validate_controls(order, OrderPackageType.PLACE) is False
):
return False
Expand All @@ -77,8 +98,6 @@ def place_order(
if new_trade:
self.market.flumine.log_control(events.TradeEvent(order.trade))
if execute: # handles replaceOrder
runner_context = order.trade.strategy.get_runner_context(*order.lookup)
runner_context.place(order.trade.id)
self._pending_place.append((order, market_version))
self._pending_orders = True
return True
Expand All @@ -94,6 +113,7 @@ def cancel_order(
)
if (
not force
and not self.atomic
and self._validate_controls(order, OrderPackageType.CANCEL) is False
):
return False
Expand All @@ -114,6 +134,7 @@ def update_order(
)
if (
not force
and not self.atomic
and self._validate_controls(order, OrderPackageType.UPDATE) is False
):
return False
Expand All @@ -134,6 +155,7 @@ def replace_order(
)
if (
not force
and not self.atomic
and self._validate_controls(order, OrderPackageType.REPLACE) is False
):
return False
Expand All @@ -146,6 +168,9 @@ def replace_order(
def execute(self) -> int:
packages = []
if self._pending_place:
for order, market_version in self._pending_place:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't delay like this as it will result in the controls that act on live/open/count orders/trades incorrect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that RunnerContext.place updates these fields:

  • invested
  • datetime_last_placed
  • trades
  • live_trades

I've updated the transaction to remove failed trades from live_trades, but am unsure whether the other 3 fields need to be reverted as well.

If a transaction contains multiple orders on the same runner, working out which datetime_last_placed to set it to could be tricky.

Similarly for RunnerContext.trades, if a trade contains multiple orders, but only one of them fails, which element from RunnerContext.trades should we remove?

runner_context = order.trade.strategy.get_runner_context(*order.lookup)
runner_context.place(order.trade.id)
packages += self._create_order_package(
self._pending_place,
OrderPackageType.PLACE,
Expand Down Expand Up @@ -185,15 +210,18 @@ def execute(self) -> int:
def _validate_controls(self, order, package_type: OrderPackageType) -> bool:
# return False on violation
try:
for control in self.market.flumine.trading_controls:
control(order, package_type)
for control in self._client.trading_controls:
control(order, package_type)
self._do_validate_controls(order, package_type)
except ControlError:
return False
else:
return True

def _do_validate_controls(self, order, package_type):
for control in self.market.flumine.trading_controls:
control(order, package_type)
for control in self._client.trading_controls:
control(order, package_type)

def _create_order_package(
self, orders: list, package_type: OrderPackageType, async_: bool = False
) -> list:
Expand Down Expand Up @@ -224,5 +252,46 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):

if self._pending_orders:

if self.atomic:
try:
for order in self._pending_place:
self._do_validate_controls(order, OrderPackageType.PLACE)
for order in self._pending_cancel:
self._do_validate_controls(order, OrderPackageType.CANCEL)
for order in self._pending_update:
self._do_validate_controls(order, OrderPackageType.UPDATE)
for order in self._pending_replace:
self._do_validate_controls(order, OrderPackageType.REPLACE)
except ControlError as e:
if logger.isEnabledFor(logging.INFO):
extra = {
"market_id": self.market.market_id,
"transaction_id": self._id,
"client_username": self._client.username,
}
if self._pending_place:
extra["pending_place"] = self._pending_place
if self._pending_update:
extra["pending_update"] = self._pending_update
if self._pending_cancel:
extra["pending_cancel"] = self._pending_cancel
if self._pending_replace:
extra["pending_replace"] = self._pending_replace
logger.info(
"Failed to execute transaction. Validation failed: %s"
% str(e),
extra=extra,
)
self._clear()
raise
self.execute()

def _clear(self):
self._pending_place.clear()
self._pending_update.clear()
self._pending_replace.clear()
self._pending_cancel.clear()
self._pending_orders = False
8 changes: 7 additions & 1 deletion flumine/markets/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ def close_market(self) -> None:
extra=self.info,
)

def transaction(self, async_place_orders: bool = None, client=None) -> Transaction:
def transaction(
self,
async_place_orders: bool = None,
client=None,
all_or_nothing: Optional[bool] = False,
) -> Transaction:
if async_place_orders is None:
async_place_orders = config.async_place_orders
if client is None:
Expand All @@ -73,6 +78,7 @@ def transaction(self, async_place_orders: bool = None, client=None) -> Transacti
id_=self._transaction_id,
async_place_orders=async_place_orders,
client=client,
atomic=all_or_nothing,
)

# order
Expand Down
2 changes: 2 additions & 0 deletions tests/test_markets.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def test_transaction(self, mock_transaction):
id_=self.market._transaction_id,
async_place_orders=False,
client=self.market.flumine.clients.get_default(),
atomic=False,
)
self.assertEqual(transaction, mock_transaction())

Expand All @@ -172,6 +173,7 @@ def test_transaction_async(self, mock_transaction):
id_=self.market._transaction_id,
async_place_orders=True,
client=self.market.flumine.clients.get_default(),
atomic=False,
)
self.assertEqual(transaction, mock_transaction())

Expand Down
Loading