diff --git a/examples/encore/encoreExample.ts b/examples/encore/encoreExample.ts new file mode 100644 index 0000000..9f54246 --- /dev/null +++ b/examples/encore/encoreExample.ts @@ -0,0 +1,43 @@ +import { EncorePipelineConfiguration } from '../../src/pipelines/encore/encore-pipeline-configuration'; +import { EncorePipeline } from '../../src/pipelines/encore/encore-pipeline'; +import createJob from '../../src/create-job'; +import fs from 'fs'; + +async function transcodeInputsAndAnalyze() { + + const configuration: EncorePipelineConfiguration = { + apiAddress: "https://api-encore.stage.osaas.io", + token: "", + instanceId: "dummy", + profilesUrl: "profilesUrl", + outputFolder: "/usercontent/demo", + baseName: "_demo", + inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"], + duration: 120, + priority: 0, + encorePollingInterval_ms: 30000, + encoreInstancePostCreationDelay_ms: 10000 + }; + + const inlineProfile = fs.readFileSync('encoreProfile.yml', 'utf8'); + + const resolutions = [{ + width: 1280, + height: 720, + range: { + min: 500000, + max: 600000 + } + }]; + + const bitrates = [ + 500000, + 600000, + 800000 + ] + + const pipeline: EncorePipeline = new EncorePipeline(configuration); + await pipeline.transcode(configuration.inputs[0], { width: 1280, height: 720}, 600000, "output", undefined, inlineProfile, resolutions, bitrates); + } + +transcodeInputsAndAnalyze(); \ No newline at end of file diff --git a/examples/encore/encoreProfile.yml b/examples/encore/encoreProfile.yml new file mode 100644 index 0000000..24f1508 --- /dev/null +++ b/examples/encore/encoreProfile.yml @@ -0,0 +1,17 @@ +name: X264Encode +description: Program profile +scaling: bicubic +encodes: + - type: X264Encode + suffix: _x264_ + twoPass: true + height: + width: + params: + b:v: + maxrate: + minrate: + r: 25 + fps_mode: cfr + pix_fmt: yuv420p + force_key_frames: expr:not(mod(n,96)) \ No newline at end of file diff --git a/examples/encore/pipeline.yml b/examples/encore/pipeline.yml new file mode 100644 index 0000000..e00fd1b --- /dev/null +++ b/examples/encore/pipeline.yml @@ -0,0 +1,12 @@ +encore: + apiAddress: https://api-encore.stage.osaas.io, + token: null, + instanceId: dummy, + profilesUrl: profilesUrl, + outputFolder: /usercontent/demo, + baseName: _demo, + inputs: ['https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4'], + duration: 120, + priority: 0, + encorePollingInterval_ms: 30000, + encoreInstancePostCreationDelay_ms: 10000, \ No newline at end of file diff --git a/examples/encoreExample.ts b/examples/encoreExample.ts deleted file mode 100644 index 3d699fd..0000000 --- a/examples/encoreExample.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { EncorePipelineConfiguration } from '../src/pipelines/encore/encore-pipeline-configuration'; -import { EncorePipeline, delay } from '../src/pipelines/encore/encore-pipeline'; -import { EncoreInstance } from '../src/models/encoreInstance'; - -async function transcodeInputsAndAnalyze() { - - const configuration: EncorePipelineConfiguration = { - apiAddress: "https://api-encore.stage.osaas.io", - token: "", - instanceId: "dummy", - profile: "program", - outputFolder: "/usercontent/demo", - baseName: "_demo", - inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"], - duration: 120, - priority: 0, - encorePollingInterval_ms: 30000, - encoreInstancePostCreationDelay_ms: 10000 - }; - - const pipeline: EncorePipeline = new EncorePipeline(configuration); - const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile); - await delay(pipeline.configuration.encoreInstancePostCreationDelay_ms); // Delay required to allow instance to be created before calling it - await pipeline.runTranscodeThenAnalyze(instance); - - } - - transcodeInputsAndAnalyze(); \ No newline at end of file diff --git a/src/analysis/brute-force.ts b/src/analysis/brute-force.ts index 0365014..5c57215 100644 --- a/src/analysis/brute-force.ts +++ b/src/analysis/brute-force.ts @@ -88,7 +88,6 @@ export default async function analyzeBruteForce(directory: string, reference: st if (options.pipelineVariables) { // Create all combinations of bitrate, resolution, and variables Object.entries(options.pipelineVariables).forEach(([variableName, values]) => { - //console.log(`variableName: ${variableName}`); pairs = pairs.flatMap(pair => values.map( value => { const variables = pair.ffmpegOptionVariables ? {...pair.ffmpegOptionVariables} : {}; diff --git a/src/encoreYamlGenerator.ts b/src/encoreYamlGenerator.ts new file mode 100644 index 0000000..ccf599c --- /dev/null +++ b/src/encoreYamlGenerator.ts @@ -0,0 +1,49 @@ +import * as fs from 'fs'; +import YAML from 'yaml'; +import { Resolution } from '../src/models/resolution'; + +export class EncoreYAMLGenerator { + + createProfile(inlineProfile: string, resolutions: Resolution[], bitRates: number[]): string { + + const inlineProfileObject = YAML.parse(inlineProfile); + let profiles: string[] = []; + + resolutions.forEach(resolution => { + bitRates.forEach(bitRate => { + const encoding = this.modifyEncoreProfileAttributes(inlineProfileObject.encodes[0], resolution, bitRate); + profiles.push(encoding) + }) + }); + + let i = 0; + profiles.forEach(encodingProfile => { + inlineProfileObject.encodes[i] = encodingProfile; + i++; + }) + + const profile = YAML.stringify(inlineProfileObject); + this.saveToFile("test_output_profile.yml", profile); + + return profile + } + + modifyEncoreProfileAttributes(profileEncodesObject: any, resolution: Resolution, bitRate: number): string { + + const data = profileEncodesObject; + + data.height = resolution.height; + data.width = resolution.width; + data.params['b:v'] = bitRate; + data.params['maxrate'] = resolution.range?.max; + data.params['minrate'] = resolution.range?.min; + + const updatedYAML = YAML.stringify(data); + + return updatedYAML; +} + + saveToFile(filePath: string, yaml: string) { + fs.writeFileSync(filePath, yaml); + } +} \ No newline at end of file diff --git a/src/load-pipeline.ts b/src/load-pipeline.ts index 45b96b5..5ad7bff 100644 --- a/src/load-pipeline.ts +++ b/src/load-pipeline.ts @@ -2,6 +2,7 @@ import fs from 'fs'; import YAML from 'yaml'; import LocalPipeline from './pipelines/local/local-pipeline'; import AWSPipeline from './pipelines/aws/aws-pipeline'; +import { EncorePipeline } from './pipelines/encore/encore-pipeline'; import { Pipeline } from './pipelines/pipeline'; import logger from './logger'; import { LocalPipelineConfiguration } from './pipelines/local/local-pipeline-configuration'; @@ -30,6 +31,20 @@ export type PipelineProfile = { pythonPath: string; ffmpegEncoder: 'libx264' | 'h264_videotoolbox'; }; + + encore?: { + apiAddress: string; + token: string; + instanceId: string; + profilesUrl: string; + outputFolder: string; + baseName: string; + inputs: Array; + duration: number; + priority: number; + encorePollingInterval_ms: number; + encoreInstancePostCreationDelay_ms: number + } }; /** @@ -53,6 +68,8 @@ async function loadPipeline(pipelineFilename: string, encodingProfile?: string): return new AWSPipeline({ ...pipelineProfile.aws, mediaConvertSettings: encodingProfileData }); } else if (pipelineProfile.local !== undefined) { return new LocalPipeline({ ...pipelineProfile.local, ffmpegOptions: encodingProfileData }); + } else if (pipelineProfile.encore !== undefined) { + return new EncorePipeline(pipelineProfile.encore); } else { throw new Error(`Invalid pipeline: ${JSON.stringify(pipelineProfile)}`); } diff --git a/src/pipelines/encore/encore-pipeline-configuration.ts b/src/pipelines/encore/encore-pipeline-configuration.ts index c8b03cc..1843e8c 100644 --- a/src/pipelines/encore/encore-pipeline-configuration.ts +++ b/src/pipelines/encore/encore-pipeline-configuration.ts @@ -2,7 +2,7 @@ export type EncorePipelineConfiguration = { apiAddress: string; token: string; instanceId: string; - profile: string; + profilesUrl: string; outputFolder: string; baseName: string; inputs: Array; diff --git a/src/pipelines/encore/encore-pipeline.ts b/src/pipelines/encore/encore-pipeline.ts index 77a577e..0e000b7 100644 --- a/src/pipelines/encore/encore-pipeline.ts +++ b/src/pipelines/encore/encore-pipeline.ts @@ -1,14 +1,16 @@ -import { Resolution } from '../../models/resolution'; -import { Pipeline } from '../pipeline'; import AWSPipeline from '../aws/aws-pipeline'; import { AWSPipelineConfiguration } from '../aws/aws-pipeline-configuration'; -import { EncorePipelineConfiguration } from './encore-pipeline-configuration'; -import { QualityAnalysisModel } from '../../models/quality-analysis-model'; import { EncoreInstance } from '../../models/encoreInstance'; import { EncoreJobs, EncoreJob } from '../../models/encoreJobs'; +import { EncorePipelineConfiguration } from './encore-pipeline-configuration'; +import { EncoreYAMLGenerator } from '../../encoreYamlGenerator'; +import { Pipeline } from '../pipeline'; +import { Resolution } from '../../models/resolution'; +import { QualityAnalysisModel } from '../../models/quality-analysis-model'; import logger from '../../logger'; import fs from 'fs'; -const ISOBoxer = require('codem-isoboxer'); +import YAML from 'yaml'; +//const ISOBoxer = require('codem-isoboxer'); const { Readable } = require('stream'); //Typescript import library contains different functions. //If someone knows how to achieve the same functionality in Typescript syntax let me know. const { finished } = require('stream/promises'); //Same as above. @@ -20,18 +22,12 @@ export function delay(ms: number): Promise { export class EncorePipeline implements Pipeline { configuration: EncorePipelineConfiguration; + private awsConf: AWSPipelineConfiguration; + private awsPipe: AWSPipeline; constructor(configuration: EncorePipelineConfiguration) { this.configuration = configuration; - } - - async transcode(input: string, targetResolution: Resolution, targetBitrate: number, output: string, variables?: Record): Promise { - return "todo" - } - - async analyzeQuality(reference: string, distorted: string, output: string, model: QualityAnalysisModel): Promise { - - const AWSConf: AWSPipelineConfiguration = { + this.awsConf = { inputBucket: "vmaf-files-incoming", outputBucket: "vmaf-files", mediaConvertRole: "", @@ -43,20 +39,40 @@ export class EncorePipeline implements Pipeline { ecsCluster: "vmaf-runner", ecsTaskDefinition: "easyvmaf-s3:3" }; + this.awsPipe = new AWSPipeline(this.awsConf); + } - const awsPipe = new AWSPipeline(AWSConf); - const result: string = await awsPipe.analyzeQuality(reference, distorted, output, model) - return result + async transcode(input: string, targetResolution: Resolution, targetBitrate: number, output: string, variables?: Record, inlineProfile?: string, resolutions?: Resolution[], bitRates?: number[]): Promise { + + if (!(inlineProfile && resolutions && bitRates)) { + throw new Error('No inline profile in encore transcode'); + } + + const instance: EncoreInstance | undefined = await this.createEncoreInstance(this.configuration.apiAddress, this.configuration.token, this.configuration.instanceId, this.configuration.profilesUrl); + await delay(this.configuration.encoreInstancePostCreationDelay_ms); // Delay required to allow instance to be created before calling it + if (!instance) { + throw new Error('undefined instance'); + } + const yamlGenerator = new EncoreYAMLGenerator(); + const transcodingProfile = yamlGenerator.createProfile(inlineProfile, resolutions, bitRates); + await this.runTranscodeThenPollUntilFinished(instance, transcodingProfile); + await this.deleteEncoreInstance(instance, this.configuration.apiAddress); + return output + } + + async analyzeQuality(reference: string, distorted: string, output: string, model: QualityAnalysisModel): Promise { + + return await this.awsPipe.analyzeQuality(reference, distorted, output, model); } /** - * Creates an Encore Instance, waits for 5000ms, then attempts to enqueue a transcode job. + * Creates an Encore Instance, waits, then attempts to enqueue a transcode job. * @param apiAddress The api address . * @param token api token. * @param instanceId The Encore Instance that will enqueue the transcode job. * @param profile The transcode profile. */ - async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise { + async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profilesUrl: string): Promise { const url = `${apiAddress}/encoreinstance`; const headerObj = { @@ -67,7 +83,7 @@ export class EncorePipeline implements Pipeline { const headers = new Headers(headerObj); const data = JSON.stringify({ "name": instanceId, - "profile": profile, + "profilesUrl": profilesUrl, }); const request = new Request(url, { @@ -85,7 +101,7 @@ export class EncorePipeline implements Pipeline { throw new Error('Instance creation failed'); } }).catch(error => { - logger.error(error); + logger.error(`Create Encore Instance ${error}`); }); } @@ -93,9 +109,9 @@ export class EncorePipeline implements Pipeline { * Attempts to delete an Encore Instance. * @param encoreInstance The Encore Instance to be deleted. */ - async deleteEncoreInstance(encoreInstance: EncoreInstance): Promise { + async deleteEncoreInstance(encoreInstance: EncoreInstance, apiAddress: string): Promise { - const url = encoreInstance.url; + const url = `${apiAddress}/encoreinstance/${encoreInstance.name}`; const headerObj = { 'accept': 'application/json', 'x-jwt': `Bearer ${this.configuration.token}`, @@ -108,19 +124,21 @@ export class EncorePipeline implements Pipeline { }); fetch(request).then(response => { - logger.info(`Delete Encore Instance Status: ${response.status}`); + if (response.status == 204) { + logger.info(`Successfully deleted encore instance`); + } }) .catch(error => { - logger.error(error); + logger.error(`Delete Encore Instance ${error}`); }); } /** * Attempts to enqueue a transcode job to an Encore Instance. * @param encoreInstance The Encore Instance were the job should be enqueued. - * @param mediaFileAddress The https address of the media file to be enqueued. + * @param input The https address of the media file to be enqueued. */ - async createEncoreJob(encoreInstance: EncoreInstance, mediaFileAddress: string): Promise { + async createEncoreJob(encoreInstance: EncoreInstance, input: string, transcodeProfile: string): Promise { const url = encoreInstance.resources.enqueueJob.url; const headerObj = { @@ -130,12 +148,12 @@ export class EncorePipeline implements Pipeline { }; const headers = new Headers(headerObj); const data = JSON.stringify({ - "profile": this.configuration.profile, + "inlineProfile": transcodeProfile, "outputFolder": this.configuration.outputFolder, "baseName": this.configuration.baseName, "inputs": [ { - "uri": mediaFileAddress, + "uri": input, "type": "AudioVideo" } ], @@ -159,18 +177,20 @@ export class EncorePipeline implements Pipeline { } }) .catch(error => { - logger.error(error); + logger.error(`Create Encore Job ${error}`); }); } /** * Polls the encore api until all enqueued jobs have reached a SUCCESSFUL state - * and been downloaded or have FAILED. + * and are downloaded or have FAILED. * @param encoreInstance The Encore Instance were the jobs have been queued. * @param enqueuedJobIds List of jobs that are expected to be transcoded. */ - async pollUntilAllJobsCompleted(encoreInstance: EncoreInstance, enqueuedJobIds: string[]): Promise { + async pollUntilAllJobsCompleted(encoreInstance: EncoreInstance, enqueuedJobIds: string[]): Promise { logger.info("polling until successful job") + let pollCounter = 0; + let pollInterval_ms = this.configuration.encorePollingInterval_ms; while (true) { logger.info(`Enqueued jobs: ${enqueuedJobIds}`); if (enqueuedJobIds.length < 1 || enqueuedJobIds === undefined) { @@ -182,30 +202,49 @@ export class EncorePipeline implements Pipeline { logger.info("getEncoreJobs returned empty list"); break } - for (let job of jobs) { - logger.info(`Encore Job Transcoding Status: ${job.status}`); - const jobShouldBeProcessed: number = enqueuedJobIds.indexOf(job.id); - if (job.status === "SUCCESSFUL" && jobShouldBeProcessed != -1) { - for (let outputObj of job.output) { - let filePath: string = outputObj.file; - let url: string = `${encoreInstance.url}${filePath}`;; - if (filePath[0] == "/") { - const trimmedInstanceUrl: string = encoreInstance.url.substring(0, encoreInstance.url.length - 1); - // Remove final '/' in url to enable combining with filename path - const url: string = `${trimmedInstanceUrl}${filePath}`; - const splitFilename = filePath.split("/"); - filePath = splitFilename[splitFilename.length - 1]; - } - - await this.downloadFile(url, filePath); + await this.downloadSuccessfulJobs(encoreInstance, enqueuedJobIds, jobs); + + //Increase poll interval if repeatedly polling, up until 20 minutes. + if (pollCounter > 2) { + pollInterval_ms += Math.round(pollInterval_ms) * 2; + pollCounter = 0; + } + if (pollInterval_ms <= 1200000) { + pollCounter++; + } + await delay(pollInterval_ms); + } + } + + /** + * Download enqueued jobs that have reached a SUCCESSFUL state. + * @param encoreInstance The Encore Instance were the jobs have been queued. + * @param enqueuedJobIds List of jobs that are expected to be transcoded. + * @param jobs List of EncoreJobs that have been successfully transcoded. + */ + async downloadSuccessfulJobs(encoreInstance: EncoreInstance, enqueuedJobIds: string[], jobs: EncoreJob[]){ + + for (let job of jobs) { + logger.info(`Encore Job Transcoding Status: ${job.status}`); + const jobShouldBeProcessed: number = enqueuedJobIds.indexOf(job.id); + if (job.status === "SUCCESSFUL" && jobShouldBeProcessed != -1) { + for (let successfulTranscoding of job.output) { + let filePath: string = successfulTranscoding.file; + let url: string = `${encoreInstance.url}${filePath}`;; + if (filePath[0] == "/") { + const trimmedInstanceUrl: string = encoreInstance.url.substring(0, encoreInstance.url.length - 1); + // Remove final '/' in url to enable combining with filename path + const url: string = `${trimmedInstanceUrl}${filePath}`; + const splitFilename = filePath.split("/"); + filePath = splitFilename[splitFilename.length - 1]; } - enqueuedJobIds.splice(jobShouldBeProcessed, 1); - } - else if (job.status === "FAILED") { - enqueuedJobIds.splice(jobShouldBeProcessed, 1); + await this.downloadFile(url, filePath); } + enqueuedJobIds.splice(jobShouldBeProcessed, 1); + } + else if (job.status === "FAILED") { + enqueuedJobIds.splice(jobShouldBeProcessed, 1); } - await delay(this.configuration.encorePollingInterval_ms); } } @@ -213,7 +252,7 @@ export class EncorePipeline implements Pipeline { * Attempts to get all jobs for an Encore Instance. * @param encoreInstance The Encore Instance to get jobs from. */ - async getEncoreJobs(encoreInstance: EncoreInstance): Promise { + async getEncoreJobs(encoreInstance: EncoreInstance): Promise { const url = encoreInstance.resources.listJobs.url; const headerObj = { 'accept': 'application/json', @@ -233,7 +272,7 @@ export class EncorePipeline implements Pipeline { } }) .catch(error => { - logger.error(error); + logger.error(`Get Encore Jobs ${error}`); }); } @@ -242,7 +281,7 @@ export class EncorePipeline implements Pipeline { * @param url The file url to download. * @param filename The filename that should be given to the downloaded file. */ - async downloadFile(url: string, filename: string): Promise { + async downloadFile(url: string, filename: string): Promise { const headerObj = { 'accept': 'application/json', 'x-jwt': `Bearer ${this.configuration.token}`, @@ -267,40 +306,49 @@ export class EncorePipeline implements Pipeline { return filename; } - async runTranscodeThenAnalyze(instance: EncoreInstance) { + async runTranscodeThenPollUntilFinished(instance: EncoreInstance, transcodeProfile: string): Promise { let jobIds: string[] = []; - let references: string[] = []; - - for (let input of this.configuration.inputs) { - const job: EncoreJob = await this.createEncoreJob(instance, input); - jobIds.push(job.id); - const referenceFilename: string = `${job.id}_reference.mp4`; - references.push(referenceFilename); - await this.downloadFile(input, referenceFilename); - await this.pollUntilAllJobsCompleted(instance, jobIds); - - const dir: string = `./${this.configuration.baseName}`; - fs.readdir(dir, (err, files) => { - files.forEach(file => { - if (file.includes(".mp4") && !file.includes("reference")) { - const arrayBuffer = new Uint8Array(fs.readFileSync(`${dir}/${file}`)).buffer; - const parsedFile = ISOBoxer.parseBuffer(arrayBuffer); - const hdlrs = parsedFile.fetchAll('hdlr'); - let isVideo: boolean = false; - hdlrs.forEach(hldr => { - if (hldr.handler_type == "vide") { - logger.debug(`Contains video track: ${file}`); - isVideo = true; - } - }) - if (isVideo) { - logger.info(`Analyzing quality using: ${file}`); - this.analyzeQuality(`./${this.configuration.baseName}/${referenceFilename}`, `${dir}/${file}`, `OSAAS_Encore_workdir/output_${file}.json`, QualityAnalysisModel.HD) - } - } - }); - }); + let input = this.configuration.inputs[0]; + + const job: EncoreJob = await this.createEncoreJob(instance, input, transcodeProfile); + if (!job) { + return } + jobIds.push(job.id); + const referenceFilename: string = `reference.mp4`; + await this.downloadFile(input, referenceFilename); + await this.pollUntilAllJobsCompleted(instance, jobIds); + + // for (let input of this.configuration.inputs) { + // const job: EncoreJob = await this.createEncoreJob(instance, input); + // jobIds.push(job.id); + // const referenceFilename: string = `${job.id}_reference.mp4`; + // references.push(referenceFilename); + // await this.downloadFile(input, referenceFilename); + // await this.pollUntilAllJobsCompleted(instance, jobIds); + + // const dir: string = `./${this.configuration.baseName}`; + // fs.readdir(dir, (err, files) => { + // files.forEach(file => { + // if (file.includes(".mp4") && !file.includes("reference")) { + // const arrayBuffer = new Uint8Array(fs.readFileSync(`${dir}/${file}`)).buffer; + // const parsedFile = ISOBoxer.parseBuffer(arrayBuffer); + // const hdlrs = parsedFile.fetchAll('hdlr'); + // let isVideo: boolean = false; + // hdlrs.forEach(hldr => { + // if (hldr.handler_type == "vide") { + // logger.debug(`Contains video track: ${file}`); + // isVideo = true; + // } + // }) + // if (isVideo) { + // logger.info(`Analyzing quality using: ${file}`); + // this.analyzeQuality(`./${this.configuration.baseName}/${referenceFilename}`, `${dir}/${file}`, `OSAAS_Encore_workdir/output_${file}.json`, QualityAnalysisModel.HD) + // } + // } + // }); + // }); + // } } } \ No newline at end of file