Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log warning on unexpected conflation in stream #583

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def subscribe_to_markets(
self.listener.update_unique_id(unique_id)
else:
self.listener.register_stream(unique_id, "marketSubscription")
self.listener.conflate_ms = conflate_ms
self._send(message)
return unique_id

Expand Down Expand Up @@ -177,6 +178,7 @@ def subscribe_to_orders(
self.listener.update_unique_id(unique_id)
else:
self.listener.register_stream(unique_id, "orderSubscription")
self.listener.conflate_ms = conflate_ms
self._send(message)
return unique_id

Expand Down
2 changes: 2 additions & 0 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self, max_latency: Optional[float] = 0.5):
self.stream_unique_id = None
self.connections_available = None # connection throttling

self.conflate_ms = None

def register_stream(self, unique_id: int, operation: str) -> None:
logger.info("[Register: %s]: %s", unique_id, operation)
if self.stream is not None:
Expand Down
5 changes: 5 additions & 0 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def on_update(self, data: dict) -> None:
"[%s: %s]: Latency high: %s", self, self.unique_id, latency
)

if data.get("con") and (
self._listener.conflate_ms is None or self._listener.conflate_ms == 0
):
logger.warning("[%s: %s]: unexpected conflation", self, self.unique_id)

if self._lookup in data:
img = self._process(data[self._lookup], publish_time)

Expand Down
25 changes: 25 additions & 0 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,31 @@ def test_on_update_no_latency(
mock_calc_latency.assert_called_with(data.get("pt"))
mock_process.assert_called_with(data.get("mc"), data.get("pt"))

@mock.patch("betfairlightweight.streaming.stream.logger.warning")
def test_on_update_conflation(self, mock_warning):
self.stream.update_clk = False
self.stream._max_latency = None
self.stream._lookup = ""

data = {"pt": 12345, "con": True}

self.stream._listener.conflate_ms = 10000
self.stream.on_update(data)
mock_warning.assert_not_called()

self.stream._listener.conflate_ms = None
self.stream.on_update(data)
mock_warning.assert_called_once_with(
"[%s: %s]: unexpected conflation", self.stream, 123
)

self.stream._listener.conflate_ms = 0
mock_warning.reset_mock()
self.stream.on_update(data)
mock_warning.assert_called_once_with(
"[%s: %s]: unexpected conflation", self.stream, 123
)

def test_clear_cache(self):
self.stream._caches = {1: "abc"}
self.stream.clear_cache()
Expand Down
Loading