Skip to content

Commit

Permalink
feat: check and log mediaconvert job status
Browse files Browse the repository at this point in the history
  • Loading branch information
oshinongit committed Oct 1, 2024
1 parent df4ce7f commit 895f912
Showing 1 changed file with 60 additions and 8 deletions.
68 changes: 60 additions & 8 deletions src/pipelines/aws/aws-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs';
import {
CreateJobCommand,
CreateJobCommandOutput,
GetJobCommand,
MediaConvertClient
} from '@aws-sdk/client-mediaconvert';
import {
Expand Down Expand Up @@ -40,6 +42,7 @@ export default class AWSPipeline implements Pipeline {
private mediaConvert: MediaConvertClient;
private ecs: ECSClient;
private static readonly MAX_WAIT_TIME = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours).
private static readonly MEDIACONVERT_CHECK_JOB_INTERVAL_MS = 15000; //Check status of mediaconvert job interval in seconds.

constructor(configuration: AWSPipelineConfiguration) {
this.configuration = configuration;
Expand Down Expand Up @@ -84,12 +87,60 @@ export default class AWSPipeline implements Pipeline {
}
}

async waitForObjectInS3(S3Bucket: string, S3Key: string): Promise<boolean> {
async mediaConvertJobStatus(
mediaConvert: MediaConvertClient,
jobId: string
): Promise<string | undefined> {
try {
await waitUntilObjectExists(
{ client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME },
{ Bucket: S3Bucket, Key: S3Key }
);
const command = new GetJobCommand({ Id: jobId });
const response = await mediaConvert.send(command);
if (response.Job?.Status === 'ERROR') {
logger.error(`Job ${jobId} error ${response.Job?.ErrorMessage}`);
}
return response.Job?.Status || undefined;
} catch (error) {
logger.error(`Error getting job ${jobId}: ${error}`);
return undefined;
}
}

async repeatGetJobStatus(
mediaConvert: MediaConvertClient,
jobId: string,
interval_ms: number
): Promise<void> {
const status: string | undefined = await this.mediaConvertJobStatus(
mediaConvert,
jobId
);

if (status === 'COMPLETED' || status === 'ERROR' || status === 'CANCELED') {
return;
} else {
logger.debug(`Job ${jobId} status: ${status}. Waiting...`);
await new Promise((resolve) => setTimeout(resolve, interval_ms));
return this.repeatGetJobStatus(mediaConvert, jobId, interval_ms);
}
}

async waitForObjectInS3(
S3Bucket: string,
S3Key: string,
jobId?: string
): Promise<boolean> {
try {
if (jobId) {
await this.repeatGetJobStatus(
this.mediaConvert,
jobId,
AWSPipeline.MEDIACONVERT_CHECK_JOB_INTERVAL_MS
);
} else {
await waitUntilObjectExists(
{ client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME },
{ Bucket: S3Bucket, Key: S3Key }
);
}
return true;
} catch (error) {
logger.error(
Expand Down Expand Up @@ -230,13 +281,14 @@ export default class AWSPipeline implements Pipeline {

// Transcode
logger.info('Transcoding ' + inputFilename + ' to ' + outputURI + '...');
let createJobResponse: CreateJobCommandOutput;
try {
const accelerationSettings = this.configuration.accelerationMode
? {
AccelerationSettings: { Mode: this.configuration.accelerationMode }
}
: {};
await this.mediaConvert.send(
createJobResponse = await this.mediaConvert.send(
new CreateJobCommand({
Role: this.configuration.mediaConvertRole,
Settings: settings,
Expand All @@ -249,10 +301,10 @@ export default class AWSPipeline implements Pipeline {
);
throw error;
}

const s3Status = await this.waitForObjectInS3(
outputBucket,
`${outputFolder}/${outputObject}`
`${outputFolder}/${outputObject}`,
createJobResponse.Job?.Id
);
if (!s3Status) return '';
await this.probeMetadata(outputBucket, outputFolder, outputObject);
Expand Down

0 comments on commit 895f912

Please sign in to comment.