From 163c2c2ac4199a303e8fa4543031f6020922019a Mon Sep 17 00:00:00 2001 From: oshinongit Date: Mon, 30 Sep 2024 13:58:35 +0200 Subject: [PATCH] feat: check and log mediaconvert job status --- src/pipelines/aws/aws-pipeline.ts | 56 ++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/src/pipelines/aws/aws-pipeline.ts b/src/pipelines/aws/aws-pipeline.ts index 5b7c9c9..f6cc0e1 100644 --- a/src/pipelines/aws/aws-pipeline.ts +++ b/src/pipelines/aws/aws-pipeline.ts @@ -1,6 +1,8 @@ import { ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs'; import { CreateJobCommand, + CreateJobCommandOutput, + GetJobCommand, MediaConvertClient } from '@aws-sdk/client-mediaconvert'; import { @@ -40,6 +42,7 @@ export default class AWSPipeline implements Pipeline { private mediaConvert: MediaConvertClient; private ecs: ECSClient; private static readonly MAX_WAIT_TIME = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours). + private static readonly MEDIACONVERT_CHECK_JOB_INTERVAL_S = 15; //Check status of mediaconvert job interval in seconds. constructor(configuration: AWSPipelineConfiguration) { this.configuration = configuration; @@ -84,8 +87,52 @@ export default class AWSPipeline implements Pipeline { } } - async waitForObjectInS3(S3Bucket: string, S3Key: string): Promise { + async mediaConvertJobStatus( + mediaConvert: MediaConvertClient, + jobId: string + ): Promise { try { + const command = new GetJobCommand({ Id: jobId }); + const response = await mediaConvert.send(command); + return response.Job?.Status || undefined; + } catch (error) { + logger.error(`Error getting job ${jobId}: \n Error: ${error}`); + return undefined; + } + } + + checkJobStatusWithInterval( + mediaConvert: MediaConvertClient, + jobId: string, + interval: number + ) { + const intervalId = setInterval(async () => { + const status: string | undefined = await this.mediaConvertJobStatus( + mediaConvert, + jobId + ); + + if (status === 'COMPLETED' || 'ERROR' || 'CANCELED') { + clearInterval(intervalId); + } else { + logger.debug(`Job ${jobId} status: ${status}. Waiting...`); + } + }, interval); + } + + async waitForObjectInS3( + S3Bucket: string, + S3Key: string, + jobId?: string + ): Promise { + try { + if (jobId) { + this.checkJobStatusWithInterval( + this.mediaConvert, + jobId, + AWSPipeline.MEDIACONVERT_CHECK_JOB_INTERVAL_S + ); + } await waitUntilObjectExists( { client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME }, { Bucket: S3Bucket, Key: S3Key } @@ -230,13 +277,14 @@ export default class AWSPipeline implements Pipeline { // Transcode logger.info('Transcoding ' + inputFilename + ' to ' + outputURI + '...'); + let createJobResponse: CreateJobCommandOutput; try { const accelerationSettings = this.configuration.accelerationMode ? { AccelerationSettings: { Mode: this.configuration.accelerationMode } } : {}; - await this.mediaConvert.send( + createJobResponse = await this.mediaConvert.send( new CreateJobCommand({ Role: this.configuration.mediaConvertRole, Settings: settings, @@ -249,10 +297,10 @@ export default class AWSPipeline implements Pipeline { ); throw error; } - const s3Status = await this.waitForObjectInS3( outputBucket, - `${outputFolder}/${outputObject}` + `${outputFolder}/${outputObject}`, + createJobResponse.Job?.Id ); if (!s3Status) return ''; await this.probeMetadata(outputBucket, outputFolder, outputObject);