Skip to content

Commit

Permalink
refactors filtering getServers API call
Browse files Browse the repository at this point in the history
Changes filtering in the getServers API call so that it can prune
branches while walking the tree of servers in zookeeper.
  • Loading branch information
keith-turner committed Oct 8, 2024
1 parent 16b71cd commit cd90b8b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -241,10 +242,15 @@ Map<String,String> modifyProperties(Consumer<Map<String,String>> mapMutator)
/**
* Returns the servers of a given type that match the given criteria
*
* @param resourceGroupPredicate only returns servers where the resource group matches this
* predicate. For the manager it does not have a resoruce group and this parameters is not
* used.
* @param hostPortPredicate only returns servers where its host and port match this predicate.
* @return set of servers of the supplied type matching the supplied test
* @since 4.0.0
*/
Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test);
Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate,
BiPredicate<String,Integer> hostPortPredicate);

/**
* List the active scans on a tablet server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
Expand Down Expand Up @@ -489,7 +489,7 @@ public ServerId getServer(ServerId.Type type, String resourceGroup, String host,
throw new IllegalStateException("Multiple servers matching provided address");
}
case MANAGER:
Set<ServerId> managers = getServers(type, null);
Set<ServerId> managers = getServers(type, rg2 -> true, hp);
if (managers.isEmpty()) {
return null;
} else {
Expand Down Expand Up @@ -520,43 +520,64 @@ public ServerId getServer(ServerId.Type type, String resourceGroup, String host,

@Override
public Set<ServerId> getServers(ServerId.Type type) {
return getServers(type, null);
AddressPredicate addressPredicate = addr -> true;
return getServers(type, rg -> true, addressPredicate);
}

@Override
public Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test) {
public Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate,
BiPredicate<String,Integer> hostPortPredicate) {
Objects.requireNonNull(type, "Server type was null");
Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate was null");
Objects.requireNonNull(hostPortPredicate, "Host port predicate was null");

AddressPredicate addressPredicate = addr -> {
var hp = HostAndPort.fromString(addr);
return hostPortPredicate.test(hp.getHost(), hp.getPort());
};

return getServers(type, resourceGroupPredicate, addressPredicate);
}

private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate,
AddressPredicate addressPredicate) {

final Set<ServerId> results = new HashSet<>();

switch (type) {
case COMPACTOR:
context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
context.getServerPaths().getCompactor(resourceGroupPredicate::test, addressPredicate, true)
.forEach(c -> results.add(createServerId(type, c)));
break;
case MANAGER:
ServiceLockPath m = context.getServerPaths().getManager(true);
Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
hp.getPort()));
if (m != null) {
Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
if (addressPredicate.test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
hp.getPort()));
}
}
}
break;
case SCAN_SERVER:
context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressPredicate, true)
.forEach(s -> results.add(createServerId(type, s)));
break;
case TABLET_SERVER:
context.getServerPaths().getTabletServer(rg -> true, addr -> true, true)
context.getServerPaths()
.getTabletServer(resourceGroupPredicate::test, addressPredicate, true)
.forEach(t -> results.add(createServerId(type, t)));
break;
default:
break;
}
if (test == null) {
return Collections.unmodifiableSet(results);
}
return results.stream().filter(test).collect(Collectors.toUnmodifiableSet());

return Collections.unmodifiableSet(results);
}

private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
Expand Down Expand Up @@ -396,14 +397,14 @@ private Set<ServiceLockPath> get(final String serverType,
if (resourceGroupPredicate.test(group)) {
final List<String> servers = cache.getChildren(typePath + "/" + group);
for (final String server : servers) {
final ZcStat stat = new ZcStat();
final ServiceLockPath slp =
parse(Optional.of(serverType), typePath + "/" + group + "/" + server);
if (addressPredicate.test(server)) {
final ServiceLockPath slp =
parse(Optional.of(serverType), typePath + "/" + group + "/" + server);
if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
// Dead TServers don't have lock data
results.add(slp);
} else {
final ZcStat stat = new ZcStat();
Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat);
if (!sld.isEmpty()) {
results.add(slp);
Expand Down

0 comments on commit cd90b8b

Please sign in to comment.