Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Oct 12, 2024
1 parent b0b9555 commit 39f0b50
Show file tree
Hide file tree
Showing 31 changed files with 254 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.schema.Ample;
Expand Down Expand Up @@ -196,8 +197,8 @@ public ServiceEnvironment getServiceEnv() {

@Override
public Supplier<Collection<ScanServerInfo>> getScanServers() {
return () -> getServerPaths().getScanServer(rg -> true, addr -> true, true).stream()
.map(entry -> new ScanServerInfo() {
return () -> getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true)
.stream().map(entry -> new ScanServerInfo() {
@Override
public String getAddress() {
return entry.getServer();
Expand Down Expand Up @@ -414,7 +415,7 @@ public ScanServerSelector getScanServerSelector() {
public Map<String,Pair<UUID,String>> getScanServers() {
Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>();
Set<ServiceLockPath> scanServerPaths =
getServerPaths().getScanServer(rg -> true, addr -> true, true);
getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true);
for (ServiceLockPath path : scanServerPaths) {
try {
ZcStat stat = new ZcStat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
Expand Down Expand Up @@ -236,7 +236,7 @@ public List<String> getManagerLocations() {
@Deprecated(since = "4.0.0")
public Set<String> getCompactors() {
Set<String> results = new HashSet<>();
context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true)
.forEach(t -> results.add(t.getServer()));
return results;
}
Expand All @@ -245,7 +245,7 @@ public Set<String> getCompactors() {
@Deprecated(since = "4.0.0")
public Set<String> getScanServers() {
Set<String> results = new HashSet<>();
context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true)
.forEach(t -> results.add(t.getServer()));
return results;
}
Expand All @@ -254,7 +254,7 @@ public Set<String> getScanServers() {
@Deprecated(since = "4.0.0")
public List<String> getTabletServers() {
List<String> results = new ArrayList<>();
context.getServerPaths().getTabletServer(rg -> true, addr -> true, true)
context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true)
.forEach(t -> results.add(t.getServer()));
return results;
}
Expand Down Expand Up @@ -476,7 +476,7 @@ public ServerId getServer(ServerId.Type type, String resourceGroup, String host,

final ResourceGroupPredicate rg =
resourceGroup == null ? rgt -> true : rgt -> rgt.equals(resourceGroup);
final AddressPredicate hp = AddressPredicate.exact(HostAndPort.fromParts(host, port));
final AddressSelector hp = AddressSelector.exact(HostAndPort.fromParts(host, port));

switch (type) {
case COMPACTOR:
Expand Down Expand Up @@ -520,8 +520,7 @@ public ServerId getServer(ServerId.Type type, String resourceGroup, String host,

@Override
public Set<ServerId> getServers(ServerId.Type type) {
AddressPredicate addressPredicate = addr -> true;
return getServers(type, rg -> true, addressPredicate);
return getServers(type, rg -> true, AddressSelector.all());
}

@Override
Expand All @@ -531,22 +530,22 @@ public Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGr
Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate was null");
Objects.requireNonNull(hostPortPredicate, "Host port predicate was null");

AddressPredicate addressPredicate = addr -> {
AddressSelector addressPredicate = AddressSelector.matching(addr -> {
var hp = HostAndPort.fromString(addr);
return hostPortPredicate.test(hp.getHost(), hp.getPort());
};
});

return getServers(type, resourceGroupPredicate, addressPredicate);
}

private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate,
AddressPredicate addressPredicate) {
AddressSelector addressSelector) {

final Set<ServerId> results = new HashSet<>();

switch (type) {
case COMPACTOR:
context.getServerPaths().getCompactor(resourceGroupPredicate::test, addressPredicate, true)
context.getServerPaths().getCompactor(resourceGroupPredicate::test, addressSelector, true)
.forEach(c -> results.add(createServerId(type, c)));
break;
case MANAGER:
Expand All @@ -556,7 +555,7 @@ private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceG
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
if (addressPredicate.test(location)) {
if (addressSelector.getPredicate().test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
hp.getPort()));
Expand All @@ -565,12 +564,12 @@ private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceG
}
break;
case SCAN_SERVER:
context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressPredicate, true)
context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressSelector, true)
.forEach(s -> results.add(createServerId(type, s)));
break;
case TABLET_SERVER:
context.getServerPaths()
.getTabletServer(resourceGroupPredicate::test, addressPredicate, true)
.getTabletServer(resourceGroupPredicate::test, addressSelector, true)
.forEach(t -> results.add(createServerId(type, t)));
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;

import com.google.common.net.HostAndPort;
Expand All @@ -39,7 +39,7 @@ public boolean doesTabletServerLockExist(String server) {
// ServiceLockPaths only returns items that have a lock
var hostAndPort = HostAndPort.fromString(server);
Set<ServiceLockPath> tservers =
ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), true);
ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hostAndPort), true);
return !tservers.isEmpty();
}

Expand All @@ -48,7 +48,7 @@ public boolean isLockHeld(String server, String session) {
// ServiceLockPaths only returns items that have a lock
var hostAndPort = HostAndPort.fromString(server);
Set<ServiceLockPath> tservers =
ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), true);
ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hostAndPort), true);
for (ServiceLockPath slp : tservers) {
if (ServiceLock.getSessionId(ctx.getZooCache(), slp) == Long.parseLong(session, 16)) {
return true;
Expand All @@ -60,7 +60,7 @@ public boolean isLockHeld(String server, String session) {
@Override
public void invalidateCache(String tserver) {
var hostAndPort = HostAndPort.fromString(tserver);
ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), false)
ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hostAndPort), false)
.forEach(slp -> {
ctx.getZooCache().clear(slp.toString());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.lock;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -285,7 +286,7 @@ public ServiceLockPath createDeadTabletServerPath(String resourceGroup,
}

public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate resourceGroupPredicate,
AddressPredicate address, boolean withLock) {
AddressSelector address, boolean withLock) {
return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock);
}

Expand All @@ -295,7 +296,8 @@ public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate resourceGroupPre
* the ZooKeeper path.
*/
public ServiceLockPath getGarbageCollector(boolean withLock) {
Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr -> true, withLock);
Set<ServiceLockPath> results =
get(Constants.ZGC_LOCK, rg -> true, AddressSelector.all(), withLock);
if (results.isEmpty()) {
return null;
} else {
Expand All @@ -309,7 +311,8 @@ public ServiceLockPath getGarbageCollector(boolean withLock) {
* InstanceOperations.getServers(ServerId.Type.MANAGER) to get the location.
*/
public ServiceLockPath getManager(boolean withLock) {
Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true, addr -> true, withLock);
Set<ServiceLockPath> results =
get(Constants.ZMANAGER_LOCK, rg -> true, AddressSelector.all(), withLock);
if (results.isEmpty()) {
return null;
} else {
Expand All @@ -318,7 +321,8 @@ public ServiceLockPath getManager(boolean withLock) {
}

public ServiceLockPath getMonitor(boolean withLock) {
Set<ServiceLockPath> results = get(Constants.ZMONITOR_LOCK, rg -> true, addr -> true, withLock);
Set<ServiceLockPath> results =
get(Constants.ZMONITOR_LOCK, rg -> true, AddressSelector.all(), withLock);
if (results.isEmpty()) {
return null;
} else {
Expand All @@ -327,29 +331,59 @@ public ServiceLockPath getMonitor(boolean withLock) {
}

public Set<ServiceLockPath> getScanServer(ResourceGroupPredicate resourceGroupPredicate,
AddressPredicate address, boolean withLock) {
AddressSelector address, boolean withLock) {
return get(Constants.ZSSERVERS, resourceGroupPredicate, address, withLock);
}

public Set<ServiceLockPath> getTabletServer(ResourceGroupPredicate resourceGroupPredicate,
AddressPredicate address, boolean withLock) {
AddressSelector address, boolean withLock) {
return get(Constants.ZTSERVERS, resourceGroupPredicate, address, withLock);
}

public Set<ServiceLockPath> getDeadTabletServer(ResourceGroupPredicate resourceGroupPredicate,
AddressPredicate address, boolean withLock) {
AddressSelector address, boolean withLock) {
return get(Constants.ZDEADTSERVERS, resourceGroupPredicate, address, withLock);
}

public interface ResourceGroupPredicate extends Predicate<String> {

}

public interface AddressPredicate extends Predicate<String> {
public static class AddressSelector {
private final Predicate<String> predicate;
private final HostAndPort exactAddress;

static AddressPredicate exact(HostAndPort hostAndPort) {
Objects.requireNonNull(hostAndPort);
AddressPredicate predicate = addr -> hostAndPort.equals(HostAndPort.fromString(addr));
private AddressSelector(Predicate<String> predicate, HostAndPort exactAddress) {
Preconditions.checkArgument((predicate == null && exactAddress != null)
|| (predicate != null && exactAddress == null));
if (predicate == null) {
String hp = exactAddress.toString();
this.predicate = addr -> addr.equals(hp);
} else {
this.predicate = predicate;
}
this.exactAddress = exactAddress;
}

public static AddressSelector exact(HostAndPort hostAndPort) {
return new AddressSelector(null, hostAndPort);
}

public static AddressSelector matching(Predicate<String> predicate) {
return new AddressSelector(predicate, null);
}

private static AddressSelector ALL = new AddressSelector(s -> true, null);

public static AddressSelector all() {
return ALL;
}

public HostAndPort getExactAddress() {
return exactAddress;
}

public Predicate<String> getPredicate() {
return predicate;
}
}
Expand All @@ -361,18 +395,18 @@ static AddressPredicate exact(HostAndPort hostAndPort) {
* @param serverType type of lock, should be something like Constants.ZTSERVERS or
* Constants.ZMANAGER_LOCK
* @param resourceGroupPredicate only returns servers in resource groups that pass this predicate
* @param addressPredicate only return servers that match this predicate
* @param addressSelector only return servers that meet this criteria
* @param withLock supply true if you only want to return servers that have an active lock. Not
* applicable for types that don't use a lock (e.g. dead tservers)
* @return set of ServiceLockPath objects for the paths found based on the search criteria
*/
private Set<ServiceLockPath> get(final String serverType,
ResourceGroupPredicate resourceGroupPredicate, AddressPredicate addressPredicate,
ResourceGroupPredicate resourceGroupPredicate, AddressSelector addressSelector,
boolean withLock) {

Objects.requireNonNull(serverType);
Objects.requireNonNull(resourceGroupPredicate);
Objects.requireNonNull(addressPredicate);
Objects.requireNonNull(addressSelector);

final Set<ServiceLockPath> results = new HashSet<>();
final String typePath = ctx.getZooKeeperRoot() + serverType;
Expand All @@ -395,7 +429,24 @@ private Set<ServiceLockPath> get(final String serverType,
final List<String> resourceGroups = cache.getChildren(typePath);
for (final String group : resourceGroups) {
if (resourceGroupPredicate.test(group)) {
final List<String> servers = cache.getChildren(typePath + "/" + group);
final Collection<String> servers;
final Predicate<String> addressPredicate;

if (addressSelector.getExactAddress() != null) {
var server = addressSelector.getExactAddress().toString();
if(withLock){
servers = List.of(server);
} else if(cache.get(typePath + "/" + group + "/" + server) != null){
servers = List.of(addressSelector.getExactAddress().toString());
}else{
servers = List.of();
}
addressPredicate = s -> true;
} else {
servers = cache.getChildren(typePath + "/" + group);
addressPredicate = addressSelector.getPredicate();
}

for (final String server : servers) {
if (addressPredicate.test(server)) {
final ServiceLockPath slp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
Expand Down Expand Up @@ -633,8 +634,8 @@ static TabletMetadata create(String id, String prevEndRow, String endRow) {
public static synchronized Set<TServerInstance> getLiveTServers(ClientContext context) {
final Set<TServerInstance> liveServers = new HashSet<>();

for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true)) {
for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true)) {

checkTabletServer(context, slp).ifPresent(liveServers::add);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec;
Expand Down Expand Up @@ -88,16 +88,19 @@ default Pair<String,C> getThriftServerConnection(Logger LOG, ThriftClientTypes<C
// correct one.
HostAndPort hp = HostAndPort.fromString(debugHost);
serverPaths.addAll(
context.getServerPaths().getCompactor(rg -> true, AddressPredicate.exact(hp), true));
context.getServerPaths().getCompactor(rg -> true, AddressSelector.exact(hp), true));
serverPaths.addAll(
context.getServerPaths().getScanServer(rg -> true, AddressPredicate.exact(hp), true));
context.getServerPaths().getScanServer(rg -> true, AddressSelector.exact(hp), true));
serverPaths.addAll(
context.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hp), true));
context.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hp), true));
} else {
serverPaths.addAll(context.getServerPaths().getTabletServer(rg -> true, addr -> true, true));
serverPaths.addAll(
context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true));
if (type == ThriftClientTypes.CLIENT) {
serverPaths.addAll(context.getServerPaths().getCompactor(rg -> true, addr -> true, true));
serverPaths.addAll(context.getServerPaths().getScanServer(rg -> true, addr -> true, true));
serverPaths
.addAll(context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true));
serverPaths.addAll(
context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true));
}
if (serverPaths.isEmpty()) {
if (warned.compareAndSet(false, true)) {
Expand Down
Loading

0 comments on commit 39f0b50

Please sign in to comment.