diff --git a/CHANGELOG.md b/CHANGELOG.md index c9a150375..3a312d891 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.9.3] +### Fixed +- Reduced `start-execution-worker` concurrency to address AWS Batch `Too Many Requests` errors. Fixes [#1676](https://github.com/ASFHyP3/hyp3/issues/1676). + ## [3.9.2] ### Fixed - Reverted `asf_search` to v6.0.2. Fixes [#1673](https://github.com/ASFHyP3/hyp3/issues/1673). diff --git a/apps/start-execution-manager/src/start_execution_manager.py b/apps/start-execution-manager/src/start_execution_manager.py index 143810b2f..ef13b6f8f 100644 --- a/apps/start-execution-manager/src/start_execution_manager.py +++ b/apps/start-execution-manager/src/start_execution_manager.py @@ -28,7 +28,7 @@ def lambda_handler(event, context) -> None: pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=900) logger.info(f'Got {len(pending_jobs)} pending jobs') - batch_size = 300 + batch_size = 450 for i in range(0, len(pending_jobs), batch_size): jobs = pending_jobs[i:i + batch_size] logger.info(f'Invoking worker for {len(jobs)} jobs') diff --git a/tests/test_start_execution_manager.py b/tests/test_start_execution_manager.py index b61d10ebc..ed6ddadff 100644 --- a/tests/test_start_execution_manager.py +++ b/tests/test_start_execution_manager.py @@ -67,17 +67,16 @@ def test_lambda_handler_900_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=900) assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs[0:300]), - call('test-worker-function-arn', mock_jobs[300:600]), - call('test-worker-function-arn', mock_jobs[600:900]), + call('test-worker-function-arn', mock_jobs[0:450]), + call('test-worker-function-arn', mock_jobs[450:900]), ] -def test_lambda_handler_400_jobs(): +def test_lambda_handler_500_jobs(): with patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, \ patch('start_execution_manager.invoke_worker') as mock_invoke_worker, \ patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True): - mock_jobs = list(range(400)) + mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs mock_invoke_worker.return_value = {'StatusCode': None} @@ -87,8 +86,8 @@ def test_lambda_handler_400_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=900) assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs[0:300]), - call('test-worker-function-arn', mock_jobs[300:400]), + call('test-worker-function-arn', mock_jobs[0:450]), + call('test-worker-function-arn', mock_jobs[450:500]), ]