diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 089767d2a99e..530a270308d9 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -30,9 +30,8 @@ import json import logging import random +from collections.abc import Iterable from typing import Any -from typing import Iterable -from typing import Tuple import apache_beam as beam from apache_beam.io import WriteToText @@ -40,7 +39,7 @@ from apache_beam.options.pipeline_options import SetupOptions -@beam.typehints.with_output_types(Tuple[int, int, int]) +@beam.typehints.with_output_types(tuple[int, int, int]) @beam.typehints.with_input_types(int) def run_trials(runs): """Run trials and return a 3-tuple representing the results. @@ -62,8 +61,8 @@ def run_trials(runs): return runs, inside_runs, 0 -@beam.typehints.with_output_types(Tuple[int, int, float]) -@beam.typehints.with_input_types(Iterable[Tuple[int, int, Any]]) +@beam.typehints.with_output_types(tuple[int, int, float]) +@beam.typehints.with_input_types(Iterable[tuple[int, int, Any]]) def combine_results(results): """Combiner function to sum up trials and compute the estimate. diff --git a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py index 6b5573aa4569..0a8c55d17d3a 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py @@ -25,7 +25,6 @@ import unittest import uuid from typing import TYPE_CHECKING -from typing import List import pytest import pytz @@ -53,7 +52,7 @@ if TYPE_CHECKING: import google.cloud.bigtable.instance -EXISTING_INSTANCES: List['google.cloud.bigtable.instance.Instance'] = [] +EXISTING_INSTANCES: list['google.cloud.bigtable.instance.Instance'] = [] LABEL_KEY = 'python-bigtable-beam' label_stamp = datetime.datetime.utcnow().replace(tzinfo=UTC) label_stamp_micros = _microseconds_from_datetime(label_stamp) diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 65ea7990a2d8..9d71ac32aff2 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -59,7 +59,7 @@ import logging import re import sys -from typing import Iterable +from collections.abc import Iterable from typing import Optional from typing import Text import uuid diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index 3ce7836b491a..8a959138d3da 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -30,7 +30,6 @@ import argparse import logging import sys -import typing import apache_beam as beam from apache_beam import coders @@ -71,7 +70,7 @@ def is_deterministic(self): # Annotate the get_players function so that the typehint system knows that the # input to the CombinePerKey operation is a key-value pair of a Player object # and an integer. -@with_output_types(typing.Tuple[Player, int]) +@with_output_types(tuple[Player, int]) def get_players(descriptor): name, points = descriptor.split(',') return Player(name), int(points) diff --git a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py index 5eb57c8fc080..69c2eacc593d 100644 --- a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py +++ b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py @@ -27,10 +27,8 @@ import argparse import logging -from typing import Dict -from typing import Iterable -from typing import Iterator -from typing import Tuple +from collections.abc import Iterable +from collections.abc import Iterator import apache_beam as beam import torch @@ -45,14 +43,14 @@ from transformers import AutoTokenizer -def add_mask_to_last_word(text: str) -> Tuple[str, str]: +def add_mask_to_last_word(text: str) -> tuple[str, str]: text_list = text.split() return text, ' '.join(text_list[:-2] + ['', text_list[-1]]) def tokenize_sentence( - text_and_mask: Tuple[str, str], - tokenizer: AutoTokenizer) -> Tuple[str, Dict[str, torch.Tensor]]: + text_and_mask: tuple[str, str], + tokenizer: AutoTokenizer) -> tuple[str, dict[str, torch.Tensor]]: text, masked_text = text_and_mask tokenized_sentence = tokenizer.encode_plus(masked_text, return_tensors="pt") @@ -81,7 +79,7 @@ def __init__(self, tokenizer: AutoTokenizer): super().__init__() self.tokenizer = tokenizer - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: text, prediction_result = element inputs = prediction_result.example logits = prediction_result.inference['logits'] diff --git a/sdks/python/apache_beam/examples/inference/huggingface_question_answering.py b/sdks/python/apache_beam/examples/inference/huggingface_question_answering.py index 9005ea5d11d7..7d4899cc38d9 100644 --- a/sdks/python/apache_beam/examples/inference/huggingface_question_answering.py +++ b/sdks/python/apache_beam/examples/inference/huggingface_question_answering.py @@ -28,8 +28,7 @@ import argparse import logging -from typing import Iterable -from typing import Tuple +from collections.abc import Iterable import apache_beam as beam from apache_beam.ml.inference.base import KeyedModelHandler @@ -49,7 +48,7 @@ class PostProcessor(beam.DoFn): Hugging Face Pipeline for Question Answering returns a dictionary with score, start and end index of answer and the answer. """ - def process(self, result: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, result: tuple[str, PredictionResult]) -> Iterable[str]: text, prediction = result predicted_answer = prediction.inference['answer'] yield text + ';' + predicted_answer diff --git a/sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py b/sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py index 18f697f673bf..0e62ab865431 100644 --- a/sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py +++ b/sdks/python/apache_beam/examples/inference/onnx_sentiment_classification.py @@ -28,9 +28,8 @@ import argparse import logging -from typing import Iterable -from typing import Iterator -from typing import Tuple +from collections.abc import Iterable +from collections.abc import Iterator import numpy as np @@ -47,7 +46,7 @@ def tokenize_sentence(text: str, - tokenizer: RobertaTokenizer) -> Tuple[str, torch.Tensor]: + tokenizer: RobertaTokenizer) -> tuple[str, torch.Tensor]: tokenized_sentence = tokenizer.encode(text, add_special_tokens=True) # Workaround to manually remove batch dim until we have the feature to @@ -63,7 +62,7 @@ def filter_empty_lines(text: str) -> Iterator[str]: class PostProcessor(beam.DoFn): - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: filename, prediction_result = element prediction = np.argmax(prediction_result.inference, axis=0) yield filename + ';' + str(prediction) diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py b/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py index d627001bcb82..c24a6d0a910e 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py @@ -21,9 +21,8 @@ import io import logging import os -from typing import Iterator +from collections.abc import Iterator from typing import Optional -from typing import Tuple import apache_beam as beam import torch @@ -41,7 +40,7 @@ def read_image(image_file_name: str, - path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]: + path_to_dir: Optional[str] = None) -> tuple[str, Image.Image]: if path_to_dir is not None: image_file_name = os.path.join(path_to_dir, image_file_name) with FileSystems().open(image_file_name, 'r') as file: @@ -122,13 +121,13 @@ def run( model_class = models.mobilenet_v2 model_params = {'num_classes': 1000} - def preprocess(image_name: str) -> Tuple[str, torch.Tensor]: + def preprocess(image_name: str) -> tuple[str, torch.Tensor]: image_name, image = read_image( image_file_name=image_name, path_to_dir=known_args.images_dir) return (image_name, preprocess_image(image)) - def postprocess(element: Tuple[str, PredictionResult]) -> str: + def postprocess(element: tuple[str, PredictionResult]) -> str: filename, prediction_result = element prediction = torch.argmax(prediction_result.inference, dim=0) return filename + ',' + str(prediction.item()) diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_classification_with_side_inputs.py b/sdks/python/apache_beam/examples/inference/pytorch_image_classification_with_side_inputs.py index 2a4e6e9a9bc6..787341263fde 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_classification_with_side_inputs.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_classification_with_side_inputs.py @@ -62,10 +62,9 @@ import io import logging import os -from typing import Iterable -from typing import Iterator +from collections.abc import Iterable +from collections.abc import Iterator from typing import Optional -from typing import Tuple import apache_beam as beam import torch @@ -84,7 +83,7 @@ def read_image(image_file_name: str, - path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]: + path_to_dir: Optional[str] = None) -> tuple[str, Image.Image]: if path_to_dir is not None: image_file_name = os.path.join(path_to_dir, image_file_name) with FileSystems().open(image_file_name, 'r') as file: @@ -116,7 +115,7 @@ class PostProcessor(beam.DoFn): Return filename, prediction and the model id used to perform the prediction """ - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: filename, prediction_result = element prediction = torch.argmax(prediction_result.inference, dim=0) yield filename, prediction, prediction_result.model_id diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_segmentation.py b/sdks/python/apache_beam/examples/inference/pytorch_image_segmentation.py index cdecb826d6e3..5e5f77a679c3 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_segmentation.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_segmentation.py @@ -21,10 +21,9 @@ import io import logging import os -from typing import Iterable -from typing import Iterator +from collections.abc import Iterable +from collections.abc import Iterator from typing import Optional -from typing import Tuple import apache_beam as beam import torch @@ -138,7 +137,7 @@ def read_image(image_file_name: str, - path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]: + path_to_dir: Optional[str] = None) -> tuple[str, Image.Image]: if path_to_dir is not None: image_file_name = os.path.join(path_to_dir, image_file_name) with FileSystems().open(image_file_name, 'r') as file: @@ -161,7 +160,7 @@ def filter_empty_lines(text: str) -> Iterator[str]: class PostProcessor(beam.DoFn): - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: filename, prediction_result = element prediction_labels = prediction_result.inference['labels'] classes = [CLASS_ID_TO_NAME[label.item()] for label in prediction_labels] diff --git a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py index 9de10e73e11b..a616998d2c73 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py @@ -26,10 +26,8 @@ import argparse import logging -from typing import Dict -from typing import Iterable -from typing import Iterator -from typing import Tuple +from collections.abc import Iterable +from collections.abc import Iterator import apache_beam as beam import torch @@ -45,14 +43,14 @@ from transformers import BertTokenizer -def add_mask_to_last_word(text: str) -> Tuple[str, str]: +def add_mask_to_last_word(text: str) -> tuple[str, str]: text_list = text.split() return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]]) def tokenize_sentence( - text_and_mask: Tuple[str, str], - bert_tokenizer: BertTokenizer) -> Tuple[str, Dict[str, torch.Tensor]]: + text_and_mask: tuple[str, str], + bert_tokenizer: BertTokenizer) -> tuple[str, dict[str, torch.Tensor]]: text, masked_text = text_and_mask tokenized_sentence = bert_tokenizer.encode_plus( masked_text, return_tensors="pt") @@ -84,7 +82,7 @@ def __init__(self, bert_tokenizer: BertTokenizer): super().__init__() self.bert_tokenizer = bert_tokenizer - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: text, prediction_result = element inputs = prediction_result.example logits = prediction_result.inference['logits'] diff --git a/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py b/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py index f0b5462d5335..18c4c3e653b4 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py @@ -24,10 +24,9 @@ import io import logging import os -from typing import Iterable -from typing import Iterator +from collections.abc import Iterable +from collections.abc import Iterator from typing import Optional -from typing import Tuple import apache_beam as beam import torch @@ -143,7 +142,7 @@ def read_image(image_file_name: str, - path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]: + path_to_dir: Optional[str] = None) -> tuple[str, Image.Image]: if path_to_dir is not None: image_file_name = os.path.join(path_to_dir, image_file_name) with FileSystems().open(image_file_name, 'r') as file: @@ -168,15 +167,15 @@ def filter_empty_lines(text: str) -> Iterator[str]: class KeyExamplesForEachModelType(beam.DoFn): """Duplicate data to run against each model type""" def process( - self, element: Tuple[torch.Tensor, - str]) -> Iterable[Tuple[str, torch.Tensor]]: + self, element: tuple[torch.Tensor, + str]) -> Iterable[tuple[str, torch.Tensor]]: yield 'v1', element[0] yield 'v2', element[0] class PostProcessor(beam.DoFn): def process( - self, element: Tuple[str, PredictionResult]) -> Tuple[torch.Tensor, str]: + self, element: tuple[str, PredictionResult]) -> tuple[torch.Tensor, str]: model, prediction_result = element prediction_labels = prediction_result.inference['labels'] classes = [CLASS_ID_TO_NAME[label.item()] for label in prediction_labels] diff --git a/sdks/python/apache_beam/examples/inference/run_inference_side_inputs.py b/sdks/python/apache_beam/examples/inference/run_inference_side_inputs.py index a6e4dc2bdb03..755eff17c163 100644 --- a/sdks/python/apache_beam/examples/inference/run_inference_side_inputs.py +++ b/sdks/python/apache_beam/examples/inference/run_inference_side_inputs.py @@ -22,9 +22,9 @@ import argparse import logging import time -from typing import Iterable +from collections.abc import Iterable +from collections.abc import Sequence from typing import Optional -from typing import Sequence import apache_beam as beam from apache_beam.ml.inference import base diff --git a/sdks/python/apache_beam/examples/inference/sklearn_japanese_housing_regression.py b/sdks/python/apache_beam/examples/inference/sklearn_japanese_housing_regression.py index 3aa2f362fa64..0a527e88dec2 100644 --- a/sdks/python/apache_beam/examples/inference/sklearn_japanese_housing_regression.py +++ b/sdks/python/apache_beam/examples/inference/sklearn_japanese_housing_regression.py @@ -31,7 +31,7 @@ import argparse import os -from typing import Iterable +from collections.abc import Iterable import pandas diff --git a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py index 5392cdf7ddae..d7d08e294e9d 100644 --- a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py +++ b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py @@ -27,9 +27,7 @@ import argparse import logging import os -from typing import Iterable -from typing import List -from typing import Tuple +from collections.abc import Iterable import apache_beam as beam from apache_beam.ml.inference.base import KeyedModelHandler @@ -42,7 +40,7 @@ from apache_beam.runners.runner import PipelineResult -def process_input(row: str) -> Tuple[int, List[int]]: +def process_input(row: str) -> tuple[int, list[int]]: data = row.split(',') label, pixels = int(data[0]), data[1:] pixels = [int(pixel) for pixel in pixels] @@ -53,7 +51,7 @@ class PostProcessor(beam.DoFn): """Process the PredictionResult to get the predicted label. Returns a comma separated string with true label and predicted label. """ - def process(self, element: Tuple[int, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[int, PredictionResult]) -> Iterable[str]: label, prediction_result = element prediction = prediction_result.inference yield '{},{}'.format(label, prediction) diff --git a/sdks/python/apache_beam/examples/inference/tensorflow_imagenet_segmentation.py b/sdks/python/apache_beam/examples/inference/tensorflow_imagenet_segmentation.py index a0f249dcfbf0..b44d775f4ad3 100644 --- a/sdks/python/apache_beam/examples/inference/tensorflow_imagenet_segmentation.py +++ b/sdks/python/apache_beam/examples/inference/tensorflow_imagenet_segmentation.py @@ -17,8 +17,8 @@ import argparse import logging -from typing import Iterable -from typing import Iterator +from collections.abc import Iterable +from collections.abc import Iterator import numpy diff --git a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py index 6cf746e77cd2..bf85bb1aef16 100644 --- a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py +++ b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py @@ -17,8 +17,7 @@ import argparse import logging -from typing import Iterable -from typing import Tuple +from collections.abc import Iterable import numpy @@ -33,7 +32,7 @@ from apache_beam.runners.runner import PipelineResult -def process_input(row: str) -> Tuple[int, numpy.ndarray]: +def process_input(row: str) -> tuple[int, numpy.ndarray]: data = row.split(',') label, pixels = int(data[0]), data[1:] pixels = [int(pixel) for pixel in pixels] @@ -46,7 +45,7 @@ class PostProcessor(beam.DoFn): """Process the PredictionResult to get the predicted label. Returns a comma separated string with true label and predicted label. """ - def process(self, element: Tuple[int, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[int, PredictionResult]) -> Iterable[str]: label, prediction_result = element prediction = numpy.argmax(prediction_result.inference, axis=0) yield '{},{}'.format(label, prediction) diff --git a/sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py b/sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py index 1faf502c71af..677d36b9b767 100644 --- a/sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py +++ b/sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py @@ -22,9 +22,8 @@ import argparse import io import os -from typing import Iterable +from collections.abc import Iterable from typing import Optional -from typing import Tuple import numpy as np @@ -134,14 +133,14 @@ def attach_im_size_to_key( - data: Tuple[str, Image.Image]) -> Tuple[Tuple[str, int, int], Image.Image]: + data: tuple[str, Image.Image]) -> tuple[tuple[str, int, int], Image.Image]: filename, image = data width, height = image.size return ((filename, width, height), image) def read_image(image_file_name: str, - path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]: + path_to_dir: Optional[str] = None) -> tuple[str, Image.Image]: if path_to_dir is not None: image_file_name = os.path.join(path_to_dir, image_file_name) with FileSystems().open(image_file_name, 'r') as file: @@ -168,7 +167,7 @@ class PostProcessor(beam.DoFn): an integer that we can transform into actual string class using COCO_OBJ_DET_CLASSES as reference. """ - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: key, prediction_result = element filename, im_width, im_height = key num_detections = prediction_result.inference[0] diff --git a/sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py b/sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py index 09a70caa4ede..5df0b51e36d7 100644 --- a/sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py +++ b/sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py @@ -32,10 +32,9 @@ import io import logging import os -from typing import Iterable -from typing import Iterator +from collections.abc import Iterable +from collections.abc import Iterator from typing import Optional -from typing import Tuple import apache_beam as beam import tensorflow as tf @@ -60,7 +59,7 @@ def filter_empty_lines(text: str) -> Iterator[str]: def read_and_process_image( image_file_name: str, - path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]: + path_to_dir: Optional[str] = None) -> tuple[str, tf.Tensor]: if path_to_dir is not None: image_file_name = os.path.join(path_to_dir, image_file_name) with FileSystems().open(image_file_name, 'r') as file: @@ -97,7 +96,7 @@ def convert_image_to_example_proto(tensor: tf.Tensor) -> tf.train.Example: class ProcessInferenceToString(beam.DoFn): def process( - self, element: Tuple[str, + self, element: tuple[str, prediction_log_pb2.PredictionLog]) -> Iterable[str]: """ Args: diff --git a/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py b/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py index 73126569e988..20312e7d3c88 100644 --- a/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py +++ b/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py @@ -27,9 +27,7 @@ import argparse import io import logging -from typing import Iterable -from typing import List -from typing import Tuple +from collections.abc import Iterable import apache_beam as beam import tensorflow as tf @@ -102,13 +100,13 @@ def parse_known_args(argv): COLUMNS = ['dandelion', 'daisy', 'tulips', 'sunflowers', 'roses'] -def read_image(image_file_name: str) -> Tuple[str, bytes]: +def read_image(image_file_name: str) -> tuple[str, bytes]: with FileSystems().open(image_file_name, 'r') as file: data = io.BytesIO(file.read()).getvalue() return image_file_name, data -def preprocess_image(data: bytes) -> List[float]: +def preprocess_image(data: bytes) -> list[float]: """Preprocess the image, resizing it and normalizing it before converting to a list. """ @@ -119,7 +117,7 @@ def preprocess_image(data: bytes) -> List[float]: class PostProcessor(beam.DoFn): - def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: img_name, prediction_result = element prediction_vals = prediction_result.inference index = prediction_vals.index(max(prediction_vals)) diff --git a/sdks/python/apache_beam/examples/inference/vllm_text_completion.py b/sdks/python/apache_beam/examples/inference/vllm_text_completion.py index 3cf7d04cb03e..2708c0f3d1a1 100644 --- a/sdks/python/apache_beam/examples/inference/vllm_text_completion.py +++ b/sdks/python/apache_beam/examples/inference/vllm_text_completion.py @@ -25,7 +25,7 @@ import argparse import logging -from typing import Iterable +from collections.abc import Iterable import apache_beam as beam from apache_beam.ml.inference.base import PredictionResult diff --git a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py index 963187fd210d..498511a5a2cf 100644 --- a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py +++ b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py @@ -17,10 +17,8 @@ import argparse import logging -from typing import Callable -from typing import Iterable -from typing import List -from typing import Tuple +from collections.abc import Callable +from collections.abc import Iterable from typing import Union import numpy @@ -48,7 +46,7 @@ class PostProcessor(beam.DoFn): """Process the PredictionResult to get the predicted label. Returns a comma separated string with true label and predicted label. """ - def process(self, element: Tuple[int, PredictionResult]) -> Iterable[str]: + def process(self, element: tuple[int, PredictionResult]) -> Iterable[str]: label, prediction_result = element prediction = prediction_result.inference yield '{},{}'.format(label, prediction) @@ -89,7 +87,7 @@ def parse_known_args(argv): def load_sklearn_iris_test_data( data_type: Callable, split: bool = True, - seed: int = 999) -> List[Union[numpy.array, pandas.DataFrame]]: + seed: int = 999) -> list[Union[numpy.array, pandas.DataFrame]]: """ Loads test data from the sklearn Iris dataset in a given format, either in a single or multiple batches. diff --git a/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py b/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py index 1cdd266c3df4..9b4889017077 100644 --- a/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py +++ b/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py @@ -26,7 +26,6 @@ import logging import sys -import typing import apache_beam as beam from apache_beam.io.kafka import ReadFromKafka @@ -97,7 +96,7 @@ def convert_kafka_record_to_dictionary(record): topic='projects/pubsub-public-data/topics/taxirides-realtime'). with_output_types(bytes) | beam.Map(lambda x: (b'', x)).with_output_types( - typing.Tuple[bytes, bytes]) # Kafka write transforms expects KVs. + tuple[bytes, bytes]) # Kafka write transforms expects KVs. | beam.WindowInto(beam.window.FixedWindows(window_size)) | WriteToKafka( producer_config={'bootstrap.servers': bootstrap_servers}, diff --git a/sdks/python/apache_beam/examples/wordcount_xlang_sql.py b/sdks/python/apache_beam/examples/wordcount_xlang_sql.py index 9d7d756f223f..632e90303010 100644 --- a/sdks/python/apache_beam/examples/wordcount_xlang_sql.py +++ b/sdks/python/apache_beam/examples/wordcount_xlang_sql.py @@ -24,7 +24,7 @@ import argparse import logging import re -import typing +from typing import NamedTuple import apache_beam as beam from apache_beam import coders @@ -41,7 +41,7 @@ # # Here we create and register a simple NamedTuple with a single str typed # field named 'word' which we will use below. -MyRow = typing.NamedTuple('MyRow', [('word', str)]) +MyRow = NamedTuple('MyRow', [('word', str)]) coders.registry.register_coder(MyRow, coders.RowCoder)