Skip to content

Commit

Permalink
Improve stop protocol error handling.
Browse files Browse the repository at this point in the history
fix reservation expiry units
  • Loading branch information
ianopolous committed Aug 9, 2023
1 parent 8eba2a0 commit 6d04428
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 21 deletions.
47 changes: 27 additions & 20 deletions src/main/java/org/peergos/protocol/circuit/CircuitHopProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void onMessage(@NotNull Stream stream, Circuit.HopMessage msg) {
.setType(Circuit.HopMessage.Type.STATUS)
.setStatus(Circuit.Status.OK)
.setReservation(Circuit.Reservation.newBuilder()
.setExpire(resv.expiry.toEpochSecond(ZoneOffset.UTC) * 1_000_000_000L)
.setExpire(resv.expiry.toEpochSecond(ZoneOffset.UTC))
.addAllAddrs(publicAddresses.get().stream()
.map(a -> ByteString.copyFrom(a.getBytes()))
.collect(Collectors.toList()))
Expand All @@ -237,26 +237,33 @@ public void onMessage(@NotNull Stream stream, Circuit.HopMessage msg) {
if (res.isPresent()) {
Reservation resv = res.get();
PeerId target = PeerId.fromBase58(targetPeerId.toBase58());
CircuitStopProtocol.StopController stop = this.stop.dial(us, target,
addressBook.getAddrs(target).join().toArray(new Multiaddr[0])).getController().join();
Circuit.StopMessage reply = stop.connect(initiator, resv.durationSeconds, resv.maxBytes).join();
if (reply.getStatus().equals(Circuit.Status.OK)) {
stream.writeAndFlush(Circuit.StopMessage.newBuilder()
.setType(Circuit.StopMessage.Type.STATUS)
.setStatus(Circuit.Status.OK));
Stream toTarget = stop.getStream();
Stream fromRequestor = stream;
// connect these streams with time + bytes enforcement
fromRequestor.pushHandler(new InboundTrafficLimitHandler(resv.maxBytes));
fromRequestor.pushHandler(new TotalTimeoutHandler(Duration.of(resv.durationSeconds, ChronoUnit.SECONDS)));
toTarget.pushHandler(new InboundTrafficLimitHandler(resv.maxBytes));
toTarget.pushHandler(new TotalTimeoutHandler(Duration.of(resv.durationSeconds, ChronoUnit.SECONDS)));
fromRequestor.pushHandler(new ProxyHandler(toTarget));
toTarget.pushHandler(new ProxyHandler(fromRequestor));
} else {
try {
CircuitStopProtocol.StopController stop = this.stop.dial(us, target,
addressBook.getAddrs(target).join().toArray(new Multiaddr[0])).getController()
.orTimeout(15, TimeUnit.SECONDS).join();
Circuit.StopMessage reply = stop.connect(initiator, resv.durationSeconds, resv.maxBytes).join();
if (reply.getStatus().equals(Circuit.Status.OK)) {
stream.writeAndFlush(Circuit.StopMessage.newBuilder()
.setType(Circuit.StopMessage.Type.STATUS)
.setStatus(Circuit.Status.OK));
Stream toTarget = stop.getStream();
Stream fromRequestor = stream;
// connect these streams with time + bytes enforcement
fromRequestor.pushHandler(new InboundTrafficLimitHandler(resv.maxBytes));
fromRequestor.pushHandler(new TotalTimeoutHandler(Duration.of(resv.durationSeconds, ChronoUnit.SECONDS)));
toTarget.pushHandler(new InboundTrafficLimitHandler(resv.maxBytes));
toTarget.pushHandler(new TotalTimeoutHandler(Duration.of(resv.durationSeconds, ChronoUnit.SECONDS)));
fromRequestor.pushHandler(new ProxyHandler(toTarget));
toTarget.pushHandler(new ProxyHandler(fromRequestor));
} else {
stream.writeAndFlush(Circuit.HopMessage.newBuilder()
.setType(Circuit.HopMessage.Type.STATUS)
.setStatus(reply.getStatus()));
}
} catch (Exception e) {
stream.writeAndFlush(Circuit.HopMessage.newBuilder()
.setType(Circuit.HopMessage.Type.STATUS)
.setStatus(reply.getStatus()));
.setType(Circuit.HopMessage.Type.STATUS)
.setStatus(Circuit.Status.CONNECTION_FAILED));
}
} else {
stream.writeAndFlush(Circuit.HopMessage.newBuilder()
Expand Down
41 changes: 40 additions & 1 deletion src/test/java/org/peergos/RelayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class RelayTest {

@Test
public void relay() {
public void remoteRelay() {
HostBuilder builder1 = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
Host node1 = builder1.build();
Expand Down Expand Up @@ -51,6 +51,45 @@ public void relay() {
System.out.println();
} finally {
node1.stop();
node2.stop();
}
}

@Test
public void localRelay() {
HostBuilder builder1 = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
Host node1 = builder1.build();
node1.start().join();
IdentifyBuilder.addIdentifyProtocol(node1);

HostBuilder builder2 = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
Host node2 = builder2.build();
node2.start().join();
IdentifyBuilder.addIdentifyProtocol(node2);

HostBuilder builder3 = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
Host relay = builder3.build();
relay.start().join();
IdentifyBuilder.addIdentifyProtocol(relay);

try {
// set up node 2 to listen via a relay
Multiaddr relayAddr = relay.listenAddresses().get(0).withP2P(relay.getPeerId());
CircuitHopProtocol.HopController hop = builder2.getRelayHop().get().dial(node2, relayAddr).getController().join();
CircuitHopProtocol.Reservation reservation = hop.reserve().join();

// connect to node2 from node1 via a relay
System.out.println("Using relay " + relay.getPeerId());
CircuitHopProtocol.HopController node1Hop = builder1.getRelayHop().get().dial(node1, relayAddr).getController().join();
Stream stream = node1Hop.connect(Multihash.deserialize(node2.getPeerId().getBytes())).join();
System.out.println();
} finally {
node1.stop();
node2.stop();
relay.stop();
}
}

Expand Down

0 comments on commit 6d04428

Please sign in to comment.