Skip to content

Commit

Permalink
lambda_event - add support for maximum_batching_window_in_seconds (an…
Browse files Browse the repository at this point in the history
…sible-collections#2025)

lambda_event - add support for maximum_batching_window_in_seconds

SUMMARY

Closes ansible-collections#1995
Fix when batch_size is greater than 10, by enabling support for setting maximum_batching_window_in_seconds.

ISSUE TYPE


Feature Pull Request

COMPONENT NAME

lambda_event

Reviewed-by: Mandar Kulkarni <[email protected]>
Reviewed-by: Bikouo Aubin
Reviewed-by: Alina Buzachis
Reviewed-by: Mark Chappell
Reviewed-by: Helen Bailey <[email protected]>
  • Loading branch information
alinabuzachis authored May 14, 2024
1 parent f7c110a commit b18ceba
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 15 deletions.
11 changes: 11 additions & 0 deletions changelogs/fragments/20240325-lambda_event-bugfix.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
breaking_changes:
- lambda_event - |
``batch_size`` no longer defaults to 100. According to the boto3 API
(https://boto3.amazonaws.com/v1/documentation/api/1.26.78/reference/services/lambda.html#Lambda.Client.create_event_source_mapping), ``batch_size`` defaults to 10 for sqs sources and to 100
for stream sources (https://github.com/ansible-collections/amazon.aws/pull/2025).

bugfixes:
- lambda_event - Fix when ``batch_size`` is greater than 10, by enabling support for setting ``maximum_batching_window_in_seconds`` (https://github.com/ansible-collections/amazon.aws/pull/2025).

minor_changes:
- lambda_event - Add support for setting the ``maximum_batching_window_in_seconds`` option (https://github.com/ansible-collections/amazon.aws/pull/2025).
83 changes: 70 additions & 13 deletions plugins/modules/lambda_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,28 @@
type: str
source_params:
description:
- Sub-parameters required for event source.
- Sub-parameters required for event source.
suboptions:
source_arn:
description:
- The Amazon Resource Name (ARN) of the SQS queue, Kinesis stream or DynamoDB stream that is the event source.
- The Amazon Resource Name (ARN) of the SQS queue, Kinesis stream or DynamoDB stream that is the event source.
type: str
required: true
enabled:
description:
- Indicates whether AWS Lambda should begin polling or readin from the event source.
- Indicates whether AWS Lambda should begin polling or readin from the event source.
default: true
type: bool
batch_size:
description:
- The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
default: 100
- The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
- Amazon Kinesis - Default V(100). Max V(10000).
- Amazon DynamoDB Streams - Default V(100). Max V(10000).
- Amazon Simple Queue Service - Default V(10). For standard queues the max is V(10000). For FIFO queues the max is V(10).
- Amazon Managed Streaming for Apache Kafka - Default V(100). Max V(10000).
- Self-managed Apache Kafka - Default C(100). Max V(10000).
- Amazon MQ (ActiveMQ and RabbitMQ) - Default V(100). Max V(10000).
- DocumentDB - Default V(100). Max V(10000).
type: int
starting_position:
description:
Expand All @@ -84,6 +90,15 @@
elements: str
choices: [ReportBatchItemFailures]
version_added: 5.5.0
maximum_batching_window_in_seconds:
description:
- The maximum amount of time, in seconds, that Lambda spends gathering records before invoking the function.
- You can configure O(source_params.maximum_batching_window_in_seconds) to any value from V(0) seconds to V(300) seconds in increments of seconds.
- For streams and Amazon SQS event sources, when O(source_params.batch_size) is set to a value greater than V(10),
O(source_params.maximum_batching_window_in_seconds) defaults to V(1).
- O(source_params.maximum_batching_window_in_seconds) is not supported by FIFO queues.
type: int
version_added: 8.0.0
required: true
type: dict
extends_documentation_fragment:
Expand Down Expand Up @@ -135,6 +150,7 @@
type: list
"""

import copy
import re

try:
Expand Down Expand Up @@ -228,6 +244,37 @@ def get_qualifier(module):
# ---------------------------------------------------------------------------------------------------


def set_default_values(module, source_params):
_source_params_cpy = copy.deepcopy(source_params)

if module.params["event_source"].lower() == "sqs":
# Default 10. For standard queues the max is 10,000. For FIFO queues the max is 10.
_source_params_cpy.setdefault("batch_size", 10)

if source_params["source_arn"].endswith(".fifo"):
if _source_params_cpy["batch_size"] > 10:
module.fail_json(msg="For FIFO queues the maximum batch_size is 10.")
if _source_params_cpy.get("maximum_batching_window_in_seconds"):
module.fail_json(
msg="maximum_batching_window_in_seconds is not supported by Amazon SQS FIFO event sources."
)
else:
if _source_params_cpy["batch_size"] >= 10000:
module.fail_json(msg="For standard queue batch_size must be between lower than 10000.")

elif module.params["event_source"].lower() == "stream":
# Default 100.
_source_params_cpy.setdefault("batch_size", 100)

if not (100 <= _source_params_cpy["batch_size"] <= 10000):
module.fail_json(msg="batch_size for streams must be between 100 and 10000")

if _source_params_cpy["batch_size"] > 10 and not _source_params_cpy.get("maximum_batching_window_in_seconds"):
_source_params_cpy["maximum_batching_window_in_seconds"] = 1

return _source_params_cpy


def lambda_event_stream(module, client):
"""
Adds, updates or deletes lambda stream (DynamoDb, Kinesis) event notifications.
Expand All @@ -252,13 +299,8 @@ def lambda_event_stream(module, client):
else:
module.fail_json(msg="Source parameter 'source_arn' is required for stream event notification.")

# check if optional sub-parameters are valid, if present
batch_size = source_params.get("batch_size")
if batch_size:
try:
source_params["batch_size"] = int(batch_size)
except ValueError:
module.fail_json(msg=f"Source parameter 'batch_size' must be an integer, found: {batch_size}")
if state == "present":
source_params = set_default_values(module, source_params)

# optional boolean value needs special treatment as not present does not imply False
source_param_enabled = module.boolean(source_params.get("enabled", "True"))
Expand All @@ -285,6 +327,10 @@ def lambda_event_stream(module, client):
api_params.update(Enabled=source_param_enabled)
if source_params.get("batch_size"):
api_params.update(BatchSize=source_params.get("batch_size"))
if source_params.get("maximum_batching_window_in_seconds"):
api_params.update(
MaximumBatchingWindowInSeconds=source_params.get("maximum_batching_window_in_seconds")
)
if source_params.get("function_response_types"):
api_params.update(FunctionResponseTypes=source_params.get("function_response_types"))

Expand Down Expand Up @@ -347,7 +393,18 @@ def main():
state=dict(required=False, default="present", choices=["present", "absent"]),
lambda_function_arn=dict(required=True, aliases=["function_name", "function_arn"]),
event_source=dict(required=False, default="stream", choices=source_choices),
source_params=dict(type="dict", required=True),
source_params=dict(
type="dict",
required=True,
options=dict(
source_arn=dict(type="str", required=True),
enabled=dict(type="bool", default=True),
batch_size=dict(type="int"),
starting_position=dict(type="str", choices=["TRIM_HORIZON", "LATEST"]),
function_response_types=dict(type="list", elements="str", choices=["ReportBatchItemFailures"]),
maximum_batching_window_in_seconds=dict(type="int"),
),
),
alias=dict(required=False, default=None),
version=dict(type="int", required=False, default=0),
)
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/targets/lambda_event/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
secret_key: "{{ aws_secret_key }}"
session_token: "{{ security_token | default(omit) }}"
region: "{{ aws_region }}"
collections:
- community.general
block:
- name: Create test resources setup
ansible.builtin.import_tasks: setup.yml

- name: Create DynamoDB stream event mapping (trigger) - check_mode
amazon.aws.lambda_event:
state: present
Expand Down
Loading

0 comments on commit b18ceba

Please sign in to comment.