Skip to content

Commit

Permalink
IGNITE-23681 Disallow IgniteCache#removeAll() within transaction (#11713
Browse files Browse the repository at this point in the history
)
  • Loading branch information
J-Bakuli authored Dec 25, 2024
1 parent d6d8b06 commit 319250b
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ public void testIgniteClient() throws Exception {
checkEvents(cli, k -> cache.removeAll(of(k)), true, EVT_CACHE_OBJECT_REMOVED);
checkEvents(cli, k -> cache.removeAllAsync(of(k)).get(), true, EVT_CACHE_OBJECT_REMOVED);

checkEvents(cli, k -> cache.removeAll(), true, EVT_CACHE_OBJECT_REMOVED);
checkEvents(cli, k -> cache.removeAllAsync().get(), true, EVT_CACHE_OBJECT_REMOVED);

checkEvents(cli, k -> cache.putIfAbsent(k, "val"), false, EVT_CACHE_OBJECT_PUT);
checkEvents(cli, k -> cache.putIfAbsentAsync(k, "val").get(), false, EVT_CACHE_OBJECT_PUT);

Expand Down Expand Up @@ -342,9 +339,6 @@ private void testNode(boolean isClient) throws Exception {
checkEvents(ignite, k -> cache.removeAll(of(k)), true, EVT_CACHE_OBJECT_REMOVED);
checkEvents(ignite, k -> cache.removeAllAsync(of(k)).get(), true, EVT_CACHE_OBJECT_REMOVED);

checkEvents(ignite, k -> cache.removeAll(), true, EVT_CACHE_OBJECT_REMOVED);
checkEvents(ignite, k -> cache.removeAllAsync().get(), true, EVT_CACHE_OBJECT_REMOVED);

checkEvents(ignite, k -> cache.putIfAbsent(k, "val"), false, EVT_CACHE_OBJECT_PUT);
checkEvents(ignite, k -> cache.putIfAbsentAsync(k, "val").get(), false, EVT_CACHE_OBJECT_PUT);

Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/IgniteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,8 @@ public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
* </ul>
* If the cache is empty, the {@link CacheWriter} is not called.
* <p>
* This operation is not transactional. It calls broadcast closure that deletes all primary keys from remote nodes.
* <p>
* This is potentially an expensive operation as listeners are invoked.
* Use {@link #clearAsync()} to avoid this.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
/** JCache adapter. */
private final Cache<K, V> jCacheAdapter;

/** Exception thrown when a non-transactional ClientCache clear operation is invoked within a transaction. */
public static final String NON_TRANSACTIONAL_CLIENT_CACHE_CLEAR_IN_TX_ERROR_MESSAGE = "Failed to invoke a " +
"non-transactional ClientCache clear operation within a transaction.";
/** Exception thrown when a non-transactional ClientCache operation is invoked within a transaction. */
public static final String NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE = "Failed to invoke a " +
"non-transactional ClientCache %s operation within a transaction.";

/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
Expand Down Expand Up @@ -555,11 +555,17 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {

/** {@inheritDoc} */
@Override public void removeAll() throws ClientException {
if (transactions.tx() != null)
throw new CacheException(String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE, "removeAll"));

ch.request(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
}

/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> removeAllAsync() throws ClientException {
if (transactions.tx() != null)
throw new CacheException(String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE, "removeAllAsync"));

return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
}

Expand Down Expand Up @@ -728,7 +734,7 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
*/
@Override public void clear() throws ClientException {
if (transactions.tx() != null)
throw new CacheException(NON_TRANSACTIONAL_CLIENT_CACHE_CLEAR_IN_TX_ERROR_MESSAGE);
throw new CacheException(String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE, "clear"));

ch.request(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
}
Expand All @@ -745,7 +751,7 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
*/
@Override public IgniteClientFuture<Void> clearAsync() throws ClientException {
if (transactions.tx() != null)
throw new CacheException(NON_TRANSACTIONAL_CLIENT_CACHE_CLEAR_IN_TX_ERROR_MESSAGE);
throw new CacheException(String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE, "clearAsync"));

return ch.requestAsync(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
}
Expand Down Expand Up @@ -1366,7 +1372,7 @@ private <T> T txAwareService(
// Transactional operation cannot be executed on affinity node, it should be executed on node started
// the transaction.
if (tx != null) {
checkTxClearOperation(op);
checkTxClearOperation(op, false);

try {
return tx.clientChannel().service(op, payloadWriter, payloadReader);
Expand Down Expand Up @@ -1395,7 +1401,7 @@ private <T> IgniteClientFuture<T> txAwareServiceAsync(
// Transactional operation cannot be executed on affinity node, it should be executed on node started
// the transaction.
if (tx != null) {
checkTxClearOperation(op);
checkTxClearOperation(op, true);

CompletableFuture<T> fut = new CompletableFuture<>();

Expand All @@ -1420,9 +1426,10 @@ else if (affKey != null)
}

/** */
private void checkTxClearOperation(ClientOperation op) {
private void checkTxClearOperation(ClientOperation op, boolean async) {
if (op == ClientOperation.CACHE_CLEAR_KEY || op == ClientOperation.CACHE_CLEAR_KEYS)
throw new CacheException(NON_TRANSACTIONAL_CLIENT_CACHE_CLEAR_IN_TX_ERROR_MESSAGE);
throw new CacheException(String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE,
async ? "clearAsync" : "clear"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ private IgniteInternalFuture<?> clearAsync(@Nullable final Set<? extends K> keys
* @return Future.
*/
private IgniteInternalFuture<?> executeClearTask(@Nullable Set<? extends K> keys, boolean near) {
if (ctx.transactional() && ctx.grid().transactions().tx() != null)
if (ctx.grid().transactions().tx() != null)
throw new CacheException(NON_TRANSACTIONAL_IGNITE_CACHE_CLEAR_IN_TX_ERROR_MESSAGE);

Collection<ClusterNode> srvNodes = ctx.grid().cluster().forCacheNodes(name(), !near, near, false).nodes();
Expand Down Expand Up @@ -4016,7 +4016,7 @@ protected Object readResolve() throws ObjectStreamException {
* @param readers Whether to clear readers.
*/
private boolean clearLocally0(K key, boolean readers) {
if (ctx.transactional() && ctx.grid().transactions().tx() != null)
if (ctx.grid().transactions().tx() != null)
throw new CacheException(NON_TRANSACTIONAL_IGNITE_CACHE_CLEAR_IN_TX_ERROR_MESSAGE);

ctx.shared().cache().checkReadOnlyState("clear", ctx.config());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
Expand Down Expand Up @@ -71,6 +72,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** */
private static final long serialVersionUID = 0L;

/** Exception thrown when a non-transactional IgniteCache operation is invoked within a transaction. */
public static final String NON_TRANSACTIONAL_IGNITE_CACHE_IN_TX_ERROR_MESSAGE = "Failed to invoke a " +
"non-transactional IgniteCache %s operation within a transaction.";

/**
* Empty constructor required by {@link Externalizable}.
*/
Expand Down Expand Up @@ -164,6 +169,9 @@ public void removeVersionedEntry(KeyCacheObject key, GridCacheVersion ver) {

/** {@inheritDoc} */
@Override public void removeAll() throws IgniteCheckedException {
if (ctx.grid().transactions().tx() != null)
throw new CacheException(String.format(NON_TRANSACTIONAL_IGNITE_CACHE_IN_TX_ERROR_MESSAGE, "removeAll"));

try {
AffinityTopologyVersion topVer;

Expand Down Expand Up @@ -201,6 +209,9 @@ public void removeVersionedEntry(KeyCacheObject key, GridCacheVersion ver) {

/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
if (ctx.grid().transactions().tx() != null)
throw new CacheException(String.format(NON_TRANSACTIONAL_IGNITE_CACHE_IN_TX_ERROR_MESSAGE, "removeAllAsync"));

GridFutureAdapter<Void> opFut = new GridFutureAdapter<>();

AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,6 @@ public void testBlockingOps() throws Exception {
() -> assertEquals(0, cache.get(0))
);

// Remove all operation.
checkOpMultithreaded(client,
() -> cache.putAll(F.asMap(0, 0, 1, 1)),
() -> cache.removeAll(),
() -> assertEquals(0, cache.size())
);

// Remove if equals operation.
checkOpMultithreaded(client,
() -> cache.put(0, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.junit.Test;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.internal.client.thin.TcpClientCache.NON_TRANSACTIONAL_CLIENT_CACHE_CLEAR_IN_TX_ERROR_MESSAGE;
import static org.apache.ignite.internal.client.thin.TcpClientCache.NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

Expand All @@ -55,29 +55,32 @@ public class ThinClientNonTransactionalOperationsInTxTest extends GridCommonAbst
public void testThinClientCacheClear() throws Exception {
startGrid(0);

IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER));
try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))) {

checkThinClientCacheOperation(client, cache -> cache.clear());
checkThinClientCacheClearOperation(client, false, cache -> cache.clear());

checkThinClientCacheOperation(client, cache -> cache.clear(2));
checkThinClientCacheClearOperation(client, false, cache -> cache.clear(2));

checkThinClientCacheOperation(client, cache -> cache.clear(Collections.singleton(2)));
checkThinClientCacheClearOperation(client, false, cache -> cache.clear(Collections.singleton(2)));

checkThinClientCacheOperation(client, cache -> cache.clearAll(Collections.singleton(2)));
checkThinClientCacheClearOperation(client, false, cache -> cache.clearAll(Collections.singleton(2)));

checkThinClientCacheOperation(client, cache -> cache.clearAsync());
checkThinClientCacheClearOperation(client, true, cache -> cache.clearAsync());

checkThinClientCacheOperation(client, cache -> cache.clearAsync(2));
checkThinClientCacheClearOperation(client, true, cache -> cache.clearAsync(2));

checkThinClientCacheOperation(client, cache -> cache.clearAllAsync(Collections.singleton(2)));
checkThinClientCacheClearOperation(client, true, cache -> cache.clearAllAsync(Collections.singleton(2)));
}
}

/**
* It should throw exception.
*
* @param client IgniteClient.
* @param async Async flag.
* @param op Operation.
*/
private void checkThinClientCacheOperation(IgniteClient client, Consumer<ClientCache<Object, Object>> op) {
private void checkThinClientCacheClearOperation(IgniteClient client, boolean async, Consumer<ClientCache<Object, Object>> op) {
ClientCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);

cache.put(1, 1);
Expand All @@ -90,7 +93,48 @@ private void checkThinClientCacheOperation(IgniteClient client, Consumer<ClientC
}

return null;
}, CacheException.class, NON_TRANSACTIONAL_CLIENT_CACHE_CLEAR_IN_TX_ERROR_MESSAGE);
}, CacheException.class, String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE,
async ? "clearAsync" : "clear"));

assertTrue(cache.containsKey(1));
assertFalse(cache.containsKey(2));
}

/** */
@Test
public void testThinClientCacheRemove() throws Exception {
startGrid(0);

try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))) {

checkThinClientCacheRemoveOperation(client, false, cache -> cache.removeAll());

checkThinClientCacheRemoveOperation(client, true, cache -> cache.removeAllAsync());
}
}

/**
* It should throw exception.
*
* @param client IgniteClient.
* @param async Async flag.
* @param op Operation.
*/
private void checkThinClientCacheRemoveOperation(IgniteClient client, boolean async, Consumer<ClientCache<Object, Object>> op) {
ClientCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);

cache.put(1, 1);

GridTestUtils.assertThrows(log, (Callable<Void>)() -> {
try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.put(2, 2);

op.accept(cache);
}

return null;
}, CacheException.class, String.format(NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE,
async ? "removeAllAsync" : "removeAll"));

assertTrue(cache.containsKey(1));
assertFalse(cache.containsKey(2));
Expand Down
Loading

0 comments on commit 319250b

Please sign in to comment.