diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index 8a99b2315e4..d0897755728 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -214,6 +214,8 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, KeyExtent ex long sid = server.getSessionManager().createSession(scanSession, true); + scanParams.setSessionDisabler(() -> server.getSessionManager().disableReservations(sid)); + ScanResult scanResult; try { scanResult = continueScan(tinfo, sid, scanSession, busyTimeout); @@ -440,6 +442,8 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, long sid = server.getSessionManager().createSession(mss, true); + scanParams.setSessionDisabler(() -> server.getSessionManager().disableReservations(sid)); + MultiScanResult result; try { result = continueMultiScan(sid, mss, busyTimeout); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java index 3ce3b4a24f2..57e0920ed57 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BooleanSupplier; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.data.Column; @@ -43,6 +44,7 @@ public final class ScanParameters { private final SamplerConfiguration samplerConfig; private final long batchTimeOut; private final String classLoaderContext; + private volatile BooleanSupplier sessionDisabler = () -> false; private volatile ScanDispatch dispatch; public ScanParameters(int maxEntries, Authorizations authorizations, Set columnSet, @@ -106,6 +108,14 @@ public ScanDispatch getScanDispatch() { return dispatch; } + public void setSessionDisabler(BooleanSupplier sessionDisabler) { + this.sessionDisabler = sessionDisabler; + } + + public BooleanSupplier getSessionDisabler() { + return sessionDisabler; + } + @Override public String toString() { StringBuilder buf = new StringBuilder(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index 6e498337293..95fe463e6b4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -31,6 +31,7 @@ enum State { public long lastAccessTime; public long startTime; State state = State.NEW; + boolean allowReservation = true; private final TCredentials credentials; Session(TCredentials credentials) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 92bbcbc8b8d..1f13492b7dc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -116,11 +116,12 @@ public Session reserveSession(long sessionId) { synchronized (session) { if (session.state == State.RESERVED) { throw new IllegalStateException( - "Attempted to reserved session that is already reserved " + sessionId); + "Attempted to reserve session that is already reserved " + sessionId); } - if (session.state == State.REMOVED) { + if (session.state == State.REMOVED || !session.allowReservation) { return null; } + session.state = State.RESERVED; } } @@ -134,7 +135,7 @@ public Session reserveSession(long sessionId, boolean wait) { if (session != null) { synchronized (session) { - if (session.state == State.REMOVED) { + if (session.state == State.REMOVED || !session.allowReservation) { return null; } @@ -150,7 +151,7 @@ public Session reserveSession(long sessionId, boolean wait) { throw new IllegalStateException( "Attempted to reserved session that is already reserved " + sessionId); } - if (session.state == State.REMOVED) { + if (session.state == State.REMOVED || !session.allowReservation) { return null; } session.state = State.RESERVED; @@ -169,6 +170,7 @@ public void unreserveSession(Session session) { if (session.state != State.RESERVED) { throw new IllegalStateException("Cannon unreserve, state: " + session.state); } + session.notifyAll(); session.state = State.UNRESERVED; session.lastAccessTime = System.currentTimeMillis(); @@ -249,6 +251,33 @@ public boolean removeIfNotReserved(long sessionId) { return removed; } + /** + * Prevents a session from ever being reserved in the future. This method can be called + * concurrently when another thread has the session reserved w/o impacting the other thread. When + * the session is currently reserved by another thread that thread can unreserve as normal and + * after that this session can never be reserved again. Since the session can never be reserved + * after this call it will eventually age off and be cleaned up. + * + * @return true if the sessions is currently not reserved, false otherwise + */ + public boolean disableReservations(long sessionId) { + var session = getSession(sessionId); + if (session == null) { + return true; + } + synchronized (session) { + if (session.allowReservation) { + // Prevent future reservations of this session. + session.allowReservation = false; + log.debug("disabled session {}", sessionId); + } + + // If nothing can reserve the session and it is not currently reserved then the session is + // disabled and will eventually be cleaned up. + return session.state != State.RESERVED; + } + } + static void cleanup(BlockingQueue deferredCleanupQueue, Session session) { if (!session.cleanup()) { var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS) @@ -296,6 +325,8 @@ private void sweep(final long maxIdle, final long maxUpdateIdle) { sessionsToCleanup.add(session); session.state = State.REMOVED; } + } else if (session.state == State.REMOVED) { + iter.remove(); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index f17d9fa57eb..f06b8fdf043 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -321,4 +321,8 @@ public String toString() { .append("expectedDeletionCount", expectedDeletionCount).append("scanParams", scanParams) .toString(); } + + public boolean disableClientSession() { + return scanParams.getSessionDisabler().getAsBoolean(); + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index b6ad6150cbc..484d1bc0c1e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -458,6 +458,7 @@ public void checkConditions(ConditionChecker checker, Authorizations authorizati ScanParameters scanParams = new ScanParameters(-1, authorizations, Collections.emptySet(), null, null, false, null, -1, null); scanParams.setScanDispatch(ScanDispatch.builder().build()); + scanParams.setSessionDisabler(() -> false); ScanDataSource dataSource = createDataSource(scanParams, false, iFlag); @@ -1038,13 +1039,31 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws activeScan.interrupt(); } + // create a copy so that it can be whittled down as client sessions are disabled + List runningScans = new ArrayList<>(this.activeScans); + + runningScans.removeIf(scanDataSource -> { + boolean disabled = scanDataSource.disableClientSession(); + if (disabled) { + log.debug("Disabled scan session in tablet close {} {}", extent, scanDataSource); + } + return disabled; + }); + long lastLogTime = System.nanoTime(); // wait for reads and writes to complete - while (writesInProgress > 0 || !activeScans.isEmpty()) { + while (writesInProgress > 0 || !runningScans.isEmpty()) { + runningScans.removeIf(scanDataSource -> { + boolean disabled = scanDataSource.disableClientSession(); + if (disabled) { + log.debug("Disabled scan session in tablet close {} {}", extent, scanDataSource); + } + return disabled; + }); if (log.isDebugEnabled() && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) { - for (ScanDataSource activeScan : activeScans) { + for (ScanDataSource activeScan : runningScans) { log.debug("Waiting on scan in completeClose {} {}", extent, activeScan); } @@ -1053,13 +1072,19 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws try { log.debug("Waiting to completeClose for {}. {} writes {} scans", extent, writesInProgress, - activeScans.size()); + runningScans.size()); this.wait(50); } catch (InterruptedException e) { log.error("Interrupted waiting to completeClose for extent {}", extent, e); } } + // It is assumed that nothing new would have been added to activeScans since it was copied, so + // check that assumption. At this point activeScans should be empty or everything in it should + // be disabled. + Preconditions.checkState( + activeScans.stream().allMatch(scanDataSource -> scanDataSource.disableClientSession())); + getTabletMemory().waitForMinC(); if (saveState && getTabletMemory().getMemTable().getNumEntries() > 0) { diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java index be3626ea712..c1b522c04be 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java @@ -18,12 +18,20 @@ */ package org.apache.accumulo.tserver.session; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.server.ServerContext; import org.junit.jupiter.api.Test; public class SessionManagerTest { @@ -106,4 +114,38 @@ public void testDeferNotNeeded() { assertEquals(2, deferredCleanupQeue.stream().filter(s -> ((TestSession) s).cleanupCount == 2).count()); } + + @Test + public void testDisableReservations() { + var sessionManager = createSessionManager(); + + var sid = sessionManager.createSession(new TestSession(2), true); + + // this should prevent future reservation and return false because its currently reserved + assertFalse(sessionManager.disableReservations(sid)); + + // should not have a problem un-reserving + sessionManager.unreserveSession(sid); + + // should not be able to reserve the session because reservations were disabled + assertNull(sessionManager.reserveSession(sid)); + assertNull(sessionManager.reserveSession(sid, false)); + + // should return true now that its not reserved + assertTrue(sessionManager.disableReservations(sid)); + + sessionManager.removeSession(sid); + + // should return true for nonexistent session + assertTrue(sessionManager.disableReservations(sid)); + } + + private SessionManager createSessionManager() { + ServerContext ctx = createMock(ServerContext.class); + expect(ctx.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + var executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); + expect(ctx.getScheduledExecutor()).andReturn(executor).anyTimes(); + replay(ctx); + return new SessionManager(ctx); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java new file mode 100644 index 00000000000..ee8beaa95b8 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class ZombieScanIT extends ConfigurableMacBase { + + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + super.configure(cfg, hadoopCoreSite); + + cfg.setNumTservers(1); + } + + /** + * An iterator that should get stuck forever when used + */ + public static class ZombieIterator extends WrappingIterator { + @Override + public boolean hasTop() { + // must call super.hasTop() before blocking as that will run accumulo code to setup iterator + boolean ht = super.hasTop(); + Semaphore semaphore = new Semaphore(10); + semaphore.acquireUninterruptibly(5); + // this should block forever + semaphore.acquireUninterruptibly(6); + return ht; + } + } + + /** + * This test ensure that scans threads that run forever do not prevent tablets from unloading. + */ + @Test + public void testZombieScan() throws Exception { + + String table = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + + var splits = new TreeSet(); + splits.add(new Text("3")); + splits.add(new Text("5")); + splits.add(new Text("7")); + var ntc = new NewTableConfiguration().withSplits(splits); + c.tableOperations().create(table, ntc); + + try (var writer = c.createBatchWriter(table)) { + for (var row : List.of("2", "4", "6", "8")) { + Mutation m = new Mutation(row); + m.put("f", "q", "v"); + writer.addMutation(m); + } + } + + // Flush the data otherwise when the tablet attempts to close with an active scan reading from + // the in memory map it will wait for 15 seconds for the scan + c.tableOperations().flush(table, null, null, true); + + var executor = Executors.newCachedThreadPool(); + + // start two zombie scans that should never return using a normal scanner + List> futures = new ArrayList<>(); + for (var row : List.of("2", "4")) { + var future = executor.submit(() -> { + try (var scanner = c.createScanner(table)) { + IteratorSetting iter = new IteratorSetting(100, "Z", ZombieIterator.class); + scanner.addScanIterator(iter); + scanner.setRange(new Range(row)); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + futures.add(future); + } + + // start two zombie scans that should never return using a batch scanner + for (var row : List.of("6", "8")) { + var future = executor.submit(() -> { + try (var scanner = c.createBatchScanner(table)) { + IteratorSetting iter = new IteratorSetting(100, "Z", ZombieIterator.class); + scanner.addScanIterator(iter); + scanner.setRanges(List.of(new Range(row))); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + futures.add(future); + } + + // should eventually see the four zombie scans running against four tablets + Wait.waitFor(() -> countDistinctTabletsScans(table, c) == 4); + + assertEquals(1, c.instanceOperations().getTabletServers().size()); + + // Start 3 new tablet servers, this should cause the table to balance and the tablets with + // zombie scans to unload. The Zombie scans should not prevent the table from unloading. The + // scan threads will still be running on the old tablet servers. + getCluster().getConfig().setNumTservers(4); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + + // Wait for all tablets servers + Wait.waitFor(() -> c.instanceOperations().getTabletServers().size() == 4); + + // The table should eventually balance across the 4 tablet servers + Wait.waitFor(() -> countLocations(table, c) == 4); + + // The zombie scans should still be running + assertTrue(futures.stream().noneMatch(Future::isDone)); + + // Should be able to scan all the tablets at the new locations. + try (var scanner = c.createScanner(table)) { + var rows = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("2", "4", "6", "8"), rows); + } + + try (var scanner = c.createBatchScanner(table)) { + scanner.setRanges(List.of(new Range())); + var rows = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("2", "4", "6", "8"), rows); + } + + // The zombie scans should migrate with the tablets, taking up more scan threads in the + // system. + Set tabletSeversWithZombieScans = new HashSet<>(); + for (String tserver : c.instanceOperations().getTabletServers()) { + if (c.instanceOperations().getActiveScans(tserver).stream() + .flatMap(activeScan -> activeScan.getSsiList().stream()) + .anyMatch(scanIters -> scanIters.contains(ZombieIterator.class.getName()))) { + tabletSeversWithZombieScans.add(tserver); + } + } + assertEquals(4, tabletSeversWithZombieScans.size()); + + executor.shutdownNow(); + } + + } + + private static long countLocations(String table, AccumuloClient client) throws Exception { + var ctx = (ClientContext) client; + var tableId = ctx.getTableId(table); + return ctx.getAmple().readTablets().forTable(tableId).build().stream() + .map(TabletMetadata::getLocation).filter(Objects::nonNull).distinct().count(); + } + + private static long countDistinctTabletsScans(String table, AccumuloClient client) + throws Exception { + var tservers = client.instanceOperations().getTabletServers(); + long count = 0; + for (String tserver : tservers) { + count += client.instanceOperations().getActiveScans(tserver).stream() + .filter(activeScan -> activeScan.getTable().equals(table)) + .map(activeScan -> activeScan.getTablet()).distinct().count(); + } + return count; + } + +}