Skip to content

Commit

Permalink
Stops waiting on scans during tablet close.
Browse files Browse the repository at this point in the history
Before this change when a tablet was closing it would prevent new scans from starting
and wait for running scans to finish.  This could cause the tablet to become unavailable
for long periods of time.  After this change tablets will no longer wait on running
scans to complete.  They will set something to interrupt the thread and then disable the
client scan session which prevents the client form ever seeing any data after the tablet
is closed.  Once all scan sessions are disabled the tablet will proceed to close.  Disabling
the scan session will cause the client side scanner to eventually switch to the new tablet
server where the tablet is loaded.

Its possible that threads may be left running on the tablet server therefore it will be
important to also implement apache#4756

fixes apache#4757
  • Loading branch information
keith-turner committed Aug 22, 2024
1 parent fd111a2 commit 388b291
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, KeyExtent ex

long sid = server.getSessionManager().createSession(scanSession, true);

scanParams.setSessionDisabler(() -> server.getSessionManager().disableReservations(sid));

ScanResult scanResult;
try {
scanResult = continueScan(tinfo, sid, scanSession, busyTimeout);
Expand Down Expand Up @@ -440,6 +442,8 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,

long sid = server.getSessionManager().createSession(mss, true);

scanParams.setSessionDisabler(() -> server.getSessionManager().disableReservations(sid));

MultiScanResult result;
try {
result = continueMultiScan(sid, mss, busyTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;

import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.data.Column;
Expand All @@ -43,6 +44,7 @@ public final class ScanParameters {
private final SamplerConfiguration samplerConfig;
private final long batchTimeOut;
private final String classLoaderContext;
private volatile BooleanSupplier sessionDisabler = () -> false;
private volatile ScanDispatch dispatch;

public ScanParameters(int maxEntries, Authorizations authorizations, Set<Column> columnSet,
Expand Down Expand Up @@ -106,6 +108,14 @@ public ScanDispatch getScanDispatch() {
return dispatch;
}

public void setSessionDisabler(BooleanSupplier sessionDisabler) {
this.sessionDisabler = sessionDisabler;
}

public BooleanSupplier getSessionDisabler() {
return sessionDisabler;
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum State {
public long lastAccessTime;
public long startTime;
State state = State.NEW;
boolean allowReservation = true;
private final TCredentials credentials;

Session(TCredentials credentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ public Session reserveSession(long sessionId) {
synchronized (session) {
if (session.state == State.RESERVED) {
throw new IllegalStateException(
"Attempted to reserved session that is already reserved " + sessionId);
"Attempted to reserve session that is already reserved " + sessionId);
}
if (session.state == State.REMOVED) {
if (session.state == State.REMOVED || !session.allowReservation) {
return null;
}

session.state = State.RESERVED;
}
}
Expand All @@ -134,7 +135,7 @@ public Session reserveSession(long sessionId, boolean wait) {
if (session != null) {
synchronized (session) {

if (session.state == State.REMOVED) {
if (session.state == State.REMOVED || !session.allowReservation) {
return null;
}

Expand All @@ -150,7 +151,7 @@ public Session reserveSession(long sessionId, boolean wait) {
throw new IllegalStateException(
"Attempted to reserved session that is already reserved " + sessionId);
}
if (session.state == State.REMOVED) {
if (session.state == State.REMOVED || !session.allowReservation) {
return null;
}
session.state = State.RESERVED;
Expand All @@ -169,6 +170,7 @@ public void unreserveSession(Session session) {
if (session.state != State.RESERVED) {
throw new IllegalStateException("Cannon unreserve, state: " + session.state);
}

session.notifyAll();
session.state = State.UNRESERVED;
session.lastAccessTime = System.currentTimeMillis();
Expand Down Expand Up @@ -249,6 +251,33 @@ public boolean removeIfNotReserved(long sessionId) {
return removed;
}

/**
* Prevents a session from ever being reserved in the future. This method can be called
* concurrently when another thread has the session reserved w/o impacting the other thread. When
* the session is currently reserved by another thread that thread can unreserve as normal and
* after that this session can never be reserved again. Since the session can never be reserved
* after this call it will eventually age off and be cleaned up.
*
* @return true if the sessions is currently not reserved, false otherwise
*/
public boolean disableReservations(long sessionId) {
var session = getSession(sessionId);
if (session == null) {
return true;
}
synchronized (session) {
if (session.allowReservation) {
// Prevent future reservations of this session.
session.allowReservation = false;
log.debug("disabled session {}", sessionId);
}

// If nothing can reserve the session and it is not currently reserved then the session is
// disabled and will eventually be cleaned up.
return session.state != State.RESERVED;
}
}

static void cleanup(BlockingQueue<Session> deferredCleanupQueue, Session session) {
if (!session.cleanup()) {
var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS)
Expand Down Expand Up @@ -296,6 +325,8 @@ private void sweep(final long maxIdle, final long maxUpdateIdle) {
sessionsToCleanup.add(session);
session.state = State.REMOVED;
}
} else if (session.state == State.REMOVED) {
iter.remove();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,8 @@ public String toString() {
.append("expectedDeletionCount", expectedDeletionCount).append("scanParams", scanParams)
.toString();
}

public boolean disableClientSession() {
return scanParams.getSessionDisabler().getAsBoolean();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ public void checkConditions(ConditionChecker checker, Authorizations authorizati
ScanParameters scanParams = new ScanParameters(-1, authorizations, Collections.emptySet(), null,
null, false, null, -1, null);
scanParams.setScanDispatch(ScanDispatch.builder().build());
scanParams.setSessionDisabler(() -> false);

ScanDataSource dataSource = createDataSource(scanParams, false, iFlag);

Expand Down Expand Up @@ -1038,13 +1039,31 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws
activeScan.interrupt();
}

// create a copy so that it can be whittled down as client sessions are disabled
List<ScanDataSource> runningScans = new ArrayList<>(this.activeScans);

runningScans.removeIf(scanDataSource -> {
boolean disabled = scanDataSource.disableClientSession();
if (disabled) {
log.debug("Disabled scan session in tablet close {} {}", extent, scanDataSource);
}
return disabled;
});

long lastLogTime = System.nanoTime();

// wait for reads and writes to complete
while (writesInProgress > 0 || !activeScans.isEmpty()) {
while (writesInProgress > 0 || !runningScans.isEmpty()) {
runningScans.removeIf(scanDataSource -> {
boolean disabled = scanDataSource.disableClientSession();
if (disabled) {
log.debug("Disabled scan session in tablet close {} {}", extent, scanDataSource);
}
return disabled;
});

if (log.isDebugEnabled() && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) {
for (ScanDataSource activeScan : activeScans) {
for (ScanDataSource activeScan : runningScans) {
log.debug("Waiting on scan in completeClose {} {}", extent, activeScan);
}

Expand All @@ -1053,13 +1072,19 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws

try {
log.debug("Waiting to completeClose for {}. {} writes {} scans", extent, writesInProgress,
activeScans.size());
runningScans.size());
this.wait(50);
} catch (InterruptedException e) {
log.error("Interrupted waiting to completeClose for extent {}", extent, e);
}
}

// It is assumed that nothing new would have been added to activeScans since it was copied, so
// check that assumption. At this point activeScans should be empty or everything in it should
// be disabled.
Preconditions.checkState(
activeScans.stream().allMatch(scanDataSource -> scanDataSource.disableClientSession()));

getTabletMemory().waitForMinC();

if (saveState && getTabletMemory().getMemTable().getNumEntries() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
*/
package org.apache.accumulo.tserver.session;

import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.server.ServerContext;
import org.junit.jupiter.api.Test;

public class SessionManagerTest {
Expand Down Expand Up @@ -106,4 +114,38 @@ public void testDeferNotNeeded() {
assertEquals(2,
deferredCleanupQeue.stream().filter(s -> ((TestSession) s).cleanupCount == 2).count());
}

@Test
public void testDisableReservations() {
var sessionManager = createSessionManager();

var sid = sessionManager.createSession(new TestSession(2), true);

// this should prevent future reservation and return false because its currently reserved
assertFalse(sessionManager.disableReservations(sid));

// should not have a problem un-reserving
sessionManager.unreserveSession(sid);

// should not be able to reserve the session because reservations were disabled
assertNull(sessionManager.reserveSession(sid));
assertNull(sessionManager.reserveSession(sid, false));

// should return true now that its not reserved
assertTrue(sessionManager.disableReservations(sid));

sessionManager.removeSession(sid);

// should return true for nonexistent session
assertTrue(sessionManager.disableReservations(sid));
}

private SessionManager createSessionManager() {
ServerContext ctx = createMock(ServerContext.class);
expect(ctx.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
var executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
expect(ctx.getScheduledExecutor()).andReturn(executor).anyTimes();
replay(ctx);
return new SessionManager(ctx);
}
}
Loading

0 comments on commit 388b291

Please sign in to comment.