-
Notifications
You must be signed in to change notification settings - Fork 0
/
line-split-stream.js
54 lines (53 loc) · 1.45 KB
/
line-split-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import { Transform } from 'stream';
import { once } from 'events';
export default class LineSplitStream extends Transform {
constructor(downStream, offset = 0) {
super({ objectMode: true });
this.downStream = downStream;
this.buffered = '';
this.lineCount = 0;
this.offset = offset;
}
async _transform(chunk, encoding, callback) {
if (typeof chunk !== "string") {
chunk = chunk.toString();
}
let backpress = 0;
if (chunk.includes('\n')) {
const parts = chunk.split('\n');
// we have buffered history; prepend to first part
if (this.buffered.length) {
parts[0] = this.buffered + parts[0];
this.buffered = '';
}
// if we have remainder after \n, push it to this.buffered
if (parts[parts.length - 1] !== '') {
this.buffered = parts.pop();
} else {
parts.pop();
}
for (const part of parts) {
if (this.offset && this.lineCount++ < this.offset) {
continue;
}
if (!this.push(part)) {
if (this.downStream)
await once(this.downStream, 'drain');
backpress += 1;
}
}
} else {
// no newline here, buffering
this.buffered += chunk;
}
callback();
}
// end(...args) {
// console.error(...args);
// console.error('linestream ended');
// if (this.buffered.length) {
// this.push(this.buffered);
// }
// this.emit('end');
// }
}