From 7856fbed19b11d9f6e7401582ede857316815c09 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Thu, 27 Jun 2024 15:21:10 -0600 Subject: [PATCH] wip: websocket --- demo.js | 243 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 159 insertions(+), 84 deletions(-) diff --git a/demo.js b/demo.js index 038b72a..95548ac 100644 --- a/demo.js +++ b/demo.js @@ -7,8 +7,6 @@ void DotEnv.config({ path: '.env.secret' }); //@ts-ignore - ts can't understand JSON, still... let pkg = require('./package.json'); -let Net = require('node:net'); - let CoinJoin = require('./coinjoin.js'); let Packer = require('./packer.js'); // TODO rename packer let Parser = require('./parser.js'); @@ -38,6 +36,9 @@ let rpcConfig = { host: process.env.DASHD_RPC_HOST || '127.0.0.1', port: process.env.DASHD_RPC_PORT || '19898', // mainnet=9998, testnet=19998, regtest=19898 timeout: 10 * 1000, // bump default from 5s to 10s for up to 10k addresses + onconnected: async function () { + console.info(`[info] rpc client connected ${rpcConfig.host}`); + }, }; if (process.env.DASHD_RPC_TIMEOUT) { let rpcTimeoutSec = parseFloat(process.env.DASHD_RPC_TIMEOUT); @@ -63,13 +64,8 @@ async function main() { let network = 'regtest'; // let minimumParticipants = Packer.NETWORKS[network].minimumParticiparts; - rpcConfig.onconnected = async function () { - let rpc = this; - console.info(`[info] rpc client connected ${rpc.host}`); - }; let rpc = new DashRpc(rpcConfig); - rpc.onconnected = rpcConfig.onconnected; let height = await rpc.init(rpc); console.info(`[info] rpc server is ready. Height = ${height}`); @@ -199,8 +195,8 @@ async function main() { // addresses: unusedAddresses, }); // console.log( - // '[debug] mempooldeltas.result.length', - // mempooldeltas.result.length, + // '[debug] mempooldeltas.result.length', + // mempooldeltas.result.length, // ); // TODO check that we have a duplicate in both deltas by using txid, vin/vout for (let delta of mempooldeltas.result) { @@ -469,9 +465,9 @@ async function main() { // console.log('[debug] generatetoaddress deltas', deltas); // let results = deltas.result.concat(deltas2.result); // for (let delta of results) { - // totalBalance += delta.satoshis; - // keysMap[delta.address].used = true; - // delete unusedMap[delta.address]; + // totalBalance += delta.satoshis; + // keysMap[delta.address].used = true; + // delete unusedMap[delta.address]; // } let utxosRpc = await rpc.getAddressUtxos({ addresses: [data.address] }); @@ -585,18 +581,20 @@ async function main() { } // async function getPrivateKeys(inputs) { - // let keys = []; - // for (let input of inputs) { - // let privKeyBytes = await keyUtils.getPrivateKey(input); - // keys.push(privKeyBytes); - // } + // let keys = []; + // for (let input of inputs) { + // let privKeyBytes = await keyUtils.getPrivateKey(input); + // keys.push(privKeyBytes); + // } - // return keys; + // return keys; // } let evonodes = []; { - let resp = await rpc.masternodelist(); + //let resp = await rpc.masternodelist(); + let res = await fetch('http://127.0.0.1:8080/rpc/masternodelist'); + let resp = await res.json(); let evonodesMap = resp.result; let evonodeProTxIds = Object.keys(evonodesMap); for (let id of evonodeProTxIds) { @@ -623,13 +621,21 @@ async function main() { console.info('[info] chosen evonode:'); console.log(JSON.stringify(evonode, null, 2)); - let conn = Net.createConnection({ - host: evonode.hostname, + let query = { + access_token: 'secret', + hostname: evonode.hostname, port: evonode.port, - keepAlive: true, - keepAliveInitialDelay: 3, - //localAddress: rpc.host, - }); + }; + let searchParams = new URLSearchParams(query); + let search = searchParams.toString(); + let wsc = new WebSocket(`ws://127.0.0.1:8080/tcp?${search}`); + //let conn = Net.createConnection({ + // host: evonode.hostname, + // port: evonode.port, + // keepAlive: true, + // keepAliveInitialDelay: 3, + // //localAddress: rpc.host, + //}); /** @type {Array} */ let chunks = []; @@ -639,18 +645,31 @@ async function main() { function onError(err) { console.error('Error:'); console.error(err); - conn.removeListener('error', onError); + // conn.removeListener('error', onError); + wsc.onerror = null; errReject(err); } function onEnd() { console.info('[info] disconnected from server'); } - conn.on('error', onError); - conn.once('end', onEnd); - conn.setMaxListeners(2); + // conn.on('error', onError); + // conn.once('end', onEnd); + // conn.setMaxListeners(2); + wsc.onerror = onError; + wsc.onclose = onEnd; + let dataCount = 0; - conn.on('data', function (data) { - console.log('[DEBUG] data'); + // conn.on('data', function (data) { + // console.log('[DEBUG] data'); + // console.log(dataCount, data.length, data.toString('hex')); + // dataCount += 1; + // }); + console.log('[DEBUG] main add wsc.onmessage'); + wsc.addEventListener('message', async function (wsevent) { + console.log('[DEBUG] main wsc.onmessage'); + let ab = await wsevent.data.arrayBuffer(); + let data = Buffer.from(ab); + console.log('[DEBUG] data (main)'); console.log(dataCount, data.length, data.toString('hex')); dataCount += 1; }); @@ -672,7 +691,8 @@ async function main() { message: pongMessageBytes, nonce: msg.payload, }); - conn.write(pongMessageBytes); + // conn.write(pongMessageBytes); + wsc.send(pongMessageBytes); console.log('[debug] sent pong'); continue; } @@ -714,13 +734,16 @@ async function main() { let header; function cleanup() { + console.log('[DEBUG] [readMessage.cleanup] remove data listener'); + wsc.removeEventListener('message', onWsReadableHeader); + wsc.removeEventListener('message', onWsReadablePayload); // console.log("[debug] readMessage handlers: remove 'onReadableHeader'"); - conn.removeListener('data', onReadableHeader); - conn.removeListener('readable', onReadableHeader); + // conn.removeListener('data', onReadableHeader); + // conn.removeListener('readable', onReadableHeader); // console.log("[debug] readMessage handlers: remove 'onReadablePayload'"); - conn.removeListener('data', onReadablePayload); - conn.removeListener('readable', onReadablePayload); + // conn.removeListener('data', onReadablePayload); + // conn.removeListener('readable', onReadablePayload); } function resolve(data) { @@ -768,8 +791,11 @@ async function main() { throw new Error('too big you are, handle you I cannot'); } // console.log('DEBUG header', header); - conn.removeListener('readable', onReadableHeader); - conn.removeListener('data', onReadableHeader); + console.log('[DEBUG] [onReadableHeader] remove data listener'); + // conn.removeListener('readable', onReadableHeader); + // conn.removeListener('data', onReadableHeader); + //wsc.onmessage = null; + wsc.removeEventListener('message', onWsReadableHeader); if (header.payloadSize === 0) { resolve(header); @@ -778,9 +804,19 @@ async function main() { // console.log("[debug] readMessage handlers: add 'onReadablePayload'"); //conn.on('readable', onReadablePayload); - conn.on('data', onReadablePayload); + // conn.on('data', onReadablePayload); + console.log('[DEBUG] onReadableHeader add wsc.onmessage'); + wsc.addEventListener('message', onWsReadablePayload); onReadablePayload(null); } + async function onWsReadableHeader(wsevent) { + console.log('[DEBUG] onReadableHeader wsc.onmessage'); + let ab = await wsevent.data.arrayBuffer(); + let data = Buffer.from(ab); + console.log('[DEBUG] data (readable header)'); + console.log(dataCount, data.length, data.toString('hex')); + onReadableHeader(data); + } function onReadablePayload(data) { let size = data?.length || 0; @@ -816,16 +852,29 @@ async function main() { chunk = chunk.slice(0, header.payloadSize); } header.payload = chunk; - conn.removeListener('readable', onReadablePayload); - conn.removeListener('data', onReadablePayload); + console.log('[DEBUG] [onReadablePayload] remove data listener'); + // conn.removeListener('readable', onReadablePayload); + // conn.removeListener('data', onReadablePayload); + // wsc.onmessage = null; + wsc.removeEventListener('message', onWsReadablePayload); resolve(header); } + async function onWsReadablePayload(wsevent) { + console.log('[DEBUG] onReadablePayload wsc.onmessage'); + let ab = await wsevent.data.arrayBuffer(); + let data = Buffer.from(ab); + console.log('[DEBUG] data (readable payload)'); + console.log(dataCount, data.length, data.toString('hex')); + onReadablePayload(data); + } errReject = reject; // console.log("[debug] readMessage handlers: add 'onReadableHeader'"); //conn.on('readable', onReadableHeader); - conn.on('data', onReadableHeader); + // conn.on('data', onReadableHeader); + console.log('[DEBUG] readMessage add wsc.onmessage'); + wsc.addEventListener('message', onWsReadableHeader); if (chunks.length) { onReadableHeader(null); @@ -840,8 +889,11 @@ async function main() { // TODO setTimeout await new Promise(function (_resolve, _reject) { function cleanup() { - conn.removeListener('readable', onReadable); - conn.removeListener('data', onReadable); + console.log('[DEBUG] [waitForConnect.cleanup] remove data listener'); + // conn.removeListener('readable', onReadable); + // conn.removeListener('data', onReadable); + // wsc.onmessage = null; + wsc.removeEventListener('message', onWsReadable); } function resolve(data) { @@ -855,6 +907,7 @@ async function main() { } function onConnect() { + console.log('[DEBUG] waitForConnect wsc.onopen'); resolve(); } @@ -862,11 +915,23 @@ async function main() { // checking an impossible condition, just in case throw new Error('unexpected response before request'); } + async function onWsReadable(wsevent) { + console.log('[DEBUG] waitForConnect wsc.onmessage'); + let ab = await wsevent.data.arrayBuffer(); + let data = Buffer.from(ab); + console.log('[DEBUG] data (readable)'); + console.log(dataCount, data.length, data.toString('hex')); + onReadable(data); + } errReject = reject; - conn.once('connect', onConnect); + // conn.once('connect', onConnect); + wsc.onopen = null; + wsc.onopen = onConnect; //conn.on('readable', onReadable); - conn.on('data', onReadable); + // conn.on('data', onReadable); + console.log('[DEBUG] waitForConnect add wsc.onmessage'); + wsc.addEventListener('message', onWsReadable); }); } @@ -912,7 +977,8 @@ async function main() { }; }); await sleep(150); - conn.write(versionMsg); + // conn.write(versionMsg); + wsc.send(versionMsg); await versionP; } @@ -936,7 +1002,8 @@ async function main() { payload: null, }); await sleep(150); - conn.write(verackBytes); + // conn.write(verackBytes); + wsc.send(verackBytes); await verackP; } @@ -965,10 +1032,11 @@ async function main() { send: true, }); await sleep(150); - conn.write(sendDsqMessage); + // conn.write(sendDsqMessage); + wsc.send(sendDsqMessage); console.log("[debug] sending 'senddsq':", sendDsqMessage); - resolve(); + resolve(null); listenerMap['senddsq'] = null; delete listenerMap['senddsq']; }; @@ -991,6 +1059,8 @@ async function main() { void (await generateMinBalance()); let collateralTxInfo = await getCollateralTx(); + // let keys = await getPrivateKeys(collateralTxInfo.inputs); + // let txInfoSigned = await dashTx.hashAndSignAll(collateralTxInfo, keys); let txInfoSigned = await dashTx.hashAndSignAll(collateralTxInfo); collateralTx = DashTx.utils.hexToBytes(txInfoSigned.transaction); } @@ -1000,7 +1070,8 @@ async function main() { collateralTx, }); await sleep(150); - conn.write(dsaMsg); + // conn.write(dsaMsg); + wsc.send(dsaMsg); let dsaBuf = Buffer.from(dsaMsg); console.log('[debug] dsa', dsaBuf.toString('hex')); @@ -1133,6 +1204,8 @@ async function main() { { void (await generateMinBalance()); let collateralTxInfo = await getCollateralTx(); + // let keys = await getPrivateKeys(collateralTxInfo.inputs); + // let txInfoSigned = await dashTx.hashAndSignAll(collateralTxInfo, keys); let txInfoSigned = await dashTx.hashAndSignAll(collateralTxInfo); let collateralTx = DashTx.utils.hexToBytes(txInfoSigned.transaction); @@ -1143,7 +1216,8 @@ async function main() { outputs, }); await sleep(150); - conn.write(dsiMessageBytes); + // conn.write(dsiMessageBytes); + wsc.send(dsiMessageBytes); dsf = await dsfP; } @@ -1192,28 +1266,28 @@ async function main() { sighashInput.sigHashType = sigHashType; console.log('[debug] YES, CAN HAZ INPUTS!!!', sighashInput); // sighashInputs.push({ - // txId: input.txId || input.txid, - // txid: input.txid || input.txId, - // outputIndex: input.outputIndex, - // pubKeyHash: input.pubKeyHash, - // sigHashType: input.sigHashType, + // txId: input.txId || input.txid, + // txid: input.txid || input.txId, + // outputIndex: input.outputIndex, + // pubKeyHash: input.pubKeyHash, + // sigHashType: input.sigHashType, // }); break; } // if (sighashInputs.length !== 1) { - // let msg = - // 'expected exactly one selected input to match one tx request input'; - // throw new Error(msg); + // let msg = + // 'expected exactly one selected input to match one tx request input'; + // throw new Error(msg); // } // let anyonecanpayIndex = 0; // let txHashable = DashTx.createHashable( - // { - // version: txInfo.version, - // inputs: sighashInputs, // exactly 1 - // outputs: txInfo.outputs, - // locktime: txInfo.locktime, - // }, - // anyonecanpayIndex, + // { + // version: txInfo.version, + // inputs: sighashInputs, // exactly 1 + // outputs: txInfo.outputs, + // locktime: txInfo.locktime, + // }, + // anyonecanpayIndex, // ); // console.log('[debug] txHashable (pre-sighashbyte)', txHashable); @@ -1251,7 +1325,8 @@ async function main() { let dssHex = DashTx.utils.bytesToHex(dssMessageBytes); console.log(dssHex); await sleep(150); - conn.write(dssMessageBytes); + // conn.write(dssMessageBytes); + wsc.send(dssMessageBytes); await dscP; } @@ -1276,22 +1351,22 @@ function byId(a, b) { // http://stackoverflow.com/questions/2450954/how-to-randomize-shuffle-a-javascript-array // function shuffle(arr) { -// let currentIndex = arr.length; - -// // While there remain elements to shuffle... -// for (; currentIndex !== 0; ) { -// // Pick a remaining element... -// let randomIndexFloat = Math.random() * currentIndex; -// let randomIndex = Math.floor(randomIndexFloat); -// currentIndex -= 1; - -// // And swap it with the current element. -// let temporaryValue = arr[currentIndex]; -// arr[currentIndex] = arr[randomIndex]; -// arr[randomIndex] = temporaryValue; -// } - -// return arr; +// let currentIndex = arr.length; + +// // While there remain elements to shuffle... +// for (; currentIndex !== 0; ) { +// // Pick a remaining element... +// let randomIndexFloat = Math.random() * currentIndex; +// let randomIndex = Math.floor(randomIndexFloat); +// currentIndex -= 1; + +// // And swap it with the current element. +// let temporaryValue = arr[currentIndex]; +// arr[currentIndex] = arr[randomIndex]; +// arr[randomIndex] = temporaryValue; +// } + +// return arr; // } function sleep(ms) {