Skip to content

Commit

Permalink
Fix stateful processing using direct runner with type checks enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
sadovnychyi committed Aug 5, 2023
1 parent c654dc0 commit b742fa2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from apache_beam.metrics import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.pvalue import AsList
Expand Down Expand Up @@ -1041,6 +1042,14 @@ def test_output_typehints(self):
ShardedKeyType[typehints.Tuple[int, int]], # type: ignore[misc]
typehints.Iterable[str]])

def test_with_type_hints(self):
options = PipelineOptions()
options.view_as(TypeOptions).runtime_type_check = True
with TestPipeline(options=options) as pipeline:
pipeline \
| beam.Create(GroupIntoBatchesTest._create_test_data()) \
| util.GroupIntoBatches(GroupIntoBatchesTest.BATCH_SIZE)

def _test_runner_api_round_trip(self, transform, urn):
context = pipeline_context.PipelineContext()
proto = transform.to_runner_api(context)
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/typehints/typecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class AbstractDoFnWrapper(DoFn):
def __init__(self, dofn):
super().__init__()
self.dofn = dofn
if hasattr(dofn, 'on_window_timer'):
self.on_window_timer = dofn.on_window_timer
if hasattr(dofn, 'on_buffering_timer'):
self.on_buffering_timer = dofn.on_buffering_timer

def _inspect_start_bundle(self):
return self.dofn.get_function_arguments('start_bundle')
Expand Down

0 comments on commit b742fa2

Please sign in to comment.