diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py index 67adda5aec03..117c7f7c9b93 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider.py @@ -18,6 +18,8 @@ import logging import re from collections import namedtuple +from inspect import Parameter +from inspect import Signature from typing import Dict from typing import List from typing import Tuple @@ -58,6 +60,22 @@ def get_config_with_descriptions( return fields_with_descriptions +def _generate_signature(schematransform: SchemaTransformsConfig) -> Signature: + schema = named_tuple_to_schema(schematransform.configuration_schema) + descriptions = schematransform.configuration_schema._field_descriptions + params: List[Parameter] = [] + for field in schema.fields: + annotation = str(typing_from_runner_api(field.type)) + description = descriptions[field.name] + if description: + annotation = annotation + f": {description}" + params.append( + Parameter( + field.name, Parameter.POSITIONAL_OR_KEYWORD, annotation=annotation)) + + return Signature(params) + + class ExternalTransform(PTransform): """Template for a wrapper class of an external SchemaTransform @@ -69,7 +87,6 @@ class ExternalTransform(PTransform): # These attributes need to be set when # creating an ExternalTransform type default_expansion_service = None - description: str = "" identifier: str = "" configuration_schema: Dict[str, ParamInfo] = {} @@ -138,18 +155,20 @@ class ExternalTransformProvider: ... 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1') >>> provider.BigqueryStorageRead - To know more about the usage of a given transform, take a look at the - `description` attribute. This returns some documentation IF the underlying - SchemaTransform provides any. - >>> provider.BigqueryStorageRead.description + You can inspect the transform's documentation to know more about it. This + returns some documentation only IF the underlying SchemaTransform + implementation provides any. + >>> import inspect + >>> inspect.getdoc(provider.BigqueryStorageRead) - Similarly, the `configuration_schema` attribute returns information about the + Similarly, you can inspect the transform's signature to know more about its parameters, including their names, types, and any documentation that the underlying SchemaTransform may provide: - >>> provider.BigqueryStorageRead.configuration_schema - {'query': ParamInfo(type=typing.Optional[str], description='The SQL query to - be executed to read from the BigQuery table.', original_name='query'), - 'row_restriction': ParamInfo(type=typing.Optional[str]...} + >>> inspect.signature(provider.BigqueryStorageRead) + (query: 'typing.Union[str, NoneType]: The SQL query to be executed to...', + row_restriction: 'typing.Union[str, NoneType]: Read only rows that match...', + selected_fields: 'typing.Union[typing.Sequence[str], NoneType]: Read ...', + table_spec: 'typing.Union[str, NoneType]: The fully-qualified name of ...') The retrieved external transform can be used as a normal PTransform like so:: @@ -213,14 +232,19 @@ def _create_wrappers(self): skipped_urns.append(identifier) continue - self._transforms[identifier] = type( - name, (ExternalTransform, ), + transform = type( + name, + (ExternalTransform, ), dict( identifier=identifier, default_expansion_service=service, schematransform=config, - description=config.description, + # configuration_schema is used by the auto-wrapper generator configuration_schema=get_config_with_descriptions(config))) + transform.__doc__ = config.description + transform.__signature__ = _generate_signature(config) + + self._transforms[identifier] = transform self._name_to_urn[name] = identifier if skipped_urns: diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py index 95720cee7eee..df30611598fe 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py @@ -15,6 +15,7 @@ # limitations under the License. # import importlib +import inspect import logging import os import secrets @@ -92,7 +93,7 @@ def test_infer_name_from_identifier(self): "EXPANSION_JARS environment var is not provided, " "indicating that jars have not been built") class ExternalTransformProviderIT(unittest.TestCase): - def test_generate_sequence_config_schema_and_description(self): + def test_generate_sequence_signature_and_doc(self): provider = ExternalTransformProvider( BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) @@ -102,14 +103,14 @@ def test_generate_sequence_config_schema_and_description(self): ) in provider.get_available()) GenerateSequence = provider.get('GenerateSequence') - config_schema = GenerateSequence.configuration_schema + signature = inspect.signature(GenerateSequence) for param in ['start', 'end', 'rate']: - self.assertTrue(param in config_schema) + self.assertTrue(param in signature.parameters.keys()) - description_substring = ( + doc_substring = ( "Outputs a PCollection of Beam Rows, each " "containing a single INT64") - self.assertTrue(description_substring in GenerateSequence.description) + self.assertTrue(doc_substring in inspect.getdoc(GenerateSequence)) def test_run_generate_sequence(self): provider = ExternalTransformProvider( diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py index ea4f496c2d04..22adf0cd354b 100644 --- a/sdks/python/gen_xlang_wrappers.py +++ b/sdks/python/gen_xlang_wrappers.py @@ -22,6 +22,7 @@ import argparse import datetime +import inspect import logging import os import shutil @@ -168,7 +169,7 @@ class name. This can be overriden by manually providing a name. 'destinations': transform_destinations, 'default_service': target, 'fields': fields, - 'description': wrapper.description + 'description': inspect.getdoc(wrapper) } transform_list.append(transform)