Skip to content

Commit

Permalink
Ensure that AsyncReadWriteBinding is released prior to subscriber not…
Browse files Browse the repository at this point in the history
…ification (#676)

Previously the driver assumed that Mono#doOnTerminate is executed
prior to the subscriber being notified of completion.  But it turns
out that behavior is not guaranteed in the Californium release of
Project Reactor (though it is in later releases).  So instead,
now the SingleResultCallback that is used to notify the Mongo of
completion is wrapped by one that first releases the binding, and
Mono#doOnTerminate is no longer used.

JAVA-4027
  • Loading branch information
jyemin committed Mar 2, 2021
1 parent 74f836b commit 44b1e89
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
binding.release();
return Mono.error(new MongoClientException("Read preference in a transaction must be primary"));
} else {
return Mono.<T>create(sink -> operation.executeAsync(binding, sinkToCallback(sink)))
.doOnTerminate(binding::release)
.doOnError((t) -> {
labelException(session, t);
unpinServerAddressOnTransientTransactionError(session, t);
});
return Mono.<T>create(sink -> operation.executeAsync(binding, (result, t) -> {
try {
binding.release();
} finally {
sinkToCallback(sink).onResult(result, t);
}
})).doOnError((t) -> {
labelException(session, t);
unpinServerAddressOnTransientTransactionError(session, t);
});
}
});
}
Expand All @@ -86,13 +90,16 @@ public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadCon
session == null && clientSession != null))
.switchIfEmpty(Mono.fromCallable(() -> getReadWriteBinding(ReadPreference.primary(), readConcern, session, false)))
.flatMap(binding ->
Mono.<T>create(sink -> operation.executeAsync(binding, sinkToCallback(sink)))
.doOnTerminate(binding::release)
.doOnError((t) -> {
labelException(session, t);
unpinServerAddressOnTransientTransactionError(session, t);
})

Mono.<T>create(sink -> operation.executeAsync(binding, (result, t) -> {
try {
binding.release();
} finally {
sinkToCallback(sink).onResult(result, t);
}
})).doOnError((t) -> {
labelException(session, t);
unpinServerAddressOnTransientTransactionError(session, t);
})
);
}

Expand Down

0 comments on commit 44b1e89

Please sign in to comment.