Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tests with new pipeline library #212

Closed
wants to merge 18 commits into from
Closed
49 changes: 33 additions & 16 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ const ShellScriptWorker = require('./shell/shellscript');
const actionWrapper = require('./webaction');
const { AssetComputeWorkerPipeline } = require('./worker-pipeline');

/**
* Checks if at least rendition needs the pipeline.
* If at least one rendition needs pipeline, all will go through pipeline
* @param {*} params Params
* @returns True is pipeline is needed, false otherwise
*/
function hasAtLeastOnePipelineRendtion(params){
return params.renditions.some(rendition => {
return rendition.pipeline === true;
});
}

/**
* Worker where the renditionCallback is called for each rendition.
Expand All @@ -28,22 +39,16 @@ const { AssetComputeWorkerPipeline } = require('./worker-pipeline');
* @param {*} options optional options
*/
function worker(renditionCallback, options = {}) {

if (typeof renditionCallback !== "function") {
throw new Error("renditionCallback must be a function");
}

return actionWrapper(async function (params) {

// if any rendition needs pipeline, all will go through pipeline
const usePipeline = params.renditions.some(rendition => {
return rendition.pipeline === true;
});

const usePipeline = hasAtLeastOnePipelineRendtion(params);
if(usePipeline){
// here the pipeline only wraps the worker callback
// and other transformers potentially already available in a transformer catalog
console.log("Using pipeline (`worker#AssetComputeWorkerPipeline # WorkerCallbackTransformer`)");
console.log("Using pipeline (`worker#AssetComputeWorkerPipeline #WorkerCallbackTransformer`)");
return new AssetComputeWorkerPipeline(renditionCallback, options).compute(params);
} else {
console.log("Using worker callback (`worker#AssetComputeWorker`)");
Expand All @@ -67,15 +72,11 @@ function batchWorker(renditionsCallback, options = {}) {
}

return actionWrapper(async function (params) {
// if any rendition needs pipeline, all will go through pipeline
const usePipeline = params.renditions.some(rendition => {
return rendition.pipeline === true;
});

const usePipeline = hasAtLeastOnePipelineRendtion(params);
if(usePipeline){
// here the pipeline only wraps the worker callback
// and other transformers potentially already available in a transformer catalog
console.log("Using pipeline (`AssetComputeWorkerPipeline # BatchWorkerCallbackTransformer`)");
console.log("Using pipeline (`AssetComputeWorkerPipeline #BatchWorkerCallbackTransformer`)");
options.isBatchWorker = true;
return new AssetComputeWorkerPipeline(renditionsCallback, options).compute(params);
} else {
Expand All @@ -88,11 +89,27 @@ function batchWorker(renditionsCallback, options = {}) {

function shellScriptWorker(script = "worker.sh", options = {}) {
ShellScriptWorker.validate(script);

options.script = script;

return actionWrapper(function (params) {
return new ShellScriptWorker(params, options).compute();
const usePipeline = hasAtLeastOnePipelineRendtion(params);
if(usePipeline){
console.log("Using pipeline (`AssetComputeWorkerPipeline #ShellscriptCallbackTransformer`)");
if(!options.supportsPipeline) {
throw new Error("This shellscript worker does not support running as part of pipelines");
}

options.isShellscriptWorker = true;
options.params = params;
if(process.env.ASSET_COMPUTE_SENSEI_CATALOG) {
options.transformerCatalog = process.env.ASSET_COMPUTE_SENSEI_CATALOG;
}

return new AssetComputeWorkerPipeline(options.script, options).compute(params);
} else {
console.log("Using shellscript worker callback (`ShellScriptWorker`)");
return new ShellScriptWorker(params, options).compute();
}
});
}

Expand Down
12 changes: 4 additions & 8 deletions lib/shell/shellscript.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
'use strict';

const AssetComputeWorker = require('../worker');


const errors = require('@adobe/asset-compute-commons');

const fs = require('fs-extra');
const path = require('path');
const { spawn } = require('child_process');

const stripAnsi = require('strip-ansi'); // escaping ansi content
const contentType = require('content-type');
const { Utils, Action } = require('@adobe/asset-compute-pipeline');
Expand Down Expand Up @@ -58,15 +54,15 @@ class ShellScriptWorker {
// use of spawn() with separate arguments prevents shell/command injection
await spawnProcess("/usr/bin/env", ["bash", "-x", this.script], { env });

// process metadata, if any
// process metadata, if any
if(await fs.pathExists(preparedFiles.typeFile)){ // use the path containing the file with mime information
console.log('Reading content type information from worker generated file');

let mimeInfoContent = await fs.readFile(preparedFiles.typeFile);
mimeInfoContent = mimeInfoContent.toString();
mimeInfoContent = mimeInfoContent.trim();

try{
try {
const contenttype = contentType.parse(mimeInfoContent);
rendition.setContentType(contenttype.type, contenttype.parameters.charset);
} catch(ex){
Expand Down Expand Up @@ -103,13 +99,13 @@ async function prepareMetadata(directory) {
// Folder structure has activationid as root, so concurrent executions should not collide
const typeFile = path.resolve(errDir, "type.txt");

// rendition options file for optiosn like postProcess
// rendition options file for options like postProcess
const optionsFile = path.resolve(directory, 'options.json');

return {
errorFile: errFile,
typeFile: typeFile,
optionsFile: optionsFile
optionsFile: optionsFile,
};
}

Expand Down
39 changes: 39 additions & 0 deletions lib/shellscript-worker-transformer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

'use strict';

const { Transformer } = require("@adobe/asset-compute-pipeline");
const ShellScriptWorker = require('./shell/shellscript');

class ShellscriptCallbackTransformer extends Transformer {
/**
* Construct a transformer for shellscript workers
* @param {*} callback
* @param {*} manifest worker manifest (only one, not a list)
*/
constructor(callback, manifest, params) {
super(`shellscript-workerCallback-${manifest.name}`, manifest);
this._params = params;
this._callback = callback;
}

async compute(input, output) {
const executionOptions = {
script: this._callback
};
const shellscriptWorker = new ShellScriptWorker(this._params, executionOptions);
return shellscriptWorker.processWithScript(input, output, executionOptions);
}
}

module.exports = ShellscriptCallbackTransformer;
23 changes: 12 additions & 11 deletions lib/worker-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const debug = console.log;
const { Engine, Plan } = require('@adobe/asset-compute-pipeline');
const WorkerCallbackTransformer = require('./worker-transformer');
const BatchWorkerCallbackTransformer = require('./batch-worker-transformer');
const ShellscriptCallbackTransformer = require('./shellscript-worker-transformer');
const { AssetComputeLogUtils } = require('@adobe/asset-compute-commons');

const fs = require('fs-extra');
Expand All @@ -39,7 +40,8 @@ class AssetComputeWorkerPipeline {

// set by compatibility layer
this.options.hasBatchModeWorker = options.isBatchWorker || false;

this.options.hasShellscriptWorker = options.isShellscriptWorker || false;

this.transformers = [];
this.buildTransformersFromManifests(renditionCallback);

Expand All @@ -48,9 +50,6 @@ class AssetComputeWorkerPipeline {
this.transformers = this.transformers.concat(options.transformerCatalog);
}
debug("Built worker transformer using rendition callback 'renditionCallback'");

// set by compatibility layer
this.options.hasBatchModeWorker = options.isBatchWorker || false;
}

/**
Expand All @@ -65,17 +64,20 @@ class AssetComputeWorkerPipeline {
return this.transformers;
}

//const transformer = new WorkerCallbackTransformer(renditionCallback, this.options.manifest);
let transformer;
if (this.options.hasBatchModeWorker === true) {
if (this.options.hasShellscriptWorker === true) {
debug("Creating a shellscript worker");
// renditionCallback is the script in this case
transformer = new ShellscriptCallbackTransformer(renditionCallback, this.options.manifest, this.options.params);
} else if (this.options.hasBatchModeWorker === true) {
debug("Creating a batch worker");
transformer = new BatchWorkerCallbackTransformer(renditionCallback, this.options.manifest);
} else {
debug("Creating a normal worker");
transformer = new WorkerCallbackTransformer(renditionCallback, this.options.manifest);
}
this.transformers.push(transformer);

debug(`Adding ${transformer.name} WorkerCallbackTransformer to the pipeline`);
}

Expand All @@ -92,7 +94,7 @@ class AssetComputeWorkerPipeline {
}

normalizeInputOuput(input, output) {
// TODO: we will have special cases for beta-worker-creative
// TODO: we will have special cases for beta-worker-creative
// special case for sensei
if(output.fmt === 'machine-metadata-json') {
output.type = 'machine-metadata-json';
Expand All @@ -102,7 +104,6 @@ class AssetComputeWorkerPipeline {
output.type = mimetype && mimetype.toLowerCase();
}


// if source.mimetype does not exist, or it does not match the extension,
// try to find mimetype by mapping the extension
// this can happen if the client (for example the devtool) does not define the content-type correctly
Expand Down Expand Up @@ -138,7 +139,7 @@ class AssetComputeWorkerPipeline {

let input = params.source;

// WORKER_TEST_MODE:
// WORKER_TEST_MODE:
// for test-worker framework, input and output are mounted at /in and /out
// random access reading and writing from that mount can be problematic on Docker for Mac at least,
// so we are copying all files over into the container
Expand Down Expand Up @@ -178,7 +179,7 @@ class AssetComputeWorkerPipeline {
AssetComputeLogUtils.log(output,'Output for refinePlan:');
// TODO we will need to add support for multiple renditions
// TODO we have to integrate options into our plan and support everything we support now

debug("Preparing plan for rendition creation...");
await engine.refinePlan(plan, input, output);
debug("Refined plan to create rendition");
Expand Down
3 changes: 1 addition & 2 deletions lib/worker-transformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ const mime = require('mime-types');
class WorkerCallbackTransformer extends Transformer {

/**
*
* Constructor
* @param {*} callback
* @param {*} manifest worker manifest (only one, not a list)
*/
constructor(callback, manifest) {
super(`workerCallback-${manifest.name}`, manifest);

this._callback = callback;
}

Expand Down
3 changes: 0 additions & 3 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class AssetComputeWorker {
new Date(this.params.times.gateway) :
new Date(this.params.times.process);


this.metrics.add({
startWorkerDuration: Utils.durationSec(this.processingStartTime, this.workerStartTime),
gatewayToProcessDuration: Utils.durationSec(this.params.times.gateway, this.params.times.process),
Expand Down Expand Up @@ -149,7 +148,6 @@ class AssetComputeWorker {
const baseDirectory = "";
const usePipeline = false;
this.directories = await Prepare.createDirectories(folderName, baseDirectory, usePipeline);

this.renditions = Rendition.forEach(this.params.renditions, this.directories.out, usePipeline);

this.timers.download.start();
Expand Down Expand Up @@ -185,7 +183,6 @@ class AssetComputeWorker {
console.log(`no rendition found after worker() callback processing at: ${rendition.path}`);
throw new GenericError(`No rendition generated for ${rendition.id()}`, `${this.actionName}_process_norendition`);
}

} catch (err) {
this.timers.processingCallback.stop();
console.log(`worker() callback processing failed with error after ${this.timers.processingCallback} seconds: ${err.message || err}`);
Expand Down
Loading
Loading