Skip to content

Commit

Permalink
Add Lineage metrics to Python PubsubIO, BigtableIO, FileIO (#32430)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored Sep 13, 2024
1 parent 98410ca commit 9a524cc
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 2 deletions.
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,11 @@ def delete(self, paths):
}
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, get_account=True)
except ValueError:
# report lineage is fail-safe
return
lineage.add('s3', *components)
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,11 @@ def delete(self, paths):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(path, get_account=True)
except ValueError:
# report lineage is fail-safe
return
lineage.add('abs', *components)
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

src_files.append(src)
dst_files.append(dst)
FileSystems.report_sink_lineage(dst)
return src_files, dst_files, delete_files, num_skipped

@check_accessible(['file_path_prefix'])
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _get_concat_source(self) -> concat_source.ConcatSource:
min_bundle_size=self._min_bundle_size,
splittable=splittable)
single_file_sources.append(single_file_source)
FileSystems.report_source_lineage(file_name)
self._concat_source = concat_source.ConcatSource(single_file_sources)
return self._concat_source

Expand Down Expand Up @@ -351,6 +352,7 @@ def process(self, element: Union[str, FileMetadata], *args,
match_results = FileSystems.match([element])
metadata_list = match_results[0].metadata_list
for metadata in metadata_list:
FileSystems.report_source_lineage(metadata.path)
splittable = (
self._splittable and _determine_splittability_from_compression_type(
metadata.path, self._compression_type))
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,3 +933,11 @@ def delete(self, paths):
``BeamIOError``: if any of the delete operations fail
"""
raise NotImplementedError

def report_lineage(self, path, unused_lineage):
"""
Report Lineage metrics for path.
Unless override by FileSystem implementations, default to no-op.
"""
pass
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileSystem
from apache_beam.metrics.metric import Lineage
from apache_beam.options.value_provider import RuntimeValueProvider

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -388,3 +389,15 @@ def get_chunk_size(path):
"""
filesystem = FileSystems.get_filesystem(path)
return filesystem.CHUNK_SIZE

@staticmethod
def report_source_lineage(path):
"""Report source :class:`~apache_beam.metrics.metric.Lineage`."""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sources())

@staticmethod
def report_sink_lineage(path):
"""Report sink :class:`~apache_beam.metrics.metric.Lineage`."""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sinks())
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import Metrics
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.metric import Lineage
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.external import BeamJarExpansionService
Expand Down Expand Up @@ -162,6 +163,12 @@ def finish_bundle(self):
if self.batcher:
self.batcher.close()
self.batcher = None
# Report Lineage metrics on write
Lineage.sinks().add(
'bigtable',
self.beam_options['project_id'],
self.beam_options['instance_id'],
self.beam_options['table_id'])

def display_data(self):
return {
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metric import Lineage
from apache_beam.testing.test_pipeline import TestPipeline

_LOGGER = logging.getLogger(__name__)

# Protect against environments where bigtable library is not available.
try:
from google.cloud.bigtable import client
from google.cloud.bigtable.batcher import MutationsBatcher
from google.cloud.bigtable.row_filters import TimestampRange
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.row import DirectRow, PartialRowData, Cell
Expand Down Expand Up @@ -266,6 +269,18 @@ def setUp(self):
instance = Instance(self._INSTANCE_ID, client)
self.table = Table(self._TABLE_ID, instance)

def test_write(self):
direct_rows = [self.generate_row(i) for i in range(5)]
with patch.object(MutationsBatcher, 'mutate'), \
patch.object(MutationsBatcher, 'close'), TestPipeline() as p:
_ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable(
self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set([
f"bigtable:{self._PROJECT_ID}.{self._INSTANCE_ID}.{self._TABLE_ID}"
]))

def test_write_metrics(self):
MetricsEnvironment.process_wide_container().reset()
write_fn = bigtableio._BigTableWriteFn(
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,11 @@ def delete(self, paths):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
try:
bucket, blob = gcsio.parse_gcs_path(path)
except ValueError:
# report lineage is fail-safe
return
lineage.add('gcs', bucket, blob)
67 changes: 65 additions & 2 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@
from apache_beam.io import iobase
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.metrics.metric import Lineage
from apache_beam.transforms import DoFn
from apache_beam.transforms import Flatten
from apache_beam.transforms import Map
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.annotations import deprecated
Expand Down Expand Up @@ -257,7 +260,16 @@ def __init__(
def expand(self, pvalue):
# TODO(BEAM-27443): Apply a proper transform rather than Read.
pcoll = pvalue.pipeline | Read(self._source)
# explicit element_type required after native read, otherwise coder error
pcoll.element_type = bytes
return self.expand_continued(pcoll)

def expand_continued(self, pcoll):
pcoll = pcoll | ParDo(
_AddMetricsPassThrough(
project=self._source.project,
topic=self._source.topic_name,
sub=self._source.subscription_name)).with_output_types(bytes)
if self.with_attributes:
pcoll = pcoll | Map(PubsubMessage._from_proto_str)
pcoll.element_type = PubsubMessage
Expand All @@ -269,6 +281,31 @@ def to_runner_api_parameter(self, context):
return self.to_runner_api_pickled(context)


class _AddMetricsPassThrough(DoFn):
def __init__(self, project, topic=None, sub=None):
self.project = project
self.topic = topic
self.sub = sub
self.reported_lineage = False

def setup(self):
self.reported_lineage = False

def process(self, element: bytes):
self.report_lineage_once()
yield element

def report_lineage_once(self):
if not self.reported_lineage:
self.reported_lineage = True
if self.topic is not None:
Lineage.sources().add(
'pubsub', self.project, self.topic, subtype='topic')
elif self.sub is not None:
Lineage.sources().add(
'pubsub', self.project, self.sub, subtype='subscription')


@deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.')
def ReadStringsFromPubSub(topic=None, subscription=None, id_label=None):
return _ReadStringsFromPubSub(topic, subscription, id_label)
Expand Down Expand Up @@ -314,6 +351,26 @@ def expand(self, pcoll):
return pcoll | WriteToPubSub(self.topic)


class _AddMetricsAndMap(DoFn):
def __init__(self, fn, project, topic=None):
self.project = project
self.topic = topic
self.fn = fn
self.reported_lineage = False

def setup(self):
self.reported_lineage = False

def process(self, element):
self.report_lineage_once()
yield self.fn(element)

def report_lineage_once(self):
if not self.reported_lineage:
self.reported_lineage = True
Lineage.sinks().add('pubsub', self.project, self.topic, subtype='topic')


class WriteToPubSub(PTransform):
"""A ``PTransform`` for writing messages to Cloud Pub/Sub."""

Expand Down Expand Up @@ -364,9 +421,15 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes:

def expand(self, pcoll):
if self.with_attributes:
pcoll = pcoll | 'ToProtobufX' >> Map(self.message_to_proto_str)
pcoll = pcoll | 'ToProtobufX' >> ParDo(
_AddMetricsAndMap(
self.message_to_proto_str, self.project,
self.topic_name)).with_input_types(PubsubMessage)
else:
pcoll = pcoll | 'ToProtobufY' >> Map(self.bytes_to_proto_str)
pcoll = pcoll | 'ToProtobufY' >> ParDo(
_AddMetricsAndMap(
self.bytes_to_proto_str, self.project,
self.topic_name)).with_input_types(Union[bytes, str])
pcoll.element_type = bytes
return pcoll | Write(self._sink)

Expand Down
57 changes: 57 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apache_beam.io.gcp.pubsub import WriteToPubSub
from apache_beam.io.gcp.pubsub import _PubSubSink
from apache_beam.io.gcp.pubsub import _PubSubSource
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.portability import common_urns
Expand Down Expand Up @@ -819,6 +820,30 @@ def test_runner_api_transformation_with_subscription(
'projects/fakeprj/subscriptions/a_subscription',
transform_from_proto.source.full_subscription)

def test_read_from_pubsub_no_overwrite(self, unused_mock):
expected_elements = [
TestWindowedValue(
b'apache',
timestamp.Timestamp(1520861826.234567), [window.GlobalWindow()]),
TestWindowedValue(
b'beam',
timestamp.Timestamp(1520861824.234567), [window.GlobalWindow()])
]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
for test_case in ('topic', 'subscription'):
with TestPipeline(options=options) as p:
# Direct runner currently overwrites the whole ReadFromPubSub transform.
# This test part of composite transform without overwrite.
pcoll = p | beam.Create([b'apache', b'beam']) | beam.Map(
lambda x: window.TimestampedValue(x, 1520861820.234567 + len(x)))
args = {test_case: f'projects/fakeprj/{test_case}s/topic_or_sub'}
pcoll = ReadFromPubSub(**args).expand_continued(pcoll)
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set([f"pubsub:{test_case}:fakeprj.topic_or_sub"]))


@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('google.cloud.pubsub.PublisherClient')
Expand Down Expand Up @@ -974,6 +999,38 @@ def test_runner_api_transformation_properties_none(self, unused_mock_pubsub):
self.assertIsNone(transform_from_proto.sink.id_label)
self.assertIsNone(transform_from_proto.sink.timestamp_attribute)

def test_write_to_pubsub_no_overwrite(self, unused_mock):
data = 'data'
payloads = [data]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = p | Create(payloads)
WriteToPubSub(
'projects/fakeprj/topics/a_topic',
with_attributes=False).expand(pcoll)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))

def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = p | Create(payloads)
# Avoid direct runner overwrites WriteToPubSub
WriteToPubSub(
'projects/fakeprj/topics/a_topic',
with_attributes=True).expand(pcoll)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit 9a524cc

Please sign in to comment.