diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index 636b0a12f3e2..e181beac4a58 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -314,3 +314,11 @@ def delete(self, paths): } if exceptions: raise BeamIOError("Delete operation failed", exceptions) + + def report_lineage(self, path, lineage): + try: + components = s3io.parse_s3_path(path, get_account=True) + except ValueError: + # report lineage is fail-safe + return + lineage.add('s3', *components) diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index c446c17247d7..bb56fa09d370 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -316,3 +316,11 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) + + def report_lineage(self, path, lineage): + try: + components = blobstorageio.parse_azfs_path(path, get_account=True) + except ValueError: + # report lineage is fail-safe + return + lineage.add('abs', *components) diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index 816de9d33a05..c708e117c3a1 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -280,6 +280,7 @@ def _check_state_for_finalize_write(self, writer_results, num_shards): src_files.append(src) dst_files.append(dst) + FileSystems.report_sink_lineage(dst) return src_files, dst_files, delete_files, num_skipped @check_accessible(['file_path_prefix']) diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 91763ced6e69..efd863810ed7 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -168,6 +168,7 @@ def _get_concat_source(self) -> concat_source.ConcatSource: min_bundle_size=self._min_bundle_size, splittable=splittable) single_file_sources.append(single_file_source) + FileSystems.report_source_lineage(file_name) self._concat_source = concat_source.ConcatSource(single_file_sources) return self._concat_source @@ -351,6 +352,7 @@ def process(self, element: Union[str, FileMetadata], *args, match_results = FileSystems.match([element]) metadata_list = match_results[0].metadata_list for metadata in metadata_list: + FileSystems.report_source_lineage(metadata.path) splittable = ( self._splittable and _determine_splittability_from_compression_type( metadata.path, self._compression_type)) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 550079a482c4..bdc25dcf0fe5 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -933,3 +933,11 @@ def delete(self, paths): ``BeamIOError``: if any of the delete operations fail """ raise NotImplementedError + + def report_lineage(self, path, unused_lineage): + """ + Report Lineage metrics for path. + + Unless override by FileSystem implementations, default to no-op. + """ + pass diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index e7cdf3844979..ccbeac640765 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -26,6 +26,7 @@ from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileSystem +from apache_beam.metrics.metric import Lineage from apache_beam.options.value_provider import RuntimeValueProvider _LOGGER = logging.getLogger(__name__) @@ -388,3 +389,15 @@ def get_chunk_size(path): """ filesystem = FileSystems.get_filesystem(path) return filesystem.CHUNK_SIZE + + @staticmethod + def report_source_lineage(path): + """Report source :class:`~apache_beam.metrics.metric.Lineage`.""" + filesystem = FileSystems.get_filesystem(path) + filesystem.report_lineage(path, Lineage.sources()) + + @staticmethod + def report_sink_lineage(path): + """Report sink :class:`~apache_beam.metrics.metric.Lineage`.""" + filesystem = FileSystems.get_filesystem(path) + filesystem.report_lineage(path, Lineage.sinks()) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index 3f54e09ee3dd..ffb1852eb0f4 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -47,6 +47,7 @@ from apache_beam.io.gcp import resource_identifiers from apache_beam.metrics import Metrics from apache_beam.metrics import monitoring_infos +from apache_beam.metrics.metric import Lineage from apache_beam.transforms import PTransform from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.external import BeamJarExpansionService @@ -162,6 +163,12 @@ def finish_bundle(self): if self.batcher: self.batcher.close() self.batcher = None + # Report Lineage metrics on write + Lineage.sinks().add( + 'bigtable', + self.beam_options['project_id'], + self.beam_options['instance_id'], + self.beam_options['table_id']) def display_data(self): return { diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_test.py index f97c9bcfbd6a..130f9a714129 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_test.py @@ -35,12 +35,15 @@ from apache_beam.io.gcp import resource_identifiers from apache_beam.metrics import monitoring_infos from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.metric import Lineage +from apache_beam.testing.test_pipeline import TestPipeline _LOGGER = logging.getLogger(__name__) # Protect against environments where bigtable library is not available. try: from google.cloud.bigtable import client + from google.cloud.bigtable.batcher import MutationsBatcher from google.cloud.bigtable.row_filters import TimestampRange from google.cloud.bigtable.instance import Instance from google.cloud.bigtable.row import DirectRow, PartialRowData, Cell @@ -266,6 +269,18 @@ def setUp(self): instance = Instance(self._INSTANCE_ID, client) self.table = Table(self._TABLE_ID, instance) + def test_write(self): + direct_rows = [self.generate_row(i) for i in range(5)] + with patch.object(MutationsBatcher, 'mutate'), \ + patch.object(MutationsBatcher, 'close'), TestPipeline() as p: + _ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable( + self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID) + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SINK), + set([ + f"bigtable:{self._PROJECT_ID}.{self._INSTANCE_ID}.{self._TABLE_ID}" + ])) + def test_write_metrics(self): MetricsEnvironment.process_wide_container().reset() write_fn = bigtableio._BigTableWriteFn( diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 47d1997ddc7b..053b02d325a5 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -365,3 +365,11 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) + + def report_lineage(self, path, lineage): + try: + bucket, blob = gcsio.parse_gcs_path(path) + except ValueError: + # report lineage is fail-safe + return + lineage.add('gcs', bucket, blob) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 32e7fbe5ed58..b6f801c63f79 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -43,8 +43,11 @@ from apache_beam.io import iobase from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write +from apache_beam.metrics.metric import Lineage +from apache_beam.transforms import DoFn from apache_beam.transforms import Flatten from apache_beam.transforms import Map +from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.annotations import deprecated @@ -257,7 +260,16 @@ def __init__( def expand(self, pvalue): # TODO(BEAM-27443): Apply a proper transform rather than Read. pcoll = pvalue.pipeline | Read(self._source) + # explicit element_type required after native read, otherwise coder error pcoll.element_type = bytes + return self.expand_continued(pcoll) + + def expand_continued(self, pcoll): + pcoll = pcoll | ParDo( + _AddMetricsPassThrough( + project=self._source.project, + topic=self._source.topic_name, + sub=self._source.subscription_name)).with_output_types(bytes) if self.with_attributes: pcoll = pcoll | Map(PubsubMessage._from_proto_str) pcoll.element_type = PubsubMessage @@ -269,6 +281,31 @@ def to_runner_api_parameter(self, context): return self.to_runner_api_pickled(context) +class _AddMetricsPassThrough(DoFn): + def __init__(self, project, topic=None, sub=None): + self.project = project + self.topic = topic + self.sub = sub + self.reported_lineage = False + + def setup(self): + self.reported_lineage = False + + def process(self, element: bytes): + self.report_lineage_once() + yield element + + def report_lineage_once(self): + if not self.reported_lineage: + self.reported_lineage = True + if self.topic is not None: + Lineage.sources().add( + 'pubsub', self.project, self.topic, subtype='topic') + elif self.sub is not None: + Lineage.sources().add( + 'pubsub', self.project, self.sub, subtype='subscription') + + @deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.') def ReadStringsFromPubSub(topic=None, subscription=None, id_label=None): return _ReadStringsFromPubSub(topic, subscription, id_label) @@ -314,6 +351,26 @@ def expand(self, pcoll): return pcoll | WriteToPubSub(self.topic) +class _AddMetricsAndMap(DoFn): + def __init__(self, fn, project, topic=None): + self.project = project + self.topic = topic + self.fn = fn + self.reported_lineage = False + + def setup(self): + self.reported_lineage = False + + def process(self, element): + self.report_lineage_once() + yield self.fn(element) + + def report_lineage_once(self): + if not self.reported_lineage: + self.reported_lineage = True + Lineage.sinks().add('pubsub', self.project, self.topic, subtype='topic') + + class WriteToPubSub(PTransform): """A ``PTransform`` for writing messages to Cloud Pub/Sub.""" @@ -364,9 +421,15 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes: def expand(self, pcoll): if self.with_attributes: - pcoll = pcoll | 'ToProtobufX' >> Map(self.message_to_proto_str) + pcoll = pcoll | 'ToProtobufX' >> ParDo( + _AddMetricsAndMap( + self.message_to_proto_str, self.project, + self.topic_name)).with_input_types(PubsubMessage) else: - pcoll = pcoll | 'ToProtobufY' >> Map(self.bytes_to_proto_str) + pcoll = pcoll | 'ToProtobufY' >> ParDo( + _AddMetricsAndMap( + self.bytes_to_proto_str, self.project, + self.topic_name)).with_input_types(Union[bytes, str]) pcoll.element_type = bytes return pcoll | Write(self._sink) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index f704338626ee..2e3e9b301618 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -38,6 +38,7 @@ from apache_beam.io.gcp.pubsub import WriteToPubSub from apache_beam.io.gcp.pubsub import _PubSubSink from apache_beam.io.gcp.pubsub import _PubSubSource +from apache_beam.metrics.metric import Lineage from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.portability import common_urns @@ -819,6 +820,30 @@ def test_runner_api_transformation_with_subscription( 'projects/fakeprj/subscriptions/a_subscription', transform_from_proto.source.full_subscription) + def test_read_from_pubsub_no_overwrite(self, unused_mock): + expected_elements = [ + TestWindowedValue( + b'apache', + timestamp.Timestamp(1520861826.234567), [window.GlobalWindow()]), + TestWindowedValue( + b'beam', + timestamp.Timestamp(1520861824.234567), [window.GlobalWindow()]) + ] + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + for test_case in ('topic', 'subscription'): + with TestPipeline(options=options) as p: + # Direct runner currently overwrites the whole ReadFromPubSub transform. + # This test part of composite transform without overwrite. + pcoll = p | beam.Create([b'apache', b'beam']) | beam.Map( + lambda x: window.TimestampedValue(x, 1520861820.234567 + len(x))) + args = {test_case: f'projects/fakeprj/{test_case}s/topic_or_sub'} + pcoll = ReadFromPubSub(**args).expand_continued(pcoll) + assert_that(pcoll, equal_to(expected_elements), reify_windows=True) + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SOURCE), + set([f"pubsub:{test_case}:fakeprj.topic_or_sub"])) + @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') @mock.patch('google.cloud.pubsub.PublisherClient') @@ -974,6 +999,38 @@ def test_runner_api_transformation_properties_none(self, unused_mock_pubsub): self.assertIsNone(transform_from_proto.sink.id_label) self.assertIsNone(transform_from_proto.sink.timestamp_attribute) + def test_write_to_pubsub_no_overwrite(self, unused_mock): + data = 'data' + payloads = [data] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + pcoll = p | Create(payloads) + WriteToPubSub( + 'projects/fakeprj/topics/a_topic', + with_attributes=False).expand(pcoll) + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SINK), + set(["pubsub:topic:fakeprj.a_topic"])) + + def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): + data = b'data' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + pcoll = p | Create(payloads) + # Avoid direct runner overwrites WriteToPubSub + WriteToPubSub( + 'projects/fakeprj/topics/a_topic', + with_attributes=True).expand(pcoll) + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SINK), + set(["pubsub:topic:fakeprj.a_topic"])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)