Skip to content

Commit

Permalink
[prism] Dev prism builds for python and Python Direct Runner fallback…
Browse files Browse the repository at this point in the history
…s. (#32876)
  • Loading branch information
lostluck authored Oct 21, 2024
1 parent e845caa commit 3767eda
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 28 deletions.
56 changes: 56 additions & 0 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,38 @@ def visit_transform(self, applied_ptransform):
if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_fnapi_runner = False

class _PrismRunnerSupportVisitor(PipelineVisitor):
"""Visitor determining if a Pipeline can be run on the PrismRunner."""
def accept(self, pipeline):
self.supported_by_prism_runner = True
pipeline.visit(self)
return self.supported_by_prism_runner

def visit_transform(self, applied_ptransform):
transform = applied_ptransform.transform
# Python SDK assumes the direct runner TestStream implementation is
# being used.
if isinstance(transform, TestStream):
self.supported_by_prism_runner = False
if isinstance(transform, beam.ParDo):
dofn = transform.dofn
# It's uncertain if the Prism Runner supports execution of CombineFns
# with deferred side inputs.
if isinstance(dofn, CombineValuesDoFn):
args, kwargs = transform.raw_side_inputs
args_to_check = itertools.chain(args, kwargs.values())
if any(isinstance(arg, ArgumentPlaceholder)
for arg in args_to_check):
self.supported_by_prism_runner = False
if userstate.is_stateful_dofn(dofn):
# https://github.com/apache/beam/issues/32786 -
# Remove once Real time clock is used.
_, timer_specs = userstate.get_dofn_specs(dofn)
for timer in timer_specs:
if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_prism_runner = False

tryingPrism = False
# Check whether all transforms used in the pipeline are supported by the
# FnApiRunner, and the pipeline was not meant to be run as streaming.
if _FnApiRunnerSupportVisitor().accept(pipeline):
Expand All @@ -122,9 +154,33 @@ def visit_transform(self, applied_ptransform):
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)
elif _PrismRunnerSupportVisitor().accept(pipeline):
_LOGGER.info('Running pipeline with PrismRunner.')
from apache_beam.runners.portability import prism_runner
runner = prism_runner.PrismRunner()
tryingPrism = True
else:
runner = BundleBasedDirectRunner()

if tryingPrism:
try:
pr = runner.run_pipeline(pipeline, options)
# This is non-blocking, so if the state is *already* finished, something
# probably failed on job submission.
if pr.state.is_terminal() and pr.state != PipelineState.DONE:
_LOGGER.info(
'Pipeline failed on PrismRunner, falling back toDirectRunner.')
runner = BundleBasedDirectRunner()
else:
return pr
except Exception as e:
# If prism fails in Preparing the portable job, then the PortableRunner
# code raises an exception. Catch it, log it, and use the Direct runner
# instead.
_LOGGER.info('Exception with PrismRunner:\n %s\n' % (e))
_LOGGER.info('Falling back to DirectRunner')
runner = BundleBasedDirectRunner()

return runner.run_pipeline(pipeline, options)


Expand Down
112 changes: 84 additions & 28 deletions sdks/python/apache_beam/runners/portability/prism_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import platform
import shutil
import stat
import subprocess
import typing
import urllib
import zipfile
Expand Down Expand Up @@ -167,38 +168,93 @@ def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:

def path_to_binary(self) -> str:
if self._path is not None:
if not os.path.exists(self._path):
url = urllib.parse.urlparse(self._path)
if not url.scheme:
raise ValueError(
'Unable to parse binary URL "%s". If using a full URL, make '
'sure the scheme is specified. If using a local file xpath, '
'make sure the file exists; you may have to first build prism '
'using `go build `.' % (self._path))

# We have a URL, see if we need to construct a valid file name.
if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
# If this URL starts with the download prefix, let it through.
return self._path
# The only other valid option is a github release page.
if not self._path.startswith(GITHUB_TAG_PREFIX):
raise ValueError(
'Provided --prism_location URL is not an Apache Beam Github '
'Release page URL or download URL: %s' % (self._path))
# Get the root tag for this URL
root_tag = os.path.basename(os.path.normpath(self._path))
return self.construct_download_url(
root_tag, platform.system(), platform.machine())
return self._path
else:
if '.dev' in self._version:
# The path is overidden, check various cases.
if os.path.exists(self._path):
# The path is local and exists, use directly.
return self._path

# Check if the path is a URL.
url = urllib.parse.urlparse(self._path)
if not url.scheme:
raise ValueError(
'Unable to parse binary URL "%s". If using a full URL, make '
'sure the scheme is specified. If using a local file xpath, '
'make sure the file exists; you may have to first build prism '
'using `go build `.' % (self._path))

# We have a URL, see if we need to construct a valid file name.
if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
# If this URL starts with the download prefix, let it through.
return self._path
# The only other valid option is a github release page.
if not self._path.startswith(GITHUB_TAG_PREFIX):
raise ValueError(
'Unable to derive URL for dev versions "%s". Please provide an '
'alternate version to derive the release URL with the '
'--prism_beam_version_override flag.' % (self._version))
'Provided --prism_location URL is not an Apache Beam Github '
'Release page URL or download URL: %s' % (self._path))
# Get the root tag for this URL
root_tag = os.path.basename(os.path.normpath(self._path))
return self.construct_download_url(
root_tag, platform.system(), platform.machine())

if '.dev' not in self._version:
# Not a development version, so construct the production download URL
return self.construct_download_url(
self._version, platform.system(), platform.machine())

# This is a development version! Assume Go is installed.
# Set the install directory to the cache location.
envdict = {**os.environ, "GOBIN": self.BIN_CACHE}
PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism"

process = subprocess.run(["go", "install", PRISMPKG],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=envdict,
check=False)
if process.returncode == 0:
# Successfully installed
return '%s/prism' % (self.BIN_CACHE)

# We failed to build for some reason.
output = process.stdout.decode("utf-8")
if ("not in a module" not in output) and (
"no required module provides" not in output):
# This branch handles two classes of failures:
# 1. Go isn't installed, so it needs to be installed by the Beam SDK
# developer.
# 2. Go is installed, and they are building in a local version of Prism,
# but there was a compile error that the developer should address.
# Either way, the @latest fallback either would fail, or hide the error,
# so fail now.
_LOGGER.info(output)
raise ValueError(
'Unable to install a local of Prism: "%s";\n'
'Likely Go is not installed, or a local change to Prism did not '
'compile.\nPlease install Go (see https://go.dev/doc/install) to '
'enable automatic local builds.\n'
'Alternatively provide a binary with the --prism_location flag.'
'\nCaptured output:\n %s' % (self._version, output))

# Go is installed and claims we're not in a Go module that has access to
# the Prism package.

# Fallback to using the @latest version of prism, which works everywhere.
process = subprocess.run(["go", "install", PRISMPKG + "@latest"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=envdict,
check=False)

if process.returncode == 0:
return '%s/prism' % (self.BIN_CACHE)

output = process.stdout.decode("utf-8")
raise ValueError(
'We were unable to execute the subprocess "%s" to automatically '
'build prism.\nAlternatively provide an alternate binary with the '
'--prism_location flag.'
'\nCaptured output:\n %s' % (process.args, output))

def subprocess_cmd_and_endpoint(
self) -> typing.Tuple[typing.List[typing.Any], str]:
bin_path = self.local_bin(
Expand Down

0 comments on commit 3767eda

Please sign in to comment.