From 6d0952bfa17b146cc413501538e3123740e23e91 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 30 Mar 2023 14:11:47 -0400 Subject: [PATCH 01/19] Created using Colaboratory --- ...auto_model_updates_using_side_inputs.ipynb | 333 ++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100644 beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb diff --git a/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb b/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb new file mode 100644 index 000000000000..a59d307d5f29 --- /dev/null +++ b/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb @@ -0,0 +1,333 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "include_colab_link": true + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "view-in-github", + "colab_type": "text" + }, + "source": [ + "\"Open" + ] + }, + { + "cell_type": "code", + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ], + "metadata": { + "cellView": "form", + "id": "OsFaZscKSPvo" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Use WatchFilePattern to auto-update ML models in RunInference\n", + "\n", + "The pipeline in this notebook uses a `RunInference` PTransform to run inference on images using TensorFlow models. It uses a side input PCollection that emits `ModelMetadata` to update the model.\n", + "\n", + "Using side inputs, you can update your model (which is passed in a ModelHandler configuration object) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the WatchFilePattern, or by configuring a custom side input PCollection that defines the logic for the model update.\n", + "\n", + "For more information about side inputs, see the Side inputs section in the Apache Beam Programming Guide.\n", + "\n", + "This notebook uses `WatchFilePattern` as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which is used in the `RunInference` PTransform to automatically update the ML model without stopping the Beam pipeline.\n" + ], + "metadata": { + "id": "ZUSiAR62SgO8" + } + }, + { + "cell_type": "markdown", + "source": [ + "### Before you begin\n", + "Install the necessary dependencies that are used to run this notebook.\n", + "\n", + "To use RunInference with side inputs for automatic model updates, install `Apache Beam` version `2.46.0` or later." + ], + "metadata": { + "id": "SPuXFowiTpWx" + } + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "1RyTYsFEIOlA" + }, + "outputs": [], + "source": [ + "!pip install apache_beam[gcp]>=2.46.0 --quiet\n", + "!pip install tensorflow\n", + "!pip install tensorflow_hub" + ] + }, + { + "cell_type": "code", + "source": [ + "# Imports required for the notebook.\n", + "import logging\n", + "import time\n", + "from typing import Iterable\n", + "\n", + "import apache_beam as beam\n", + "from apache_beam.examples.inference.tensorflow_imagenet_segmentation import PostProcessor\n", + "from apache_beam.examples.inference.tensorflow_imagenet_segmentation import read_image\n", + "from apache_beam.ml.inference.base import PredictionResult\n", + "from apache_beam.ml.inference.base import RunInference\n", + "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor\n", + "from apache_beam.ml.inference.utils import WatchFilePattern\n", + "from apache_beam.options.pipeline_options import GoogleCloudOptions\n", + "from apache_beam.options.pipeline_options import PipelineOptions\n", + "from apache_beam.options.pipeline_options import SetupOptions\n", + "from apache_beam.options.pipeline_options import StandardOptions\n", + "from apache_beam.transforms.periodicsequence import PeriodicImpulse" + ], + "metadata": { + "id": "Rs4cwwNrIV9H" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# authenticate to your gcp account.\n", + "from google.colab import auth\n", + "auth.authenticate_user()" + ], + "metadata": { + "id": "jAKpPcmmGm03" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Pipeline options\n", + "\n", + "Configure the pipeline options for the pipeline to run on Dataflow. Make sure the streaming mode is on for this pipeline." + ], + "metadata": { + "id": "ORYNKhH3WQyP" + } + }, + { + "cell_type": "code", + "source": [ + "options = PipelineOptions()\n", + "options.view_as(StandardOptions).streaming = True\n", + "\n", + "# provide required pipeline options for DataflowRunner\n", + "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n", + "\n", + "# Sets the project to the default project in your current Google Cloud environment.\n", + "options.view_as(GoogleCloudOptions).project = ''\n", + "\n", + "# Sets the Google Cloud Region in which Cloud Dataflow runs.\n", + "options.view_as(GoogleCloudOptions).region = 'us-central1'\n", + "\n", + "# IMPORTANT! Adjust the following to choose a Cloud Storage location.\n", + "dataflow_gcs_location = \"gs:///tmp/\"\n", + "\n", + "# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.\n", + "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location\n", + "\n", + "# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.\n", + "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location\n", + "\n" + ], + "metadata": { + "id": "wWjbnq6X-4uE" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "We need to install the `tensorflow` and `tensorflow_hub` dependencies on Dataflow. We can pass them via `requirements_file` pipeline option." + ], + "metadata": { + "id": "HTJV8pO2Wcw4" + } + }, + { + "cell_type": "code", + "source": [ + "# define dependencies in a requirements file required for the pipeline.\n", + "deps_required_for_pipeline = ['tensorflow>=2.12.0', 'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n", + "requirements_file_path = './requirements.txt'\n", + "# write the depencies to a requirements file.\n", + "with open(requirements_file_path, 'w') as f:\n", + " for dep in deps_required_for_pipeline:\n", + " f.write(dep + '\\n')\n", + "\n", + "# the pipeline needs dependencies needed to be installed on Dataflow.\n", + "options.view_as(SetupOptions).requirements_file = requirements_file_path" + ], + "metadata": { + "id": "lEy4PkluWbdm" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Let's define configuration for the `PeriodicImpulse`.\n", + "\n", + " * `PeriodicImpulse` transform generates an infinite sequence of elements with given runtime interval.\n", + "\n", + "We use `PeriodicImpulse` in this notebook to mimic the `Pub/Sub` source. Since the inputs in a streaming pipleine arrives in intervals, we use `PeriodicImpulse` to output element at `m` intervals.\n", + "\n", + "To learn more about PeriodicImpulse, please take a look at the [code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)" + ], + "metadata": { + "id": "wVRmOh_fRBwd" + } + }, + { + "cell_type": "code", + "source": [ + "start_timestamp = time.time()\n", + "end_timestamp = start_timestamp + 60 * 20\n", + "main_input_fire_interval = 60 # interval at which the main input PCollection is emitted.\n", + "side_input_fire_interval = 60 # interval at which the side input PCollection is emitted." + ], + "metadata": { + "id": "E7zsnoxFQ_7L" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## TensorFlow ModelHandler\n", + " In this notebook, we will use `TFModelHandlerTensor` as the ModelHandler. We will use `resnet_101` model trained on imagenet.\n", + "\n", + " Download the model from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5 and place it in a directory that you would use to auto model updates." + ], + "metadata": { + "id": "_AUNH_GJk_NE" + } + }, + { + "cell_type": "code", + "source": [ + "model_handler = TFModelHandlerTensor(\n", + " model_uri=\"gs:///resnet101_weights_tf_dim_ordering_tf_kernels.h5\")" + ], + "metadata": { + "id": "kkSnsxwUk-Sp" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Now, let's jump into the pipeline code.\n", + "\n", + "**Pipeline steps**:\n", + "1. Create a `PeriodImpulse`, which emits output every `n` seconds.\n", + "2. Read and pre-process the images using the `read_image` function.\n", + "3. Pass the images to the RunInference `PTransform`. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters.\n", + "\n", + "\n", + "The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` to the RunInference `PTransform`. This side input is used to update the models in the `model_handler` without needing to stop the beam pipeline.\n", + "We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files.\n", + "\n", + "`model_metadata_pcoll` expects a `PCollection` of ModelMetadata compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Because the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`.\n", + "\n", + "**How to watch for auto model update**\n", + "\n", + "After the pipeline starts processing data and when you see some outputs emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model(for example, [resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5)) that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as a side input.\n", + "\n", + "**Note**: Side input update frequency is non-deterministic and can have longer intervals between updates.\n", + "\n", + "When the inference is complete, RunInference outputs a `PredictionResult` object that contains `example`, `inference`, and `model_id` fields. The `model_id` is used to identify which model is used for running the inference." + ], + "metadata": { + "id": "kjnb2Ib3ZpJN" + } + }, + { + "cell_type": "code", + "source": [ + "pipeline = beam.Pipeline(options=options)\n", + "\n", + "# file_pattern used in WatchFilePattern to watch for latest model files.\n", + "file_pattern = 'gs:///*.h5'\n", + "with beam.Pipeline(options=options) as pipeline:\n", + "\n", + " # side input used to watch for .h5 file and auto update the model_uri of the TFModelHandlerTensor.\n", + " side_input_pcoll = (\n", + " pipeline\n", + " | \"WatchFilePattern\" >> WatchFilePattern(file_pattern=file_pattern,\n", + " interval=side_input_fire_interval,\n", + " stop_timestamp=end_timestamp))\n", + "\n", + " read_images = (\n", + " pipeline\n", + " | \"MainInputPcoll\" >> PeriodicImpulse(\n", + " start_timestamp=start_timestamp,\n", + " stop_timestamp=end_timestamp,\n", + " fire_interval=main_input_fire_interval)\n", + " # since this example focuses on the auto model updates, we will use only one image for every prediction.\n", + " | beam.Map(lambda x: \"Cat-with-beanie.jpg\")\n", + " | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n", + " image_name=image_name, image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))\n", + "\n", + " inferences = (read_images | \"ApplyWindowing\" >> beam.WindowInto(beam.window.FixedWindows(10))\n", + " | \"RunInference\" >> RunInference(model_handler=model_handler,\n", + " model_metadata_pcoll=side_input_pcoll))\n", + "\n", + " post_processor = (inferences | \"PostProcessResults\" >> beam.ParDo(PostProcessor()))\n", + "\n", + " post_processor | \"print\" >> beam.Map(logging.info)\n" + ], + "metadata": { + "id": "ZWcNlixlQ8-Z" + }, + "execution_count": null, + "outputs": [] + } + ] +} \ No newline at end of file From 1d988cd2fe5c1f99f55cc04068c9b11bfdae9ddd Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 6 Apr 2023 19:03:46 +0000 Subject: [PATCH 02/19] Add `--pre` flag and a ValidateContainerTest with released candidates. --- ..._Python_ValidatesContainer_Dataflow.groovy | 22 +++++++++++++++++++ sdks/python/container/Dockerfile | 8 ++++--- sdks/python/container/common.gradle | 2 ++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy index bc43ecb366fc..9dd328a5c014 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy @@ -43,3 +43,25 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_ValCont', } } } + +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_ValCont_with_RC', + 'Run Python RC Dataflow ValidatesContainer', 'Google Cloud Dataflow Runner Python ValidatesContainer Tests with RC Dependencies', this) { + description('Runs Python ValidatesContainer suite on the Dataflow runner by installing Release Candidates.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(delegate) + + publishers { + archiveJunit('**/pytest*.xml') + } + + // Execute shell command to test Python SDK. + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':sdks:python:test-suites:dataflow:validatesContainerTests') + switches('-PinstallReleaseCandidates=--pre') + commonJobProperties.setGradleSwitches(delegate) + } + } + } \ No newline at end of file diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index c4ea6d58417e..2baa83e4d88d 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -37,11 +37,11 @@ RUN apt-get update && \ #### # Install required packages for Beam Python SDK and common dependencies used by users. #### - +ARG install_release_candidates COPY target/base_image_requirements.txt /tmp/base_image_requirements.txt RUN \ # use --no-deps to ensure the list includes all transitive dependencies. - pip install --no-deps -r /tmp/base_image_requirements.txt && \ + pip install "${install_release_candidates}" --no-deps -r /tmp/base_image_requirements.txt && \ python -c "import nltk; nltk.download('stopwords')" && \ rm /root/nltk_data/corpora/stopwords.zip && \ # Check that the protobuf upb(also called micro protobuf) is used. @@ -70,9 +70,11 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa #### # Install Apache Beam SDK. Use --no-deps and pip check to verify that all # necessary dependencies are specified in base_image_requirements.txt. +# use ARG install_release_candidates to install release candidates(RCs) of Beam' +# dependencies. #### COPY target/apache-beam.tar.gz /opt/apache/beam/tars/ -RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] +RUN pip install ${install_release_candidates} --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] RUN pip check || (echo "Container does not include required Beam dependencies or has conflicting dependencies. If Beam dependencies have changed, you need to regenerate base_image_requirements.txt files. See: https://s.apache.org/beam-python-requirements-generate" && exit 1) # Log complete list of what exact packages and versions are installed. RUN pip freeze --all diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 75be45326f2d..69d5f9c98f1b 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -74,9 +74,11 @@ docker { tag: project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version) // tags used by dockerTag task + def installReleaseCandidates = project.rootProject.findProperty(['installReleaseCandidates']) tags containerImageTags() files "../Dockerfile", "./build" buildArgs(['py_version': "${project.ext.pythonVersion}", + 'install_release_candidates': project.rootProject.findProperty(["installReleaseCandidates"]), 'pull_licenses': project.rootProject.hasProperty(["docker-pull-licenses"]) || project.rootProject.hasProperty(["isRelease"])]) buildx project.containerPlatforms() != [project.nativeArchitecture()] From 791e93fcdccf4048af28266ea0567abb5b00d3a1 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 6 Apr 2023 19:04:01 +0000 Subject: [PATCH 03/19] Change Python version in BeamModulePlugin --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 041435b0808c..a388c65cfec1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2738,7 +2738,7 @@ class BeamModulePlugin implements Plugin { // If none of them applied, version set here will be used as default value. // TODO(BEAM-12000): Move default value to Py3.9. project.ext.pythonVersion = project.hasProperty('pythonVersion') ? - project.pythonVersion : '3.8' + project.pythonVersion : '3.10' def setupVirtualenv = project.tasks.register('setupVirtualenv') { doLast { From 4fe20963e24b5fbf798adeccc56e002d3c3aebd4 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 6 Apr 2023 15:14:59 -0400 Subject: [PATCH 04/19] Delete beam-ml auto_model_updates_using_side_inputs.ipynb --- ...auto_model_updates_using_side_inputs.ipynb | 333 ------------------ 1 file changed, 333 deletions(-) delete mode 100644 beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb diff --git a/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb b/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb deleted file mode 100644 index a59d307d5f29..000000000000 --- a/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb +++ /dev/null @@ -1,333 +0,0 @@ -{ - "nbformat": 4, - "nbformat_minor": 0, - "metadata": { - "colab": { - "provenance": [], - "include_colab_link": true - }, - "kernelspec": { - "name": "python3", - "display_name": "Python 3" - }, - "language_info": { - "name": "python" - } - }, - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "id": "view-in-github", - "colab_type": "text" - }, - "source": [ - "\"Open" - ] - }, - { - "cell_type": "code", - "source": [ - "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", - "\n", - "# Licensed to the Apache Software Foundation (ASF) under one\n", - "# or more contributor license agreements. See the NOTICE file\n", - "# distributed with this work for additional information\n", - "# regarding copyright ownership. The ASF licenses this file\n", - "# to you under the Apache License, Version 2.0 (the\n", - "# \"License\"); you may not use this file except in compliance\n", - "# with the License. You may obtain a copy of the License at\n", - "#\n", - "# http://www.apache.org/licenses/LICENSE-2.0\n", - "#\n", - "# Unless required by applicable law or agreed to in writing,\n", - "# software distributed under the License is distributed on an\n", - "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", - "# KIND, either express or implied. See the License for the\n", - "# specific language governing permissions and limitations\n", - "# under the License" - ], - "metadata": { - "cellView": "form", - "id": "OsFaZscKSPvo" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "# Use WatchFilePattern to auto-update ML models in RunInference\n", - "\n", - "The pipeline in this notebook uses a `RunInference` PTransform to run inference on images using TensorFlow models. It uses a side input PCollection that emits `ModelMetadata` to update the model.\n", - "\n", - "Using side inputs, you can update your model (which is passed in a ModelHandler configuration object) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the WatchFilePattern, or by configuring a custom side input PCollection that defines the logic for the model update.\n", - "\n", - "For more information about side inputs, see the Side inputs section in the Apache Beam Programming Guide.\n", - "\n", - "This notebook uses `WatchFilePattern` as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which is used in the `RunInference` PTransform to automatically update the ML model without stopping the Beam pipeline.\n" - ], - "metadata": { - "id": "ZUSiAR62SgO8" - } - }, - { - "cell_type": "markdown", - "source": [ - "### Before you begin\n", - "Install the necessary dependencies that are used to run this notebook.\n", - "\n", - "To use RunInference with side inputs for automatic model updates, install `Apache Beam` version `2.46.0` or later." - ], - "metadata": { - "id": "SPuXFowiTpWx" - } - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "1RyTYsFEIOlA" - }, - "outputs": [], - "source": [ - "!pip install apache_beam[gcp]>=2.46.0 --quiet\n", - "!pip install tensorflow\n", - "!pip install tensorflow_hub" - ] - }, - { - "cell_type": "code", - "source": [ - "# Imports required for the notebook.\n", - "import logging\n", - "import time\n", - "from typing import Iterable\n", - "\n", - "import apache_beam as beam\n", - "from apache_beam.examples.inference.tensorflow_imagenet_segmentation import PostProcessor\n", - "from apache_beam.examples.inference.tensorflow_imagenet_segmentation import read_image\n", - "from apache_beam.ml.inference.base import PredictionResult\n", - "from apache_beam.ml.inference.base import RunInference\n", - "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor\n", - "from apache_beam.ml.inference.utils import WatchFilePattern\n", - "from apache_beam.options.pipeline_options import GoogleCloudOptions\n", - "from apache_beam.options.pipeline_options import PipelineOptions\n", - "from apache_beam.options.pipeline_options import SetupOptions\n", - "from apache_beam.options.pipeline_options import StandardOptions\n", - "from apache_beam.transforms.periodicsequence import PeriodicImpulse" - ], - "metadata": { - "id": "Rs4cwwNrIV9H" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# authenticate to your gcp account.\n", - "from google.colab import auth\n", - "auth.authenticate_user()" - ], - "metadata": { - "id": "jAKpPcmmGm03" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "# Pipeline options\n", - "\n", - "Configure the pipeline options for the pipeline to run on Dataflow. Make sure the streaming mode is on for this pipeline." - ], - "metadata": { - "id": "ORYNKhH3WQyP" - } - }, - { - "cell_type": "code", - "source": [ - "options = PipelineOptions()\n", - "options.view_as(StandardOptions).streaming = True\n", - "\n", - "# provide required pipeline options for DataflowRunner\n", - "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n", - "\n", - "# Sets the project to the default project in your current Google Cloud environment.\n", - "options.view_as(GoogleCloudOptions).project = ''\n", - "\n", - "# Sets the Google Cloud Region in which Cloud Dataflow runs.\n", - "options.view_as(GoogleCloudOptions).region = 'us-central1'\n", - "\n", - "# IMPORTANT! Adjust the following to choose a Cloud Storage location.\n", - "dataflow_gcs_location = \"gs:///tmp/\"\n", - "\n", - "# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.\n", - "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location\n", - "\n", - "# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.\n", - "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location\n", - "\n" - ], - "metadata": { - "id": "wWjbnq6X-4uE" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "We need to install the `tensorflow` and `tensorflow_hub` dependencies on Dataflow. We can pass them via `requirements_file` pipeline option." - ], - "metadata": { - "id": "HTJV8pO2Wcw4" - } - }, - { - "cell_type": "code", - "source": [ - "# define dependencies in a requirements file required for the pipeline.\n", - "deps_required_for_pipeline = ['tensorflow>=2.12.0', 'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n", - "requirements_file_path = './requirements.txt'\n", - "# write the depencies to a requirements file.\n", - "with open(requirements_file_path, 'w') as f:\n", - " for dep in deps_required_for_pipeline:\n", - " f.write(dep + '\\n')\n", - "\n", - "# the pipeline needs dependencies needed to be installed on Dataflow.\n", - "options.view_as(SetupOptions).requirements_file = requirements_file_path" - ], - "metadata": { - "id": "lEy4PkluWbdm" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "Let's define configuration for the `PeriodicImpulse`.\n", - "\n", - " * `PeriodicImpulse` transform generates an infinite sequence of elements with given runtime interval.\n", - "\n", - "We use `PeriodicImpulse` in this notebook to mimic the `Pub/Sub` source. Since the inputs in a streaming pipleine arrives in intervals, we use `PeriodicImpulse` to output element at `m` intervals.\n", - "\n", - "To learn more about PeriodicImpulse, please take a look at the [code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)" - ], - "metadata": { - "id": "wVRmOh_fRBwd" - } - }, - { - "cell_type": "code", - "source": [ - "start_timestamp = time.time()\n", - "end_timestamp = start_timestamp + 60 * 20\n", - "main_input_fire_interval = 60 # interval at which the main input PCollection is emitted.\n", - "side_input_fire_interval = 60 # interval at which the side input PCollection is emitted." - ], - "metadata": { - "id": "E7zsnoxFQ_7L" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## TensorFlow ModelHandler\n", - " In this notebook, we will use `TFModelHandlerTensor` as the ModelHandler. We will use `resnet_101` model trained on imagenet.\n", - "\n", - " Download the model from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5 and place it in a directory that you would use to auto model updates." - ], - "metadata": { - "id": "_AUNH_GJk_NE" - } - }, - { - "cell_type": "code", - "source": [ - "model_handler = TFModelHandlerTensor(\n", - " model_uri=\"gs:///resnet101_weights_tf_dim_ordering_tf_kernels.h5\")" - ], - "metadata": { - "id": "kkSnsxwUk-Sp" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "Now, let's jump into the pipeline code.\n", - "\n", - "**Pipeline steps**:\n", - "1. Create a `PeriodImpulse`, which emits output every `n` seconds.\n", - "2. Read and pre-process the images using the `read_image` function.\n", - "3. Pass the images to the RunInference `PTransform`. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters.\n", - "\n", - "\n", - "The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` to the RunInference `PTransform`. This side input is used to update the models in the `model_handler` without needing to stop the beam pipeline.\n", - "We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files.\n", - "\n", - "`model_metadata_pcoll` expects a `PCollection` of ModelMetadata compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Because the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`.\n", - "\n", - "**How to watch for auto model update**\n", - "\n", - "After the pipeline starts processing data and when you see some outputs emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model(for example, [resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5)) that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as a side input.\n", - "\n", - "**Note**: Side input update frequency is non-deterministic and can have longer intervals between updates.\n", - "\n", - "When the inference is complete, RunInference outputs a `PredictionResult` object that contains `example`, `inference`, and `model_id` fields. The `model_id` is used to identify which model is used for running the inference." - ], - "metadata": { - "id": "kjnb2Ib3ZpJN" - } - }, - { - "cell_type": "code", - "source": [ - "pipeline = beam.Pipeline(options=options)\n", - "\n", - "# file_pattern used in WatchFilePattern to watch for latest model files.\n", - "file_pattern = 'gs:///*.h5'\n", - "with beam.Pipeline(options=options) as pipeline:\n", - "\n", - " # side input used to watch for .h5 file and auto update the model_uri of the TFModelHandlerTensor.\n", - " side_input_pcoll = (\n", - " pipeline\n", - " | \"WatchFilePattern\" >> WatchFilePattern(file_pattern=file_pattern,\n", - " interval=side_input_fire_interval,\n", - " stop_timestamp=end_timestamp))\n", - "\n", - " read_images = (\n", - " pipeline\n", - " | \"MainInputPcoll\" >> PeriodicImpulse(\n", - " start_timestamp=start_timestamp,\n", - " stop_timestamp=end_timestamp,\n", - " fire_interval=main_input_fire_interval)\n", - " # since this example focuses on the auto model updates, we will use only one image for every prediction.\n", - " | beam.Map(lambda x: \"Cat-with-beanie.jpg\")\n", - " | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n", - " image_name=image_name, image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))\n", - "\n", - " inferences = (read_images | \"ApplyWindowing\" >> beam.WindowInto(beam.window.FixedWindows(10))\n", - " | \"RunInference\" >> RunInference(model_handler=model_handler,\n", - " model_metadata_pcoll=side_input_pcoll))\n", - "\n", - " post_processor = (inferences | \"PostProcessResults\" >> beam.ParDo(PostProcessor()))\n", - "\n", - " post_processor | \"print\" >> beam.Map(logging.info)\n" - ], - "metadata": { - "id": "ZWcNlixlQ8-Z" - }, - "execution_count": null, - "outputs": [] - } - ] -} \ No newline at end of file From 39fbb152ca17afa2e808ebffc22016edf353a184 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 6 Apr 2023 19:30:58 +0000 Subject: [PATCH 05/19] Fix null value --- sdks/python/container/common.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 69d5f9c98f1b..1cf124887bce 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -74,11 +74,10 @@ docker { tag: project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version) // tags used by dockerTag task - def installReleaseCandidates = project.rootProject.findProperty(['installReleaseCandidates']) tags containerImageTags() files "../Dockerfile", "./build" buildArgs(['py_version': "${project.ext.pythonVersion}", - 'install_release_candidates': project.rootProject.findProperty(["installReleaseCandidates"]), + 'install_release_candidates': project.rootProject.findProperty(['installReleaseCandidates']) ?: "", 'pull_licenses': project.rootProject.hasProperty(["docker-pull-licenses"]) || project.rootProject.hasProperty(["isRelease"])]) buildx project.containerPlatforms() != [project.nativeArchitecture()] From 43efb825aea14972639235c1dc79480a69760a8e Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 6 Apr 2023 21:35:49 +0000 Subject: [PATCH 06/19] Remove double quotes --- sdks/python/container/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index 2baa83e4d88d..a2e193b33a39 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -41,7 +41,7 @@ ARG install_release_candidates COPY target/base_image_requirements.txt /tmp/base_image_requirements.txt RUN \ # use --no-deps to ensure the list includes all transitive dependencies. - pip install "${install_release_candidates}" --no-deps -r /tmp/base_image_requirements.txt && \ + pip install ${install_release_candidates} --no-deps -r /tmp/base_image_requirements.txt && \ python -c "import nltk; nltk.download('stopwords')" && \ rm /root/nltk_data/corpora/stopwords.zip && \ # Check that the protobuf upb(also called micro protobuf) is used. From 9edf6adc5eac1c0f298500e545a092c89aac0a2a Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 12 Apr 2023 14:40:20 -0400 Subject: [PATCH 07/19] refactor --- sdks/python/container/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index a2e193b33a39..ea98283a6208 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -37,11 +37,11 @@ RUN apt-get update && \ #### # Install required packages for Beam Python SDK and common dependencies used by users. #### -ARG install_release_candidates +ARG pip_extra_options="" COPY target/base_image_requirements.txt /tmp/base_image_requirements.txt RUN \ # use --no-deps to ensure the list includes all transitive dependencies. - pip install ${install_release_candidates} --no-deps -r /tmp/base_image_requirements.txt && \ + pip install ${pip_extra_options} --no-deps -r /tmp/base_image_requirements.txt && \ python -c "import nltk; nltk.download('stopwords')" && \ rm /root/nltk_data/corpora/stopwords.zip && \ # Check that the protobuf upb(also called micro protobuf) is used. @@ -70,11 +70,11 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa #### # Install Apache Beam SDK. Use --no-deps and pip check to verify that all # necessary dependencies are specified in base_image_requirements.txt. -# use ARG install_release_candidates to install release candidates(RCs) of Beam' +# use ARG pip_extra_options to install release candidates(RCs) of Beam' # dependencies. #### COPY target/apache-beam.tar.gz /opt/apache/beam/tars/ -RUN pip install ${install_release_candidates} --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] +RUN pip install ${pip_extra_options} --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] RUN pip check || (echo "Container does not include required Beam dependencies or has conflicting dependencies. If Beam dependencies have changed, you need to regenerate base_image_requirements.txt files. See: https://s.apache.org/beam-python-requirements-generate" && exit 1) # Log complete list of what exact packages and versions are installed. RUN pip freeze --all From 3f69fe2f072f1dfc674ed70b86072b22ccb3aabf Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 12 Apr 2023 14:43:54 -0400 Subject: [PATCH 08/19] Refactor name --- .../job_PostCommit_Python_ValidatesContainer_Dataflow.groovy | 4 ++-- sdks/python/container/common.gradle | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy index 9dd328a5c014..742d6f854062 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy @@ -60,8 +60,8 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_ValCont_with_RC', gradle { rootBuildScriptDir(commonJobProperties.checkoutDir) tasks(':sdks:python:test-suites:dataflow:validatesContainerTests') - switches('-PinstallReleaseCandidates=--pre') + switches('-PpipExtraOptions=--pre') commonJobProperties.setGradleSwitches(delegate) } } - } \ No newline at end of file + } diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 1cf124887bce..751eb7208bb6 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -77,7 +77,7 @@ docker { tags containerImageTags() files "../Dockerfile", "./build" buildArgs(['py_version': "${project.ext.pythonVersion}", - 'install_release_candidates': project.rootProject.findProperty(['installReleaseCandidates']) ?: "", + 'pip_extra_options': project.rootProject.findProperty(['pipExtraOptions']) ?: "", 'pull_licenses': project.rootProject.hasProperty(["docker-pull-licenses"]) || project.rootProject.hasProperty(["isRelease"])]) buildx project.containerPlatforms() != [project.nativeArchitecture()] From 35e7ba41155f05afcdb976d4f5f4442b9af028ba Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Wed, 12 Apr 2023 17:17:23 -0400 Subject: [PATCH 09/19] Update sdks/python/container/Dockerfile --- sdks/python/container/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index ea98283a6208..ff8a5f69e21e 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -70,7 +70,7 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa #### # Install Apache Beam SDK. Use --no-deps and pip check to verify that all # necessary dependencies are specified in base_image_requirements.txt. -# use ARG pip_extra_options to install release candidates(RCs) of Beam' +# use ARG pip_extra_options to pass additional flags to the pip installation.' # dependencies. #### COPY target/apache-beam.tar.gz /opt/apache/beam/tars/ From eabb07c31d59e9bf5cf19c8b21e8337614c009dd Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 13 Apr 2023 16:34:27 +0000 Subject: [PATCH 10/19] Make changes to generateRequirements --- sdks/python/container/Dockerfile | 20 +++++++------------ sdks/python/container/common.gradle | 8 +++++--- .../container/run_generate_requirements.sh | 7 ++++--- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index ff8a5f69e21e..ff3b44849504 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -18,7 +18,7 @@ ARG py_version FROM python:"${py_version}"-bullseye as beam -MAINTAINER "Apache Beam " +LABEL Author "Apache Beam " ARG TARGETOS ARG TARGETARCH @@ -37,11 +37,11 @@ RUN apt-get update && \ #### # Install required packages for Beam Python SDK and common dependencies used by users. #### -ARG pip_extra_options="" + COPY target/base_image_requirements.txt /tmp/base_image_requirements.txt RUN \ # use --no-deps to ensure the list includes all transitive dependencies. - pip install ${pip_extra_options} --no-deps -r /tmp/base_image_requirements.txt && \ + pip install --no-deps -r /tmp/base_image_requirements.txt && \ python -c "import nltk; nltk.download('stopwords')" && \ rm /root/nltk_data/corpora/stopwords.zip && \ # Check that the protobuf upb(also called micro protobuf) is used. @@ -70,11 +70,9 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa #### # Install Apache Beam SDK. Use --no-deps and pip check to verify that all # necessary dependencies are specified in base_image_requirements.txt. -# use ARG pip_extra_options to pass additional flags to the pip installation.' -# dependencies. #### COPY target/apache-beam.tar.gz /opt/apache/beam/tars/ -RUN pip install ${pip_extra_options} --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] +RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] RUN pip check || (echo "Container does not include required Beam dependencies or has conflicting dependencies. If Beam dependencies have changed, you need to regenerate base_image_requirements.txt files. See: https://s.apache.org/beam-python-requirements-generate" && exit 1) # Log complete list of what exact packages and versions are installed. RUN pip freeze --all @@ -95,11 +93,9 @@ ENTRYPOINT ["/opt/apache/beam/boot"] FROM beam as third_party_licenses ARG pull_licenses COPY target/license_scripts /tmp/license_scripts/ -# Add golang licenses. Because the go-license directory may be empty if -# pull_licenses is false, and COPY fails if there are no files, -# copy an extra LICENSE file then remove it. -COPY target/LICENSE target/go-licenses/* /opt/apache/beam/third_party_licenses/golang/ -RUN rm /opt/apache/beam/third_party_licenses/golang/LICENSE + +# Add golang licenses. +COPY target/go-licenses/* /opt/apache/beam/third_party_licenses/golang/ COPY target/license_scripts /tmp/license_scripts/ RUN if [ "$pull_licenses" = "true" ] ; then \ @@ -113,5 +109,3 @@ COPY --from=third_party_licenses /opt/apache/beam/third_party_licenses /opt/apac RUN if [ "$pull_licenses" != "true" ] ; then \ rm -rf /opt/apache/beam/third_party_licenses ; \ fi - - diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 751eb7208bb6..64dbefae8356 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -32,13 +32,14 @@ dependencies { def generatePythonRequirements = tasks.register("generatePythonRequirements") { dependsOn ':sdks:python:sdist' + def pipExtraOptions = project.findProperty("pipExtraOptions") ?: "" def runScriptsPath = "${rootDir}/sdks/python/container/run_generate_requirements.sh" doLast { exec { executable 'sh' args '-c', "cd ${rootDir} && ${runScriptsPath} " + "${project.ext.pythonVersion} " + - "${files(configurations.sdkSourceTarball.files).singleFile}" + "${files(configurations.sdkSourceTarball.files).singleFile}" + "${pipExtraOptions}" } } } @@ -77,13 +78,13 @@ docker { tags containerImageTags() files "../Dockerfile", "./build" buildArgs(['py_version': "${project.ext.pythonVersion}", - 'pip_extra_options': project.rootProject.findProperty(['pipExtraOptions']) ?: "", 'pull_licenses': project.rootProject.hasProperty(["docker-pull-licenses"]) || project.rootProject.hasProperty(["isRelease"])]) buildx project.containerPlatforms() != [project.nativeArchitecture()] platform(*project.containerPlatforms()) } + dockerPrepare.dependsOn copyLauncherDependencies dockerPrepare.dependsOn copyDockerfileDependencies dockerPrepare.dependsOn copyLicenseScripts @@ -98,7 +99,8 @@ if (project.rootProject.hasProperty(["docker-pull-licenses"])) { } else { def skipPullLicenses = tasks.register("skipPullLicenses", Exec) { executable "sh" - args "-c", "mkdir -p build/target/go-licenses" + // Touch a dummy file to ensure the directory exists. + args "-c", "mkdir -p build/target/go-licenses && touch build/target/go-licenses/skip" } dockerPrepare.dependsOn skipPullLicenses } diff --git a/sdks/python/container/run_generate_requirements.sh b/sdks/python/container/run_generate_requirements.sh index 55173bb8cd93..1a401a85b06d 100755 --- a/sdks/python/container/run_generate_requirements.sh +++ b/sdks/python/container/run_generate_requirements.sh @@ -30,7 +30,7 @@ # You will need Python interpreters for all versions supported by Beam, see: # https://s.apache.org/beam-python-dev-wiki -if [[ $# != 2 ]]; then +if [[ $# != 3 ]]; then printf "Example usage: \n$> ./sdks/python/container/run_generate_requirements.sh 3.8 " printf "\n\twhere 3.8 is the Python major.minor version." exit 1 @@ -38,6 +38,7 @@ fi PY_VERSION=$1 SDK_TARBALL=$2 +PIP_EXTRA_OPTION=$3 if ! python$PY_VERSION --version > /dev/null 2>&1 ; then echo "Please install a python${PY_VERSION} interpreter. See s.apache.org/beam-python-dev-wiki for Python installation tips." @@ -61,8 +62,8 @@ pip install --upgrade pip setuptools wheel # Install dataframe deps to add have Dataframe support in released images. # Install test deps since some integration tests need dependencies, # such as pytest, installed in the runner environment. -pip install --no-cache-dir $SDK_TARBALL[gcp,dataframe,test] -pip install --no-cache-dir -r $PWD/sdks/python/container/base_image_requirements_manual.txt +pip install --pre --no-cache-dir $SDK_TARBALL[gcp,dataframe,test] +pip install --pre --no-cache-dir -r $PWD/sdks/python/container/base_image_requirements_manual.txt pip uninstall -y apache-beam echo "Checking for broken dependencies:" pip check From 6c992655ca0b76447aa26413b24ebbdad2623ed4 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 13 Apr 2023 16:43:16 +0000 Subject: [PATCH 11/19] Sync with master --- sdks/python/container/common.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 64dbefae8356..bc51a29872f3 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -84,7 +84,6 @@ docker { platform(*project.containerPlatforms()) } - dockerPrepare.dependsOn copyLauncherDependencies dockerPrepare.dependsOn copyDockerfileDependencies dockerPrepare.dependsOn copyLicenseScripts From 9aa741e8fb98da16ec3e8640369b8333858e9add Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 13:09:16 -0400 Subject: [PATCH 12/19] Add dependency on generateRequirements --- sdks/python/test-suites/dataflow/common.gradle | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 2552eac431a7..fe7f8e06a4d2 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -351,6 +351,8 @@ task chicagoTaxiExample { task validatesContainer() { def pyversion = "${project.ext.pythonVersion.replace('.', '')}" + dependsOn ":sdks:python:container:py${pyversion}:generatePythonRequirements" + mustRunAfter ":sdks:python:container:py${pyversion}:generatePythonRequirements" dependsOn 'initializeForDataflowJob' dependsOn ":sdks:python:container:py${pyversion}:docker" def runScriptsPath = "${rootDir}/sdks/python/container/run_validatescontainer.sh" @@ -457,4 +459,4 @@ project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> ], pytestOptions: basicPytestOpts ) -} \ No newline at end of file +} From 2ccff5b0375069740b288022fa0bfa8b89562ca4 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 14:16:07 -0400 Subject: [PATCH 13/19] Comment out if condition --- sdks/python/container/run_generate_requirements.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/container/run_generate_requirements.sh b/sdks/python/container/run_generate_requirements.sh index 1a401a85b06d..94dbcf2c33df 100755 --- a/sdks/python/container/run_generate_requirements.sh +++ b/sdks/python/container/run_generate_requirements.sh @@ -30,11 +30,11 @@ # You will need Python interpreters for all versions supported by Beam, see: # https://s.apache.org/beam-python-dev-wiki -if [[ $# != 3 ]]; then - printf "Example usage: \n$> ./sdks/python/container/run_generate_requirements.sh 3.8 " - printf "\n\twhere 3.8 is the Python major.minor version." - exit 1 -fi +#if [[ $# != 3 ]]; then +# printf "Example usage: \n$> ./sdks/python/container/run_generate_requirements.sh 3.8 " +# printf "\n\twhere 3.8 is the Python major.minor version." +# exit 1 +#fi PY_VERSION=$1 SDK_TARBALL=$2 From 0fe767ca381799ec77e7c293fba709464402a2fa Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 16:30:40 -0400 Subject: [PATCH 14/19] Add pipExtraOptions for validate Container suite --- sdks/python/container/common.gradle | 2 +- .../container/run_generate_requirements.sh | 40 +++++++++++-------- .../python/test-suites/dataflow/common.gradle | 6 ++- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index bc51a29872f3..f973a11c7ff2 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -39,7 +39,7 @@ def generatePythonRequirements = tasks.register("generatePythonRequirements") { executable 'sh' args '-c', "cd ${rootDir} && ${runScriptsPath} " + "${project.ext.pythonVersion} " + - "${files(configurations.sdkSourceTarball.files).singleFile}" + "${pipExtraOptions}" + "${files(configurations.sdkSourceTarball.files).singleFile} " + "${pipExtraOptions}" } } } diff --git a/sdks/python/container/run_generate_requirements.sh b/sdks/python/container/run_generate_requirements.sh index 94dbcf2c33df..1569e8d7e945 100755 --- a/sdks/python/container/run_generate_requirements.sh +++ b/sdks/python/container/run_generate_requirements.sh @@ -16,7 +16,7 @@ # limitations under the License. # -# Generates requirements files, which list PyPI depenedncies to install in +# Generates requirements files, which list PyPI dependencies to install in # Apache Beam Python SDK container images. To generate the list, # we use two sources of information: # 1) Requirements of Apache Beam itself, as defined by setup.py. @@ -30,22 +30,22 @@ # You will need Python interpreters for all versions supported by Beam, see: # https://s.apache.org/beam-python-dev-wiki -#if [[ $# != 3 ]]; then -# printf "Example usage: \n$> ./sdks/python/container/run_generate_requirements.sh 3.8 " -# printf "\n\twhere 3.8 is the Python major.minor version." -# exit 1 -#fi +if [[ $# -lt 2 ]]; then + printf "Example usage: \n$> ./sdks/python/container/run_generate_requirements.sh 3.8 " + printf "\n\where 3.8 is the Python major.minor version." + exit 1 +fi PY_VERSION=$1 SDK_TARBALL=$2 -PIP_EXTRA_OPTION=$3 +PIP_EXTRA_OPTIONS=$3 -if ! python$PY_VERSION --version > /dev/null 2>&1 ; then +if ! python"$PY_VERSION" --version > /dev/null 2>&1 ; then echo "Please install a python${PY_VERSION} interpreter. See s.apache.org/beam-python-dev-wiki for Python installation tips." exit 1 fi -if ! python$PY_VERSION -m venv --help > /dev/null 2>&1 ; then +if ! python"$PY_VERSION" -m venv --help > /dev/null 2>&1 ; then echo "Your python${PY_VERSION} installation does not have a required venv module. See s.apache.org/beam-python-dev-wiki for Python installation tips." exit 1 fi @@ -53,17 +53,23 @@ fi set -ex ENV_PATH="$PWD/build/python${PY_VERSION/./}_requirements_gen" -rm -rf $ENV_PATH 2>/dev/null || true -python${PY_VERSION} -m venv $ENV_PATH -source $ENV_PATH/bin/activate +rm -rf "$ENV_PATH" 2>/dev/null || true +python"${PY_VERSION}" -m venv "$ENV_PATH" +source "$ENV_PATH"/bin/activate pip install --upgrade pip setuptools wheel # Install gcp extra deps since these deps are commonly used with Apache Beam. # Install dataframe deps to add have Dataframe support in released images. # Install test deps since some integration tests need dependencies, # such as pytest, installed in the runner environment. -pip install --pre --no-cache-dir $SDK_TARBALL[gcp,dataframe,test] -pip install --pre --no-cache-dir -r $PWD/sdks/python/container/base_image_requirements_manual.txt +if [ -z "$PIP_EXTRA_OPTIONS" ]; then + pip install "$PIP_EXTRA_OPTIONS" --no-cache-dir "$SDK_TARBALL"[gcp,dataframe,test] + pip install "$PIP_EXTRA_OPTIONS" --no-cache-dir -r "$PWD"/sdks/python/container/base_image_requirements_manual.txt +else + pip install --no-cache-dir "$SDK_TARBALL"[gcp,dataframe,test] + pip install --no-cache-dir -r "$PWD"/sdks/python/container/base_image_requirements_manual.txt +fi + pip uninstall -y apache-beam echo "Checking for broken dependencies:" pip check @@ -72,7 +78,7 @@ pip freeze PY_IMAGE="py${PY_VERSION//.}" REQUIREMENTS_FILE=$PWD/sdks/python/container/$PY_IMAGE/base_image_requirements.txt -cat < $REQUIREMENTS_FILE +cat < "$REQUIREMENTS_FILE" # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -99,5 +105,5 @@ cat < $REQUIREMENTS_FILE EOT # Remove pkg_resources to guard against # https://stackoverflow.com/questions/39577984/what-is-pkg-resources-0-0-0-in-output-of-pip-freeze-command -pip freeze | grep -v pkg_resources >> $REQUIREMENTS_FILE -rm -rf $ENV_PATH +pip freeze | grep -v pkg_resources >> "$REQUIREMENTS_FILE" +rm -rf "$ENV_PATH" diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index fe7f8e06a4d2..6b6e65a12df9 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -351,8 +351,10 @@ task chicagoTaxiExample { task validatesContainer() { def pyversion = "${project.ext.pythonVersion.replace('.', '')}" - dependsOn ":sdks:python:container:py${pyversion}:generatePythonRequirements" - mustRunAfter ":sdks:python:container:py${pyversion}:generatePythonRequirements" + if (project.hasProperty("pipExtraOptions")) { + dependsOn ":sdks:python:container:py${pyversion}:generatePythonRequirements" + mustRunAfter ":sdks:python:container:py${pyversion}:generatePythonRequirements" + } dependsOn 'initializeForDataflowJob' dependsOn ":sdks:python:container:py${pyversion}:docker" def runScriptsPath = "${rootDir}/sdks/python/container/run_validatescontainer.sh" From b945621aefa2da4a5524660f359f976931e72969 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 17:34:05 -0400 Subject: [PATCH 15/19] Refactoring --- ...tCommit_Python_ValidatesContainer_Dataflow.groovy | 2 +- sdks/python/container/common.gradle | 4 ++-- sdks/python/container/run_generate_requirements.sh | 12 +++++------- sdks/python/container/tmp.sh | 0 sdks/python/test-suites/dataflow/common.gradle | 4 +++- 5 files changed, 11 insertions(+), 11 deletions(-) create mode 100644 sdks/python/container/tmp.sh diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy index 742d6f854062..254096448668 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy @@ -60,7 +60,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_ValCont_with_RC', gradle { rootBuildScriptDir(commonJobProperties.checkoutDir) tasks(':sdks:python:test-suites:dataflow:validatesContainerTests') - switches('-PpipExtraOptions=--pre') + switches('-PtestRCDependencies=true') commonJobProperties.setGradleSwitches(delegate) } } diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index f973a11c7ff2..90bebbef57d8 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -32,14 +32,14 @@ dependencies { def generatePythonRequirements = tasks.register("generatePythonRequirements") { dependsOn ':sdks:python:sdist' - def pipExtraOptions = project.findProperty("pipExtraOptions") ?: "" + def enableRCInstallation = project.hasProperty("testRCDependencies") ? "--pre" : "" def runScriptsPath = "${rootDir}/sdks/python/container/run_generate_requirements.sh" doLast { exec { executable 'sh' args '-c', "cd ${rootDir} && ${runScriptsPath} " + "${project.ext.pythonVersion} " + - "${files(configurations.sdkSourceTarball.files).singleFile} " + "${pipExtraOptions}" + "${files(configurations.sdkSourceTarball.files).singleFile} " + "${enableRCInstallation}" } } } diff --git a/sdks/python/container/run_generate_requirements.sh b/sdks/python/container/run_generate_requirements.sh index 1569e8d7e945..fd222107dba5 100755 --- a/sdks/python/container/run_generate_requirements.sh +++ b/sdks/python/container/run_generate_requirements.sh @@ -38,6 +38,9 @@ fi PY_VERSION=$1 SDK_TARBALL=$2 +# Use the PIP_EXTRA_OPTIONS environment variable to pass additional flags to the pip install command. +# For example, you can include the --pre flag in $PIP_EXTRA_OPTIONS to download pre-release versions of packages. +# Note that you can modify the behavior of the pip install command in this script by passing in your own $PIP_EXTRA_OPTIONS. PIP_EXTRA_OPTIONS=$3 if ! python"$PY_VERSION" --version > /dev/null 2>&1 ; then @@ -62,13 +65,8 @@ pip install --upgrade pip setuptools wheel # Install dataframe deps to add have Dataframe support in released images. # Install test deps since some integration tests need dependencies, # such as pytest, installed in the runner environment. -if [ -z "$PIP_EXTRA_OPTIONS" ]; then - pip install "$PIP_EXTRA_OPTIONS" --no-cache-dir "$SDK_TARBALL"[gcp,dataframe,test] - pip install "$PIP_EXTRA_OPTIONS" --no-cache-dir -r "$PWD"/sdks/python/container/base_image_requirements_manual.txt -else - pip install --no-cache-dir "$SDK_TARBALL"[gcp,dataframe,test] - pip install --no-cache-dir -r "$PWD"/sdks/python/container/base_image_requirements_manual.txt -fi +pip install ${PIP_EXTRA_OPTIONS:+"$PIP_EXTRA_OPTIONS"} --no-cache-dir "$SDK_TARBALL"[gcp,dataframe,test] +pip install ${PIP_EXTRA_OPTIONS:+"$PIP_EXTRA_OPTIONS"} --no-cache-dir -r "$PWD"/sdks/python/container/base_image_requirements_manual.txt pip uninstall -y apache-beam echo "Checking for broken dependencies:" diff --git a/sdks/python/container/tmp.sh b/sdks/python/container/tmp.sh new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 6b6e65a12df9..f47b7a5fe901 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -351,7 +351,9 @@ task chicagoTaxiExample { task validatesContainer() { def pyversion = "${project.ext.pythonVersion.replace('.', '')}" - if (project.hasProperty("pipExtraOptions")) { + if (project.hasProperty("testRCDependencies")) { + // Generate a requirements file with pre-release versions for the docker task + // if testing with pre-release dependencies. dependsOn ":sdks:python:container:py${pyversion}:generatePythonRequirements" mustRunAfter ":sdks:python:container:py${pyversion}:generatePythonRequirements" } From 49357a1f192665df79e5f6dc997adf9e0c2d7073 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 17:38:37 -0400 Subject: [PATCH 16/19] Update python version --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4f173a726b04..aa0734dae228 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2737,9 +2737,8 @@ class BeamModulePlugin implements Plugin { // Python interpreter version for virtualenv setup and test run. This value can be // set from commandline with -PpythonVersion, or in build script of certain project. // If none of them applied, version set here will be used as default value. - // TODO(BEAM-12000): Move default value to Py3.9. project.ext.pythonVersion = project.hasProperty('pythonVersion') ? - project.pythonVersion : '3.10' + project.pythonVersion : '3.11' def setupVirtualenv = project.tasks.register('setupVirtualenv') { doLast { From 2a261de51eb579143e1477d06b13b2f00f14a197 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 17:58:14 -0400 Subject: [PATCH 17/19] remove redundant file --- sdks/python/container/tmp.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 sdks/python/container/tmp.sh diff --git a/sdks/python/container/tmp.sh b/sdks/python/container/tmp.sh deleted file mode 100644 index e69de29bb2d1..000000000000 From 6d2a04263fad71aa5062a3f99bfefc7adaa2e39e Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 13 Apr 2023 19:51:56 -0400 Subject: [PATCH 18/19] Apply suggestions from code review Co-authored-by: tvalentyn --- sdks/python/container/common.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/container/common.gradle b/sdks/python/container/common.gradle index 90bebbef57d8..48c059ae5ecd 100644 --- a/sdks/python/container/common.gradle +++ b/sdks/python/container/common.gradle @@ -32,14 +32,14 @@ dependencies { def generatePythonRequirements = tasks.register("generatePythonRequirements") { dependsOn ':sdks:python:sdist' - def enableRCInstallation = project.hasProperty("testRCDependencies") ? "--pre" : "" + def pipExtraOptions = project.hasProperty("testRCDependencies") ? "--pre" : "" def runScriptsPath = "${rootDir}/sdks/python/container/run_generate_requirements.sh" doLast { exec { executable 'sh' args '-c', "cd ${rootDir} && ${runScriptsPath} " + "${project.ext.pythonVersion} " + - "${files(configurations.sdkSourceTarball.files).singleFile} " + "${enableRCInstallation}" + "${files(configurations.sdkSourceTarball.files).singleFile} " + "${pipExtraOptions}" } } } From b96b8d94481a01b87b5065714e842a39a96f40a5 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 13 Apr 2023 17:38:37 -0400 Subject: [PATCH 19/19] Revert "Update python version" This reverts commit 49357a1f192665df79e5f6dc997adf9e0c2d7073. --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index aa0734dae228..9f8537aed3fc 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2737,8 +2737,9 @@ class BeamModulePlugin implements Plugin { // Python interpreter version for virtualenv setup and test run. This value can be // set from commandline with -PpythonVersion, or in build script of certain project. // If none of them applied, version set here will be used as default value. + // TODO(BEAM-12000): Move default value to Py3.9. project.ext.pythonVersion = project.hasProperty('pythonVersion') ? - project.pythonVersion : '3.11' + project.pythonVersion : '3.8' def setupVirtualenv = project.tasks.register('setupVirtualenv') { doLast {