Skip to content

Commit

Permalink
interface with create-job
Browse files Browse the repository at this point in the history
  • Loading branch information
oshinongit committed Oct 18, 2023
1 parent 8e64f8b commit c66f3eb
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 102 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ To generate VMAF measurements, you will need to define a job which can be create
```typescript
const { createJob } = require('@eyevinn/autovmaf');

// AWS or Local pipeline
const vmafScores = await createJob({
name: "MyVMAFmeasurements",
pipeline: "pipeline.yml",
Expand All @@ -79,6 +80,30 @@ To generate VMAF measurements, you will need to define a job which can be create
],
method: "bruteForce" // optional
});

// Encore pipeline
const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-address.io",
token: "",
instanceId: "myEncoreInstance",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo",
inputs: ["myInputPath.mp4"],
duration: 120,
priority: 0,
encorePollingInterval_ms: 60000,
encoreInstancePostCreationDelay_ms: 10000
};

const job: JobDescription = {
name: "encorePipelineJob",
pipeline: configuration,
encodingProfile: "null", // Transcoding profile is set the pipeline configuration as seen above.
reference: "null" // Reference media file will be downloaded from the address given in inputs[] parameter.
}

createJob(job);
```

When creating a job, you can specify:
Expand Down
48 changes: 23 additions & 25 deletions examples/encoreExample.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import { EncorePipelineConfiguration } from '../src/pipelines/encore/encore-pipeline-configuration';
import { EncorePipeline, delay } from '../src/pipelines/encore/encore-pipeline';
import { EncoreInstance } from '../src/models/encoreInstance';
import { JobDescription } from '../src/create-job';
import createJob from '../src/create-job';

async function transcodeInputsAndAnalyze() {
const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-encore.stage.osaas.io",
token: "",
instanceId: "dummy",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo_job",
inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"],
duration: 5,
priority: 0,
encorePollingInterval_ms: 60000,
encoreInstancePostCreationDelay_ms: 10000
};

const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-encore.stage.osaas.io",
token: "",
instanceId: "dummy",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo",
inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"],
duration: 120,
priority: 0,
encorePollingInterval_ms: 30000,
encoreInstancePostCreationDelay_ms: 10000
};

const pipeline: EncorePipeline = new EncorePipeline(configuration);
const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile);
await delay(pipeline.configuration.encoreInstancePostCreationDelay_ms); // Delay required to allow instance to be created before calling it
await pipeline.runTranscodeThenAnalyze(instance);

}

transcodeInputsAndAnalyze();
const job: JobDescription = {
name: "oliver_testjob",
pipeline: configuration,
encodingProfile: "null",
reference: "null"
}

createJob(job);
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"@aws-sdk/lib-storage": "^3.36.0",
"@types/aws-lambda": "^8.10.86",
"aws-sdk": "^2.1123.0",
"codem-isoboxer": "^0.3.9",
"fluent-ffmpeg": "https://github.com/Eyevinn/node-fluent-ffmpeg.git#measure-cpu",
"objects-to-csv": "^1.3.6",
"papaparse": "^5.3.1",
Expand Down
24 changes: 18 additions & 6 deletions src/create-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ import logger from './logger';
import { Resolution } from './models/resolution';
import analyzeWalkTheHull from './analysis/walk-the-hull';
import { LocalPipelineConfiguration } from './pipelines/local/local-pipeline-configuration';

import { EncorePipelineConfiguration } from './pipelines/encore/encore-pipeline-configuration';
import { instanceOfEncorePipelineConfiguration, instanceOfLocalPipelineConfiguration } from './interfaceGuards';
import { EncorePipeline, delay } from './pipelines/encore/encore-pipeline';
import { EncoreInstance } from './models/encoreInstance';

/** Describes a ABR-analysis job and can be used to create jobs using the createJob()-function. */
export type JobDescription = {
/** This will name the folder in which to put the files. */
name: string;

/** Path to a YAML-file that defines the pipeline, or an inline local pipeline configuration. See `examples/pipeline.yml` for an example AWS-pipeline. */
pipeline: string | LocalPipelineConfiguration;
pipeline: string | LocalPipelineConfiguration | EncorePipelineConfiguration;

/** Path to a JSON-file that defines how the reference should be encoded. When using AWS, this is a MediaConvert configuration.
* For local pipelines, this is key-value pairs that will be passed as command line arguments to FFmpeg.
Expand Down Expand Up @@ -111,15 +114,24 @@ export type JobDescription = {
* @param description An object that describes the job to create.
*/
export default async function createJob(description: JobDescription, pipelineData?: any, encodingProfileData?: any, concurrency: boolean = true) {
logger.info(`Creating job ${description.name}.`);
logger.info(`Creating job ${description.name}`);

let pipeline: Pipeline | undefined = undefined;
let pipeline: any | undefined = undefined;
if (pipelineData && encodingProfileData) {
pipeline = await loadPipelineFromObjects(pipelineData, encodingProfileData);
} else if(typeof description.pipeline === 'object') {
} else if(instanceOfLocalPipelineConfiguration(description.pipeline)) {
pipeline = new LocalPipeline({...description.pipeline, ...description.encodingProfile as Record<string,string>} );
} else {
} else if(typeof description.pipeline === 'string'){
pipeline = await loadPipeline(description.pipeline, description.encodingProfile as string);
} else if(instanceOfEncorePipelineConfiguration(description.pipeline)){
pipeline = new EncorePipeline(description.pipeline);
const configuration: EncorePipelineConfiguration = description.pipeline;
const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile);
await delay(configuration.encoreInstancePostCreationDelay_ms); // Delay required to allow instance to be created before calling it
await pipeline.runTranscodeThenAnalyze(instance);
logger.info(`Finishing Encore Pipeline Job ${description.name}.`);
await pipeline.deleteEncoreInstance(instance, configuration.apiAddress);
return
}

if (pipeline === undefined) {
Expand Down
17 changes: 17 additions & 0 deletions src/interfaceGuards.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { EncorePipelineConfiguration } from "./pipelines/encore/encore-pipeline-configuration";
import { LocalPipelineConfiguration } from "./pipelines/local/local-pipeline-configuration";
import logger from './logger';

export function instanceOfLocalPipelineConfiguration(object: any): object is LocalPipelineConfiguration {
return (typeof object === 'object' &&
'ffmpegPath' in object && 'pythonPath' in object && 'easyVmafPath' in object &&
'ffmpegEncoder' in object && 'ffmpegOptions' in object);
}

export function instanceOfEncorePipelineConfiguration(object: any): object is EncorePipelineConfiguration {
return (typeof object === 'object' &&
'apiAddress' in object && 'token' in object && 'instanceId' in object &&
'profile' in object && 'outputFolder' in object && 'baseName' in object &&
'inputs' in object && 'duration' in object && 'priority' in object &&
'encorePollingInterval_ms' in object && 'encoreInstancePostCreationDelay_ms' in object);
}
114 changes: 43 additions & 71 deletions src/pipelines/encore/encore-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { EncoreInstance } from '../../models/encoreInstance';
import { EncoreJobs, EncoreJob } from '../../models/encoreJobs';
import logger from '../../logger';
import fs from 'fs';
const ISOBoxer = require('codem-isoboxer');
const { Readable } = require('stream'); //Typescript import library contains different functions.
//If someone knows how to achieve the same functionality in Typescript syntax let me know.
const { finished } = require('stream/promises'); //Same as above.
Expand All @@ -25,7 +26,6 @@ export class EncorePipeline implements Pipeline {
}

async transcode(input: string, targetResolution: Resolution, targetBitrate: number, output: string, variables?: Record<string, string>): Promise<string> {

return "todo"
}

Expand All @@ -45,13 +45,13 @@ export class EncorePipeline implements Pipeline {
};

const awsPipe = new AWSPipeline(AWSConf);
const result: string = await awsPipe.analyzeQuality(`./mediaTransferDir/${reference}`, `./mediaTransferDir/${distorted}`, output, model)
const result: string = await awsPipe.analyzeQuality(reference, distorted, output, model);
return result
}

/**
* Creates an Encore Instance, waits for 5000ms, then attempts to enqueue a transcode job.
* @param apiAddress The api address .
* @param apiAddress The api address.
* @param token api token.
* @param instanceId The Encore Instance that will enqueue the transcode job.
* @param profile The transcode profile.
Expand Down Expand Up @@ -93,9 +93,10 @@ export class EncorePipeline implements Pipeline {
* Attempts to delete an Encore Instance.
* @param encoreInstance The Encore Instance to be deleted.
*/
async deleteEncoreInstance(encoreInstance: EncoreInstance): Promise<void> {
async deleteEncoreInstance(encoreInstance: EncoreInstance, apiAddress: string): Promise<any> {

const url = encoreInstance.url;
const url = `${apiAddress}/encoreinstance/${encoreInstance.name}`;
logger.info(`DELETE url: ${url}`);
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
Expand All @@ -109,6 +110,7 @@ export class EncorePipeline implements Pipeline {

fetch(request).then(response => {
logger.info(`Delete Encore Instance Status: ${response.status}`);
return response;
})
.catch(error => {
logger.error(error);
Expand Down Expand Up @@ -179,8 +181,7 @@ export class EncorePipeline implements Pipeline {
const jobsResponse: EncoreJobs = await this.getEncoreJobs(encoreInstance);
const jobs: EncoreJob[] = jobsResponse._embedded.encoreJobs;
if (jobs.length < 1 || jobs === undefined) {
logger.info("getEncoreJobs returned empty list");
break
throw new Error('getEncoreJobs returned empty list');
}
for (let job of jobs) {
logger.info(`Encore Job Transcoding Status: ${job.status}`);
Expand Down Expand Up @@ -266,7 +267,8 @@ export class EncorePipeline implements Pipeline {
}

/**
* Download a finished transcode job using a url address.
* Download a finished transcode job using a url address.
* Creates a download directory from the baseName parameter in the pipeline configurations.
* @param url The file url to download.
* @param filename The filename that should be given to the downloaded file.
*/
Expand All @@ -283,88 +285,58 @@ export class EncorePipeline implements Pipeline {
});

logger.info(`File to download: ${filename}`);
const destinationPath: string = `./mediaTransferDir/${filename}`;
const dir: string = `./${this.configuration.baseName}`;
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
logger.debug('Directory created:', dir);
}
const destinationPath: string = `${dir}/${filename}`;
const stream = fs.createWriteStream(destinationPath);
const { body } = await fetch(request);
await finished(Readable.fromWeb(body).pipe(stream));
return filename;
}

/**
* Creates an Encore job for each media path in the input[] parameter for the EncoreInstance.
* Downloads the original reference file then polls the api and downloads the finished files if they exist.
* Will keep polling until all jobs queued by the pipeline have reached a SUCCESSFUL or FAILED state.
* When polling is done, runs analysis on the transcoded files.
* ONLY RUNS WITH .mp4 files.
* @param encoreInstance The Encore Instance in which to create Encore Transcode jobs.
*/
async runTranscodeThenAnalyze(instance: EncoreInstance) {

let jobIds: string[] = [];
let references: string[] = [];


for (let input of this.configuration.inputs) {
const job: EncoreJob = await this.createEncoreJob(instance, input);
jobIds.push(job.id);
const referenceFilename: string = `${job.id}_reference.mp4`;
references.push(referenceFilename);
await this.downloadFile(input, referenceFilename);
await this.pollUntilAllJobsCompleted(instance, jobIds);

fs.readdir("./mediaTransferDir", (err, files) => {
const dir: string = `./${this.configuration.baseName}`;
fs.readdir(dir, (err, files) => {
files.forEach(file => {
if(file.includes(".mp4") && !file.includes("reference")){
//this.analyzeQuality(`./mediaTransferDir/${referenceFilename}`, `./mediaTransferDir/${distorted}`, "oliverEncore/output.json", QualityAnalysisModel.HD)
if (file.includes(".mp4") && !file.includes("reference")) {
const arrayBuffer = new Uint8Array(fs.readFileSync(`${dir}/${file}`)).buffer;
const parsedFile = ISOBoxer.parseBuffer(arrayBuffer);
const hdlrs = parsedFile.fetchAll('hdlr');
let isVideo: boolean = false;
hdlrs.forEach(hldr => {
if (hldr.handler_type == "vide") {
logger.debug(`Contains video track: ${file}`);
isVideo = true;
}
})
if (isVideo) {
logger.info(`Analyzing quality using: ${file}`);
this.analyzeQuality(`./${this.configuration.baseName}/${referenceFilename}`, `${dir}/${file}`, `OSAAS_Encore_workdir/output_${file}.json`, QualityAnalysisModel.HD) // TODO Model should be configurable
}
}
});
});
}
}
}

async function dummymain() {

const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-encore.stage.osaas.io",
token: "",
instanceId: "dummy",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo",
inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"],
duration: 120,
priority: 0,
encorePollingInterval_ms: 30000,
encoreInstancePostCreationDelay_ms: 10000
};

const AWSConf: AWSPipelineConfiguration = {
inputBucket: "vmaf-files-incoming",
outputBucket: "vmaf-files",
mediaConvertRole: "",
mediaConvertSettings: "",
mediaConvertEndpoint: "",
ecsSubnet: "subnet-05d98882c13408e16",
ecsSecurityGroup: "sg-0e444b67a747bf739",
ecsContainerName: "easyvmaf-s3",
ecsCluster: "vmaf-runner",
ecsTaskDefinition: "easyvmaf-s3:3"
};



//const awsPipe = new AWSPipeline(AWSConf);
const pipeline: EncorePipeline = new EncorePipeline(configuration);
const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile);
let jobIds: string[] = [];
await delay(pipeline.configuration.encoreInstancePostCreationDelay_ms);
for (let input of configuration.inputs) {
const job: EncoreJob = await pipeline.createEncoreJob(instance, input);
jobIds.push(job.id);
}
await pipeline.downloadFile(configuration.inputs[0], "reference.mp4")
await pipeline.pollUntilAllJobsCompleted(instance, jobIds);
fs.readdir("./mediaTransferDir", (err, files) => {
files.forEach(file => {
if(file.includes(".mp4") && !file.includes("reference")){
//awsPipe.analyzeQuality("./mediaTransferDir/reference.mp4", "./mediaTransferDir/_demo_x264_324.mp4", "oliverEncore/output.json", QualityAnalysisModel.HD)
}
});
});


}

//dummymain();
}

0 comments on commit c66f3eb

Please sign in to comment.