-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add portable Mqtt source and sink transforms #32385
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
CC: @chamikaramj You can test this out locally with the following code. @gdiazdelrio can you check this PR out and give it a shot? Let me know if something doesn't work import apache_beam as beam
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.transforms.external import BeamJarExpansionService
provider = ExternalTransformProvider(BeamJarExpansionService("sdks:java:io:expansion-service:shadowJar"))
MqttRead = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_read:v1")
MqttWrite = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_write:v1")
with beam.Pipeline() as p:
connection_configuration = {
"server_uri": "tcp://localhost:58494",
"topic": "WRITE_TOPIC",
"client_id": "READ_PIPELINE"
}
# read
p | MqttRead(connection_configuration, max_read_time_seconds=10) | beam.Map(print)
# write
# p | beam.Create([beam.Row(bytes=bytes([1, 2, 3, 4, 5]))]) | MqttWrite(
# connection_configuration=connection_configuration) |
Hi, what is the status of this PR? @ahmedabu98 @chamikaramj |
@ahmedabu98 I’ve only worked with the Java SDK so far, so this will be my first time dealing with portable development. It might take me some time, but I’ll do my best to work through it. If I have any questions along the way, would it be alright to leave a comment here on the PR? |
Of course! You'd be doing me a favor :) Let me know how I can help To start, you can git checkout this PR and first run |
@ahmedabu98 I'll add feature for
and also Should I start by creating my own branch and cherry-picking your commit? |
Fixes #21060