Skip to content

Commit

Permalink
f: switch to Uint8Array
Browse files Browse the repository at this point in the history
  • Loading branch information
coolaj86 committed Jul 14, 2024
1 parent ad23fd4 commit 520992d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
95 changes: 67 additions & 28 deletions demo.js
Original file line number Diff line number Diff line change
Expand Up @@ -679,17 +679,18 @@ var CJDemo = ('object' === typeof module && exports) || {};

let dataCount = 0;
// conn.on('data', function (data) {
// let bytes = new Uint8Array(data);
// console.log('[DEBUG] data');
// console.log(dataCount, data.length, data.toString('hex'));
// console.log(dataCount, bytes.length, DashTx.utils.bytesToHex(bytes));
// dataCount += 1;
// });
console.log('[DEBUG] main add wsc.onmessage');
wsc.addEventListener('message', async function (wsevent) {
console.log('[DEBUG] main wsc.onmessage');
let ab = await wsevent.data.arrayBuffer();
let data = Buffer.from(ab);
let bytes = new Uint8Array(ab);
console.log('[DEBUG] data (main)');
console.log(dataCount, data.length, data.toString('hex'));
console.log(dataCount, bytes.length, DashTx.utils.bytesToHex(bytes));
dataCount += 1;
});

Expand Down Expand Up @@ -775,25 +776,25 @@ var CJDemo = ('object' === typeof module && exports) || {};
_reject(err);
}

function onReadableHeader(data) {
let size = data?.length || 0;
console.log('State: reading header', size);
function onReadableHeader(bytes) {
let size = bytes?.length || 0;
console.log('State: reading header', size, typeof bytes);
let chunk;
for (;;) {
chunk = data;
chunk = bytes;
// chunk = conn.read(); // TODO reenable
if (!chunk) {
break;
}
chunks.push(chunk);
chunksLength += chunk.byteLength;
data = null; // TODO nix
bytes = null; // TODO nix
}
if (chunksLength < HEADER_SIZE) {
return;
}
if (chunks.length > 1) {
chunk = Buffer.concat(chunks, chunksLength);
chunk = concatBytes(chunks, chunksLength);
} else {
chunk = chunks[0];
}
Expand All @@ -810,7 +811,7 @@ var CJDemo = ('object' === typeof module && exports) || {};
console.log(`[DEBUG] header`, header);
throw new Error('too big you are, handle you I cannot');
}
// console.log('DEBUG header', header);
console.log('DEBUG header', header);
console.log('[DEBUG] [onReadableHeader] remove data listener');
// conn.removeListener('readable', onReadableHeader);
// conn.removeListener('data', onReadableHeader);
Expand All @@ -829,34 +830,41 @@ var CJDemo = ('object' === typeof module && exports) || {};
wsc.addEventListener('message', onWsReadablePayload);
onReadablePayload(null);
}
async function onNodeReadableHeader(data) {
let bytes = new Uint8Array(data);
onReadableHeader(bytes);
}
async function onWsReadableHeader(wsevent) {
console.log('[DEBUG] onReadableHeader wsc.onmessage');
let ab = await wsevent.data.arrayBuffer();
let data = Buffer.from(ab);
console.log('[DEBUG] data (readable header)');
console.log(dataCount, data.length, data.toString('hex'));
onReadableHeader(data);
let bytes = new Uint8Array(ab);
console.log('[DEBUG] bytes (readable header)');
console.log(dataCount, bytes.length, DashTx.utils.bytesToHex(bytes));
onReadableHeader(bytes);
}

function onReadablePayload(data) {
let size = data?.length || 0;
/**
* @param {Uint8Array} bytes
*/
function onReadablePayload(bytes) {
let size = bytes?.length || 0;
console.log('State: reading payload', size);
let chunk;
for (;;) {
chunk = data;
chunk = bytes;
// chunk = conn.read(); // TODO revert
if (!chunk) {
break;
}
chunks.push(chunk);
chunksLength += chunk.byteLength;
data = null; // TODO nix
bytes = null; // TODO nix
}
if (chunksLength < header.payloadSize) {
return;
}
if (chunks.length > 1) {
chunk = Buffer.concat(chunks, chunksLength);
chunk = concatBytes(chunks, chunksLength);
} else if (chunks.length === 1) {
chunk = chunks[0];
} else {
Expand All @@ -879,13 +887,17 @@ var CJDemo = ('object' === typeof module && exports) || {};
wsc.removeEventListener('message', onWsReadablePayload);
resolve(header);
}
async function onNodeReadablePayload(data) {
let bytes = new Uint8Array(data);
onReadablePayload(bytes);
}
async function onWsReadablePayload(wsevent) {
console.log('[DEBUG] onReadablePayload wsc.onmessage');
let ab = await wsevent.data.arrayBuffer();
let data = Buffer.from(ab);
let bytes = new Uint8Array(ab);
console.log('[DEBUG] data (readable payload)');
console.log(dataCount, data.length, data.toString('hex'));
onReadablePayload(data);
console.log(dataCount, bytes.length, DashTx.utils.bytesToHex(bytes));
onReadablePayload(bytes);
}

errReject = reject;
Expand Down Expand Up @@ -931,17 +943,21 @@ var CJDemo = ('object' === typeof module && exports) || {};
resolve();
}

function onReadable() {
function onReadable(bytes) {
// checking an impossible condition, just in case
throw new Error('unexpected response before request');
}
async function onNodeReadable(data) {
let bytes = new Uint8Array(data);
onReadable(bytes);
}
async function onWsReadable(wsevent) {
console.log('[DEBUG] waitForConnect wsc.onmessage');
let ab = await wsevent.data.arrayBuffer();
let data = Buffer.from(ab);
let bytes = new Uint8Array(ab);
console.log('[DEBUG] data (readable)');
console.log(dataCount, data.length, data.toString('hex'));
onReadable(data);
console.log(dataCount, bytes.length, DashTx.utils.bytesToHex(bytes));
onReadable(bytes);
}

errReject = reject;
Expand Down Expand Up @@ -1093,8 +1109,9 @@ var CJDemo = ('object' === typeof module && exports) || {};
// conn.write(dsaMsg);
wsc.send(dsaMsg);

let dsaBuf = Buffer.from(dsaMsg);
console.log('[debug] dsa', dsaBuf.toString('hex'));
// let dsaBuf = Buffer.from(dsaMsg);
// console.log('[debug] dsa', dsaBuf.toString('hex'));
console.log('[debug] dsa', DashTx.utils.bytesToHex(dsaMsg));

let dsq = await dsqPromise;
for (; !dsq.ready; ) {
Expand Down Expand Up @@ -1389,6 +1406,28 @@ var CJDemo = ('object' === typeof module && exports) || {};
// return arr;
// }

/**
* @param {Array<Uint8Array>} byteArrays
* @param {Number?} [len]
* @returns {Uint8Array}
*/
function concatBytes(byteArrays, len) {
if (!len) {
for (let bytes of byteArrays) {
len += bytes.length;
}
}

let allBytes = new Uint8Array(len);
let offset = 0;
for (let bytes of byteArrays) {
allBytes.set(bytes, offset);
offset += bytes.length;
}

return allBytes;
}

function sleep(ms) {
return new Promise(function (resolve) {
setTimeout(resolve, ms);
Expand Down
6 changes: 4 additions & 2 deletions parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var CJParser = ('object' === typeof module && exports) || {};
CJParser.DSQ_SIZE = 142;
CJParser.SESSION_ID_SIZE = 4;

let textDecoder = new TextDecoder();

/**
* Parse the 24-byte P2P Message Header
* - 4 byte magic bytes (delimiter) (possibly intended for non-tcp messages?)
Expand Down Expand Up @@ -71,7 +73,7 @@ var CJParser = ('object' === typeof module && exports) || {};
throw new Error('command name longer than 12 bytes');
}
let commandBuf = bytes.slice(commandStart, commandEnd);
let command = commandBuf.toString('utf8');
let command = textDecoder.decode(commandBuf);

let payloadSize = dv.getUint32(payloadSizeStart, DV_LITTLE_ENDIAN);
let checksum = bytes.slice(checksumStart, checksumStart + 4);
Expand Down Expand Up @@ -154,7 +156,7 @@ var CJParser = ('object' === typeof module && exports) || {};

let uaStart = uaSizeStart + 1;
let uaBytes = bytes.slice(uaStart, uaStart + uaSize);
let ua = uaBytes.toString('utf8');
let ua = textDecoder.decode(uaBytes);

let startHeightStart = uaStart + uaSize;
let startHeight = dv.getUint32(startHeightStart, DV_LITTLE_ENDIAN);
Expand Down

0 comments on commit 520992d

Please sign in to comment.