Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: The DataflowRunner behavior is changed when removing the runner v1 code #28399

Closed
1 of 15 tasks
liferoad opened this issue Sep 11, 2023 · 5 comments
Closed
1 of 15 tasks
Assignees
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python

Comments

@liferoad
Copy link
Collaborator

What happened?

The attached notebook is broken now due to #27196.
Dataflow_Word_Count.ipynb.zip

The issue is p = beam.Pipeline(InteractiveRunner()) is used to create the pipeline but later when switching to DataflowRunner with DataflowRunner().run_pipeline(p, options=options), DataflowRunner takes the Dataflow job as the legacy runner and disables the Runner V2.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator Author

@robertwb FYI

@robertwb
Copy link
Contributor

DataflowRunner().run_pipeline(p, options=options) should work just as well as before. Could you clarify exactly what is going wrong?

@tvalentyn
Copy link
Contributor

tvalentyn commented Sep 15, 2023

Repro:

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import DataflowRunner

def run(argv=None):
  parser = argparse.ArgumentParser()
  options = PipelineOptions()
  p = beam.Pipeline()
  p | beam.Create([1])
  beam.runners.DataflowRunner().run_pipeline(p, options=options).wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

python pipeline.py  --project google.com:clouddfe --temp_location=gs://clouddfe-valentyn --staging_location=gs://clouddfe-valentyn --region us-central1

fails with

ERROR:apache_beam.runners.dataflow.dataflow_runner:2023-09-15T18:10:32.288Z: JOB_MESSAGE_ERROR: Runnable workflow has no steps specified.
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2023-09-15_11_10_28-12637281573769828865 is in state JOB_STATE_FAILED

@liferoad
Copy link
Collaborator Author

I could not figure out what cause this issue. I am wondering whether we need call _check_and_add_missing_options somewhere.

@robertwb
Copy link
Contributor

This boils down to https://s.apache.org/no-beam-pipeline :).

It looks like manually setting these flags after construction may be sufficient. Right now we're hijacking the apply() on every PTransform to set them, which would be good to see if we can avoid. It should also be possible to fix this service-side so the client doesn't need to pass these arguments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python
Projects
None yet
Development

No branches or pull requests

4 participants