forked from danielwestendorf/pdf-printer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.js
99 lines (88 loc) · 3.48 KB
/
process.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
const fs = require('fs');
const path = require('path');
const debug = require('debug')('pdf:processor');
const clone = require('lodash.clonedeep');
const createPdfGenerator = require('pdf-bot/src/pdfGenerator')
const error = require('pdf-bot/src/error');
const createQueue = require('pdf-bot/src/queue');
const execSync = require('child_process').execSync;
const configPath = path.join(process.cwd(), 'pdf-bot.config.js');
const configuration = require(configPath);
const queue = initiateQueue();
const maxTries = configuration.queue.generationMaxTries;
const retryStrategy = configuration.queue.generationRetryStrategy;
const parallelism = configuration.queue.parallelism;
const POLL_INTERVAL = parseInt(process.env.POLL_INTERVAL);
// Entry point
// Assumes only one worker. Queue's busy is set to false, in case of prior
// crash or interrupt
queue.setIsBusy(false).then(findAndProcessNext);
function findAndProcessNext() {
function timeout(duration) {
duration = duration || POLL_INTERVAL;
setTimeout(findAndProcessNext, duration);
}
queue.isBusy()
.then((isBusy) => {
if (!isBusy) {
queue.getAllUnfinished(retryStrategy, maxTries)
.then((jobs) => {
if (jobs.length === 0) {
debug('No jobs in the queue');
timeout();
} else {
queue.setIsBusy(true).then(() => {
const promises = [];
// Process X of the Jobs available
for (let i = 0; i < parallelism; i++) {
if (jobs[i] != undefined) {
promises.push(processJob(jobs[i], clone(configuration)));
}
}
console.log(" Found %s jobs, processing %s of them", jobs.length, promises.length);
Promise.all(promises)
.then(() => {
if (jobs.length > promises.length) {
// There are more jobs to process, make haste!
queue.setIsBusy(false).then(() => {
timeout(0);
});
} else {
queue.setIsBusy(false).then(timeout);
}
})
.catch(() => {
return queue.setIsBusy(false).then(() => {
// invoke timeout
console.log('An error was caught')
timeout();
})
})
});
}
});
} else {
debug('Queue is busy, why was this called?')
}
});
}
function initiateQueue() {
const db = configuration.db(configuration);
const queueOptions = configuration.queue;
return createQueue(db, queueOptions);
}
function processJob(job, configuration) {
const generatorOptions = configuration.generator;
const storagePlugins = configuration.storage;
const generator = createPdfGenerator(configuration.storagePath, generatorOptions, storagePlugins);
return queue.processJob(generator, job, configuration.webhook).then((response) => {
if (error.isError(response)) {
console.error('Job ID ' + job.id + ' failed to process.')
debug(response);
return Promise.reject(response);
} else {
console.log('Job ID ' + job.id + ' was processed.')
return Promise.resolve(true);
}
});
}