forked from jeffcrouse/mturk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
98 lines (85 loc) · 2.97 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
module.exports = function(conf) {
var EventEmitter = require('events').EventEmitter
, notificationReceptor = require('./notification_receptor')(conf)
, HIT = require('./model/hit')(conf)
, uri = require('./lib/uri')
, ret;
var POLLER_INTERVAL_MS = conf.poller && conf.poller.frequency_ms || 60000;
uri.setBaseURI(conf.url || "http://mechanicalturk.amazonaws.com")
var notification = new EventEmitter();
ret = notification;
var started = false;
var recentlyReviewed = {};
var clearTimeouts = [];
function emitHitReviewable(hitId, emitAny) {
var emitted = false
, timeout;
if (! recentlyReviewed[hitId]) {
recentlyReviewed[hitId] = true;
if (emitAny) { notification.emit('any', {EventType: 'HITReviewable', HITId: hitId, eventType: 'hITReviewable'}); }
notification.emit('HITReviewable', hitId);
// eventually delete hitId from list so it doesn't grow too much
timeout = setTimeout(function() {
var pos = clearTimeouts.lastIndexOf(timeout);
if (pos >= 0) { clearTimeouts.splice(pos, 1); }
delete recentlyReviewed[hitId];
}, POLLER_INTERVAL_MS * 10);
clearTimeouts.push(timeout);
emitted = true;
}
return emitted;
}
function startNotificationReceptor() {
notificationReceptor.start();
notificationReceptor.on('any', function(event) {
if (event.EventType == 'HITReviewable') {
emitHitReviewable(event.HITId, false);
} else {
notification.emit(event.eventType, event);
}
});
}
var pollerTimeout;
function startPoller() {
(function get(pageNumber) {
if (! pageNumber) pageNumber = 1;
HIT.getReviewable({pageSize: 20, pageNumber: pageNumber, status: 'Reviewable'}, function(err, numResults, totalNumResults, pageNumber, hits) {
var reschedule = true;
if (! err) {
hits.forEach(function(hit) {
emitHitReviewable(hit.id, true)
});
if (numResults > 0 && totalNumResults > numResults) {
reschedule = false;
get(pageNumber + 1);
}
}
if (reschedule) pollerTimeout = setTimeout(get, POLLER_INTERVAL_MS);
});
})();
}
var oldNotificationOn = notification.on;
notification.on = function(event, callback) {
if (! started) {
startNotificationReceptor();
startPoller();
started = true;
}
oldNotificationOn.call(notification, event, callback);
};
notification.stopListening = function() {
notificationReceptor.stop();
if (pollerTimeout) {
clearTimeout(pollerTimeout);
}
clearTimeouts.forEach(function(timeout) {
clearTimeout(timeout);
});
};
ret.HIT = HIT;
ret.HITType = require('./model/hit_type')(conf);
ret.Price = require('./model/price')(conf);
ret.Question = require('./model/question')(conf);
ret.Notification = require('./model/notification')(conf);
return ret;
};