The AWS SQS Source is a custom user-defined source for Numaflow that enables the integration of Amazon Simple Queue Service (SQS) as a source within your Numaflow pipelines.
- Quick Start
- Using AWS SQS Source in Your Numaflow Pipeline
- JSON Configuration
- Environment Variables Configuration
- Debugging AWS SQS Source
This quick start guide will walk you through setting up an AWS SQS source in a Numaflow pipeline.
- Install Numaflow on your Kubernetes cluster
- AWS CLI configured with access to AWS SQS
Using AWS CLI or the AWS Management Console, create a new SQS queue.
Save the following Kubernetes manifest to a file (e.g., sqs-source-pipeline.yaml
), modifying the AWS region, queue name, and AWS credentials accordingly:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sqs-source-pipeline
spec:
vertices:
- name: sqs-source
source:
udsource:
container:
image: your-repo/aws-sqs-source-go:v1.0.0
env:
- name: AWS_REGION
value: "us-east-1"
- name: AWS_QUEUE
value: "your-sqs-queue-name"
- name: AWS_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: accessKeyId
- name: AWS_SECRET
valueFrom:
secretKeyRef:
name: aws-credentials
key: secretAccessKey
- name: log-sink
sink:
log: {}
edges:
- from: sqs-source
to: log-sink
Then apply it to your cluster:
kubectl apply -f sqs-source-pipeline.yaml
Check if the pipeline is running:
kubectl get pipeline sqs-source-pipeline
Using the AWS CLI, send a message to your SQS queue:
aws sqs send-message --queue-url <YourQueueUrl> --message-body "Hello from SQS"
Check the logs of the log-sink
vertex to see if it received the message from SQS:
kubectl logs <log-sink-pod-name>
You should see output similar to:
Message received: Hello from SQS
To delete the Numaflow pipeline:
kubectl delete pipeline sqs-source-pipeline
To delete the SQS queue:
aws sqs delete-queue --queue-url <YourQueueUrl>
Congratulations! You have successfully set up an AWS SQS source in a Numaflow pipeline.
Prerequisites:
- Ensure Numaflow is installed on your Kubernetes cluster.
- Your AWS CLI should be configured to access AWS SQS.
Step-by-step Guide:
-
Create an AWS SQS Queue:
- Use the AWS CLI or log in to the AWS Management Console to create a new SQS queue.
-
Deploy a Numaflow Pipeline with AWS SQS Source:
-
Prepare a Kubernetes manifest file, let’s say
sqs-source-pipeline.yaml
. Here's a template to get you started:apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: name: sqs-source-pipeline spec: vertices: - name: sqs-source source: udsource: container: image: your-repo/aws-sqs-source-go:v1.0.0 env: - name: AWS_REGION value: "us-east-1" - name: AWS_QUEUE value: "your-sqs-queue-name" - name: AWS_ACCESS_KEY valueFrom: secretKeyRef: name: aws-credentials key: accessKeyId - name: AWS_SECRET valueFrom: secretKeyRef: name: aws-credentials key: secretAccessKey - name: log-sink sink: log: {} edges: - from: sqs-source to: log-sink
-
Modify the image path, AWS region, queue name, and AWS credentials as per your configuration.
-
Apply the configuration to your cluster by running:
kubectl apply -f sqs-source-pipeline.yaml
-
By following these steps, you'll have an AWS SQS queue as a source for your Numaflow pipeline running in Kubernetes. Messages from the SQS queue will be fetched by the pipeline and passed to the log sink, where they can be processed as per your pipeline logic.
While YAML is the default configuration format, the AWS SQS source also supports JSON. See the section Using JSON format to specify the AWS SQS source configuration for examples and guidance.
To configure the AWS SQS source in your Numaflow pipeline using environment variables, you can follow this process:
Using Environment Variables for AWS SQS Source Configuration:
-
Set up Environment Variables:
- Define environment variables on your Kubernetes deployment to hold the necessary AWS SQS configuration such as the region, queue name, and credentials.
- For example, you can set
AWS_REGION
,AWS_QUEUE
,AWS_ACCESS_KEY
, andAWS_SECRET
as environment variables in your pipeline deployment.
-
Modify Your Pipeline Configuration:
- In your Numaflow pipeline definition (
sqs-source-pipeline.yaml
), you will reference these environment variables. Ensure your source container in the pipeline specification is configured to use these variables.
- In your Numaflow pipeline definition (
Here's an example snippet for your Kubernetes manifest file:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sqs-source-pipeline
spec:
vertices:
- name: sqs-source
source:
udsource:
container:
image: your-repo/aws-sqs-source-go:v1.0.0
env:
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: aws-env-variables
key: region
- name: AWS_QUEUE
valueFrom:
secretKeyRef:
name: aws-env-variables
key: queueName
- name: AWS_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-env-variables
key: accessKeyID
- name: AWS_SECRET
valueFrom:
secretKeyRef:
name: aws-env-variables
key: secretAccessKey
- Deploy the Configuration:
-
Apply the updated pipeline configuration to your Kubernetes cluster using the command:
kubectl apply -f sqs-source-pipeline.yaml
-
For enhanced security, configure your Kubernetes pods to assume an IAM role using AWS Security Token Service (STS). This avoids the need to store or manage static AWS credentials.
- Set up an IAM role with the necessary SQS permissions and a trust relationship to your Kubernetes service account.
- Use the AWS IAM Roles for Service Accounts (IRSA) feature if you are on EKS to bind the IAM role to a service account used by your pods.
- The AWS SDK within your application will automatically use these credentials when accessing AWS SQS.
apiVersion: v1
kind: ServiceAccount
metadata:
name: sqs-source-sa
annotations:
eks.amazonaws.com/role-arn: "arn:aws:iam::YOUR_ACCOUNT_ID:role/YOUR_IAM_ROLE"
---
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sqs-source-pipeline
spec:
vertices:
- name: sqs-source
source:
udsource:
container:
image: your-repo/aws-sqs-source-go:v1.0.0
serviceAccountName: sqs-source-sa
This configuration allows specific AWS roles or accounts to access the SQS queue without embedding credentials in the application:
- Create a resource-based policy in AWS SQS that grants the necessary permissions to the IAM role used by your Kubernetes application.
- Ensure that the policy includes actions such as
sqs:SendMessage
andsqs:ReceiveMessage
.
By configuring your pipeline in this manner, it allows the AWS SQS source to dynamically retrieve the configuration from the environment variables, making it easier to manage credentials and configuration changes.
To enable debugging, set the DEBUG
environment variable to true
within the AWS SQS source container. This will output additional log messages for troubleshooting.
See - Debugging
For more information on Numaflow and how to use it to process data in a Kubernetes-native way, visit the Numaflow Documentation. For AWS SQS specific configuration, refer to the AWS SQS Documentation.