Skip to content

Commit

Permalink
feat: download and process files
Browse files Browse the repository at this point in the history
  • Loading branch information
oshinongit committed Oct 6, 2023
1 parent 9bf7b61 commit 1be4b77
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 29 deletions.
54 changes: 54 additions & 0 deletions src/models/encoreJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
export type EncoreJobs = {
_embedded:{
encoreJobs: EncoreJob[]
},
_links:{
self:any,
profile:any,
search:any
},
page:any
}

export type EncoreJob = {
id:string,
externalId:string,
profile:string,
outputFolder:string,
baseName:string,
createdDate:Date,
progressCallbackUri:string,
priority:number,
message:string,
progress:number,
speed:string,
startedDate:Date,
completedDate:Date,
debugOverlay:boolean,
logContext:any,
seekTo:string,
duration:number,
thumbnailTime:string,
inputs:any[],
output:Output[],
status:string,
_links:{
self:{
href:string
},
encoreJob:{
href:string
}
}
}

export type Output = {
file : string,
fileSize : number,
format : string,
overallBitrate : number,
duration : number,
videoStreams : any[],
audioStreams : any[],
type : string
}
248 changes: 220 additions & 28 deletions src/pipelines/encore/encore-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { Resolution } from '../../models/resolution';
import { Pipeline } from '../pipeline';
import AWSPipeline from '../aws/aws-pipeline';
import { AWSPipelineConfiguration } from '../aws/aws-pipeline-configuration';
import { EncorePipelineConfiguration } from './encore-pipeline-configuration';
import { QualityAnalysisModel } from '../../models/quality-analysis-model';
import { EncoreInstance } from '../../models/encoreInstance';
import { EncoreJobs, EncoreJob } from '../../models/encoreJobs';
import logger from '../../logger';
import fs from 'fs';
const { Readable } = require('stream'); //Typescript import library contains different functions. If someone knows how to achieve the same functionality let me know.
const { finished } = require('stream/promises'); //Same as above.

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export default class EncorePipeline implements Pipeline {

Expand All @@ -14,10 +24,36 @@ export default class EncorePipeline implements Pipeline {
}

async transcode(input: string, targetResolution: Resolution, targetBitrate: number, output: string, variables?: Record<string, string>, encoreInstance?: EncoreInstance): Promise<string> {
const pipeline: EncorePipeline = new EncorePipeline(this.configuration);
const instance: EncoreInstance = await pipeline.createEncoreInstance(this.configuration.apiAddress, this.configuration.token, this.configuration.instanceId, this.configuration.profile);
let jobIds: string[] = [];
await delay(10000);
for (let input of this.configuration.inputs) {
const job: EncoreJob = await pipeline.createEncoreJob(instance, input);
jobIds.push(job.id);
}
await pipeline.downloadFile(this.configuration.inputs[0], "reference.mp4")
await pipeline.pollUntilAllJobsCompleted(instance, jobIds);
return "todo"
}

async analyzeQuality(reference: string, distorted: string, output: string, model: QualityAnalysisModel): Promise<string> {

const AWSConf: AWSPipelineConfiguration = {
inputBucket: "vmaf-files-incoming",
outputBucket: "vmaf-files",
mediaConvertRole: "",
mediaConvertSettings: "",
mediaConvertEndpoint: "",
ecsSubnet: "subnet-05d98882c13408e16",
ecsSecurityGroup: "sg-0e444b67a747bf739",
ecsContainerName: "easyvmaf-s3",
ecsCluster: "vmaf-runner",
ecsTaskDefinition: "easyvmaf-s3:3"
};

const awsPipe = new AWSPipeline(AWSConf);
awsPipe.analyzeQuality("./mediaTransferDir/reference.mp4", "./mediaTransferDir/_demo_x264_324.mp4", "oliverEncore/output.json", QualityAnalysisModel.HD)
return "todo"
}

Expand All @@ -28,7 +64,7 @@ export default class EncorePipeline implements Pipeline {
* @param instanceId The Encore Instance that will enqueue the transcode job.
* @param profile The transcode profile.
*/
async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise<void> {
async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise<any> {

const url = `${apiAddress}/encoreinstance`;
const headerObj = {
Expand All @@ -48,21 +84,14 @@ export default class EncorePipeline implements Pipeline {
body: data,
});

fetch(request).then(response => {
return fetch(request).then(response => {
logger.info(`Create Encore Instance Status: ${response.status}`);
if (response.status == 200) {
return response.json()
return response.json();
}
else {
throw new Error('Instance creation failed');
}
}).then(data => {
const encoreInstance: EncoreInstance = data;
return encoreInstance;
}).then(encoreInstance => {
setTimeout(() => {
this.createEncoreJob(encoreInstance);
}, 5000);
}).catch(error => {
logger.error(error);
});
Expand Down Expand Up @@ -96,10 +125,10 @@ export default class EncorePipeline implements Pipeline {

/**
* Attempts to enqueue a transcode job to an Encore Instance.
* If job creation fails the first time a second attempt will be made after 5000ms.
* @param encoreInstance The Encore Instance were the job should be enqueued.
* @param mediaFileAddress The https address of the media file to be enqueued.
*/
async createEncoreJob(encoreInstance: EncoreInstance): Promise<void> {
async createEncoreJob(encoreInstance: EncoreInstance, mediaFileAddress: string): Promise<any> {

const url = encoreInstance.resources.enqueueJob.url;
const headerObj = {
Expand All @@ -114,7 +143,7 @@ export default class EncorePipeline implements Pipeline {
"baseName": this.configuration.baseName,
"inputs": [
{
"uri": this.configuration.inputs[0],
"uri": mediaFileAddress,
"type": "AudioVideo"
}
],
Expand All @@ -128,25 +157,188 @@ export default class EncorePipeline implements Pipeline {
body: data,
});

fetch(request).then(response => {
return fetch(request).then(response => {
logger.info(`Create Encore Job Status: ${response.status}`);
return response.status;
}).then(responseStatus => {
if (responseStatus != 201) {
const newRequest = new Request(url, {
method: "POST",
headers: headers,
body: data,
});
setTimeout(() => {
fetch(newRequest).then(response => {
logger.info(`Attempt 2, Create Encore Job Status: ${response.status}`);
});
}, 5000);
if (response.status == 201) {
return response.json();
}
else {
throw new Error('Job creation failed');
}
})
.catch(error => {
logger.error(error);
});
}

/**
* Attempts to enqueue a transcode job to an Encore Instance.
* @param encoreInstance The Encore Instance were the jobs have been queued.
* @param enqueuedJobIds List of jobs that are expected to be transcoded.
*/
async pollUntilAllJobsCompleted(encoreInstance: EncoreInstance, enqueuedJobIds: string[]): Promise<any> {
logger.info("polling until successful job")
while (true) {
logger.info(`Enqueued jobs: ${enqueuedJobIds}`);
if (enqueuedJobIds.length < 1 || enqueuedJobIds === undefined) {
break
}
const jobsResponse: EncoreJobs = await this.getEncoreJobs(encoreInstance);
const jobs: EncoreJob[] = jobsResponse._embedded.encoreJobs;
if (jobs.length < 1 || jobs === undefined) {
logger.info("getEncoreJobs returned empty list");
break
}
for (let job of jobs) {
logger.info(`Encore Job Transcoding Status: ${job.status}`);
const jobShouldBeProcessed: number = enqueuedJobIds.indexOf(job.id);
if (job.status === "SUCCESSFUL" && jobShouldBeProcessed != -1) {
for (let outputObj of job.output) {
let filePath: string = outputObj.file;
let url: string = `${encoreInstance.url}${filePath}`;;
if(filePath[0] == "/"){
const trimmedInstanceUrl: string = encoreInstance.url.substring(0, encoreInstance.url.length - 1);
// Remove final '/' in url to enable combining with filename path
const url: string = `${trimmedInstanceUrl}${filePath}`;
const splitFilename = filePath.split("/");
filePath = splitFilename[splitFilename.length - 1];
}

await this.downloadFile(url, filePath);
}
enqueuedJobIds.splice(jobShouldBeProcessed, 1);
}
else if (job.status === "FAILED") {
enqueuedJobIds.splice(jobShouldBeProcessed, 1);
}
}
await delay(30000);
}
}

/**
* Attempts to get all jobs for an Encore Instance.
* @param encoreInstance The Encore Instance to get jobs from.
*/
async getEncoreJobs(encoreInstance: EncoreInstance): Promise<any> {
const url = encoreInstance.resources.listJobs.url;
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
};
const headers = new Headers(headerObj);

const request = new Request(url, {
method: "GET",
headers: headers,
});

return fetch(request).then(response => {
logger.info(`GET Encore Jobs Status: ${response.status}`);
if (response.status == 200) {
return response.json();
}
})
.catch(error => {
logger.error(error);
});
}

/**
* Attempts to GET a transcode job using a url address.
* @param encoreJobUrl The Encore job url to get.
*/
async getEncoreJob(encoreJobUrl: string): Promise<any> {
const url = encoreJobUrl;
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
};
const headers = new Headers(headerObj);

const request = new Request(url, {
method: "GET",
headers: headers,
});

return fetch(request).then(response => {
logger.info(`GET Encore Job Status: ${response.status}`);
if (response.status == 200) {
return response.json();
}
})
.catch(error => {
logger.error(error);
});
}
}

/**
* Attempts to GET a transcode job using a url address.
* @param url The file url to download.
* @param filename The filename that should be given to the downloaded file.
*/
async downloadFile(url: string, filename: string): Promise<any> {
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
'Content-Type': 'application/json',
};
const headers = new Headers(headerObj);

const request = new Request(url, {
headers: headers,
});

logger.info(`File to download: ${filename}`);
const destinationPath: string = `./mediaTransferDir/${filename}`;
const stream = fs.createWriteStream(destinationPath);
const { body } = await fetch(request);
await finished(Readable.fromWeb(body).pipe(stream));
}
}

async function dummymain() {

const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-encore.stage.osaas.io",
token: "",
instanceId: "dummy",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo",
inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"],
duration: 120,
priority: 0,
};

const AWSConf: AWSPipelineConfiguration = {
inputBucket: "vmaf-files-incoming",
outputBucket: "vmaf-files",
mediaConvertRole: "",
mediaConvertSettings: "",
mediaConvertEndpoint: "",
ecsSubnet: "subnet-05d98882c13408e16",
ecsSecurityGroup: "sg-0e444b67a747bf739",
ecsContainerName: "easyvmaf-s3",
ecsCluster: "vmaf-runner",
ecsTaskDefinition: "easyvmaf-s3:3"
};



//const awsPipe = new AWSPipeline(AWSConf);
const pipeline: EncorePipeline = new EncorePipeline(configuration);
const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile);
let jobIds: string[] = [];
await delay(10000);
for (let input of configuration.inputs) {
const job: EncoreJob = await pipeline.createEncoreJob(instance, input);
jobIds.push(job.id);
}
await pipeline.downloadFile(configuration.inputs[0], "reference.mp4")
await pipeline.pollUntilAllJobsCompleted(instance, jobIds);
//awsPipe.analyzeQuality("./mediaTransferDir/reference.mp4", "./mediaTransferDir/_demo_x264_324.mp4", "oliverEncore/output.json", QualityAnalysisModel.HD)

}

dummymain();
2 changes: 1 addition & 1 deletion tests/encore-pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ describe('createEncoreJob', () => {
const mockFetch = jest.fn().mockResolvedValue(mockResponse);
jest.spyOn(global, 'fetch').mockImplementation(mockFetch);

await pipeline.createEncoreJob(myInstance);
await pipeline.createEncoreJob(myInstance, configuration.inputs[0]);

const url = myInstance.resources.enqueueJob.url;
expect(mockResponse.then).toHaveBeenCalledTimes(1);
Expand Down

0 comments on commit 1be4b77

Please sign in to comment.