From 8fb18db3c3581ca3484ea8efcdbc0206cc4f776c Mon Sep 17 00:00:00 2001 From: oshinongit Date: Mon, 2 Oct 2023 14:25:52 +0200 Subject: [PATCH] feat: download and process files --- src/pipelines/encore/encore-pipeline.ts | 141 +++++++++++++++++++----- 1 file changed, 113 insertions(+), 28 deletions(-) diff --git a/src/pipelines/encore/encore-pipeline.ts b/src/pipelines/encore/encore-pipeline.ts index ebcb94b..f30c95a 100644 --- a/src/pipelines/encore/encore-pipeline.ts +++ b/src/pipelines/encore/encore-pipeline.ts @@ -4,6 +4,13 @@ import { EncorePipelineConfiguration } from './encore-pipeline-configuration'; import { QualityAnalysisModel } from '../../models/quality-analysis-model'; import { EncoreInstance } from '../../models/encoreInstance'; import logger from '../../logger'; +import fs from 'fs'; +const { Readable } = require('stream'); //Typescript import library contains different functions. If someone knows how to achieve the same functionality let me know. +const { finished } = require('stream/promises'); //Same as above. + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} export default class EncorePipeline implements Pipeline { @@ -28,7 +35,7 @@ export default class EncorePipeline implements Pipeline { * @param instanceId The Encore Instance that will enqueue the transcode job. * @param profile The transcode profile. */ - async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise { + async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise { const url = `${apiAddress}/encoreinstance`; const headerObj = { @@ -48,7 +55,7 @@ export default class EncorePipeline implements Pipeline { body: data, }); - fetch(request).then(response => { + return fetch(request).then(response => { logger.info(`Create Encore Instance Status: ${response.status}`); if (response.status == 200) { return response.json() @@ -56,16 +63,9 @@ export default class EncorePipeline implements Pipeline { else { throw new Error('Instance creation failed'); } - }).then(data => { - const encoreInstance: EncoreInstance = data; - return encoreInstance; - }).then(encoreInstance => { - setTimeout(() => { - this.createEncoreJob(encoreInstance); - }, 5000); }).catch(error => { - logger.error(error); - }); + logger.error(error); + }); } /** @@ -98,8 +98,9 @@ export default class EncorePipeline implements Pipeline { * Attempts to enqueue a transcode job to an Encore Instance. * If job creation fails the first time a second attempt will be made after 5000ms. * @param encoreInstance The Encore Instance were the job should be enqueued. + * @param mediaFileAddress The https address of the media file to be enqueued. */ - async createEncoreJob(encoreInstance: EncoreInstance): Promise { + async createEncoreJob(encoreInstance: EncoreInstance, mediaFileAddress: string): Promise { const url = encoreInstance.resources.enqueueJob.url; const headerObj = { @@ -114,7 +115,7 @@ export default class EncorePipeline implements Pipeline { "baseName": this.configuration.baseName, "inputs": [ { - "uri": this.configuration.inputs[0], + "uri": mediaFileAddress, "type": "AudioVideo" } ], @@ -128,25 +129,109 @@ export default class EncorePipeline implements Pipeline { body: data, }); - fetch(request).then(response => { + return fetch(request).then(response => { logger.info(`Create Encore Job Status: ${response.status}`); - return response.status; - }).then(responseStatus => { - if (responseStatus != 201) { - const newRequest = new Request(url, { - method: "POST", - headers: headers, - body: data, - }); - setTimeout(() => { - fetch(newRequest).then(response => { - logger.info(`Attempt 2, Create Encore Job Status: ${response.status}`); - }); - }, 5000); + if (response.status == 201) { + return response.json() + } + else { + throw new Error('Job creation failed'); } }) .catch(error => { logger.error(error); }); } -} \ No newline at end of file + + async pollForSuccessfulJob(encoreInstance: EncoreInstance): Promise { + //Todo there should be some handling here of "FAILED" jobs + logger.info("in polling") + while (true) { + setTimeout(() => { + this.getEncoreJobs(encoreInstance).then( + jobResponse => { + logger.info(jobResponse.body) + if (jobResponse.status == "SUCCESSFUL") { + //loop through job object and download the relevant files + } + } + ); + }, 30000); + } + } + + /** + * Attempts to get all jobs for an Encore Instance. + * @param encoreInstance The Encore Instance to get jobs from. + */ + async getEncoreJobs(encoreInstance: EncoreInstance): Promise { + const url = encoreInstance.resources.listJobs.url; + const headerObj = { + 'accept': 'application/json', + 'x-jwt': `Bearer ${this.configuration.token}`, + }; + const headers = new Headers(headerObj); + + const request = new Request(url, { + method: "GET", + headers: headers, + }); + + fetch(request).then(response => { + logger.info(`GET Encore Jobs Status: ${response.status}`); + return response; + }) + .catch(error => { + logger.error(error); + }); + } + + async downloadFile(url: string, destinationPath: string): Promise { + const headerObj = { + 'accept': 'application/json', + 'x-jwt': `Bearer ${this.configuration.token}`, + 'Content-Type': 'application/json', + }; + const headers = new Headers(headerObj); + + const request = new Request(url, { + headers: headers, + }); + + const stream = fs.createWriteStream(destinationPath); + const { body } = await fetch(request); + await finished(Readable.fromWeb(body).pipe(stream)); + } +} + +async function dummymain() { + + const configuration: EncorePipelineConfiguration = { + apiAddress: "https://api-encore.stage.osaas.io", + token: "", + instanceId: "dummy", + profile: "program", + outputFolder: "/usercontent/demo", + baseName: "_demo", + inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4", "https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"], + duration: 10, + priority: 0, + }; + + const pipeline: EncorePipeline = new EncorePipeline(configuration); + const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile); + let jobs: Array = []; + await delay(10000); + for (let input of configuration.inputs){ + const job = await pipeline.createEncoreJob(instance, input); + logger.info(job.url) + jobs.push(job); + } + // logger.info(jobs[0].body); + + //const fileUrl:string = "https://olivertest-newtest.encore.stage.osaas.io/usercontent/demo/_demo_x264_324.mp4" + //pipeline.downloadFile(fileUrl, './newvid.mp4'); + +} + +dummymain(); \ No newline at end of file