From c487507d4041fd06bb39c61f7dbc3245f12af3f5 Mon Sep 17 00:00:00 2001 From: Frans King Date: Sun, 18 Sep 2022 22:56:56 +0100 Subject: [PATCH] FLINK-27934 - Improving inefficient deserialization/serialization of state variables within a batch --- .../statefun/request_reply_v3.py | 1 + statefun-sdk-python/statefun/storage.py | 34 ++++++++++++++----- statefun-sdk-python/tests/storage_test.py | 20 +++++++++++ 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/statefun-sdk-python/statefun/request_reply_v3.py b/statefun-sdk-python/statefun/request_reply_v3.py index 3cee39645..1c23d2e26 100644 --- a/statefun-sdk-python/statefun/request_reply_v3.py +++ b/statefun-sdk-python/statefun/request_reply_v3.py @@ -212,6 +212,7 @@ def collect_egress(egresses: typing.List[EgressMessage], invocation_result): def collect_mutations(cells: typing.Dict[str, Cell], invocation_result): for key, cell in cells.items(): if not cell.dirty: + cell.reset() continue mutation = invocation_result.state_mutations.add() mutation.state_name = key diff --git a/statefun-sdk-python/statefun/storage.py b/statefun-sdk-python/statefun/storage.py index 21a49feb4..975146b3b 100644 --- a/statefun-sdk-python/statefun/storage.py +++ b/statefun-sdk-python/statefun/storage.py @@ -39,31 +39,49 @@ class Resolution: class Cell(object): - __slots__ = ("tpe", "typed_value", "dirty") + __slots__ = ("tpe", "dirty", "_value", "_typed_value") def __init__(self, tpe: Type, typed_value: typing.Optional[TypedValue]): # read only self.tpe = tpe # mutable - self.typed_value = typed_value self.dirty = False + # private + self._value = None + self._typed_value = typed_value def get(self): - typed_value = self.typed_value - return from_typed_value(self.tpe, typed_value) + if self._value is None: + typed_value = self._typed_value + self._value = from_typed_value(self.tpe, typed_value) + + return self._value def set(self, val): if val is None: raise ValueError('provided value must not be None. To delete a value, please use del.') - tpe = self.tpe - typed_value = to_typed_value(tpe, val) - self.typed_value = typed_value + + self._value = val + self._typed_value = None self.dirty = True def delete(self): - self.typed_value = None + self._value = None + self._typed_value = None self.dirty = True + def reset(self): + self._value = None + + @property + def typed_value(self): + if self._typed_value is None and self._value is not None: + tpe = self.tpe + typed_value = to_typed_value(tpe, self._value) + self._typed_value = typed_value + + return self._typed_value + # self.cells: typing.Dict[str, Cell] = {name: Cell(name, tpe, vals[name]) for name, tpe in types.items()} diff --git a/statefun-sdk-python/tests/storage_test.py b/statefun-sdk-python/tests/storage_test.py index dd4a9ed9c..d7e531dca 100644 --- a/statefun-sdk-python/tests/storage_test.py +++ b/statefun-sdk-python/tests/storage_test.py @@ -146,6 +146,26 @@ def test_stateless(self): store = store_from() self.assertTrue(len(store._cells) == 0) + def test_reference_equals(self): + store = store_from(ValueSpec("a", StringType), PbPersistedValueLike("a", "123", StringType)) + + a = store.a + b = store.a + + self.assertEqual(id(a), id(b)) + + def test_reset(self): + store = store_from(ValueSpec("a", StringType), PbPersistedValueLike("a", "123", StringType)) + + a = store.a + cell = store._cells["a"] + + # reset is required when store.a is accessed in fnA, perhaps modified but not committed (i.e. store.a = a) + # and we want fnB in the same batch to have the correct original value of a + cell.reset() + + b = store.a + self.assertNotEqual(id(a), id(b)) def store_from(*args): """test helper that creates an already resolved store from specs and pb values."""