Skip to content

Commit

Permalink
feat: download and process files
Browse files Browse the repository at this point in the history
  • Loading branch information
oshinongit committed Oct 4, 2023
1 parent 9bf7b61 commit dc15a82
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 28 deletions.
54 changes: 54 additions & 0 deletions src/models/encoreJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
export type EncoreJobs = {
_embedded:{
encoreJobs: EncoreJob[]
},
_links:{
self:any,
profile:any,
search:any
},
page:any
}

export type EncoreJob = {
id:string,
externalId:string,
profile:string,
outputFolder:string,
baseName:string,
createdDate:Date,
progressCallbackUri:string,
priority:number,
message:string,
progress:number,
speed:string,
startedDate:Date,
completedDate:Date,
debugOverlay:boolean,
logContext:any,
seekTo:string,
duration:number,
thumbnailTime:string,
inputs:any[],
output:Output[],
status:string,
_links:{
self:{
href:string
},
encoreJob:{
href:string
}
}
}

export type Output = {
file : string,
fileSize : number,
format : string,
overallBitrate : number,
duration : number,
videoStreams : any[],
audioStreams : any[],
type : string
}
204 changes: 176 additions & 28 deletions src/pipelines/encore/encore-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { Resolution } from '../../models/resolution';
import { Pipeline } from '../pipeline';
import AWSPipeline from '../aws/aws-pipeline';
import { AWSPipelineConfiguration } from '../aws/aws-pipeline-configuration';
import { EncorePipelineConfiguration } from './encore-pipeline-configuration';
import { QualityAnalysisModel } from '../../models/quality-analysis-model';
import { EncoreInstance } from '../../models/encoreInstance';
import { EncoreJobs, EncoreJob } from '../../models/encoreJobs';
import logger from '../../logger';
import fs from 'fs';
const { Readable } = require('stream'); //Typescript import library contains different functions. If someone knows how to achieve the same functionality let me know.
const { finished } = require('stream/promises'); //Same as above.

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export default class EncorePipeline implements Pipeline {

Expand All @@ -28,7 +38,7 @@ export default class EncorePipeline implements Pipeline {
* @param instanceId The Encore Instance that will enqueue the transcode job.
* @param profile The transcode profile.
*/
async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise<void> {
async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise<any> {

const url = `${apiAddress}/encoreinstance`;
const headerObj = {
Expand All @@ -48,21 +58,14 @@ export default class EncorePipeline implements Pipeline {
body: data,
});

fetch(request).then(response => {
return fetch(request).then(response => {
logger.info(`Create Encore Instance Status: ${response.status}`);
if (response.status == 200) {
return response.json()
return response.json();
}
else {
throw new Error('Instance creation failed');
}
}).then(data => {
const encoreInstance: EncoreInstance = data;
return encoreInstance;
}).then(encoreInstance => {
setTimeout(() => {
this.createEncoreJob(encoreInstance);
}, 5000);
}).catch(error => {
logger.error(error);
});
Expand Down Expand Up @@ -96,10 +99,10 @@ export default class EncorePipeline implements Pipeline {

/**
* Attempts to enqueue a transcode job to an Encore Instance.
* If job creation fails the first time a second attempt will be made after 5000ms.
* @param encoreInstance The Encore Instance were the job should be enqueued.
* @param mediaFileAddress The https address of the media file to be enqueued.
*/
async createEncoreJob(encoreInstance: EncoreInstance): Promise<void> {
async createEncoreJob(encoreInstance: EncoreInstance, mediaFileAddress: string): Promise<any> {

const url = encoreInstance.resources.enqueueJob.url;
const headerObj = {
Expand All @@ -114,7 +117,7 @@ export default class EncorePipeline implements Pipeline {
"baseName": this.configuration.baseName,
"inputs": [
{
"uri": this.configuration.inputs[0],
"uri": mediaFileAddress,
"type": "AudioVideo"
}
],
Expand All @@ -128,25 +131,170 @@ export default class EncorePipeline implements Pipeline {
body: data,
});

fetch(request).then(response => {
return fetch(request).then(response => {
logger.info(`Create Encore Job Status: ${response.status}`);
return response.status;
}).then(responseStatus => {
if (responseStatus != 201) {
const newRequest = new Request(url, {
method: "POST",
headers: headers,
body: data,
});
setTimeout(() => {
fetch(newRequest).then(response => {
logger.info(`Attempt 2, Create Encore Job Status: ${response.status}`);
});
}, 5000);
if (response.status == 201) {
return response.json();
}
else {
throw new Error('Job creation failed');
}
})
.catch(error => {
logger.error(error);
});
}

async pollUntilAllJobsCompleted(encoreInstance: EncoreInstance, enqueuedJobIds: string[]): Promise<any> {
logger.info("polling until successful job")
while (true) {
logger.info(`Enqueued jobs: ${enqueuedJobIds}`);
if (enqueuedJobIds.length < 1 || enqueuedJobIds === undefined) {
break
}
const jobsResponse: EncoreJobs = await this.getEncoreJobs(encoreInstance);
const jobs: EncoreJob[] = jobsResponse._embedded.encoreJobs;
if (jobs.length < 1 || jobs === undefined) {
logger.info("getEncoreJobs returned empty list");
break
}
for (let job of jobs) {
logger.info(`Encore Job Transcoding Status: ${job.status}`);
const jobShouldBeProcessed: number = enqueuedJobIds.indexOf(job.id);
if (job.status === "SUCCESSFUL" && jobShouldBeProcessed != -1) {
for (let outputObj of job.output) {
const trimmedInstanceUrl: string = encoreInstance.url.substring(0, encoreInstance.url.length - 1);
const url: string = `${trimmedInstanceUrl}${outputObj.file}`;
const splitFilename = outputObj.file.split("/");
await this.downloadFile(url, splitFilename[splitFilename.length - 1]);
}
enqueuedJobIds.splice(jobShouldBeProcessed, 1);
}
else if (job.status === "FAILED") {
enqueuedJobIds.splice(jobShouldBeProcessed, 1);
}
}
await delay(30000);
}
}

/**
* Attempts to get all jobs for an Encore Instance.
* @param encoreInstance The Encore Instance to get jobs from.
*/
async getEncoreJobs(encoreInstance: EncoreInstance): Promise<any> {
const url = encoreInstance.resources.listJobs.url;
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
};
const headers = new Headers(headerObj);

const request = new Request(url, {
method: "GET",
headers: headers,
});

return fetch(request).then(response => {
logger.info(`GET Encore Jobs Status: ${response.status}`);
if (response.status == 200) {
return response.json();
}
})
.catch(error => {
logger.error(error);
});
}

async getEncoreJob(encoreJobUrl: string): Promise<any> {
const url = encoreJobUrl;
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
};
const headers = new Headers(headerObj);

const request = new Request(url, {
method: "GET",
headers: headers,
});

return fetch(request).then(response => {
logger.info(`GET Encore Job Status: ${response.status}`);
if (response.status == 200) {
return response.json();
}
})
.catch(error => {
logger.error(error);
});
}
}

async downloadFile(url: string, filename: string): Promise<any> {
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
'Content-Type': 'application/json',
};
const headers = new Headers(headerObj);

const request = new Request(url, {
headers: headers,
});

logger.info(`File to download: ${filename}`);
const destinationPath: string = `./mediaTransferDir/${filename}`;
const stream = fs.createWriteStream(destinationPath);
const { body } = await fetch(request);
await finished(Readable.fromWeb(body).pipe(stream));
}
}

async function dummymain() {

const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-encore.stage.osaas.io",
token: "",
instanceId: "dummy2",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo",
inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4", "https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"],
duration: 10,
priority: 0,
};

const AWSConf: AWSPipelineConfiguration = {
inputBucket: "string",
outputBucket: "string",
mediaConvertRole: "string",
mediaConvertEndpoint: "string",
mediaConvertSettings: "any",
ecsSubnet: "string",
ecsSecurityGroup: "string",
ecsCluster: "string",
ecsContainerName: "string",
ecsTaskDefinition: "string",
};;



const awsPipe = new AWSPipeline(AWSConf);
const pipeline: EncorePipeline = new EncorePipeline(configuration);
const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile);
let jobIds: string[] = [];
await delay(10000);
for (let input of configuration.inputs) {
const job: EncoreJob = await pipeline.createEncoreJob(instance, input);
jobIds.push(job.id);
}
await pipeline.pollUntilAllJobsCompleted(instance, jobIds);
//awsPipe.uploadToS3()


//const fileUrl:string = "https://olivertest-newtest.encore.stage.osaas.io/usercontent/demo/_demo_x264_324.mp4"
//pipeline.downloadFile(fileUrl, './newvid.mp4');

}

dummymain();

0 comments on commit dc15a82

Please sign in to comment.