Skip to content

Commit

Permalink
added a simple pub/sub topic for testing (#27610)
Browse files Browse the repository at this point in the history
* added a simple pub/sub topic for testing

* update the code based on comments

---------

Co-authored-by: xqhu <[email protected]>
  • Loading branch information
liferoad and liferoad authored Jul 25, 2023
1 parent 1273d22 commit 891349c
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
37 changes: 37 additions & 0 deletions .test-infra/pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Overview

This folder contains Python scripts to create a Pub/Sub topic under
the GCP project `apache-beam-testing` and test the topic.
The created topic is `projects/apache-beam-testing/topics/Imagenet_openimage_50k_benchmark`.

# Create the topic `Imagenet_openimage_50k_benchmark`

- Create one VM to run `gcs_image_looper.py`.
The VM `pubsub-test-do-not-delete` was already created under `apache-beam-testing`.
Keep the script running to continuously publish data.
- You might run `gcloud auth application-default login` to get the auth.
- You might run `pip install google-cloud-core google-cloud-pubsub google-cloud-storage`.
- Must make `Imagenet_openimage_50k_benchmark` public by adding `allAuthenticatedUsers` to the Pub/Sub Subscriber role.

# Tes the topic by subscribing it

- Run `test_image_looper.py` to check whether you could get any data.
69 changes: 69 additions & 0 deletions .test-infra/pubsub/gcs_image_looper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This executable loops image filepaths from a gcs bucket file."""
import random
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud import storage

# use the eou project and gcs to run the word looper
project_id = "apache-beam-testing"
gcs_bucket = "apache-beam-ml"
num_images_per_second = 5

publisher = pubsub_v1.PublisherClient()
image_file_path = "testing/inputs/openimage_50k_benchmark.txt"
topic_name = "Imagenet_openimage_50k_benchmark"
topic_path = publisher.topic_path(project_id, topic_name)


class ImageLooper(object):
"""Loop the images in a gcs bucket file and publish them to a pubsub topic.
"""
content = ""
cursor = 0

def __init__(self, filename):
self._read_gcs_file(filename)

def get_next_image(self):
"""Returns the next image randomly."""
next_image = ""
while not next_image:
image_id = random.randint(0, len(self.content) - 1)
next_image = self.content[image_id]
return next_image

def _read_gcs_file(self, filename):
client = storage.Client()
bucket = client.get_bucket(gcs_bucket)
blob = bucket.get_blob(filename)
self.content = blob.download_as_string().decode("utf-8").split('\n')


try:
publisher.create_topic(request={"name": topic_path})
except AlreadyExists:
pass

looper = ImageLooper(image_file_path)
while True:
image = looper.get_next_image()
publisher.publish(topic_path, data=image.encode("utf-8"))
time.sleep(1 / num_images_per_second)
72 changes: 72 additions & 0 deletions .test-infra/pubsub/test_image_looper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This executable test the pub/sub topic created by gcs_image_looper.py"""

from concurrent.futures import TimeoutError

from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists

project_id = "apache-beam-testing"
subscription_id = "test-image-looper"
topic_id = "Imagenet_openimage_50k_benchmark"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

try:
subscription = subscriber.create_subscription(request={
"name": subscription_path,
"topic": topic_path
})
print(f"Subscription created: {subscription}")
except AlreadyExists:
subscriber.delete_subscription(request={"subscription": subscription_path})
subscription = subscriber.create_subscription(request={
"name": subscription_path,
"topic": topic_path
})
print(f"Subscription recreated: {subscription}")

timeout = 3.0

total_images = []


def callback(message: pubsub_v1.subscriber.message.Message) -> None:
total_images.append(message.data.decode())
message.ack()


streaming_pull_future = subscriber.subscribe(subscription_path,
callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
print("Results: \n", total_images)

subscriber.delete_subscription(request={"subscription": subscription_path})

print(f"Subscription deleted: {subscription_path}.")

0 comments on commit 891349c

Please sign in to comment.