Skip to content

Commit

Permalink
feat: check and log mediaconvert job status
Browse files Browse the repository at this point in the history
  • Loading branch information
oshinongit committed Oct 4, 2024
1 parent df4ce7f commit ce651aa
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/models/aws-jobstatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export enum JobStatus {
CANCELED = 'CANCELED',
COMPLETE = 'COMPLETE',
ERROR = 'ERROR',
PROGRESSING = 'PROGRESSING',
SUBMITTED = 'SUBMITTED'
}
63 changes: 55 additions & 8 deletions src/pipelines/aws/aws-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs';
import {
CreateJobCommand,
CreateJobCommandOutput,
GetJobCommand,
MediaConvertClient
} from '@aws-sdk/client-mediaconvert';
import {
Expand All @@ -12,6 +14,7 @@ import {
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { Resolution } from '../../models/resolution';
import { JobStatus } from '../../models/aws-jobstatus';
import { Pipeline } from '../pipeline';
import { AWSPipelineConfiguration } from './aws-pipeline-configuration';
import fs from 'fs';
Expand All @@ -24,6 +27,7 @@ import {
} from '../../models/quality-analysis-model';
import logger from '../../logger';
import { runFfprobe } from '../../pairVmaf';
import { delay } from '../../utils';

export function isS3URI(url: string): boolean {
try {
Expand All @@ -39,7 +43,8 @@ export default class AWSPipeline implements Pipeline {
private s3: S3Client;
private mediaConvert: MediaConvertClient;
private ecs: ECSClient;
private static readonly MAX_WAIT_TIME = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours).
private static readonly MAX_WAIT_TIME_S = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours).
private static readonly MEDIACONVERT_CHECK_JOB_INTERVAL_MS = 10000; //Check status of mediaconvert job interval in seconds.

constructor(configuration: AWSPipelineConfiguration) {
this.configuration = configuration;
Expand Down Expand Up @@ -84,10 +89,43 @@ export default class AWSPipeline implements Pipeline {
}
}

async getMediaConvertJobStatus(jobId: string): Promise<string | undefined> {
try {
const command = new GetJobCommand({ Id: jobId });
const response = await this.mediaConvert.send(command);
if (response.Job?.Status === JobStatus.ERROR) {
logger.error(`Job ${jobId} error ${response.Job?.ErrorMessage}`);
} else if (response.Job?.Status === JobStatus.CANCELED) {
logger.error(`Job ${jobId} ${response.Job?.Status}`);
}
return response.Job?.Status;
} catch (error) {
logger.error(`Error getting job ${jobId}: ${error}`);
return undefined;
}
}

async loopGetJobStatusUntilFinished(
jobId: string
): Promise<string | undefined> {
let status: string | undefined = await this.getMediaConvertJobStatus(jobId);
while (
status !== JobStatus.COMPLETE &&
status !== JobStatus.ERROR &&
status !== JobStatus.CANCELED &&
status !== undefined
) {
logger.debug(`Job ${jobId} status: ${status}. Waiting...`);
status = await this.getMediaConvertJobStatus(jobId);
await delay(AWSPipeline.MEDIACONVERT_CHECK_JOB_INTERVAL_MS);
}
return status;
}

async waitForObjectInS3(S3Bucket: string, S3Key: string): Promise<boolean> {
try {
await waitUntilObjectExists(
{ client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME },
{ client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME_S },
{ Bucket: S3Bucket, Key: S3Key }
);
return true;
Expand Down Expand Up @@ -230,13 +268,14 @@ export default class AWSPipeline implements Pipeline {

// Transcode
logger.info('Transcoding ' + inputFilename + ' to ' + outputURI + '...');
let createJobResponse: CreateJobCommandOutput;
try {
const accelerationSettings = this.configuration.accelerationMode
? {
AccelerationSettings: { Mode: this.configuration.accelerationMode }
}
: {};
await this.mediaConvert.send(
createJobResponse = await this.mediaConvert.send(
new CreateJobCommand({
Role: this.configuration.mediaConvertRole,
Settings: settings,
Expand All @@ -249,12 +288,20 @@ export default class AWSPipeline implements Pipeline {
);
throw error;
}

const s3Status = await this.waitForObjectInS3(
outputBucket,
`${outputFolder}/${outputObject}`
if (!createJobResponse.Job?.Id) {
logger.error(`No Job from create response ${inputFilename}`);
return '';
}
const mediaConvertJobStatus = await this.loopGetJobStatusUntilFinished(
createJobResponse.Job.Id
);
if (!s3Status) return '';
if (
mediaConvertJobStatus === JobStatus.ERROR ||
mediaConvertJobStatus === JobStatus.CANCELED ||
mediaConvertJobStatus === undefined
) {
return '';
}
await this.probeMetadata(outputBucket, outputFolder, outputObject);

logger.info('Finished transcoding ' + inputFilename + '.');
Expand Down
3 changes: 3 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export async function delay(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}

0 comments on commit ce651aa

Please sign in to comment.