Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track send errors #227

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 0 additions & 50 deletions core/src/main/java/org/bitcoinj/core/PeerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -2483,13 +2483,6 @@ public TransactionBroadcast broadcastTransaction(final Transaction tx, final int
}
tx.getConfidence().setPeerInfo(getConnectedPeers().size(), minConnections);

// keep track of how many times a transaction is sent
int sendCount = 0;
if(pendingTxSendCounts.containsKey(tx.getHash())) {
sendCount = pendingTxSendCounts.get(tx.getHash());
}
pendingTxSendCounts.put(tx.getHash(), ++sendCount);

final TransactionBroadcast broadcast = new TransactionBroadcast(this, tx);
broadcast.setMinConnections(minConnections);
broadcast.setDropPeersAfterBroadcast(dropPeersAfterBroadcast && tx.getConfidence().numBroadcastPeers() == 0);
Expand Down Expand Up @@ -2522,49 +2515,6 @@ public void onFailure(Throwable throwable) {
}
}, MoreExecutors.directExecutor());

// Handle the case of 0.14.0.x nodes disconnecting dashj when sending transactions
// This will resend the transaction one if it was only sent to 1 peer
// This will resend the transaction up to 9 times if any one peer was disconnected while sending
Futures.addCallback(broadcast.future(), new FutureCallback<Transaction>() {
final int MAX_ATTEMPTS = 9;
Context context = Context.get();
@Override
public void onSuccess(Transaction transaction) {
log.info("Successfully sent tx {}", transaction.getTxId());
Context.propagate(context);

if(transaction.getConfidence().numBroadcastPeers() == 0) {
// TODO: this tx was sent to a single peer, should we send it again to make sure or see if there are more connections?
int sentCount = pendingTxSendCounts.get(transaction.getTxId());

if(sentCount <= 2) {
log.info("resending tx {} since it was only sent to 1 peer", tx.getHash());
broadcastTransaction(tx);
} else pendingTxSendCounts.put(tx.getHash(), sentCount + MAX_ATTEMPTS);
}
pendingTxSendCounts.remove(tx.getHash());
}

@Override
public void onFailure(Throwable throwable) {
Context.propagate(context);
int sentCount = pendingTxSendCounts.get(tx.getHash());
if(throwable instanceof PeerException) {
log.info("Failed to send tx {} due to disconnects", tx.getHash());

if(sentCount <= MAX_ATTEMPTS) {
log.info("resending tx {} due to disconnects", tx.getHash());
broadcastTransaction(tx);
} else {
pendingTxSendCounts.remove(tx.getHash());
}
} else {
log.info("Failed to send tx {} due to rejections from peers", tx.getHash());
pendingTxSendCounts.remove(tx.getHash());
}
}
}, MoreExecutors.directExecutor());

// Keep a reference to the TransactionBroadcast object. This is important because otherwise, the entire tree
// of objects we just created would become garbage if the user doesn't hold on to the returned future, and
// eventually be collected. This in turn could result in the transaction not being committed to the wallet
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/bitcoinj/core/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -1755,4 +1755,32 @@ public static Sha256Hash calculateInputsHash(TransactionInput input) {
}
}

/**
* This method simulates the BIP61 Reject messages from Dash Core prior to v19.
* It is not likely that a transaction created by DashJ would have any of these issues.
*
*
* @return RejectMessage corresponding to a reason that the network may reject this transaction
*/
public RejectMessage determineRejectMessage() {
try {
verify();
} catch (VerificationException e) {
return new RejectMessage(params, RejectMessage.RejectCode.MALFORMED, getTxId(), e.getMessage(), "");
}
// do any outputs contain dust?
if (getOutputs().stream().anyMatch(TransactionOutput::isDust)) {
return new RejectMessage(params, RejectMessage.RejectCode.DUST, getTxId(), "", "");
}
// is the fee high enough
Coin fee = getFee();
if (fee != null) {
int size = bitcoinSerialize().length;
Coin minFee = Coin.valueOf(size).multiply(REFERENCE_DEFAULT_MIN_TX_FEE.value).div(1000);
if (minFee.isGreaterThan(fee)) {
return new RejectMessage(params, RejectMessage.RejectCode.INSUFFICIENTFEE, getTxId(), "", "");
}
}
return null;
}
}
39 changes: 9 additions & 30 deletions core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,32 +97,7 @@ public void setDropPeersAfterBroadcast(boolean dropPeersAfterBroadcast) {
this.dropPeersAfterBroadcast = dropPeersAfterBroadcast;
}

private PreMessageReceivedEventListener rejectionListener = new PreMessageReceivedEventListener() {
@Override
public Message onPreMessageReceived(Peer peer, Message m) {
if (m instanceof RejectMessage) {
RejectMessage rejectMessage = (RejectMessage)m;
if (tx.getTxId().equals(rejectMessage.getRejectedObjectHash())) {
rejects.put(peer, rejectMessage);
tx.getConfidence().markRejectedBy(peer.getAddress(), rejectMessage);
tx.getConfidence().queueListeners(TransactionConfidence.Listener.ChangeReason.REJECT);

int size = rejects.size();
long threshold = Math.round(numWaitingFor / 2.0);
if (size > threshold) {
log.warn("Threshold for considering broadcast rejected has been reached ({}/{})", size, threshold);
future.setException(new RejectedTransactionException(tx, rejectMessage));
peerGroup.removePreMessageReceivedEventListener(this);
peerGroup.removeDisconnectedEventListener(disconnectedListener);
}
}
}
return m;
}
};

public ListenableFuture<Transaction> broadcast() {
peerGroup.addPreMessageReceivedEventListener(Threading.SAME_THREAD, rejectionListener);
peerGroup.addDisconnectedEventListener(Threading.SAME_THREAD, disconnectedListener);
log.info("Waiting for {} peers required for broadcast, we have {} ...", minConnections, peerGroup.getConnectedPeers().size());
peerGroup.waitForPeers(minConnections).addListener(new EnoughAvailablePeers(), Threading.SAME_THREAD);
Expand Down Expand Up @@ -226,7 +201,6 @@ public void onConfidenceChanged(TransactionConfidence conf, ChangeReason reason)
// We're done! It's important that the PeerGroup lock is not held (by this thread) at this
// point to avoid triggering inversions when the Future completes.
log.info("broadcastTransaction: {} complete", tx.getTxId());
peerGroup.removePreMessageReceivedEventListener(rejectionListener);
peerGroup.removeDisconnectedEventListener(disconnectedListener);
conf.removeEventListener(this);
future.set(tx); // RE-ENTRANCY POINT
Expand Down Expand Up @@ -322,18 +296,23 @@ private int getDisconnectedPeers() {
return count;
}

private PeerDisconnectedEventListener disconnectedListener = new PeerDisconnectedEventListener() {
private final PeerDisconnectedEventListener disconnectedListener = new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
//log.info(peer + " was disconnected while sending a tx {}", tx.getHash());
if(sentToPeers.containsKey(peer.getAddress())) {
sentToPeers.put(peer.getAddress(), true);
log.info(peer + " was disconnected while sending a transaction to it. tx: {}", tx.getHash());
log.info(peer + " was disconnected while sending a transaction to it. tx: {}", tx.getTxId());
int numDisconnectedPeers = getDisconnectedPeers();

RejectMessage rm = tx.determineRejectMessage();
if (rm == null)
rm = new RejectMessage(peer.getVersionMessage().params, RejectMessage.RejectCode.OTHER, tx.getTxId(),"Peer disconnected after receiving this tx", "The transaction is invalid");
tx.getConfidence().markRejectedBy(peer.getAddress(), rm);
tx.getConfidence().queueListeners(TransactionConfidence.Listener.ChangeReason.REJECT);

if(numDisconnectedPeers >= Math.round(numToBroadcastTo / 2.0)) {
log.info(peer + " was disconnected, setting exception on this transaction broadcast");
future.setException(new PeerException(peer + " was disconnected"));
peerGroup.removePreMessageReceivedEventListener(rejectionListener);
peerGroup.removeDisconnectedEventListener(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ public synchronized String toString() {
for (Map.Entry<PeerAddress, RejectMessage> entry : rejects.entrySet()) {
builder.append(String.format(" Rejected by %s: %s - %s - %s", entry.getKey(),
entry.getValue().getReasonCode().toString(), entry.getValue().getRejectedMessage(), entry.getValue().getReasonString()));
builder.append("\n");
}
}

Expand Down
Loading