diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 00000000..e69de29b diff --git a/404.html b/404.html new file mode 100644 index 00000000..024ca6a9 --- /dev/null +++ b/404.html @@ -0,0 +1,576 @@ + + + +
+ + + + + + + + + + + + + + + +Functions:
+add_client
Adds a client to the frameworkadd_strategy
Adds a strategy to the frameworkadd_worker
Adds a worker to the frameworkadd_client_control
Adds a client control to the frameworkadd_trading_control
Adds a trading control to the frameworkadd_market_middleware
Adds market middleware to the frameworkadd_logging_control
Adds a logging control to the frameworkThe Flumine class can be adapted by overriding the following functions:
+_process_market_books()
called on MarketBook event_process_sports_data()
called on SportsData event_process_market_orders()
called when market has pending orders_process_order_package()
called on new OrderPackage_add_market()
called when new Market received through streams_remove_market()
called when Market removed from framework_process_raw_data()
called on RawData event_process_market_catalogues
called on MarketCatalogue event_process_current_orders
called on currentOrders event_process_custom_event
called on CustomEvent event see here_process_close_market
called on Market closure_process_cleared_orders()
called on ClearedOrders event_process_cleared_markets()
called on ClearedMarkets event_process_end_flumine()
called on Flumine terminationFlumine handles market streams by taking the parameters provided in the strategies, a strategy will then subscribe to the stream. This means strategies can share streams reducing load or create new if they require different markets or data filter.
+Similar to Market Streams but the raw streaming data is passed back, this reduces ram/CPU and allows recording of the data for future playback, see the example marketrecorder.py
+This is created on a per market basis when simulating.
+Subscribes to all orders per running instance using the config.customer_strategy_ref
Custom streams (aka threads) can be added as per:
+from flumine.streams.basestream import BaseStream
+from flumine.events.events import CustomEvent
+
+
+class CustomStream(BaseStream):
+ def run(self) -> None:
+ # connect to stream / make API requests etc.
+ response = api_call()
+
+ # callback func
+ def callback(framework, event):
+ for strategy in framework.strategies:
+ strategy.process_my_event(event)
+
+ # push results through using custom event
+ event = CustomEvent(response, callback)
+
+ # put in main queue
+ self.flumine.handler_queue.put(event)
+
+
+custom_stream = CustomStream(framework, custom=True)
+framework.streams.add_custom_stream(custom_stream)
+
Flumine will catch all errors that occur in strategy.check_market
and strategy.process_market_book
, and log either error or critical errors.
Tip
+You can remove this error handling by setting config.raise_errors = True
jsonlogger is used to log extra detail, see below for a typical setup:
+import time
+import logging
+from pythonjsonlogger import jsonlogger
+
+logger = logging.getLogger()
+
+custom_format = "%(asctime) %(levelname) %(message)"
+log_handler = logging.StreamHandler()
+formatter = jsonlogger.JsonFormatter(custom_format)
+formatter.converter = time.gmtime
+log_handler.setFormatter(formatter)
+logger.addHandler(log_handler)
+logger.setLevel(logging.INFO)
+
Updated to True when simulating or paper trading
+Defaults to True to match orders per strategy, when False prevents double counting of passive liquidity on all orders regardless of strategy.
+When True will simulate matches against available prices after initial execution, note this will double count liquidity.
+Store server id or similar (e.g. AWS ec2 instanceId)
+Used as customerStrategyRefs so that only orders created by the running instance are returned.
+OS process id of running application.
+Used for simulation
+Raises errors on strategy functions, see Error Handling
+Max number of workers in execution thread pool
+Place orders sent with place orders flag, prevents waiting for bet delay
+Place latency used for simulation / simulation execution
+Cancel latency used for simulation / simulation execution
+Update latency used for simulation / simulation execution
+Replace latency used for simulation / simulation execution
+customer_order_ref separator
+ + + + + + +Typical to most trading frameworks flumine uses an event driven design with the main thread handling these events through a FIFO queue.
+MARKET_CATALOGUE
Betfair MarketCatalogue objectMARKET_BOOK
Betfair MarketBook objectRAW_DATA
Raw streaming dataCURRENT_ORDERS
Betfair CurrentOrders objectCLEARED_MARKETS
Betfair ClearedMarkets objectCLEARED_ORDERS
Betfair ClearedOrders objectCLOSE_MARKET
flumine Close Market updateSTRATEGY_RESET
flumine Strategy Reset updateCUSTOM_EVENT
flumine Custom event updateTERMINATOR
flumine End instance updateThe above events are handled in the flumine class
+Simulation is achieved by monkeypatching the datetime function utcnow(), this allows strategies to be simulated as if they were being executed in real time. Functions such as market.seconds_to_start and fillKill.seconds work as per a live execution.
+Flumine is capable of using multiple clients, these can be of the same ExchangeType
, a variation depending on use case or your own custom client/wrapper. The default workers handle login/keep-alive/logout and market closure for all clients added to the framework automatically.
BetfairClient
SimulatedClient
BetconnectClient
To add a client use the add_client
this will allow use via framework.clients
or strategy.clients
from flumine import Flumine, clients
+
+framework = Flumine()
+
+client = clients.BetfairClient(trading)
+framework.add_client(client)
+
or when simulating:
+from flumine import FlumineSimulation, clients
+
+framework = FlumineSimulation()
+
+client = clients.SimulatedClient(username="123")
+framework.add_client(client)
+
To access clients within a strategy use the helper functions:
+betfair_client = self.clients.get_betfair_default()
+
+client = self.clients.get_client(ExchangeType.SIMULATED, username="123")
+
Tip
+get_default
and get_betfair_default
will use the first client added via add_client
(ordered list)
By default a transaction will use clients.get_default()
however you can use a particular client:
client = self.clients.get_client(ExchangeType.SIMULATED, username="123")
+
+market.place_order(order, client=client)
+
or using a transaction directly:
+client = self.clients.get_client(ExchangeType.SIMULATED, username="123")
+
+with market.transaction(client=client) as t:
+ t.place_order(order)
+
Before placing an order flumine will check the client and trading controls, this allows validation to occur before execution. If an order does not meet any of these validations it is not executed and status is updated to Violation
.
MaxTransactionCount
: Checks transaction count is not over betfair transaction limit (5000 per hour) OrderValidation
: Checks order is valid (size/odds)StrategyExposure
: Checks order does not invalidate strategy.validate_order
, strategy.max_order_exposure
or strategy.max_selection_exposure
Sometimes it is desirable to skip the controls, for example when canceling an open order even if the transaction count has already reached the betfair transaction limit. This can be done by passing force=True
when placing or changing an order:
market.place_order(order, force=True)
+transaction.place_order(order, force=True)
+
This works for markets and transactions and is supported by the operations place_order
, cancel_order
, update_order
, and replace_order
.
Custom logging is available using the LoggingControl
class, the base class creates debug logs and can be used as follows:
from flumine.controls.loggingcontrols import LoggingControl
+
+control = LoggingControl()
+
+framework.add_logging_control(control)
+
Tip
+More than one control can be added, for example a csv logger and db logger.
+Please try the following channels for any support:
+Betfair trading framework with a focus on:
+Support for market, order and custom streaming data.
+ +Tested on Python 3.7, 3.8, 3.9, 3.10 and 3.11.
+$ pip install flumine
+
flumine requires Python 3.7+
+Get started...
+import betfairlightweight
+from flumine import Flumine, clients
+
+trading = betfairlightweight.APIClient("username")
+client = clients.BetfairClient(trading)
+
+framework = Flumine(
+ client=client,
+)
+
Example strategy:
+from flumine import BaseStrategy
+from flumine.order.trade import Trade
+from flumine.order.order import LimitOrder, OrderStatus
+from flumine.markets.market import Market
+from betfairlightweight.filters import streaming_market_filter
+from betfairlightweight.resources import MarketBook
+
+
+class ExampleStrategy(BaseStrategy):
+ def start(self) -> None:
+ print("starting strategy 'ExampleStrategy'")
+
+ def check_market_book(self, market: Market, market_book: MarketBook) -> bool:
+ # process_market_book only executed if this returns True
+ if market_book.status != "CLOSED":
+ return True
+
+ def process_market_book(self, market: Market, market_book: MarketBook) -> None:
+ # process marketBook object
+ for runner in market_book.runners:
+ if runner.status == "ACTIVE" and runner.last_price_traded < 1.5:
+ trade = Trade(
+ market_id=market_book.market_id,
+ selection_id=runner.selection_id,
+ handicap=runner.handicap,
+ strategy=self,
+ )
+ order = trade.create_order(
+ side="LAY", order_type=LimitOrder(price=1.01, size=2.00)
+ )
+ market.place_order(order)
+
+ def process_orders(self, market: Market, orders: list) -> None:
+ for order in orders:
+ if order.status == OrderStatus.EXECUTABLE:
+ if order.size_remaining == 2.00:
+ market.cancel_order(order, 0.02) # reduce size to 1.98
+ if order.order_type.persistence_type == "LAPSE":
+ market.update_order(order, "PERSIST")
+ if order.size_remaining > 0:
+ market.replace_order(order, 1.02) # move
+
+
+strategy = ExampleStrategy(
+ market_filter=streaming_market_filter(
+ event_type_ids=["7"],
+ country_codes=["GB"],
+ market_types=["WIN"],
+ )
+)
+
+framework.add_strategy(strategy)
+
Run framework:
+framework.run()
+
Danger
+By default flumine will try to prevent coding errors which result in flash crashes and burnt fingers but use at your own risk as per the MIT license.
+Recommendation is not to remove the trading controls and carry out extensive testing before executing on live markets, even then only use new strategies on an account with a small balance (transfer balance to games wallet).
+flumine relies on these libraries:
+betfairlightweight
- Betfair API supporttenacity
- Used for connection retrying (streaming)python-json-logger
- JSON loggingrequests
- HTTP supportWithin markets you have market objects which contains current up to date market data.
+flumine
Frameworkmarket_id
MarketBook idclosed
Closed booldate_time_closed
Closed datetimemarket_book
Latest MarketBook objectmarket_catalogue
Latest MarketCatalogue objectcontext
Market context, store market specific context e.g. simulated data storeblotter
Holds all order data and positionplace_order(order)
Place new order from order objectcancel_order(order, size_reduction)
Cancel orderupdate_order(order, new_persistance_type)
Update orderreplace_order(order, new_price)
Replace orderevent
Dictionary containing all event related markets (assumes markets have been subscribed)event_type_id
Betfair event type id (horse racing: 7)event_id
Market event id (12345)market_type
Market type ('WIN')seconds_to_start
Seconds to scheduled market start time (123.45)elapsed_seconds_closed
Seconds since market was closed (543.21)market_start_datetime
Market scheduled start timeThe transaction class is used by default when orders are executed, however it is possible to control the execution behaviour using the transaction class like so:
+with market.transaction() as t:
+ market.place_order(order) # executed immediately in separate transaction
+ t.place_order(order) # executed on transaction __exit__
+
+with market.transaction() as t:
+ t.place_order(order)
+ ..
+ t.execute() # above order executed
+ ..
+ t.cancel_order(order)
+ t.place_order(order) # both executed on transaction __exit__
+
The blotter is a simple and fast class to hold all orders for a particular market.
+strategy_orders(strategy)
Returns all orders related to a strategystrategy_selection_orders(strategy, selection_id, handicap)
Returns all orders related to a strategy selectionselection_exposure(strategy, lookup)
Returns strategy/selection exposuremarket_exposure(strategy, market_book)
Returns strategy/market exposurelive_orders
List of live ordershas_live_orders
Bool on live ordersIt is common that you want to carry about analysis on a market before passing through to strategies, similar to Django's middleware design flumine allows middleware to be executed.
+For example simulation uses simulated middleware in order to calculate order matching.
+Note
+Middleware will be executed in the order it is added and before the strategies are processed.
+Please see below for the example middleware class if you wish to create your own:
+from flumine.markets.middleware import Middleware
+
+class CustomMiddleware(Middleware):
+ def __call__(self, market) -> None:
+ pass # called on each MarketBook update
+
+ def add_market(self, market) -> None:
+ print("market {0} added".format(market.market_id))
+
+ def remove_market(self, market) -> None:
+ print("market {0} removed".format(market.market_id))
+
The above middleware can then be added to the framework:
+framework.add_logging_control(CustomMiddleware())
+
Flumine is heavily optimised out of the box to be as quick as possible however there are various ways to improve the performance further with minimal effort.
+This is one of the most powerful options available as the variables are passed down to betfairlightweight limiting the number of updates to process, examples:
+strategy = ExampleStrategy(
+ market_filter={
+ "markets": ["/tmp/marketdata/1.170212754"],
+ "listener_kwargs": {"seconds_to_start": 600, "inplay": False},
+ }
+)
+
strategy = ExampleStrategy(
+ market_filter={
+ "markets": ["/tmp/marketdata/1.170212754"],
+ "listener_kwargs": {"inplay": True},
+ }
+)
+
Logging in python can add a lot of function calls, it is therefore recommended to switch it off once you are comfortable with the outputs from a strategy:
+logger.setLevel(logging.CRITICAL)
+
This might sound obvious but having the market files stored locally on your machine will allow much quicker processing. A common pattern is to use s3 to store all market files but a local cache for common markets processed.
+smart_open
is a commonly used package for processing gz/s3 files:
with patch("builtins.open", smart_open.open):
+ framework.add_strategy(strategy)
+ framework.run()
+
Tip
+Note that add_strategy
needs to be in the patch as well.
Sometimes a download from the betfair site will include market and event files in the same directory resulting in duplicate processing, flumine will log a warning on this but it is worth checking if you are seeing slow processing times.
+Simulation is CPU bound so can therefore be improved through the use of multiprocessing, threading offers no improvement due to the limitations of the GIL.
+The multiprocessing example code below will:
+run_process
will process 8 markets at a time (prevents memory leaks)import os
+import math
+import smart_open
+from concurrent import futures
+from unittest.mock import patch as mock_patch
+from flumine import FlumineSimulation, clients, utils
+from strategies.lowestlayer import LowestLayer
+
+
+def run_process(markets):
+ client = clients.SimulatedClient()
+ framework = FlumineSimulation(client=client)
+ strategy = LowestLayer(
+ market_filter={"markets": markets},
+ context={"stake": 2},
+ )
+ with mock_patch("builtins.open", smart_open.open):
+ framework.add_strategy(strategy)
+ framework.run()
+
+
+if __name__ == "__main__":
+ all_markets = [...]
+ processes = os.cpu_count()
+ markets_per_process = 8 # optimal
+
+ _process_jobs = []
+ with futures.ProcessPoolExecutor(max_workers=processes) as p:
+ chunk = min(
+ markets_per_process, math.ceil(len(all_markets) / processes)
+ )
+ for m in (utils.chunks(all_markets, chunk)):
+ _process_jobs.append(
+ p.submit(
+ run_process,
+ markets=m,
+ )
+ )
+ for job in futures.as_completed(_process_jobs):
+ job.result() # wait for result
+
Tip
+If the code above is failing add logging to the run_process
function to find the error or run the strategy in a single process with logging
The heaviest load on CPU comes from reading the files and processing into py objects before processing through flumine, after this the bottleneck becomes the number of orders that need to be processed. Therefore anything that can be done to limit the number of redundant or control blocked orders will see an improvement.
+Profiling code is always the best option for finding improvements, cprofilev
is a commonly used python library for this:
python -m cprofilev examples/simulate.py
+
If you don't need the simulation middleware remove it from framework._market_middleware
, this is useful when processing markets for data collection. This can dramatically improve processing time due to the heavy functions contained in the simulation logic.
Installing betfairlightweight[speed] will have a big impact on processing speed due to the inclusion of C and Rust libraries for datetime and json decoding.
+For improving live trading 'Strategy' and 'cprofile' tips above will help although CPU load tends to be considerably lower compared to simulating.
+ + + + + + +Tip
+flumine uses betfairlightweight
for communicating with the Betfair API, please see docs for how to use/setup before proceeding.
First, start by importing flumine/bflw and creating a trading and framework client:
+import betfairlightweight
+from flumine import Flumine, clients
+
+trading = betfairlightweight.APIClient("username")
+client = clients.BetfairClient(trading)
+
+framework = Flumine(client=client)
+
Note
+flumine will handle login, logout and keep alive whilst the framework is running using the keep_alive
worker.
A strategy can now be created by using the BaseStrategy class:
+from flumine import BaseStrategy
+
+
+class ExampleStrategy(BaseStrategy):
+ def start(self):
+ # subscribe to streams
+ print("starting strategy 'ExampleStrategy'")
+
+ def check_market_book(self, market, market_book):
+ # process_market_book only executed if this returns True
+ if market_book.status != "CLOSED":
+ return True
+
+ def process_market_book(self, market, market_book):
+ # process marketBook object
+ print(market_book.status)
+
This strategy can now be initiated with the market and data filter before being added to the framework:
+from betfairlightweight.filters import (
+ streaming_market_filter,
+ streaming_market_data_filter,
+)
+
+strategy = ExampleStrategy(
+ market_filter=streaming_market_filter(
+ event_type_ids=["7"],
+ country_codes=["GB"],
+ market_types=["WIN"],
+ ),
+ market_data_filter=streaming_market_data_filter(fields=["EX_ALL_OFFERS"])
+)
+
+framework.add_strategy(strategy)
+
The framework can now be started:
+framework.run()
+
Orders can be placed as followed:
+from flumine.order.trade import Trade
+from flumine.order.order import LimitOrder
+
+
+class ExampleStrategy(BaseStrategy):
+ def process_market_book(self, market, market_book):
+ for runner in market_book.runners:
+ if runner.selection_id == 123:
+ trade = Trade(
+ market_id=market_book.market_id,
+ selection_id=runner.selection_id,
+ handicap=runner.handicap,
+ strategy=self
+ )
+ order = trade.create_order(
+ side="LAY",
+ order_type=LimitOrder(price=1.01, size=2.00)
+ )
+ market.place_order(order)
+
This order will be validated through controls, stored in the blotter and sent straight to the execution thread pool for execution. It is also possible to batch orders into transactions as follows:
+with market.transaction() as t:
+ market.place_order(order) # executed immediately in separate transaction
+ t.place_order(order) # executed on transaction __exit__
+
+with market.transaction() as t:
+ t.place_order(order)
+
+ t.execute() # above order executed
+
+ t.cancel_order(order)
+ t.place_order(order) # both executed on transaction __exit__
+
By default the stream class will be a MarketStream which provides a MarketBook python object, if collecting data this can be changed to a DataStream class however process_raw_data will be called and not process_market_book:
+from flumine import BaseStrategy
+from flumine.streams.datastream import DataStream
+
+
+class ExampleDataStrategy(BaseStrategy):
+ def process_raw_data(self, publish_time, data):
+ print(publish_time, data)
+
+strategy = ExampleDataStrategy(
+ market_filter=streaming_market_filter(
+ event_type_ids=["7"],
+ country_codes=["GB"],
+ market_types=["WIN"],
+ ),
+ stream_class=DataStream
+)
+
+flumine.add_strategy(strategy)
+
The OrderDataStream class can be used to record order data as per market:
+from flumine.streams.datastream import OrderDataStream
+
+strategy = ExampleDataStrategy(
+ market_filter=None,
+ stream_class=OrderDataStream
+)
+
+flumine.add_strategy(strategy)
+
Flumine can be used to paper trade strategies live using the following code:
+from flumine import clients
+
+client = clients.BetfairClient(trading, paper_trade=True)
+
Market data will be recieved as per live but any orders will use Simulated execution and Simulated order polling to replicate live trading.
+Tip
+This can be handy when testing strategies as the betfair website can be used to validate the market.
+Flumine can be used to simulate strategies using the following code:
+from flumine import FlumineSimulation, clients
+
+client = clients.SimulatedClient()
+framework = FlumineSimulation(client=client)
+
+strategy = ExampleStrategy(
+ market_filter={"markets": ["/tmp/marketdata/1.170212754"]}
+)
+framework.add_strategy(strategy)
+
+framework.run()
+
Note the use of market filter to pass the file directories.
+Sometimes a subset of the market lifetime is required, this can be optimised by limiting the number of updates to process resulting in faster simulation:
+strategy = ExampleStrategy(
+ market_filter={
+ "markets": ["/tmp/marketdata/1.170212754"],
+ "listener_kwargs": {"inplay": False, "seconds_to_start": 600},
+ }
+)
+
The extra kwargs above will limit processing to preplay in the final 10 minutes.
+Tip
+Multiple strategies and markets can be passed, flumine will pass the MarketBooks to the correct strategy via its subscription.
+It is also possible to process events with multiple markets such as win/place in racing or all football markets as per live by adding the following flag:
+strategy = ExampleStrategy(
+ market_filter={"markets": [..], "event_processing": True}
+)
+
The Market
object contains a helper method for accessing other event linked markets:
place_market = market.event["PLACE"]
+
When simulating you can filter markets to be processed by using the market_type
and country_code
filter as per live:
strategy = ExampleStrategy(
+ market_filter={"markets": [..], "market_types": ["MATCH_ODDS"], "country_codes": ["GB"]}
+)
+
Simulation uses the SimulatedExecution
execution class and tries to accurately simulate matching with the following:
Limitations #192:
+Betfair trading framework with a focus on:
Support for market, order and custom streaming data.
join slack group
Tested on Python 3.7, 3.8, 3.9, 3.10 and 3.11.
"},{"location":"#installation","title":"installation","text":"$ pip install flumine\n
flumine requires Python 3.7+
"},{"location":"#setup","title":"setup","text":"Get started...
import betfairlightweight\nfrom flumine import Flumine, clients\n\ntrading = betfairlightweight.APIClient(\"username\")\nclient = clients.BetfairClient(trading)\n\nframework = Flumine(\n client=client,\n)\n
Example strategy:
from flumine import BaseStrategy\nfrom flumine.order.trade import Trade\nfrom flumine.order.order import LimitOrder, OrderStatus\nfrom flumine.markets.market import Market\nfrom betfairlightweight.filters import streaming_market_filter\nfrom betfairlightweight.resources import MarketBook\n\n\nclass ExampleStrategy(BaseStrategy):\n def start(self) -> None:\n print(\"starting strategy 'ExampleStrategy'\")\n\n def check_market_book(self, market: Market, market_book: MarketBook) -> bool:\n # process_market_book only executed if this returns True\n if market_book.status != \"CLOSED\":\n return True\n\n def process_market_book(self, market: Market, market_book: MarketBook) -> None:\n # process marketBook object\n for runner in market_book.runners:\n if runner.status == \"ACTIVE\" and runner.last_price_traded < 1.5:\n trade = Trade(\n market_id=market_book.market_id,\n selection_id=runner.selection_id,\n handicap=runner.handicap,\n strategy=self,\n )\n order = trade.create_order(\n side=\"LAY\", order_type=LimitOrder(price=1.01, size=2.00)\n )\n market.place_order(order)\n\n def process_orders(self, market: Market, orders: list) -> None:\n for order in orders:\n if order.status == OrderStatus.EXECUTABLE:\n if order.size_remaining == 2.00:\n market.cancel_order(order, 0.02) # reduce size to 1.98\n if order.order_type.persistence_type == \"LAPSE\":\n market.update_order(order, \"PERSIST\")\n if order.size_remaining > 0:\n market.replace_order(order, 1.02) # move\n\n\nstrategy = ExampleStrategy(\n market_filter=streaming_market_filter(\n event_type_ids=[\"7\"],\n country_codes=[\"GB\"],\n market_types=[\"WIN\"],\n )\n)\n\nframework.add_strategy(strategy)\n
Run framework:
framework.run()\n
Danger
By default flumine will try to prevent coding errors which result in flash crashes and burnt fingers but use at your own risk as per the MIT license.
Recommendation is not to remove the trading controls and carry out extensive testing before executing on live markets, even then only use new strategies on an account with a small balance (transfer balance to games wallet).
"},{"location":"#features","title":"Features","text":"flumine relies on these libraries:
betfairlightweight
- Betfair API supporttenacity
- Used for connection retrying (streaming)python-json-logger
- JSON loggingrequests
- HTTP supportFunctions:
add_client
Adds a client to the frameworkadd_strategy
Adds a strategy to the frameworkadd_worker
Adds a worker to the frameworkadd_client_control
Adds a client control to the frameworkadd_trading_control
Adds a trading control to the frameworkadd_market_middleware
Adds market middleware to the frameworkadd_logging_control
Adds a logging control to the frameworkThe Flumine class can be adapted by overriding the following functions:
_process_market_books()
called on MarketBook event_process_sports_data()
called on SportsData event_process_market_orders()
called when market has pending orders_process_order_package()
called on new OrderPackage_add_market()
called when new Market received through streams_remove_market()
called when Market removed from framework_process_raw_data()
called on RawData event_process_market_catalogues
called on MarketCatalogue event_process_current_orders
called on currentOrders event_process_custom_event
called on CustomEvent event see here_process_close_market
called on Market closure_process_cleared_orders()
called on ClearedOrders event_process_cleared_markets()
called on ClearedMarkets event_process_end_flumine()
called on Flumine terminationFlumine handles market streams by taking the parameters provided in the strategies, a strategy will then subscribe to the stream. This means strategies can share streams reducing load or create new if they require different markets or data filter.
"},{"location":"advanced/#data-stream","title":"Data Stream","text":"Similar to Market Streams but the raw streaming data is passed back, this reduces ram/CPU and allows recording of the data for future playback, see the example marketrecorder.py
"},{"location":"advanced/#historical-stream","title":"Historical Stream","text":"This is created on a per market basis when simulating.
"},{"location":"advanced/#order-stream","title":"Order Stream","text":"Subscribes to all orders per running instance using the config.customer_strategy_ref
Custom streams (aka threads) can be added as per:
from flumine.streams.basestream import BaseStream\nfrom flumine.events.events import CustomEvent\n\n\nclass CustomStream(BaseStream):\n def run(self) -> None:\n # connect to stream / make API requests etc.\n response = api_call()\n\n # callback func\n def callback(framework, event):\n for strategy in framework.strategies:\n strategy.process_my_event(event)\n\n # push results through using custom event\n event = CustomEvent(response, callback)\n\n # put in main queue\n self.flumine.handler_queue.put(event)\n\n\ncustom_stream = CustomStream(framework, custom=True)\nframework.streams.add_custom_stream(custom_stream)\n
"},{"location":"advanced/#error-handling","title":"Error Handling","text":"Flumine will catch all errors that occur in strategy.check_market
and strategy.process_market_book
, and log either error or critical errors.
Tip
You can remove this error handling by setting config.raise_errors = True
jsonlogger is used to log extra detail, see below for a typical setup:
import time\nimport logging\nfrom pythonjsonlogger import jsonlogger\n\nlogger = logging.getLogger()\n\ncustom_format = \"%(asctime) %(levelname) %(message)\"\nlog_handler = logging.StreamHandler()\nformatter = jsonlogger.JsonFormatter(custom_format)\nformatter.converter = time.gmtime\nlog_handler.setFormatter(formatter)\nlogger.addHandler(log_handler)\nlogger.setLevel(logging.INFO)\n
"},{"location":"advanced/#config","title":"Config","text":""},{"location":"advanced/#simulated","title":"simulated","text":"Updated to True when simulating or paper trading
"},{"location":"advanced/#simulated_strategy_isolation","title":"simulated_strategy_isolation","text":"Defaults to True to match orders per strategy, when False prevents double counting of passive liquidity on all orders regardless of strategy.
"},{"location":"advanced/#simulation_available_prices","title":"simulation_available_prices","text":"When True will simulate matches against available prices after initial execution, note this will double count liquidity.
"},{"location":"advanced/#instance_id","title":"instance_id","text":"Store server id or similar (e.g. AWS ec2 instanceId)
"},{"location":"advanced/#customer_strategy_ref","title":"customer_strategy_ref","text":"Used as customerStrategyRefs so that only orders created by the running instance are returned.
"},{"location":"advanced/#process_id","title":"process_id","text":"OS process id of running application.
"},{"location":"advanced/#current_time","title":"current_time","text":"Used for simulation
"},{"location":"advanced/#raise_errors","title":"raise_errors","text":"Raises errors on strategy functions, see Error Handling
"},{"location":"advanced/#max_execution_workers","title":"max_execution_workers","text":"Max number of workers in execution thread pool
"},{"location":"advanced/#async_place_orders","title":"async_place_orders","text":"Place orders sent with place orders flag, prevents waiting for bet delay
"},{"location":"advanced/#place_latency","title":"place_latency","text":"Place latency used for simulation / simulation execution
"},{"location":"advanced/#cancel_latency","title":"cancel_latency","text":"Cancel latency used for simulation / simulation execution
"},{"location":"advanced/#update_latency","title":"update_latency","text":"Update latency used for simulation / simulation execution
"},{"location":"advanced/#replace_latency","title":"replace_latency","text":"Replace latency used for simulation / simulation execution
"},{"location":"advanced/#order_sep","title":"order_sep","text":"customer_order_ref separator
"},{"location":"architecture/","title":"Design","text":""},{"location":"architecture/#main-loop","title":"Main loop","text":"Typical to most trading frameworks flumine uses an event driven design with the main thread handling these events through a FIFO queue.
MARKET_CATALOGUE
Betfair MarketCatalogue objectMARKET_BOOK
Betfair MarketBook objectRAW_DATA
Raw streaming dataCURRENT_ORDERS
Betfair CurrentOrders objectCLEARED_MARKETS
Betfair ClearedMarkets objectCLEARED_ORDERS
Betfair ClearedOrders objectCLOSE_MARKET
flumine Close Market updateSTRATEGY_RESET
flumine Strategy Reset updateCUSTOM_EVENT
flumine Custom event updateTERMINATOR
flumine End instance updateThe above events are handled in the flumine class
"},{"location":"architecture/#marketbook-cycle","title":"MarketBook Cycle","text":""},{"location":"architecture/#simulation","title":"Simulation","text":"Simulation is achieved by monkeypatching the datetime function utcnow(), this allows strategies to be simulated as if they were being executed in real time. Functions such as market.seconds_to_start and fillKill.seconds work as per a live execution.
"},{"location":"architecture/#streams","title":"Streams","text":"Flumine is capable of using multiple clients, these can be of the same ExchangeType
, a variation depending on use case or your own custom client/wrapper. The default workers handle login/keep-alive/logout and market closure for all clients added to the framework automatically.
BetfairClient
SimulatedClient
BetconnectClient
To add a client use the add_client
this will allow use via framework.clients
or strategy.clients
from flumine import Flumine, clients\n\nframework = Flumine()\n\nclient = clients.BetfairClient(trading)\nframework.add_client(client)\n
or when simulating:
from flumine import FlumineSimulation, clients\n\nframework = FlumineSimulation()\n\nclient = clients.SimulatedClient(username=\"123\")\nframework.add_client(client)\n
To access clients within a strategy use the helper functions:
betfair_client = self.clients.get_betfair_default()\n\nclient = self.clients.get_client(ExchangeType.SIMULATED, username=\"123\")\n
Tip
get_default
and get_betfair_default
will use the first client added via add_client
(ordered list)
By default a transaction will use clients.get_default()
however you can use a particular client:
client = self.clients.get_client(ExchangeType.SIMULATED, username=\"123\")\n\nmarket.place_order(order, client=client)\n
or using a transaction directly:
client = self.clients.get_client(ExchangeType.SIMULATED, username=\"123\")\n\nwith market.transaction(client=client) as t:\n t.place_order(order)\n
"},{"location":"clients/#future-development","title":"Future Development","text":"Before placing an order flumine will check the client and trading controls, this allows validation to occur before execution. If an order does not meet any of these validations it is not executed and status is updated to Violation
.
MaxTransactionCount
: Checks transaction count is not over betfair transaction limit (5000 per hour) OrderValidation
: Checks order is valid (size/odds)StrategyExposure
: Checks order does not invalidate strategy.validate_order
, strategy.max_order_exposure
or strategy.max_selection_exposure
Sometimes it is desirable to skip the controls, for example when canceling an open order even if the transaction count has already reached the betfair transaction limit. This can be done by passing force=True
when placing or changing an order:
market.place_order(order, force=True)\ntransaction.place_order(order, force=True)\n
This works for markets and transactions and is supported by the operations place_order
, cancel_order
, update_order
, and replace_order
.
Custom logging is available using the LoggingControl
class, the base class creates debug logs and can be used as follows:
from flumine.controls.loggingcontrols import LoggingControl\n\ncontrol = LoggingControl()\n\nframework.add_logging_control(control)\n
Tip
More than one control can be added, for example a csv logger and db logger.
"},{"location":"help/","title":"Help","text":"Please try the following channels for any support:
Within markets you have market objects which contains current up to date market data.
"},{"location":"markets/#class-variables","title":"Class variables","text":"flumine
Frameworkmarket_id
MarketBook idclosed
Closed booldate_time_closed
Closed datetimemarket_book
Latest MarketBook objectmarket_catalogue
Latest MarketCatalogue objectcontext
Market context, store market specific context e.g. simulated data storeblotter
Holds all order data and positionplace_order(order)
Place new order from order objectcancel_order(order, size_reduction)
Cancel orderupdate_order(order, new_persistance_type)
Update orderreplace_order(order, new_price)
Replace orderevent
Dictionary containing all event related markets (assumes markets have been subscribed)event_type_id
Betfair event type id (horse racing: 7)event_id
Market event id (12345)market_type
Market type ('WIN')seconds_to_start
Seconds to scheduled market start time (123.45)elapsed_seconds_closed
Seconds since market was closed (543.21)market_start_datetime
Market scheduled start timeThe transaction class is used by default when orders are executed, however it is possible to control the execution behaviour using the transaction class like so:
with market.transaction() as t:\n market.place_order(order) # executed immediately in separate transaction\n t.place_order(order) # executed on transaction __exit__\n\nwith market.transaction() as t:\n t.place_order(order)\n ..\n t.execute() # above order executed\n ..\n t.cancel_order(order)\n t.place_order(order) # both executed on transaction __exit__\n
"},{"location":"markets/#blotter","title":"Blotter","text":"The blotter is a simple and fast class to hold all orders for a particular market.
"},{"location":"markets/#functions_1","title":"Functions","text":"strategy_orders(strategy)
Returns all orders related to a strategystrategy_selection_orders(strategy, selection_id, handicap)
Returns all orders related to a strategy selectionselection_exposure(strategy, lookup)
Returns strategy/selection exposuremarket_exposure(strategy, market_book)
Returns strategy/market exposurelive_orders
List of live ordershas_live_orders
Bool on live ordersIt is common that you want to carry about analysis on a market before passing through to strategies, similar to Django's middleware design flumine allows middleware to be executed.
For example simulation uses simulated middleware in order to calculate order matching.
Note
Middleware will be executed in the order it is added and before the strategies are processed.
Please see below for the example middleware class if you wish to create your own:
from flumine.markets.middleware import Middleware\n\nclass CustomMiddleware(Middleware):\n def __call__(self, market) -> None:\n pass # called on each MarketBook update\n\n def add_market(self, market) -> None:\n print(\"market {0} added\".format(market.market_id))\n\n def remove_market(self, market) -> None:\n print(\"market {0} removed\".format(market.market_id))\n
The above middleware can then be added to the framework:
framework.add_logging_control(CustomMiddleware())\n
"},{"location":"performance/","title":"Performance","text":"Flumine is heavily optimised out of the box to be as quick as possible however there are various ways to improve the performance further with minimal effort.
"},{"location":"performance/#simulation","title":"Simulation","text":""},{"location":"performance/#listener-kwargs","title":"Listener Kwargs","text":"This is one of the most powerful options available as the variables are passed down to betfairlightweight limiting the number of updates to process, examples:
"},{"location":"performance/#600s-before-scheduled-start-and-no-inplay","title":"600s before scheduled start and no inplay","text":"strategy = ExampleStrategy(\n market_filter={\n \"markets\": [\"/tmp/marketdata/1.170212754\"],\n \"listener_kwargs\": {\"seconds_to_start\": 600, \"inplay\": False},\n }\n)\n
"},{"location":"performance/#inplay-only","title":"inplay only","text":"strategy = ExampleStrategy(\n market_filter={\n \"markets\": [\"/tmp/marketdata/1.170212754\"],\n \"listener_kwargs\": {\"inplay\": True},\n }\n)\n
"},{"location":"performance/#logging","title":"Logging","text":"Logging in python can add a lot of function calls, it is therefore recommended to switch it off once you are comfortable with the outputs from a strategy:
logger.setLevel(logging.CRITICAL)\n
"},{"location":"performance/#file-location","title":"File location","text":"This might sound obvious but having the market files stored locally on your machine will allow much quicker processing. A common pattern is to use s3 to store all market files but a local cache for common markets processed.
smart_open
is a commonly used package for processing gz/s3 files:
with patch(\"builtins.open\", smart_open.open):\n framework.add_strategy(strategy)\n framework.run()\n
Tip
Note that add_strategy
needs to be in the patch as well.
Sometimes a download from the betfair site will include market and event files in the same directory resulting in duplicate processing, flumine will log a warning on this but it is worth checking if you are seeing slow processing times.
"},{"location":"performance/#multiprocessing","title":"Multiprocessing","text":"Simulation is CPU bound so can therefore be improved through the use of multiprocessing, threading offers no improvement due to the limitations of the GIL.
The multiprocessing example code below will:
run_process
will process 8 markets at a time (prevents memory leaks)import os\nimport math\nimport smart_open\nfrom concurrent import futures\nfrom unittest.mock import patch as mock_patch\nfrom flumine import FlumineSimulation, clients, utils\nfrom strategies.lowestlayer import LowestLayer\n\n\ndef run_process(markets):\n client = clients.SimulatedClient()\n framework = FlumineSimulation(client=client)\n strategy = LowestLayer(\n market_filter={\"markets\": markets},\n context={\"stake\": 2},\n )\n with mock_patch(\"builtins.open\", smart_open.open):\n framework.add_strategy(strategy)\n framework.run()\n\n\nif __name__ == \"__main__\":\n all_markets = [...]\n processes = os.cpu_count()\n markets_per_process = 8 # optimal\n\n _process_jobs = []\n with futures.ProcessPoolExecutor(max_workers=processes) as p:\n chunk = min(\n markets_per_process, math.ceil(len(all_markets) / processes)\n )\n for m in (utils.chunks(all_markets, chunk)):\n _process_jobs.append(\n p.submit(\n run_process,\n markets=m,\n )\n )\n for job in futures.as_completed(_process_jobs):\n job.result() # wait for result\n
Tip
If the code above is failing add logging to the run_process
function to find the error or run the strategy in a single process with logging
The heaviest load on CPU comes from reading the files and processing into py objects before processing through flumine, after this the bottleneck becomes the number of orders that need to be processed. Therefore anything that can be done to limit the number of redundant or control blocked orders will see an improvement.
"},{"location":"performance/#cprofile","title":"cprofile","text":"Profiling code is always the best option for finding improvements, cprofilev
is a commonly used python library for this:
python -m cprofilev examples/simulate.py\n
"},{"location":"performance/#middleware","title":"Middleware","text":"If you don't need the simulation middleware remove it from framework._market_middleware
, this is useful when processing markets for data collection. This can dramatically improve processing time due to the heavy functions contained in the simulation logic.
Installing betfairlightweight[speed] will have a big impact on processing speed due to the inclusion of C and Rust libraries for datetime and json decoding.
"},{"location":"performance/#live","title":"Live","text":"For improving live trading 'Strategy' and 'cprofile' tips above will help although CPU load tends to be considerably lower compared to simulating.
"},{"location":"quickstart/","title":"QuickStart","text":""},{"location":"quickstart/#live","title":"Live","text":"Tip
flumine uses betfairlightweight
for communicating with the Betfair API, please see docs for how to use/setup before proceeding.
First, start by importing flumine/bflw and creating a trading and framework client:
import betfairlightweight\nfrom flumine import Flumine, clients\n\ntrading = betfairlightweight.APIClient(\"username\")\nclient = clients.BetfairClient(trading)\n\nframework = Flumine(client=client)\n
Note
flumine will handle login, logout and keep alive whilst the framework is running using the keep_alive
worker.
A strategy can now be created by using the BaseStrategy class:
from flumine import BaseStrategy\n\n\nclass ExampleStrategy(BaseStrategy):\n def start(self):\n # subscribe to streams\n print(\"starting strategy 'ExampleStrategy'\")\n\n def check_market_book(self, market, market_book):\n # process_market_book only executed if this returns True\n if market_book.status != \"CLOSED\":\n return True\n\n def process_market_book(self, market, market_book):\n # process marketBook object\n print(market_book.status)\n
This strategy can now be initiated with the market and data filter before being added to the framework:
from betfairlightweight.filters import (\n streaming_market_filter, \n streaming_market_data_filter,\n)\n\nstrategy = ExampleStrategy(\n market_filter=streaming_market_filter(\n event_type_ids=[\"7\"],\n country_codes=[\"GB\"],\n market_types=[\"WIN\"],\n ),\n market_data_filter=streaming_market_data_filter(fields=[\"EX_ALL_OFFERS\"])\n)\n\nframework.add_strategy(strategy)\n
The framework can now be started:
framework.run()\n
"},{"location":"quickstart/#order-placement","title":"Order placement","text":"Orders can be placed as followed:
from flumine.order.trade import Trade\nfrom flumine.order.order import LimitOrder\n\n\nclass ExampleStrategy(BaseStrategy):\n def process_market_book(self, market, market_book):\n for runner in market_book.runners:\n if runner.selection_id == 123:\n trade = Trade(\n market_id=market_book.market_id, \n selection_id=runner.selection_id,\n handicap=runner.handicap,\n strategy=self\n )\n order = trade.create_order(\n side=\"LAY\", \n order_type=LimitOrder(price=1.01, size=2.00)\n )\n market.place_order(order)\n
This order will be validated through controls, stored in the blotter and sent straight to the execution thread pool for execution. It is also possible to batch orders into transactions as follows:
with market.transaction() as t:\n market.place_order(order) # executed immediately in separate transaction\n t.place_order(order) # executed on transaction __exit__\n\nwith market.transaction() as t:\n t.place_order(order)\n\n t.execute() # above order executed\n\n t.cancel_order(order)\n t.place_order(order) # both executed on transaction __exit__\n
"},{"location":"quickstart/#stream-class","title":"Stream class","text":"By default the stream class will be a MarketStream which provides a MarketBook python object, if collecting data this can be changed to a DataStream class however process_raw_data will be called and not process_market_book:
from flumine import BaseStrategy\nfrom flumine.streams.datastream import DataStream\n\n\nclass ExampleDataStrategy(BaseStrategy):\n def process_raw_data(self, publish_time, data):\n print(publish_time, data)\n\nstrategy = ExampleDataStrategy(\n market_filter=streaming_market_filter(\n event_type_ids=[\"7\"],\n country_codes=[\"GB\"],\n market_types=[\"WIN\"],\n ),\n stream_class=DataStream\n)\n\nflumine.add_strategy(strategy)\n
The OrderDataStream class can be used to record order data as per market:
from flumine.streams.datastream import OrderDataStream\n\nstrategy = ExampleDataStrategy(\n market_filter=None,\n stream_class=OrderDataStream\n)\n\nflumine.add_strategy(strategy)\n
"},{"location":"quickstart/#paper-trading","title":"Paper Trading","text":"Flumine can be used to paper trade strategies live using the following code:
from flumine import clients\n\nclient = clients.BetfairClient(trading, paper_trade=True)\n
Market data will be recieved as per live but any orders will use Simulated execution and Simulated order polling to replicate live trading.
Tip
This can be handy when testing strategies as the betfair website can be used to validate the market.
"},{"location":"quickstart/#simulation","title":"Simulation","text":"Flumine can be used to simulate strategies using the following code:
from flumine import FlumineSimulation, clients\n\nclient = clients.SimulatedClient()\nframework = FlumineSimulation(client=client)\n\nstrategy = ExampleStrategy(\n market_filter={\"markets\": [\"/tmp/marketdata/1.170212754\"]}\n)\nframework.add_strategy(strategy)\n\nframework.run()\n
Note the use of market filter to pass the file directories.
"},{"location":"quickstart/#listener-kwargs","title":"Listener kwargs","text":"Sometimes a subset of the market lifetime is required, this can be optimised by limiting the number of updates to process resulting in faster simulation:
strategy = ExampleStrategy(\n market_filter={\n \"markets\": [\"/tmp/marketdata/1.170212754\"],\n \"listener_kwargs\": {\"inplay\": False, \"seconds_to_start\": 600},\n }\n)\n
The extra kwargs above will limit processing to preplay in the final 10 minutes.
Tip
Multiple strategies and markets can be passed, flumine will pass the MarketBooks to the correct strategy via its subscription.
"},{"location":"quickstart/#event-processing","title":"Event Processing","text":"It is also possible to process events with multiple markets such as win/place in racing or all football markets as per live by adding the following flag:
strategy = ExampleStrategy(\n market_filter={\"markets\": [..], \"event_processing\": True}\n)\n
The Market
object contains a helper method for accessing other event linked markets:
place_market = market.event[\"PLACE\"]\n
"},{"location":"quickstart/#market-filter","title":"Market Filter","text":"When simulating you can filter markets to be processed by using the market_type
and country_code
filter as per live:
strategy = ExampleStrategy(\n market_filter={\"markets\": [..], \"market_types\": [\"MATCH_ODDS\"], \"country_codes\": [\"GB\"]}\n)\n
"},{"location":"quickstart/#simulation_1","title":"Simulation","text":"Simulation uses the SimulatedExecution
execution class and tries to accurately simulate matching with the following:
Limitations #192:
Flumine is able to connect to the sports-data-stream provided by Betfair for live data on cricket and races.
Tip
Your appKey must be authorised to access the sports-data stream, contact bdp@betfair.com
"},{"location":"sportsdata/#cricket-subscription","title":"Cricket Subscription","text":"A cricket subscription can be added via the sports_data_filter
on a strategy
strategy = ExampleStrategy(\n market_filter=streaming_market_filter(\n event_type_ids=[\"4\"], market_types=[\"MATCH_ODDS\"]\n ),\n sports_data_filter=[\"cricketSubscription\"],\n)\n
"},{"location":"sportsdata/#race-subscription","title":"Race Subscription","text":"A race subscription can be added via the sports_data_filter
on a strategy
strategy = ExampleStrategy(\n market_filter=streaming_market_filter(\n event_type_ids=[\"7\"], market_types=[\"WIN\"]\n ),\n sports_data_filter=[\"raceSubscription\"],\n)\n
"},{"location":"sportsdata/#strategy","title":"Strategy","text":"Any sports data stream updates will be available in the strategy via the process_sports_data
function
class ExampleStrategy(BaseStrategy):\n def process_sports_data(\n self, market: Market, sports_data: Union[Race, CricketMatch]\n ) -> None:\n # called on each update from sports-data-stream\n print(market, sports_data)\n
"},{"location":"sportsdata/#data-recorder","title":"Data Recorder","text":"The example marketrecorder.py
can be modified to record race and cricket data by updating the process_raw_data
with the matching op
and data keys.
mcm
and mc
ocm
and oc
ccm
and cc
rcm
and rc
And using the correct stream class:
"},{"location":"sportsdata/#cricket-recorder","title":"Cricket Recorder","text":"from flumine.streams.datastream import CricketDataStream\n\nstrategy= MarketRecorder(\n market_filter=None,\n stream_class=CricketDataStream,\n context={\n \"local_dir\": \"/tmp\",\n \"force_update\": False,\n \"remove_file\": True,\n \"remove_gz_file\": False,\n },\n)\n
"},{"location":"sportsdata/#race-recorder","title":"Race Recorder","text":"from flumine.streams.datastream import RaceDataStream\n\nstrategy= MarketRecorder(\n market_filter=None,\n stream_class=RaceDataStream,\n context={\n \"local_dir\": \"/tmp\",\n \"force_update\": False,\n \"remove_file\": True,\n \"remove_gz_file\": False,\n },\n)\n
"},{"location":"strategies/","title":"Strategies","text":""},{"location":"strategies/#basestrategy","title":"BaseStrategy","text":"The base strategy class should be used for all strategies and contains the following parameters / functions for order/trade execution.
"},{"location":"strategies/#parameters","title":"Parameters","text":"market_filter
Streaming market filter or list of filters requiredmarket_data_filter
Streaming market data filter requiredstreaming_timeout
Streaming timeout, will call snap() on cache every x secondsconflate_ms
Streaming conflatestream_class
MarketStream or RawDataStreamname
Strategy name, if None will default to class namecontext
Dictionary object where any extra data can be stored here such as triggersmax_selection_exposure
Max exposure per selection (including new order), note this does not handle reduction in exposure due to laying another runnermax_order_exposure
Max exposure per orderclients
flumine.clientsmax_trade_count
Max total number of trades per runnermax_live_trade_count
Max live (with executable orders) trades per runnermulti_order_trades
Allow multiple live orders per tradeThe following functions can be overridden dependent on the strategy:
add()
Function called when strategy is added to frameworkstart()
Function called when framework startsprocess_new_market()
Process Market when it gets added to the framework for the first timecheck_market_book()
Function called with marketBook, process_market_book
is only executed if this returns Trueprocess_market_book()
Processes market book updates, called on every update that is receivedprocess_raw_data()
As per process_market_book
but handles raw dataprocess_orders()
Process list of Order objects for strategy and Marketprocess_closed_market()
Process Market after closurefinish()
Function called when framework endsEach strategy stores a RunnerContext
object which contains the state of a runner based on all and current active trades. This is used by controls to calculate exposure and control the number of live or total trades.
runner_context = self.get_runner_context(\n market.market_id, runner.selection_id, runner.handicap\n)\n\nrunner_context.live_trade_count\n
"},{"location":"trades/","title":"Trades / Orders","text":""},{"location":"trades/#trade","title":"Trade","text":"A trade object is used to handle order execution.
from flumine.order.trade import Trade\nfrom flumine.order.ordertype import LimitOrder\n\ntrade = Trade(\n market_id=\"1.2345678\",\n selection_id=123456,\n handicap=1.0,\n strategy=strategy\n)\ntrade.orders # []\ntrade.status # TradeStatus.LIVE\n\norder = trade.create_order(\n side=\"LAY\",\n order_type=LimitOrder(price=1.01, size=2.00)\n)\ntrade.orders # [<BetfairOrder>]\n
"},{"location":"trades/#parameters","title":"Parameters","text":"market_id
Market Idselection_id
Selection Idhandicap
Runner handicapstrategy
Strategy objectnotes
Trade notes, used to store market / trigger info for later analysisplace_reset_seconds
Seconds to wait since runner_context.reset
before allowing another orderreset_seconds
Seconds to wait since runner_context.place
before allowing another orderYou can create your own trade classes and then handle the logic within the strategy.process_orders
function.
Order objects store all order data locally allowing trade logic to be applied.
from flumine.order.order import BetfairOrder, LimitOrder\n\norder = BetfairOrder(\n trade=trade,\n side=\"LAY\",\n order_type=LimitOrder(price=1.01, size=2.00)\n)\n\norder.status # OrderStatus.PENDING\norder.executable()\norder.execution_complete()\n
"},{"location":"workers/","title":"Workers","text":""},{"location":"workers/#background-workers","title":"Background Workers","text":"Background workers run in their own thread allowing cleanup / cron like workers to run in the background, by default flumine adds the following workers:
keep_alive
: runs every 1200s (or session_timeout/2) to make sure clients are logged and kept alivepoll_account_balance
: runs every 120s to poll account balance endpointpoll_market_catalogue
: runs every 60s to poll listMarketCatalogue endpointpoll_market_closure
: checks for closed markets to get cleared orders at order and market levelflumine
: Frameworkfunction
: Function to be calledinterval
: Interval in seconds, set to None for single callfunc_args
: Function argsfunc_kwargs
: Function kwargsstart_delay
: Start delay in secondscontext
: Worker contextname
: Worker nameFurther workers can be added as per:
from flumine.worker import BackgroundWorker\n\ndef func(context: dict, flumine, name=\"\"):\n print(name)\n\n\nworker = BackgroundWorker(\n framework, interval=10, function=func, func_args=(\"hello\",)\n)\n\nframework.add_worker(\n worker\n)\n
"}]}
\ No newline at end of file
diff --git a/sitemap.xml b/sitemap.xml
new file mode 100644
index 00000000..0f8724ef
--- /dev/null
+++ b/sitemap.xml
@@ -0,0 +1,3 @@
+
+Flumine is able to connect to the sports-data-stream provided by Betfair for live data on cricket and races.
+Tip
+Your appKey must be authorised to access the sports-data stream, contact bdp@betfair.com
+A cricket subscription can be added via the sports_data_filter
on a strategy
strategy = ExampleStrategy(
+ market_filter=streaming_market_filter(
+ event_type_ids=["4"], market_types=["MATCH_ODDS"]
+ ),
+ sports_data_filter=["cricketSubscription"],
+)
+
A race subscription can be added via the sports_data_filter
on a strategy
strategy = ExampleStrategy(
+ market_filter=streaming_market_filter(
+ event_type_ids=["7"], market_types=["WIN"]
+ ),
+ sports_data_filter=["raceSubscription"],
+)
+
Any sports data stream updates will be available in the strategy via the process_sports_data
function
class ExampleStrategy(BaseStrategy):
+ def process_sports_data(
+ self, market: Market, sports_data: Union[Race, CricketMatch]
+ ) -> None:
+ # called on each update from sports-data-stream
+ print(market, sports_data)
+
The example marketrecorder.py
can be modified to record race and cricket data by updating the process_raw_data
with the matching op
and data keys.
mcm
and mc
ocm
and oc
ccm
and cc
rcm
and rc
And using the correct stream class:
+from flumine.streams.datastream import CricketDataStream
+
+strategy= MarketRecorder(
+ market_filter=None,
+ stream_class=CricketDataStream,
+ context={
+ "local_dir": "/tmp",
+ "force_update": False,
+ "remove_file": True,
+ "remove_gz_file": False,
+ },
+)
+
from flumine.streams.datastream import RaceDataStream
+
+strategy= MarketRecorder(
+ market_filter=None,
+ stream_class=RaceDataStream,
+ context={
+ "local_dir": "/tmp",
+ "force_update": False,
+ "remove_file": True,
+ "remove_gz_file": False,
+ },
+)
+
The base strategy class should be used for all strategies and contains the following parameters / functions for order/trade execution.
+market_filter
Streaming market filter or list of filters requiredmarket_data_filter
Streaming market data filter requiredstreaming_timeout
Streaming timeout, will call snap() on cache every x secondsconflate_ms
Streaming conflatestream_class
MarketStream or RawDataStreamname
Strategy name, if None will default to class namecontext
Dictionary object where any extra data can be stored here such as triggersmax_selection_exposure
Max exposure per selection (including new order), note this does not handle reduction in exposure due to laying another runnermax_order_exposure
Max exposure per orderclients
flumine.clientsmax_trade_count
Max total number of trades per runnermax_live_trade_count
Max live (with executable orders) trades per runnermulti_order_trades
Allow multiple live orders per tradeThe following functions can be overridden dependent on the strategy:
+add()
Function called when strategy is added to frameworkstart()
Function called when framework startsprocess_new_market()
Process Market when it gets added to the framework for the first timecheck_market_book()
Function called with marketBook, process_market_book
is only executed if this returns Trueprocess_market_book()
Processes market book updates, called on every update that is receivedprocess_raw_data()
As per process_market_book
but handles raw dataprocess_orders()
Process list of Order objects for strategy and Marketprocess_closed_market()
Process Market after closurefinish()
Function called when framework endsEach strategy stores a RunnerContext
object which contains the state of a runner based on all and current active trades. This is used by controls to calculate exposure and control the number of live or total trades.
runner_context = self.get_runner_context(
+ market.market_id, runner.selection_id, runner.handicap
+)
+
+runner_context.live_trade_count
+
A trade object is used to handle order execution.
+from flumine.order.trade import Trade
+from flumine.order.ordertype import LimitOrder
+
+trade = Trade(
+ market_id="1.2345678",
+ selection_id=123456,
+ handicap=1.0,
+ strategy=strategy
+)
+trade.orders # []
+trade.status # TradeStatus.LIVE
+
+order = trade.create_order(
+ side="LAY",
+ order_type=LimitOrder(price=1.01, size=2.00)
+)
+trade.orders # [<BetfairOrder>]
+
market_id
Market Idselection_id
Selection Idhandicap
Runner handicapstrategy
Strategy objectnotes
Trade notes, used to store market / trigger info for later analysisplace_reset_seconds
Seconds to wait since runner_context.reset
before allowing another orderreset_seconds
Seconds to wait since runner_context.place
before allowing another orderYou can create your own trade classes and then handle the logic within the strategy.process_orders
function.
Order objects store all order data locally allowing trade logic to be applied.
+from flumine.order.order import BetfairOrder, LimitOrder
+
+order = BetfairOrder(
+ trade=trade,
+ side="LAY",
+ order_type=LimitOrder(price=1.01, size=2.00)
+)
+
+order.status # OrderStatus.PENDING
+order.executable()
+order.execution_complete()
+
Background workers run in their own thread allowing cleanup / cron like workers to run in the background, by default flumine adds the following workers:
+keep_alive
: runs every 1200s (or session_timeout/2) to make sure clients are logged and kept alivepoll_account_balance
: runs every 120s to poll account balance endpointpoll_market_catalogue
: runs every 60s to poll listMarketCatalogue endpointpoll_market_closure
: checks for closed markets to get cleared orders at order and market levelflumine
: Frameworkfunction
: Function to be calledinterval
: Interval in seconds, set to None for single callfunc_args
: Function argsfunc_kwargs
: Function kwargsstart_delay
: Start delay in secondscontext
: Worker contextname
: Worker nameFurther workers can be added as per:
+from flumine.worker import BackgroundWorker
+
+def func(context: dict, flumine, name=""):
+ print(name)
+
+
+worker = BackgroundWorker(
+ framework, interval=10, function=func, func_args=("hello",)
+)
+
+framework.add_worker(
+ worker
+)
+