Skip to content

Commit

Permalink
feat: download and process files
Browse files Browse the repository at this point in the history
  • Loading branch information
oshinongit committed Oct 3, 2023
1 parent 9bf7b61 commit 8fb18db
Showing 1 changed file with 113 additions and 28 deletions.
141 changes: 113 additions & 28 deletions src/pipelines/encore/encore-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import { EncorePipelineConfiguration } from './encore-pipeline-configuration';
import { QualityAnalysisModel } from '../../models/quality-analysis-model';
import { EncoreInstance } from '../../models/encoreInstance';
import logger from '../../logger';
import fs from 'fs';
const { Readable } = require('stream'); //Typescript import library contains different functions. If someone knows how to achieve the same functionality let me know.
const { finished } = require('stream/promises'); //Same as above.

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export default class EncorePipeline implements Pipeline {

Expand All @@ -28,7 +35,7 @@ export default class EncorePipeline implements Pipeline {
* @param instanceId The Encore Instance that will enqueue the transcode job.
* @param profile The transcode profile.
*/
async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise<void> {
async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise<any> {

const url = `${apiAddress}/encoreinstance`;
const headerObj = {
Expand All @@ -48,24 +55,17 @@ export default class EncorePipeline implements Pipeline {
body: data,
});

fetch(request).then(response => {
return fetch(request).then(response => {
logger.info(`Create Encore Instance Status: ${response.status}`);
if (response.status == 200) {
return response.json()
}
else {
throw new Error('Instance creation failed');
}
}).then(data => {
const encoreInstance: EncoreInstance = data;
return encoreInstance;
}).then(encoreInstance => {
setTimeout(() => {
this.createEncoreJob(encoreInstance);
}, 5000);
}).catch(error => {
logger.error(error);
});
logger.error(error);
});
}

/**
Expand Down Expand Up @@ -98,8 +98,9 @@ export default class EncorePipeline implements Pipeline {
* Attempts to enqueue a transcode job to an Encore Instance.
* If job creation fails the first time a second attempt will be made after 5000ms.
* @param encoreInstance The Encore Instance were the job should be enqueued.
* @param mediaFileAddress The https address of the media file to be enqueued.
*/
async createEncoreJob(encoreInstance: EncoreInstance): Promise<void> {
async createEncoreJob(encoreInstance: EncoreInstance, mediaFileAddress: string): Promise<any> {

const url = encoreInstance.resources.enqueueJob.url;
const headerObj = {
Expand All @@ -114,7 +115,7 @@ export default class EncorePipeline implements Pipeline {
"baseName": this.configuration.baseName,
"inputs": [
{
"uri": this.configuration.inputs[0],
"uri": mediaFileAddress,
"type": "AudioVideo"
}
],
Expand All @@ -128,25 +129,109 @@ export default class EncorePipeline implements Pipeline {
body: data,
});

fetch(request).then(response => {
return fetch(request).then(response => {
logger.info(`Create Encore Job Status: ${response.status}`);
return response.status;
}).then(responseStatus => {
if (responseStatus != 201) {
const newRequest = new Request(url, {
method: "POST",
headers: headers,
body: data,
});
setTimeout(() => {
fetch(newRequest).then(response => {
logger.info(`Attempt 2, Create Encore Job Status: ${response.status}`);
});
}, 5000);
if (response.status == 201) {
return response.json()
}
else {
throw new Error('Job creation failed');
}
})
.catch(error => {
logger.error(error);
});
}
}

async pollForSuccessfulJob(encoreInstance: EncoreInstance): Promise<void> {
//Todo there should be some handling here of "FAILED" jobs
logger.info("in polling")
while (true) {
setTimeout(() => {
this.getEncoreJobs(encoreInstance).then(
jobResponse => {
logger.info(jobResponse.body)
if (jobResponse.status == "SUCCESSFUL") {
//loop through job object and download the relevant files
}
}
);
}, 30000);
}
}

/**
* Attempts to get all jobs for an Encore Instance.
* @param encoreInstance The Encore Instance to get jobs from.
*/
async getEncoreJobs(encoreInstance: EncoreInstance): Promise<any> {
const url = encoreInstance.resources.listJobs.url;
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
};
const headers = new Headers(headerObj);

const request = new Request(url, {
method: "GET",
headers: headers,
});

fetch(request).then(response => {
logger.info(`GET Encore Jobs Status: ${response.status}`);
return response;
})
.catch(error => {
logger.error(error);
});
}

async downloadFile(url: string, destinationPath: string): Promise<void> {
const headerObj = {
'accept': 'application/json',
'x-jwt': `Bearer ${this.configuration.token}`,
'Content-Type': 'application/json',
};
const headers = new Headers(headerObj);

const request = new Request(url, {
headers: headers,
});

const stream = fs.createWriteStream(destinationPath);
const { body } = await fetch(request);
await finished(Readable.fromWeb(body).pipe(stream));
}
}

async function dummymain() {

const configuration: EncorePipelineConfiguration = {
apiAddress: "https://api-encore.stage.osaas.io",
token: "",
instanceId: "dummy",
profile: "program",
outputFolder: "/usercontent/demo",
baseName: "_demo",
inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4", "https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"],
duration: 10,
priority: 0,
};

const pipeline: EncorePipeline = new EncorePipeline(configuration);
const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile);
let jobs: Array<any> = [];
await delay(10000);
for (let input of configuration.inputs){
const job = await pipeline.createEncoreJob(instance, input);
logger.info(job.url)
jobs.push(job);
}
// logger.info(jobs[0].body);

//const fileUrl:string = "https://olivertest-newtest.encore.stage.osaas.io/usercontent/demo/_demo_x264_324.mp4"
//pipeline.downloadFile(fileUrl, './newvid.mp4');

}

dummymain();

0 comments on commit 8fb18db

Please sign in to comment.