From a8eb0942dda1f08c30e19b9ed410ff6260b9e450 Mon Sep 17 00:00:00 2001 From: Alex Anderson <191496+alxndrsn@users.noreply.github.com> Date: Fri, 20 Sep 2024 00:00:23 +0300 Subject: [PATCH] s3: add audit logging (#1192) * wip * lint * Remove start event, add ms duration to event details --------- Co-authored-by: alxndrsn Co-authored-by: Kathleen Tuite --- lib/model/query/blobs.js | 43 ++++++++++++++++++++++++------------- test/integration/task/s3.js | 13 ++++++++++- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/lib/model/query/blobs.js b/lib/model/query/blobs.js index 8a77080d8..6808cce58 100644 --- a/lib/model/query/blobs.js +++ b/lib/model/query/blobs.js @@ -51,15 +51,19 @@ const s3CountByStatus = (status) => ({ oneFirst }) => { } }; -const s3SetFailedToPending = () => ({ oneFirst }) => oneFirst(sql` - WITH updated AS ( - UPDATE blobs - SET s3_status='pending' - WHERE s3_status='failed' - RETURNING 1 - ) - SELECT COUNT(*) FROM updated -`); +const s3SetFailedToPending = () => async ({ oneFirst, Audits }) => { + const updated = await oneFirst(sql` + WITH updated AS ( + UPDATE blobs + SET s3_status='pending' + WHERE s3_status='failed' + RETURNING 1 + ) + SELECT COUNT(*) FROM updated + `); + await Audits.log(null, 'blobs.s3.failed-to-pending', null, { updated }); + return updated; +}; const _markAsFailed = ({ id }) => ({ run }) => run(sql` UPDATE blobs @@ -174,12 +178,21 @@ const uploadBlobIfAvailable = async container => { }; const s3UploadPending = (limit) => async (container) => { - if (limit) { - // eslint-disable-next-line no-await-in-loop, no-param-reassign, no-plusplus - while (await uploadBlobIfAvailable(container) && --limit); - } else { - // eslint-disable-next-line no-await-in-loop - while (await uploadBlobIfAvailable(container)); + let uploaded = 0; + let failed = 0; + const start = new Date(); + try { + while (await uploadBlobIfAvailable(container)) { // eslint-disable-line no-await-in-loop + ++uploaded; // eslint-disable-line no-plusplus + if (limit && !--limit) break; // eslint-disable-line no-param-reassign, no-plusplus + } + } catch (err) { + ++failed; // eslint-disable-line no-plusplus + throw err; + } finally { + const end = new Date(); + if (uploaded > 0 || failed > 0) + await container.Audits.log(null, 'blobs.s3.upload', null, { uploaded, failed, duration: end-start }); } }; diff --git a/test/integration/task/s3.js b/test/integration/task/s3.js index 233e57ffc..2e7b8789d 100644 --- a/test/integration/task/s3.js +++ b/test/integration/task/s3.js @@ -99,16 +99,18 @@ describe('task: s3', () => { // then (await getCount('pending')).should.equal(4); (await getCount('failed')).should.equal(0); + (await container.Audits.getLatestByAction('blobs.s3.failed-to-pending')).get().details.should.deepEqual({ updated: 3 }); })); }); describe('uploadPending()', () => { - it('should not do anything if nothing to upload', testTask(async () => { + it('should not do anything if nothing to upload', testTask(async (container) => { // when await uploadPending(); // then assertUploadCount(0); + (await container.Audits.getLatestByAction('blobs.s3.upload')).isDefined().should.equal(false); })); it('should upload pending blobs, and ignore others', testTask(async (container) => { @@ -125,6 +127,7 @@ describe('task: s3', () => { // then assertUploadCount(2); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 2, failed: 0 }); })); it('should return error if uploading fails', testTask(async (container) => { @@ -137,6 +140,7 @@ describe('task: s3', () => { // and assertUploadCount(0); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 0, failed: 1 }); })); it('should not allow failure to affect previous or future uploads', testTask(async (container) => { @@ -151,6 +155,7 @@ describe('task: s3', () => { // and assertUploadCount(2); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 2, failed: 1 }); // given @@ -161,6 +166,7 @@ describe('task: s3', () => { // then assertUploadCount(3); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 1, failed: 0 }); })); it('should not attempt to upload an in-progress blob', testTaskFullTrx(async (container) => { @@ -187,6 +193,7 @@ describe('task: s3', () => { // then global.s3.uploads.attempted.should.equal(0); global.s3.uploads.successful.should.equal(0); + (await container.Audits.getLatestByAction('blobs.s3.upload')).isDefined().should.equal(false); // when resume(); @@ -195,6 +202,7 @@ describe('task: s3', () => { // then global.s3.uploads.attempted.should.equal(1); global.s3.uploads.successful.should.equal(1); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 1, failed: 0 }); })); describe('with limit', () => { @@ -233,6 +241,7 @@ describe('task: s3', () => { // then consoleLog[0].should.deepEqual('Uploading 6 blobs...'); assertUploadCount(6); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 6, failed: 0 }); })); it('should not complain if blob count is less than limit', testTask(async (container) => { @@ -245,6 +254,7 @@ describe('task: s3', () => { // then consoleLog[0].should.deepEqual('Uploading 1 blobs...'); assertUploadCount(1); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 1, failed: 0 }); })); it('should upload all blobs if limit is zero', testTask(async (container) => { @@ -266,6 +276,7 @@ describe('task: s3', () => { // then consoleLog[0].should.deepEqual('Uploading 10 blobs...'); assertUploadCount(10); + (await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 10, failed: 0 }); })); }); });