Skip to content

Commit

Permalink
s3: add audit logging (#1192)
Browse files Browse the repository at this point in the history
* wip

* lint

* Remove start event, add ms duration to event details

---------

Co-authored-by: alxndrsn <alxndrsn>
Co-authored-by: Kathleen Tuite <[email protected]>
  • Loading branch information
alxndrsn and ktuite authored Sep 19, 2024
1 parent 242f613 commit a8eb094
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
43 changes: 28 additions & 15 deletions lib/model/query/blobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,19 @@ const s3CountByStatus = (status) => ({ oneFirst }) => {
}
};

const s3SetFailedToPending = () => ({ oneFirst }) => oneFirst(sql`
WITH updated AS (
UPDATE blobs
SET s3_status='pending'
WHERE s3_status='failed'
RETURNING 1
)
SELECT COUNT(*) FROM updated
`);
const s3SetFailedToPending = () => async ({ oneFirst, Audits }) => {
const updated = await oneFirst(sql`
WITH updated AS (
UPDATE blobs
SET s3_status='pending'
WHERE s3_status='failed'
RETURNING 1
)
SELECT COUNT(*) FROM updated
`);
await Audits.log(null, 'blobs.s3.failed-to-pending', null, { updated });
return updated;
};

const _markAsFailed = ({ id }) => ({ run }) => run(sql`
UPDATE blobs
Expand Down Expand Up @@ -174,12 +178,21 @@ const uploadBlobIfAvailable = async container => {
};

const s3UploadPending = (limit) => async (container) => {
if (limit) {
// eslint-disable-next-line no-await-in-loop, no-param-reassign, no-plusplus
while (await uploadBlobIfAvailable(container) && --limit);
} else {
// eslint-disable-next-line no-await-in-loop
while (await uploadBlobIfAvailable(container));
let uploaded = 0;
let failed = 0;
const start = new Date();
try {
while (await uploadBlobIfAvailable(container)) { // eslint-disable-line no-await-in-loop
++uploaded; // eslint-disable-line no-plusplus
if (limit && !--limit) break; // eslint-disable-line no-param-reassign, no-plusplus
}
} catch (err) {
++failed; // eslint-disable-line no-plusplus
throw err;
} finally {
const end = new Date();
if (uploaded > 0 || failed > 0)
await container.Audits.log(null, 'blobs.s3.upload', null, { uploaded, failed, duration: end-start });
}
};

Expand Down
13 changes: 12 additions & 1 deletion test/integration/task/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,18 @@ describe('task: s3', () => {
// then
(await getCount('pending')).should.equal(4);
(await getCount('failed')).should.equal(0);
(await container.Audits.getLatestByAction('blobs.s3.failed-to-pending')).get().details.should.deepEqual({ updated: 3 });
}));
});

describe('uploadPending()', () => {
it('should not do anything if nothing to upload', testTask(async () => {
it('should not do anything if nothing to upload', testTask(async (container) => {
// when
await uploadPending();

// then
assertUploadCount(0);
(await container.Audits.getLatestByAction('blobs.s3.upload')).isDefined().should.equal(false);
}));

it('should upload pending blobs, and ignore others', testTask(async (container) => {
Expand All @@ -125,6 +127,7 @@ describe('task: s3', () => {

// then
assertUploadCount(2);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 2, failed: 0 });
}));

it('should return error if uploading fails', testTask(async (container) => {
Expand All @@ -137,6 +140,7 @@ describe('task: s3', () => {

// and
assertUploadCount(0);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 0, failed: 1 });
}));

it('should not allow failure to affect previous or future uploads', testTask(async (container) => {
Expand All @@ -151,6 +155,7 @@ describe('task: s3', () => {

// and
assertUploadCount(2);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 2, failed: 1 });


// given
Expand All @@ -161,6 +166,7 @@ describe('task: s3', () => {

// then
assertUploadCount(3);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 1, failed: 0 });
}));

it('should not attempt to upload an in-progress blob', testTaskFullTrx(async (container) => {
Expand All @@ -187,6 +193,7 @@ describe('task: s3', () => {
// then
global.s3.uploads.attempted.should.equal(0);
global.s3.uploads.successful.should.equal(0);
(await container.Audits.getLatestByAction('blobs.s3.upload')).isDefined().should.equal(false);

// when
resume();
Expand All @@ -195,6 +202,7 @@ describe('task: s3', () => {
// then
global.s3.uploads.attempted.should.equal(1);
global.s3.uploads.successful.should.equal(1);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 1, failed: 0 });
}));

describe('with limit', () => {
Expand Down Expand Up @@ -233,6 +241,7 @@ describe('task: s3', () => {
// then
consoleLog[0].should.deepEqual('Uploading 6 blobs...');
assertUploadCount(6);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 6, failed: 0 });
}));

it('should not complain if blob count is less than limit', testTask(async (container) => {
Expand All @@ -245,6 +254,7 @@ describe('task: s3', () => {
// then
consoleLog[0].should.deepEqual('Uploading 1 blobs...');
assertUploadCount(1);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 1, failed: 0 });
}));

it('should upload all blobs if limit is zero', testTask(async (container) => {
Expand All @@ -266,6 +276,7 @@ describe('task: s3', () => {
// then
consoleLog[0].should.deepEqual('Uploading 10 blobs...');
assertUploadCount(10);
(await container.Audits.getLatestByAction('blobs.s3.upload')).get().details.should.containEql({ uploaded: 10, failed: 0 });
}));
});
});
Expand Down

0 comments on commit a8eb094

Please sign in to comment.