From 361b09dde7de3c7314d7cac7447e029a8e54c97b Mon Sep 17 00:00:00 2001 From: Maxim Musayev Date: Mon, 16 Nov 2020 13:28:23 +0200 Subject: [PATCH] Fix commited offset is always behind the real offset by 1 --- faust/transport/consumer.py | 6 +++--- tests/unit/transport/test_consumer.py | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index f8294e78b..6cdb8b884 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -739,7 +739,7 @@ def ack(self, message: Message) -> bool: if self.app.topics.acks_enabled_for(message.topic): committed = self._committed_offset[tp] try: - if committed is None or offset > committed: + if committed is None or offset >= committed: acked_index = self._acked_index[tp] if offset not in acked_index: self._unacked_messages.discard(message) @@ -1028,7 +1028,7 @@ def _new_offset(self, tp: TP) -> Optional[int]: acked[: len(batch) - 1] = [] self._acked_index[tp].difference_update(batch) # return the highest commit offset - return batch[-1] + return batch[-1] + 1 return None async def on_task_error(self, exc: BaseException) -> None: @@ -1081,7 +1081,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover offset = message.offset r_offset = get_read_offset(tp) - if r_offset is None or offset > r_offset: + if r_offset is None or offset >= r_offset: gap = offset - (r_offset or 0) # We have a gap in income messages if gap > 1 and r_offset: diff --git a/tests/unit/transport/test_consumer.py b/tests/unit/transport/test_consumer.py index 4517b58ae..3b79425ab 100644 --- a/tests/unit/transport/test_consumer.py +++ b/tests/unit/transport/test_consumer.py @@ -858,7 +858,8 @@ def test_filter_committable_offsets(self, *, consumer): TP2: 30, } assert consumer._filter_committable_offsets({TP1, TP2}) == { - TP2: 36, + TP1: 5, + TP2: 37, } @pytest.mark.asyncio @@ -1015,10 +1016,10 @@ def test_should_commit(self, tp, offset, committed, should, *, consumer): "tp,acked,expected_offset", [ (TP1, [], None), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 10), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], 8), - (TP1, [1, 2, 3, 4, 6, 7, 8, 10], 4), - (TP1, [1, 3, 4, 6, 7, 8, 10], 1), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 11), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], 9), + (TP1, [1, 2, 3, 4, 6, 7, 8, 10], 5), + (TP1, [1, 3, 4, 6, 7, 8, 10], 2), ], ) def test_new_offset(self, tp, acked, expected_offset, *, consumer): @@ -1029,10 +1030,10 @@ def test_new_offset(self, tp, acked, expected_offset, *, consumer): "tp,acked,gaps,expected_offset", [ (TP1, [], [], None), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 10), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 10), - (TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 8), - (TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 10), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 11), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11), + (TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9), + (TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11), ], ) def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):