diff --git a/src/models/encoreJobs.ts b/src/models/encoreJobs.ts new file mode 100644 index 0000000..2332978 --- /dev/null +++ b/src/models/encoreJobs.ts @@ -0,0 +1,54 @@ +export type EncoreJobs = { + _embedded:{ + encoreJobs: EncoreJob[] + }, + _links:{ + self:any, + profile:any, + search:any + }, + page:any +} + +export type EncoreJob = { + id:string, + externalId:string, + profile:string, + outputFolder:string, + baseName:string, + createdDate:Date, + progressCallbackUri:string, + priority:number, + message:string, + progress:number, + speed:string, + startedDate:Date, + completedDate:Date, + debugOverlay:boolean, + logContext:any, + seekTo:string, + duration:number, + thumbnailTime:string, + inputs:any[], + output:Output[], + status:string, + _links:{ + self:{ + href:string + }, + encoreJob:{ + href:string + } + } + } + +export type Output = { + file : string, + fileSize : number, + format : string, + overallBitrate : number, + duration : number, + videoStreams : any[], + audioStreams : any[], + type : string + } \ No newline at end of file diff --git a/src/pipelines/encore/encore-pipeline.ts b/src/pipelines/encore/encore-pipeline.ts index ebcb94b..d7cabb5 100644 --- a/src/pipelines/encore/encore-pipeline.ts +++ b/src/pipelines/encore/encore-pipeline.ts @@ -3,7 +3,15 @@ import { Pipeline } from '../pipeline'; import { EncorePipelineConfiguration } from './encore-pipeline-configuration'; import { QualityAnalysisModel } from '../../models/quality-analysis-model'; import { EncoreInstance } from '../../models/encoreInstance'; +import { EncoreJobs, EncoreJob } from '../../models/encoreJobs'; import logger from '../../logger'; +import fs from 'fs'; +const { Readable } = require('stream'); //Typescript import library contains different functions. If someone knows how to achieve the same functionality let me know. +const { finished } = require('stream/promises'); //Same as above. + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} export default class EncorePipeline implements Pipeline { @@ -28,7 +36,7 @@ export default class EncorePipeline implements Pipeline { * @param instanceId The Encore Instance that will enqueue the transcode job. * @param profile The transcode profile. */ - async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise { + async createEncoreInstance(apiAddress: string, token: string, instanceId: string, profile: string): Promise { const url = `${apiAddress}/encoreinstance`; const headerObj = { @@ -48,21 +56,14 @@ export default class EncorePipeline implements Pipeline { body: data, }); - fetch(request).then(response => { + return fetch(request).then(response => { logger.info(`Create Encore Instance Status: ${response.status}`); if (response.status == 200) { - return response.json() + return response.json(); } else { throw new Error('Instance creation failed'); } - }).then(data => { - const encoreInstance: EncoreInstance = data; - return encoreInstance; - }).then(encoreInstance => { - setTimeout(() => { - this.createEncoreJob(encoreInstance); - }, 5000); }).catch(error => { logger.error(error); }); @@ -98,8 +99,9 @@ export default class EncorePipeline implements Pipeline { * Attempts to enqueue a transcode job to an Encore Instance. * If job creation fails the first time a second attempt will be made after 5000ms. * @param encoreInstance The Encore Instance were the job should be enqueued. + * @param mediaFileAddress The https address of the media file to be enqueued. */ - async createEncoreJob(encoreInstance: EncoreInstance): Promise { + async createEncoreJob(encoreInstance: EncoreInstance, mediaFileAddress: string): Promise { const url = encoreInstance.resources.enqueueJob.url; const headerObj = { @@ -114,7 +116,7 @@ export default class EncorePipeline implements Pipeline { "baseName": this.configuration.baseName, "inputs": [ { - "uri": this.configuration.inputs[0], + "uri": mediaFileAddress, "type": "AudioVideo" } ], @@ -128,25 +130,151 @@ export default class EncorePipeline implements Pipeline { body: data, }); - fetch(request).then(response => { + return fetch(request).then(response => { logger.info(`Create Encore Job Status: ${response.status}`); - return response.status; - }).then(responseStatus => { - if (responseStatus != 201) { - const newRequest = new Request(url, { - method: "POST", - headers: headers, - body: data, - }); - setTimeout(() => { - fetch(newRequest).then(response => { - logger.info(`Attempt 2, Create Encore Job Status: ${response.status}`); - }); - }, 5000); + if (response.status == 201) { + return response.json(); + } + else { + throw new Error('Job creation failed'); + } + }) + .catch(error => { + logger.error(error); + }); + } + + async pollUntilSuccessfulJob(encoreInstance: EncoreInstance): Promise { + //Todo there should be some handling here of "FAILED" jobs + logger.info("polling until successful job") + while (true) { + + const jobsResponse: EncoreJobs = await this.getEncoreJobs(encoreInstance); + const jobs = jobsResponse._embedded.encoreJobs; + if (jobs.length < 1 || jobs === undefined){ + logger.info("getEncoreJobs returned empty list"); + break + } + for (let job of jobs) { + logger.info(`Encore Job Status: ${job.status}`) + if (job.status === "SUCCESSFUL") { + for (let outputObj of job.output){ + const trimmedInstanceUrl: string = encoreInstance.url.substring(0, encoreInstance.url.length - 1); + const url: string = `${trimmedInstanceUrl}${outputObj.file}`; + const splitFilename = outputObj.file.split("/"); + await this.downloadFile(url, splitFilename[splitFilename.length - 1]); + } + //await delete job + break + } + else if (job.status === "FAILED"){ + //await delete job + } + } + await delay(30000); + } + } + + /** + * Attempts to get all jobs for an Encore Instance. + * @param encoreInstance The Encore Instance to get jobs from. + */ + async getEncoreJobs(encoreInstance: EncoreInstance): Promise { + const url = encoreInstance.resources.listJobs.url; + const headerObj = { + 'accept': 'application/json', + 'x-jwt': `Bearer ${this.configuration.token}`, + }; + const headers = new Headers(headerObj); + + const request = new Request(url, { + method: "GET", + headers: headers, + }); + + return fetch(request).then(response => { + logger.info(`GET Encore Jobs Status: ${response.status}`); + if (response.status == 200) { + return response.json(); } }) .catch(error => { logger.error(error); }); } -} \ No newline at end of file + + async getEncoreJob(encoreJobUrl: string): Promise { + const url = encoreJobUrl; + const headerObj = { + 'accept': 'application/json', + 'x-jwt': `Bearer ${this.configuration.token}`, + }; + const headers = new Headers(headerObj); + + const request = new Request(url, { + method: "GET", + headers: headers, + }); + + return fetch(request).then(response => { + logger.info(`GET Encore Job Status: ${response.status}`); + if (response.status == 200) { + return response.json(); + } + }) + .catch(error => { + logger.error(error); + }); + } + + async downloadFile(url: string, filename: string): Promise { + const headerObj = { + 'accept': 'application/json', + 'x-jwt': `Bearer ${this.configuration.token}`, + 'Content-Type': 'application/json', + }; + const headers = new Headers(headerObj); + + const request = new Request(url, { + headers: headers, + }); + + logger.info(`File to download: ${filename}`); + const destinationPath: string = `./mediaTransferDir/${filename}`; + const stream = fs.createWriteStream(destinationPath); + const { body } = await fetch(request); + await finished(Readable.fromWeb(body).pipe(stream)); + } +} + +async function dummymain() { + + const configuration: EncorePipelineConfiguration = { + apiAddress: "https://api-encore.stage.osaas.io", + token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2OTUxMjkyNTAsImN1c3RvbWVyIjoib2xpdmVydGVzdCIsImVtYWlsIjoib2xpdmVydGVzdG1haWwiLCJsaW1pdCI6M30.HnFLzkl2-QQU28kNZW1SAgO6l69mslJaCwvUxITzfJY", + instanceId: "dummy", + profile: "program", + outputFolder: "/usercontent/demo", + baseName: "_demo", + inputs: ["https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4", "https://testcontent.eyevinn.technology/mp4/stswe-tvplus-promo.mp4"], + duration: 10, + priority: 0, + }; + + const pipeline: EncorePipeline = new EncorePipeline(configuration); + const instance: EncoreInstance = await pipeline.createEncoreInstance(configuration.apiAddress, configuration.token, configuration.instanceId, configuration.profile); + let jobs: Array = []; + await delay(10000); + for (let input of configuration.inputs) { + const job: EncoreJob = await pipeline.createEncoreJob(instance, input); + jobs.push(job); + } + await pipeline.pollUntilSuccessfulJob(instance); + + + //const fileUrl:string = "https://olivertest-newtest.encore.stage.osaas.io/usercontent/demo/_demo_x264_324.mp4" + //pipeline.downloadFile(fileUrl, './newvid.mp4'); + +} + +dummymain(); \ No newline at end of file