Skip to content

Commit

Permalink
Adds needsReassignment to balancer
Browse files Browse the repository at this point in the history
Adds a new method to the balancer that can be used to efficiently detect
when a tablet needs reassignment based on configuration changes in
balancer.

fixes apache#3590
  • Loading branch information
keith-turner committed Jul 22, 2023
1 parent 6e745d1 commit 170aeea
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,85 @@
*/
package org.apache.accumulo.core.metadata;

import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public enum TabletState {
UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, ASSIGNED_TO_WRONG_GROUP
UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, NEEDS_REASSIGNMENT;

private static Logger log = LoggerFactory.getLogger(TabletState.class);

public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers) {
return compute(tm, liveTServers, null, null);
}

public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers,
TabletBalancer balancer, Map<TabletServerId,String> tserverGroups) {
TabletMetadata.Location current = null;
TabletMetadata.Location future = null;
if (tm.hasCurrent()) {
current = tm.getLocation();
} else {
future = tm.getLocation();
}
if (future != null) {
return liveTServers.contains(future.getServerInstance()) ? TabletState.ASSIGNED
: TabletState.ASSIGNED_TO_DEAD_SERVER;
} else if (current != null) {
if (liveTServers.contains(current.getServerInstance())) {
if (balancer != null) {
var tsii = new TabletServerIdImpl(current.getServerInstance());
var resourceGroup = tserverGroups.get(tsii);

if (resourceGroup != null) {
var reassign = balancer.needsReassignment(new TabletBalancer.CurrentAssignment() {
@Override
public TabletId getTablet() {
return new TabletIdImpl(tm.getExtent());
}

@Override
public TabletServerId getTabletServer() {
return tsii;
}

@Override
public String getResourceGroup() {
return resourceGroup;
}
});

if (reassign) {
return TabletState.NEEDS_REASSIGNMENT;
}
} else {
// A tablet server should always have a resource group, however there is a race
// conditions where the resource group map was read before a tablet server came into
// existence. Another possible cause for an absent resource group is a bug in accumulo.
// In either case do not call the balancer for now with the assumption that the resource
// group will be available later. Log a message in case it is a bug.
log.trace(
"Could not find resource group for tserver {}, so did not consult balancer. Assuming this is a temporary race condition.",
current.getServerInstance());
}
}
return TabletState.HOSTED;
} else {
return TabletState.ASSIGNED_TO_DEAD_SERVER;
}
} else if (tm.getSuspend() != null) {
return TabletState.SUSPENDED;
} else {
return TabletState.UNASSIGNED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,15 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
Expand All @@ -79,8 +76,6 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand Down Expand Up @@ -430,47 +425,6 @@ public SortedMap<Key,Value> getKeyValues() {
return keyValues;
}

public TabletState getTabletState(Set<TServerInstance> liveTServers) {
return getTabletState(liveTServers, null, null);
}

public TabletState getTabletState(Set<TServerInstance> liveTServers, TabletBalancer balancer,
Map<String,Set<TabletServerId>> currentTServerGrouping) {
ensureFetched(ColumnType.LOCATION);
ensureFetched(ColumnType.LAST);
ensureFetched(ColumnType.SUSPEND);
Location current = null;
Location future = null;
if (hasCurrent()) {
current = location;
} else {
future = location;
}
if (future != null) {
return liveTServers.contains(future.getServerInstance()) ? TabletState.ASSIGNED
: TabletState.ASSIGNED_TO_DEAD_SERVER;
} else if (current != null) {
if (liveTServers.contains(current.getServerInstance())) {
if (balancer != null) {
String resourceGroup = balancer.getResourceGroup(new TabletIdImpl(extent));
log.trace("Resource Group for extent {} is {}", extent, resourceGroup);
Set<TabletServerId> tservers = currentTServerGrouping.get(resourceGroup);
if (tservers == null
|| !tservers.contains(new TabletServerIdImpl(current.getServerInstance()))) {
return TabletState.ASSIGNED_TO_WRONG_GROUP;
}
}
return TabletState.HOSTED;
} else {
return TabletState.ASSIGNED_TO_DEAD_SERVER;
}
} else if (getSuspend() != null) {
return TabletState.SUSPENDED;
} else {
return TabletState.UNASSIGNED;
}
}

public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() {
ensureFetched(ColumnType.ECOMP);
return extCompactions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -561,4 +562,26 @@ private static String limitTen(Collection<?> iterable) {
.collect(Collectors.joining(", ", "[", "]"));
}

@Override
public boolean needsReassignment(CurrentAssignment currentAssignment) {
String tableName;
try {
tableName = environment.getTableName(currentAssignment.getTablet().getTable());
} catch (TableNotFoundException e) {
LOG.trace("Table name not found for {}, assuming table was deleted",
currentAssignment.getTablet().getTable(), e);
// if the table was deleted, then other parts of Accumulo can sort that out
return false;
}

var hostPools = getPoolNamesForHost(currentAssignment.getTabletServer());
var poolForTable = getPoolNameForTable(tableName);

if (!hostPools.contains(poolForTable)) {
return true;
}

return getBalancerForTable(currentAssignment.getTablet().getTable())
.needsReassignment(currentAssignment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* <p>
* Note that in versions prior to 4.0 this class would pass all known TabletServers to the Table
* load balancers. In version 4.0 this changed with the introduction of the
* {@link TABLE_ASSIGNMENT_GROUP_PROPERTY} table property. If defined, this balancer passes the
* {@value #TABLE_ASSIGNMENT_GROUP_PROPERTY} table property. If defined, this balancer passes the
* TabletServers that have the corresponding {@link Property#TSERV_GROUP_NAME} property to the Table
* load balancer.
*
Expand Down Expand Up @@ -188,6 +188,22 @@ public void getAssignments(AssignmentParameters params) {
}
}

@Override
public boolean needsReassignment(CurrentAssignment currentAssignment) {
var tableId = currentAssignment.getTablet().getTable();
String value = environment.getConfiguration(tableId).get(TABLE_ASSIGNMENT_GROUP_PROPERTY);
String expectedGroup = (value == null || StringUtils.isEmpty(value))
? Constants.DEFAULT_RESOURCE_GROUP_NAME : value;

if (!expectedGroup.equals(currentAssignment.getResourceGroup())) {
// The tablet is not in the expected resource group, so it needs to be reassigned
return true;
}

// defer to the per table balancer
return getBalancerForTable(tableId).needsReassignment(currentAssignment);
}

@Override
public long balance(BalanceParameters params) {
long minBalanceTime = 5_000;
Expand All @@ -212,13 +228,4 @@ public long balance(BalanceParameters params) {
}
return minBalanceTime;
}

@Override
public String getResourceGroup(TabletId tabletId) {
String value =
environment.getConfiguration(tabletId.getTable()).get(TABLE_ASSIGNMENT_GROUP_PROPERTY);
return (value == null || StringUtils.isEmpty(value)) ? Constants.DEFAULT_RESOURCE_GROUP_NAME
: value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Set;
import java.util.SortedMap;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
Expand Down Expand Up @@ -135,13 +134,52 @@ interface BalanceParameters {
long balance(BalanceParameters params);

/**
* Get the ResourceGroup name for this tablet
* Provides access to information related to a tablet that is currently assigned to a tablet
* server.
*
* @param tabletId id of tablet
* @return resource group name
* @since 4.0.0
*/
default String getResourceGroup(TabletId tabletId) {
return Constants.DEFAULT_RESOURCE_GROUP_NAME;
interface CurrentAssignment {
TabletId getTablet();

TabletServerId getTabletServer();

String getResourceGroup();
}

/**
* <p>
* The manager periodically scans all tablets looking for tablets that are assigned to dead tablet
* servers or unassigned. During the scan this method is also called for tablets that are
* currently assigned to a live tserver to see if they should be unassigned and reassigned. If
* this method returns true the tablet will be unloaded from the tablet sever and then later the
* tablet will be passed to {@link #getAssignments(AssignmentParameters)}.
* </p>
*
* <p>
* One example use case for this method is a balancer that partitions tablet servers into groups.
* If the balancers config is changed such that a table that was assigned to tablet server group A
* should now be assigned to tablet server B, then this method can return true for the tablets in
* that table assigned to tablet server group A. After those tablets are unloaded and passed to
* the {@link #getAssignments(AssignmentParameters)} method it can reassign them to tablet server
* group B.
* </p>
*
* <p>
* Accumulo may instantiate this plugin in different processes and call this method. When the
* manager looks for tablets that needs reassignment it currently uses an Accumulo iterator to
* scan the metadata table and filter tablets. That iterator may run on multiple tablets servers
* and call this plugin. Keep this in mind when implementing this plugin and considering keeping
* state between calls to this method.
* </p>
*
* <p>
* This new method may be used instead of or in addition to {@link #balance(BalanceParameters)}
* </p>
*
* @since 4.0.0
*/
default boolean needsReassignment(CurrentAssignment currentAssignment) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testLocationStates() {

TabletMetadata tm =
TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false);
TabletState state = tm.getTabletState(tservers);
TabletState state = TabletState.compute(tm, tservers);

assertEquals(TabletState.ASSIGNED, state);
assertEquals(ser1, tm.getLocation().getServerInstance());
Expand All @@ -254,7 +254,7 @@ public void testLocationStates() {

tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false);

assertEquals(TabletState.HOSTED, tm.getTabletState(tservers));
assertEquals(TabletState.HOSTED, TabletState.compute(tm, tservers));
assertEquals(ser2, tm.getLocation().getServerInstance());
assertEquals(ser2.getSession(), tm.getLocation().getSession());
assertEquals(LocationType.CURRENT, tm.getLocation().getType());
Expand All @@ -268,7 +268,7 @@ public void testLocationStates() {

tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false);

assertEquals(TabletState.ASSIGNED_TO_DEAD_SERVER, tm.getTabletState(tservers));
assertEquals(TabletState.ASSIGNED_TO_DEAD_SERVER, TabletState.compute(tm, tservers));
assertEquals(deadSer, tm.getLocation().getServerInstance());
assertEquals(deadSer.getSession(), tm.getLocation().getSession());
assertEquals(LocationType.CURRENT, tm.getLocation().getType());
Expand All @@ -280,7 +280,7 @@ public void testLocationStates() {

tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false);

assertEquals(TabletState.UNASSIGNED, tm.getTabletState(tservers));
assertEquals(TabletState.UNASSIGNED, TabletState.compute(tm, tservers));
assertNull(tm.getLocation());
assertFalse(tm.hasCurrent());

Expand All @@ -293,7 +293,7 @@ public void testLocationStates() {

tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false);

assertEquals(TabletState.SUSPENDED, tm.getTabletState(tservers));
assertEquals(TabletState.SUSPENDED, TabletState.compute(tm, tservers));
assertEquals(1000L, tm.getSuspend().suspensionTime);
assertEquals(ser2.getHostAndPort(), tm.getSuspend().server);
assertNull(tm.getLocation());
Expand Down
Loading

0 comments on commit 170aeea

Please sign in to comment.