Skip to content

Commit

Permalink
- Handle errors from refresh() separately
Browse files Browse the repository at this point in the history
- Account for control connection count in tests
- Code refactoring in tests and code styling changes
  • Loading branch information
ashetkar committed Apr 29, 2024
1 parent 27cd3e2 commit fdb3411
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

package com.yugabyte.ysql;

import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

Expand Down
73 changes: 45 additions & 28 deletions pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

Expand All @@ -30,16 +35,6 @@ public class LoadBalanceManager {
private static boolean forceRefreshOnce = false;
private static Boolean useHostColumn = null;

/**
* FOR TEST PURPOSE ONLY
*/
public static synchronized void clear() {
forceRefreshOnce = false;
clusterInfoMap.clear();
lastRefreshTime = 0;
useHostColumn = null;
}

/**
* FOR TEST PURPOSE ONLY
*/
Expand All @@ -64,13 +59,13 @@ public static long getLastRefreshTime() {

public static boolean needsRefresh(long refreshInterval) {
if (forceRefreshOnce) {
LOGGER.finest("forceRefreshOnce is set to true");
LOGGER.fine("forceRefreshOnce is set to true");
return true;
}
long elapsed = (System.currentTimeMillis() - lastRefreshTime) / 1000;
if (elapsed > refreshInterval) {
LOGGER.fine("Needs refresh as list of servers may be stale or being fetched for " +
"the first time, refreshInterval: " + refreshInterval);
if (elapsed >= refreshInterval) {
LOGGER.fine("Needs refresh as list of servers may be stale or being fetched for "
+ "the first time, refreshInterval: " + refreshInterval);
return true;
}
LOGGER.fine("Refresh not required, refreshInterval: " + refreshInterval);
Expand Down Expand Up @@ -150,16 +145,18 @@ private static synchronized boolean refresh(Connection conn, long refreshInterva
clusterInfoMap.remove(info.host);
}
} else if (useHostColumn == null) {
LOGGER.warning("Unable to identify set of addresses to use for establishing connections. " +
"Using private addresses.");
LOGGER.warning("Unable to identify set of addresses to use for establishing connections. "
+ "Using private addresses.");
}
lastRefreshTime = System.currentTimeMillis();
return true;
}

public static void markAsFailed(String host) {
NodeInfo info = clusterInfoMap.get(host);
if (info == null) return; // unexpected
if (info == null) {
return; // unexpected
}
synchronized (info) {
String previous = info.isDown ? "DOWN" : "UP";
info.isDown = true;
Expand Down Expand Up @@ -257,6 +254,7 @@ public static Connection getConnection(LoadBalanceProperties loadBalanceProperti
String url = loadBalanceProperties.getStrippedURL();

if (!checkAndRefresh(loadBalanceProperties, lb, user, dbName)) {
LOGGER.fine("checkAndRefresh() returns false");
return null;
}

Expand Down Expand Up @@ -297,6 +295,7 @@ public static Connection getConnection(LoadBalanceProperties loadBalanceProperti
}
chosenHost = lb.getLeastLoadedServer(false, failedHosts);
}
LOGGER.fine("No host could be chosen");
return null;
}

Expand All @@ -309,37 +308,55 @@ private static synchronized boolean checkAndRefresh(LoadBalanceProperties loadBa

ArrayList<String> hosts = getAllAvailableHosts(new ArrayList<>());
while (true) {
boolean refreshFailed = false;
try {
if (controlConnection == null || controlConnection.isClosed()) {
controlConnection = new PgConnection(hspec, user, dbName, props, url);
}
refresh(controlConnection, lb.getRefreshListSeconds());
break;
try {
refresh(controlConnection, lb.getRefreshListSeconds());
break;
} catch (SQLException e) {
// May fail with "terminating connection due to unexpected postmaster exit", 57P01
refreshFailed = true;
throw e;
}
} catch (SQLException ex) {
if (refreshFailed) {
LOGGER.fine("Exception while refreshing: " + ex + ", " + ex.getSQLState());
} else {
LOGGER.fine("Exception while creating control connection to "
+ hspec[0].getHost() + ": " + ex + ", " + ex.getSQLState());
}
if (PSQLState.UNDEFINED_FUNCTION.getState().equals(ex.getSQLState())) {
LOGGER.warning("Received error UNDEFINED_FUNCTION (42883)");
return false;
}
if (PSQLState.CONNECTION_UNABLE_TO_CONNECT.getState().equals(ex.getSQLState())) {
LOGGER.warning("Received error CONNECTION_UNABLE_TO_CONNECT for " + hspec[0].getHost());
for (HostSpec h : hspec) {
markAsFailed(h.getHost());
}
}
// Retry until servers are available
for (HostSpec h : hspec) {
hosts.remove(h.getHost());
// Remove hspec only if exception not thrown from refresh() above
// Since it could be a stale connection
if (!refreshFailed) {
for (HostSpec h : hspec) {
hosts.remove(h.getHost());
}
}
// Retry until servers are available
if (hosts.isEmpty()) {
LOGGER.fine("Failed to establish control connection to available servers");
return false;
} else {
} else if (!refreshFailed) {
// Try the first host in the list (don't have to check least loaded one since it's
// just for the control connection)
HostSpec hs = new HostSpec(hosts.get(0), LoadBalanceManager.getPort(hosts.get(0)),
loadBalanceProperties.getOriginalProperties().getProperty("localSocketAddress"));
hspec = new HostSpec[]{hs};
controlConnection = null;
}
controlConnection = null;
}
}
}
Expand Down Expand Up @@ -443,9 +460,9 @@ public boolean equals(Object other) {
LOGGER.fine("equals called for this: " + this + " and other = " + other);
if (other instanceof CloudPlacement) {
CloudPlacement o = (CloudPlacement) other;
equal = this.cloud.equalsIgnoreCase(o.cloud) &&
this.region.equalsIgnoreCase(o.region) &&
this.zone.equalsIgnoreCase(o.zone);
equal = this.cloud.equalsIgnoreCase(o.cloud)
&& this.region.equalsIgnoreCase(o.region)
&& this.zone.equalsIgnoreCase(o.zone);
}
LOGGER.fine("equals returning: " + equal);
return equal;
Expand Down
4 changes: 2 additions & 2 deletions pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ public interface LoadBalancer {
* @param e The {@link com.yugabyte.ysql.LoadBalanceManager.NodeInfo} object for the host
* @return true, if a host is eligible to be considered for a connection request
*/
public boolean isHostEligible(Map.Entry<String, LoadBalanceManager.NodeInfo> e);
boolean isHostEligible(Map.Entry<String, LoadBalanceManager.NodeInfo> e);

/**
* @param newRequest whether this invocation is first for a new connection request
* @param failedHosts list of host names which have been known to be down
* @return the name of a host with the least number of connections, as per the driver's stats
*/
public String getLeastLoadedServer(boolean newRequest, List<String> failedHosts);
String getLeastLoadedServer(boolean newRequest, List<String> failedHosts);

/**
* @return the value of the property "yb-servers-refresh-interval" specified either in the url or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@

package com.yugabyte.ysql;

import static com.yugabyte.ysql.LoadBalanceProperties.*;

import java.util.*;
import static com.yugabyte.ysql.LoadBalanceProperties.DEFAULT_REFRESH_INTERVAL;
import static com.yugabyte.ysql.LoadBalanceProperties.LOCATIONS_DELIMITER;
import static com.yugabyte.ysql.LoadBalanceProperties.MAX_PREFERENCE_VALUE;
import static com.yugabyte.ysql.LoadBalanceProperties.PREFERENCE_DELIMITER;
import static com.yugabyte.ysql.LoadBalanceProperties.REFRESH_INTERVAL_KEY;
import static com.yugabyte.ysql.LoadBalanceProperties.TOPOLOGY_AWARE_PROPERTY_KEY;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

Expand All @@ -42,7 +52,6 @@ public class TopologyAwareLoadBalancer implements LoadBalancer {
private int refreshIntervalSeconds;
private boolean explicitFallbackOnly = false;


public TopologyAwareLoadBalancer(String placementValues, boolean onlyExplicitFallback) {
placements = placementValues;
explicitFallbackOnly = onlyExplicitFallback;
Expand All @@ -62,8 +71,8 @@ private void populatePlacementSet(String placement,
// Return an error so the user takes corrective action.
LOGGER.warning(
"Malformed " + TOPOLOGY_AWARE_PROPERTY_KEY + " property value: " + placement);
throw new IllegalArgumentException("Malformed " + TOPOLOGY_AWARE_PROPERTY_KEY + " property " +
"value: " + placement);
throw new IllegalArgumentException("Malformed " + TOPOLOGY_AWARE_PROPERTY_KEY
+ " property value: " + placement);
}
LoadBalanceManager.CloudPlacement cp = new LoadBalanceManager.CloudPlacement(
placementParts[0], placementParts[1], placementParts[2]);
Expand Down Expand Up @@ -110,8 +119,8 @@ public boolean isHostEligible(Map.Entry<String, LoadBalanceManager.NodeInfo> e)
|| (set != null && e.getValue().getPlacement().isContainedIn(set));
boolean isAttempted = attempted.contains(e.getKey());
boolean isDown = e.getValue().isDown();
LOGGER.fine(e.getKey() + " has required placement? " + found + ", isDown? " + isDown + ", " +
"attempted? " + isAttempted);
LOGGER.fine(e.getKey() + " has required placement? " + found + ", isDown? "
+ isDown + ", attempted? " + isAttempted);
return found
&& !isAttempted
&& !isDown;
Expand Down Expand Up @@ -159,8 +168,8 @@ public synchronized String getLeastLoadedServer(boolean newRequest, List<String>
if (chosenHost != null) {
LoadBalanceManager.incrementConnectionCount(chosenHost);
} else {
LOGGER.fine("chosenHost is null for placement level " + currentPlacementIndex + ", " +
"allowedPlacements: " + allowedPlacements);
LOGGER.fine("chosenHost is null for placement level " + currentPlacementIndex
+ ", allowedPlacements: " + allowedPlacements);
currentPlacementIndex += 1;
while (allowedPlacements.get(currentPlacementIndex) == null && currentPlacementIndex > 0) {
currentPlacementIndex += 1;
Expand Down
Loading

0 comments on commit fdb3411

Please sign in to comment.