Skip to content

Commit

Permalink
avoids uneeded object allocation on ServiceLocksPath
Browse files Browse the repository at this point in the history
ServiceLockPaths allows filtering hosts using a predicate.  In the case
where a predicate was passed in that always returned true a HostPort
object needlessly allocated to pass to the predicate.  These changes
refactor the predicate to only allocate the HostAndPort object when
needed.  Before the changes in apache#4943 this was the behavior, that when
all host were wanted no objects were allocated.
  • Loading branch information
keith-turner committed Oct 4, 2024
1 parent a3c9bf7 commit 74b28a8
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;

import com.google.common.net.HostAndPort;
Expand All @@ -38,7 +39,7 @@ public boolean doesTabletServerLockExist(String server) {
// ServiceLockPaths only returns items that have a lock
var hostAndPort = HostAndPort.fromString(server);
Set<ServiceLockPath> tservers =
ctx.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hostAndPort), true);
ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), true);
return !tservers.isEmpty();
}

Expand All @@ -47,7 +48,7 @@ public boolean isLockHeld(String server, String session) {
// ServiceLockPaths only returns items that have a lock
var hostAndPort = HostAndPort.fromString(server);
Set<ServiceLockPath> tservers =
ctx.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hostAndPort), true);
ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), true);
for (ServiceLockPath slp : tservers) {
if (ServiceLock.getSessionId(ctx.getZooCache(), slp) == Long.parseLong(session, 16)) {
return true;
Expand All @@ -59,7 +60,7 @@ public boolean isLockHeld(String server, String session) {
@Override
public void invalidateCache(String tserver) {
var hostAndPort = HostAndPort.fromString(tserver);
ctx.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hostAndPort), false)
ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), false)
.forEach(slp -> {
ctx.getZooCache().clear(slp.toString());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public ServiceLockPath createDeadTabletServerPath(String resourceGroup,
}

public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate resourceGroupPredicate,
Predicate<HostAndPort> address, boolean withLock) {
AddressPredicate address, boolean withLock) {
return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock);
}

Expand Down Expand Up @@ -316,24 +316,33 @@ public ServiceLockPath getMonitor(boolean withLock) {
}

public Set<ServiceLockPath> getScanServer(ResourceGroupPredicate resourceGroupPredicate,
Predicate<HostAndPort> address, boolean withLock) {
AddressPredicate address, boolean withLock) {
return get(Constants.ZSSERVERS, resourceGroupPredicate, address, withLock);
}

public Set<ServiceLockPath> getTabletServer(ResourceGroupPredicate resourceGroupPredicate,
Predicate<HostAndPort> address, boolean withLock) {
AddressPredicate address, boolean withLock) {
return get(Constants.ZTSERVERS, resourceGroupPredicate, address, withLock);
}

public Set<ServiceLockPath> getDeadTabletServer(ResourceGroupPredicate resourceGroupPredicate,
Predicate<HostAndPort> address, boolean withLock) {
AddressPredicate address, boolean withLock) {
return get(Constants.ZDEADTSERVERS, resourceGroupPredicate, address, withLock);
}

public interface ResourceGroupPredicate extends Predicate<String> {

}

public interface AddressPredicate extends Predicate<String> {

static AddressPredicate exact(HostAndPort hostAndPort) {
Objects.requireNonNull(hostAndPort);
AddressPredicate predicate = addr -> hostAndPort.equals(HostAndPort.fromString(addr));
return predicate;
}
}

/**
* Find paths in ZooKeeper based on the input arguments and return a set of ServiceLockPath
* objects.
Expand All @@ -347,7 +356,7 @@ public interface ResourceGroupPredicate extends Predicate<String> {
* @return set of ServiceLockPath objects for the paths found based on the search criteria
*/
private Set<ServiceLockPath> get(final String serverType,
ResourceGroupPredicate resourceGroupPredicate, Predicate<HostAndPort> addressPredicate,
ResourceGroupPredicate resourceGroupPredicate, AddressPredicate addressPredicate,
boolean withLock) {

Objects.requireNonNull(serverType);
Expand Down Expand Up @@ -380,7 +389,7 @@ private Set<ServiceLockPath> get(final String serverType,
final ZcStat stat = new ZcStat();
final ServiceLockPath slp =
parse(Optional.of(serverType), typePath + "/" + group + "/" + server);
if (addressPredicate.test(HostAndPort.fromString(server))) {
if (addressPredicate.test(server)) {
if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
// Dead TServers don't have lock data
results.add(slp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -385,7 +386,7 @@ public void testGetCompactorsNotRunning() {
assertTrue(ctx.getServerPaths()
.getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty());
assertTrue(ctx.getServerPaths()
.getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), true)
.getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressPredicate.exact(hp), true)
.isEmpty());

EasyMock.verify(ctx, zc);
Expand Down Expand Up @@ -504,7 +505,7 @@ public void testGetCompactors() {

// query for a specific server
results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(hp), true);
AddressPredicate.exact(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
Expand All @@ -515,7 +516,7 @@ public void testGetCompactors() {

// query for a wrong server
results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true);
AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());

EasyMock.verify(ctx, zc);
Expand All @@ -542,7 +543,7 @@ public void testGetScanServersNotRunning() {
assertTrue(ctx.getServerPaths()
.getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty());
assertTrue(ctx.getServerPaths()
.getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), true)
.getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressPredicate.exact(hp), true)
.isEmpty());

EasyMock.verify(ctx, zc);
Expand Down Expand Up @@ -658,7 +659,7 @@ public void testGetScanServers() {

// query for a specific server
results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(hp), true);
AddressPredicate.exact(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
Expand All @@ -669,7 +670,7 @@ public void testGetScanServers() {

// query for a wrong server
results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true);
AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());

EasyMock.verify(ctx, zc);
Expand All @@ -696,7 +697,7 @@ public void testGetTabletServersNotRunning() {
assertTrue(ctx.getServerPaths()
.getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty());
assertTrue(ctx.getServerPaths()
.getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), true)
.getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressPredicate.exact(hp), true)
.isEmpty());

EasyMock.verify(ctx, zc);
Expand Down Expand Up @@ -812,7 +813,7 @@ public void testGetTabletServers() {

// query for a specific server
results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(hp), true);
AddressPredicate.exact(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
Expand All @@ -823,7 +824,7 @@ public void testGetTabletServers() {

// query for a wrong server
results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true);
AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());

EasyMock.verify(ctx, zc);
Expand All @@ -849,9 +850,8 @@ public void testGetDeadTabletServersNone() {
assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false).isEmpty());
assertTrue(ctx.getServerPaths()
.getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, false).isEmpty());
assertTrue(ctx.getServerPaths()
.getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), false)
.isEmpty());
assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressPredicate.exact(hp), false).isEmpty());

EasyMock.verify(ctx, zc);

Expand Down Expand Up @@ -946,7 +946,7 @@ public void testGetDeadTabletServers() {

// query for a specific server
results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(hp), false);
AddressPredicate.exact(hp), false);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
Expand All @@ -958,7 +958,7 @@ public void testGetDeadTabletServers() {

// query for a wrong server
results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
addr -> addr.equals(HostAndPort.fromString("localhost:1234")), false);
AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), false);
assertEquals(0, results.size());

EasyMock.verify(ctx, zc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
Expand All @@ -41,6 +40,7 @@
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
Expand Down Expand Up @@ -465,10 +465,7 @@ public synchronized void remove(TServerInstance server) {
ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2);
return rgp;
}).orElse(rg -> true);
Predicate<HostAndPort> addrPredicate = address.map(addr -> {
Predicate<HostAndPort> ap = addr2 -> addr.equals(addr2);
return ap;
}).orElse(addr -> true);
AddressPredicate addrPredicate = address.map(AddressPredicate::exact).orElse(addr -> true);
Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(rgPredicate, addrPredicate, false);
if (paths.isEmpty() || paths.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.TFateId;
Expand Down Expand Up @@ -679,8 +680,8 @@ static String getTServersZkPath(ClientContext context) {
static String qualifyWithZooKeeperSessionId(ClientContext context, ZooCache zooCache,
String hostAndPort) {
var hpObj = HostAndPort.fromString(hostAndPort);
Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hpObj), true);
Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rg -> true,
ServiceLockPaths.AddressPredicate.exact(hpObj), true);
if (paths.size() != 1) {
return hostAndPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.server.ServerContext;

Expand Down Expand Up @@ -64,7 +65,7 @@ public static void execute(final ServerContext context, final String lock, final
} else {
var hostAndPort = HostAndPort.fromString(lock);
Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rg -> true,
addr -> addr.equals(hostAndPort), true);
ServiceLockPaths.AddressPredicate.exact(hostAndPort), true);
Preconditions.checkArgument(paths.size() == 1,
lock + " does not match a single ZooKeeper TabletServer lock. matches=" + paths);
ServiceLockPath path = paths.iterator().next();
Expand Down

0 comments on commit 74b28a8

Please sign in to comment.