-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.js
147 lines (115 loc) · 5.22 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
'use strict'
const IpfsClient = require('ipfs-http-client')
const { StreamID } = require('@ceramicnetwork/streamid')
const PeerID = require('peer-id')
const multihashes = require('multihashes')
const {CID} = require('multiformats/cid')
const { multiaddr } = require('multiaddr')
// const { EventTypes } = require('ipfs-core-types') // TODO: why does this fail?
//const KadDht = require('libp2p-kad-dht')
const Message = require('libp2p-kad-dht/src/message')
const {pipe} = require('it-pipe')
const drain = require('it-drain')
const lp = require('it-length-prefixed')
const createLibp2p = require('./libp2p-node')
const IPFS_ENDPOINT = 'http://localhost:5001'
const STREAM_ID = StreamID.fromString('kjzl6cwe1jw148j1183ue1j9l5fbt3fbfru08e54387qo91t4tusnsecp5db2ws')
async function main() {
const ipfs = await IpfsClient.create({ url: IPFS_ENDPOINT })
console.log(`connected to go-ipfs node with PeerID: ${(await ipfs.id()).id}`)
const libp2p = await createLibp2p()
await libp2p.start()
console.log(`libp2p node running with PeerID: ${libp2p.peerId.toB58String()}`)
console.log(`libp2p node has multiaddrs: ${libp2p.multiaddrs}`)
// Encode StreamID
console.log(`streamid: ${STREAM_ID.toString()}`)
const streamidMultihash = multihashes.encode(STREAM_ID.bytes, 'sha2-256')
console.log(`streamid sha256 multihash: ${streamidMultihash}`)
//const streamidAsCidV0 = new CID(streamidMultihash)
//const streamidAsCid = streamidAsCidV0.toV1()
const streamidAsCidV0 = CID.decode(streamidMultihash)
const streamidAsCid = streamidAsCidV0.toV1()
console.log(`streamid sha256 as CID ${streamidAsCid.toString()}`)
// make sure libp2p instance believes itself to be a provider of the stream
//await libp2p.contentRouting.provide(streamidAsCid)
console.log(`=======================================================`)
const initialProviders = await findProviders(ipfs, streamidAsCid)
console.log(`Initial providers: `)
console.log(initialProviders)
console.log(`=======================================================`)
const closestPeers = await findClosestPeers(ipfs, streamidAsCid)
console.log(`Closest peers: `)
console.log(closestPeers.map((peerid) => peerid.toB58String()))
console.log(`=======================================================`)
for (const peerid of closestPeers) {
await findMultiaddrAndAddToPeerStore(ipfs, libp2p, peerid)
}
console.log(`=======================================================`)
for (const peerid of closestPeers) {
await provideToPeer(libp2p, streamidAsCid, peerid)
}
console.log(`=======================================================`)
console.log('adding self as provider')
await libp2p.contentRouting.provide(streamidAsCid)
const finalProviders = await findProviders(ipfs, streamidAsCid)
console.log(`Final providers: `)
console.log(finalProviders)
console.log(`complete, shutting down`)
await libp2p.stop()
await ipfs.stop()
}
async function findMultiaddrAndAddToPeerStore(ipfs, libp2p, peerid) {
console.log(`looking up multiaddr for peerid ${peerid.toB58String()}`)
const events = await ipfs.dht.findPeer(peerid, {timeout: 30000})
for await (const event of events) {
//console.log(JSON.stringify(event, null, 2))
if (event.type != 2) {
continue
}
//console.log(`Adding multiaddrs for peer ${event.peer.id} to peer store. Multiaddrs: ${JSON.stringify(event.peer.multiaddrs, null, 2)}`)
await libp2p.peerStore.addressBook.add(peerid, event.peer.multiaddrs)
}
return null
}
async function findClosestPeers(ipfs, streamAsCID) {
const streamAsPeerID = PeerID.createFromCID(streamAsCID)
const closestPeers = await ipfs.dht.query(streamAsPeerID)
const peers = []
for await (const peer of closestPeers) {
//console.log(JSON.stringify(peer, null, 2))
// if (peer.type != EventTypes.PEER_RESPONSE && peer.type != EventTypes.FINAL_PEER) {// todo broken import
if (peer.type != 2) {
continue
}
//console.log(JSON.stringify(peer, null, 2))
peers.push(PeerID.createFromB58String(peer.peer.id))
}
peers.sort()
return peers
}
async function provideToPeer(libp2p, keyCID, peerid) {
console.log(`attempting to emplace our peerid (${libp2p.peerId.toB58String()}) as a stream provider on peer ${peerid.toB58String()}`)
const msg = new Message.Message(Message.MESSAGE_TYPE.ADD_PROVIDER, keyCID.bytes, 0)
msg.providerPeers = [{id: libp2p.peerId, multiaddrs: libp2p.multiaddrs}]
try {
const {stream} = await libp2p.dialProtocol(peerid, '/ipfs/kad/1.0.0')
await pipe([msg.serialize()], lp.encode(), stream, drain)
} catch (err) {
console.warn(err.message)
}
}
async function findProviders(ipfs, streamidCID) {
console.log(`Looking up providers for CID: ${streamidCID}`)
const providers = []
const providersGenerator = await ipfs.dht.findProvs(streamidCID)
for await (const provider of providersGenerator) {
if (provider.type != 2) {
continue
}
// console.log(JSON.stringify(provider))
providers.push(provider)
}
providers.sort()
return providers
}
main()