Skip to content

Commit

Permalink
Code clean up and formatting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ashetkar committed Sep 6, 2024
1 parent 9fc66db commit c5bfcef
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void main(String[] args) {

System.out.println("Setting up the connection pool having 6 connections.......");

testUsingHikariPool("uniform_load_balance", "true", "simple", // Change this 'simple'
testUsingHikariPool("uniform_load_balance", "true", "ignored",
controlHost, controlPort, numConnections, verbose, interactive);
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ kotlin.parallel.tasks.in.project=true
# This is version for PgJdbc itself
# Note: it should not include "-SNAPSHOT" as it is automatically added by build.gradle.kts
# Release version can be generated by using -Prelease or -Prc=<int> arguments
pgjdbc.version=42.3.5-yb-7-SNAPSHOT
pgjdbc.version=42.3.5-yb-7

# The options below configures the use of local clone (e.g. testing development versions)
# You can pass un-comment it, or pass option -PlocalReleasePlugins, or -PlocalReleasePlugins=<path>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@

package com.yugabyte.ysql;

import com.yugabyte.ysql.LoadBalanceService.LoadBalance;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

import com.yugabyte.ysql.LoadBalanceService.LoadBalance;

public class ClusterAwareLoadBalancer implements LoadBalancer {
protected static final Logger LOGGER =
Logger.getLogger("org.postgresql." + ClusterAwareLoadBalancer.class.getName());
protected static final Logger LOGGER = Logger.getLogger("org.postgresql." + ClusterAwareLoadBalancer.class.getName());

private static volatile ClusterAwareLoadBalancer instance;
private List<String> attempted = new ArrayList<>();
Expand Down Expand Up @@ -51,7 +50,8 @@ public static ClusterAwareLoadBalancer getInstance(LoadBalanceService.LoadBalanc
instance.refreshListSeconds =
refreshListSeconds >= 0 && refreshListSeconds <= LoadBalanceProperties.MAX_REFRESH_INTERVAL ?
refreshListSeconds : LoadBalanceProperties.DEFAULT_REFRESH_INTERVAL;
LOGGER.fine("Created a new cluster-aware LB instance with loadbalance = " + instance.loadBalance + " and refresh interval " + instance.refreshListSeconds + " seconds");
LOGGER.fine("Created a new cluster-aware LB instance with loadbalance = " +
instance.loadBalance + " and refresh interval " + instance.refreshListSeconds + " seconds");
}
}
}
Expand All @@ -65,38 +65,8 @@ public String toString() {
@Override
public boolean isHostEligible(Map.Entry<String, LoadBalanceService.NodeInfo> e,
Byte requestFlags) {
return !attempted.contains(e.getKey()) && !e.getValue().isDown() && isRightNodeType(e.getValue().getNodeType(), requestFlags);
}

private boolean isRightNodeType(String nodeType, byte requestFlags) {
switch (loadBalance) {
case ANY:
LOGGER.fine("case ANY");
return true;
case ONLY_PRIMARY:
LOGGER.fine("case ONLY_PRIMARY, nodeType " + nodeType);
return nodeType.equalsIgnoreCase("primary");
case ONLY_RR:
LOGGER.fine("case ONLY_RR, nodeType " + nodeType);
return nodeType.equalsIgnoreCase("read_replica");
case PREFER_PRIMARY:
LOGGER.fine("case PREFER_PRIMARY, nodeType " + nodeType + " requestFlag " + requestFlags);
if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) {
return nodeType.equalsIgnoreCase("primary");
} else {
return nodeType.equalsIgnoreCase("primary") || nodeType.equalsIgnoreCase("read_replica");
}
case PREFER_RR:
LOGGER.fine("case PREFER_RR, nodeType " + nodeType + " requestFlag " + requestFlags);
if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) {
return nodeType.equalsIgnoreCase("read_replica");
} else {
return nodeType.equalsIgnoreCase("primary") || nodeType.equalsIgnoreCase("read_replica");
}
default:
LOGGER.fine("case default");
return false;
}
return !attempted.contains(e.getKey()) && !e.getValue().isDown()
&& LoadBalanceService.isRightNodeType(loadBalance, e.getValue().getNodeType(), requestFlags);
}

public synchronized String getLeastLoadedServer(boolean newRequest, List<String> failedHosts,
Expand Down Expand Up @@ -148,7 +118,7 @@ public synchronized String getLeastLoadedServer(boolean newRequest, List<String>
if (chosenHost == null && (loadBalance == LoadBalance.ONLY_PRIMARY || loadBalance == LoadBalance.ONLY_RR)) {
throw new IllegalStateException("No node available in "
+ (loadBalance == LoadBalance.ONLY_PRIMARY ? "primary" : "read-replica")
+ " cluster to connect to");
+ " cluster to connect to.");
}
return chosenHost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.logging.Logger;

public class LoadBalanceProperties {
private static final String SIMPLE_LB = "simple";
public static final String LOAD_BALANCE_PROPERTY_KEY = "load-balance";
public static final String TOPOLOGY_AWARE_PROPERTY_KEY = "topology-keys";
public static final String REFRESH_INTERVAL_KEY = "yb-servers-refresh-interval";
Expand Down Expand Up @@ -227,7 +226,7 @@ private void setLoadBalanceValue(String value) {
default:
LOGGER.warning("Invalid value for load-balance: " + value + ", ignoring it.");
}
LOGGER.info("loadbalance value set to " + this.loadBalance);
LOGGER.fine("loadbalance value set to " + this.loadBalance);
}

private int parseAndGetValue(String propValue, int defaultValue, int maxValue) {
Expand Down Expand Up @@ -283,7 +282,7 @@ public LoadBalancer getAppropriateLoadBalancer() {
// return base class conn manager.
ld = CONNECTION_MANAGER_MAP.get(this.loadBalance.name());
if (ld == null) {
LOGGER.fine(">>>>>>>>>>>>>>>>>>>>>>>>>>> No LB found for " + this.loadBalance + ", creating one ...");
LOGGER.fine("No LB found for " + this.loadBalance + ", creating one ...");
synchronized (CONNECTION_MANAGER_MAP) {
ld = CONNECTION_MANAGER_MAP.get(this.loadBalance.name());
if (ld == null) {
Expand All @@ -292,13 +291,13 @@ public LoadBalancer getAppropriateLoadBalancer() {
}
}
} else {
LOGGER.fine(">>>>>>>>>>>>>>>>>>>>>>>>>>> LB found for " + this.loadBalance + ": " + ld);
LOGGER.fine("LB found for " + this.loadBalance + ": " + ld);
}
} else {
String key = this.loadBalance.name() + "&" + placements + "&" + String.valueOf(explicitFallbackOnly).toLowerCase(Locale.ROOT);
ld = CONNECTION_MANAGER_MAP.get(key);
if (ld == null) {
LOGGER.fine(">>>>>>>>>>>>>>>>>>>>>>>>>>> No LB found for " + this.loadBalance + " and placements " + placements + " and fallback? " + explicitFallbackOnly + ", creating one ...");
LOGGER.fine("No LB found for " + this.loadBalance + " and placements " + placements + " and fallback? " + explicitFallbackOnly + ", creating one ...");
synchronized (CONNECTION_MANAGER_MAP) {
ld = CONNECTION_MANAGER_MAP.get(key);
if (ld == null) {
Expand All @@ -307,7 +306,7 @@ public LoadBalancer getAppropriateLoadBalancer() {
}
}
} else {
LOGGER.fine(">>>>>>>>>>>>>>>>>>>>>>>>>>> LB found for " + this.loadBalance + " and placements " + placements + ": " + ld);
LOGGER.fine("LB found for " + this.loadBalance + " and placements " + placements + ": " + ld);
}
}
return ld;
Expand Down
32 changes: 9 additions & 23 deletions pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalanceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.yugabyte.ysql.LoadBalanceProperties.FAILED_HOST_RECONNECT_DELAY_SECS_KEY;
import static org.postgresql.Driver.hostSpecs;

import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
Expand All @@ -28,13 +27,11 @@

public class LoadBalanceService {

private static final ConcurrentHashMap<String, NodeInfo> clusterInfoMap =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, NodeInfo> clusterInfoMap = new ConcurrentHashMap<>();
static final byte STRICT_PREFERENCE = 0b00000001;
private static Connection controlConnection = null;
protected static final String GET_SERVERS_QUERY = "select * from yb_servers()";
protected static final Logger LOGGER =
Logger.getLogger("org.postgresql." + LoadBalanceService.class.getName());
protected static final Logger LOGGER = Logger.getLogger("org.postgresql." + LoadBalanceService.class.getName());
private static long lastRefreshTime;
private static boolean forceRefreshOnce = false;
private static Boolean useHostColumn = null;
Expand Down Expand Up @@ -102,8 +99,7 @@ private static synchronized boolean refresh(Connection conn, long refreshInterva
String region = rs.getString("region");
String zone = rs.getString("zone");
String nodeType = rs.getString("node_type");
NodeInfo nodeInfo = clusterInfoMap.containsKey(host) ? clusterInfoMap.get(host) :
new NodeInfo();
NodeInfo nodeInfo = clusterInfoMap.containsKey(host) ? clusterInfoMap.get(host) : new NodeInfo();
synchronized (nodeInfo) {
nodeInfo.host = host;
nodeInfo.publicIP = publicHost;
Expand All @@ -114,20 +110,16 @@ private static synchronized boolean refresh(Connection conn, long refreshInterva
try {
nodeInfo.port = Integer.valueOf(port);
} catch (NumberFormatException nfe) {
LOGGER.warning("Could not parse port " + port + " for host " + host + ", using 5433 " +
"instead.");
LOGGER.warning("Could not parse port " + port + " for host " + host + ", using 5433 instead.");
nodeInfo.port = 5433;
}
long failedHostTTL = Long.getLong(FAILED_HOST_RECONNECT_DELAY_SECS_KEY,
DEFAULT_FAILED_HOST_TTL_SECONDS);
long failedHostTTL = Long.getLong(FAILED_HOST_RECONNECT_DELAY_SECS_KEY, DEFAULT_FAILED_HOST_TTL_SECONDS);
if (nodeInfo.isDown) {
if (System.currentTimeMillis() - nodeInfo.isDownSince > (failedHostTTL * 1000)) {
LOGGER.fine("Marking " + nodeInfo.host + " as UP since failed-host-reconnect-delay" +
"-secs (" + failedHostTTL + "s) has elapsed");
LOGGER.fine("Marking " + nodeInfo.host + " as UP since failed-host-reconnect-delay-secs (" + failedHostTTL + "s) has elapsed");
nodeInfo.isDown = false;
} else {
LOGGER.fine("Keeping " + nodeInfo.host + " as DOWN since failed-host-reconnect-delay" +
"-secs (" + failedHostTTL + "s) has not elapsed");
LOGGER.fine("Keeping " + nodeInfo.host + " as DOWN since failed-host-reconnect-delay-secs (" + failedHostTTL + "s) has not elapsed");
}
}
}
Expand Down Expand Up @@ -355,8 +347,7 @@ private static synchronized boolean checkAndRefresh(LoadBalanceProperties loadBa
} catch (SQLException ex) {
if (refreshFailed) {
LOGGER.fine("Exception while refreshing: " + ex + ", " + ex.getSQLState());
String failed =
((PgConnection) controlConnection).getQueryExecutor().getHostSpec().getHost();
String failed = ((PgConnection) controlConnection).getQueryExecutor().getHostSpec().getHost();
markAsFailed(failed);
} else {
String msg = hspec.length > 1 ? " and others" : "";
Expand Down Expand Up @@ -410,32 +401,27 @@ private static InetAddress getConnectedInetAddress(Connection conn) throws SQLEx
}

static boolean isRightNodeType(LoadBalance loadBalance, String nodeType, byte requestFlags) {
LOGGER.fine("loadBalance " + loadBalance + ", nodeType: " + nodeType + ", requestFlags: " + requestFlags);
switch (loadBalance) {
case ANY:
LOGGER.fine("case ANY");
return true;
case ONLY_PRIMARY:
LOGGER.fine("case ONLY_PRIMARY, nodeType " + nodeType);
return nodeType.equalsIgnoreCase("primary");
case ONLY_RR:
LOGGER.fine("case ONLY_RR, nodeType " + nodeType);
return nodeType.equalsIgnoreCase("read_replica");
case PREFER_PRIMARY:
LOGGER.fine("case PREFER_PRIMARY, nodeType " + nodeType + " requestFlag " + requestFlags);
if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) {
return nodeType.equalsIgnoreCase("primary");
} else {
return nodeType.equalsIgnoreCase("primary") || nodeType.equalsIgnoreCase("read_replica");
}
case PREFER_RR:
LOGGER.fine("case PREFER_RR, nodeType " + nodeType + " requestFlag " + requestFlags);
if (requestFlags == LoadBalanceService.STRICT_PREFERENCE) {
return nodeType.equalsIgnoreCase("read_replica");
} else {
return nodeType.equalsIgnoreCase("primary") || nodeType.equalsIgnoreCase("read_replica");
}
default:
LOGGER.fine("case default");
return false;
}
}
Expand Down
3 changes: 1 addition & 2 deletions pgjdbc/src/main/java/com/yugabyte/ysql/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public interface LoadBalancer {
* @param timedOutHosts list of host names where connections were attempted but timed out
* @return the name of a host with the least number of connections, as per the driver's stats
*/
String getLeastLoadedServer(boolean newRequest, List<String> failedHosts,
ArrayList<String> timedOutHosts);
String getLeastLoadedServer(boolean newRequest, List<String> failedHosts, ArrayList<String> timedOutHosts);

/**
* @return the value of the property "yb-servers-refresh-interval" specified either in the url or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static com.yugabyte.ysql.LoadBalanceProperties.REFRESH_INTERVAL_KEY;
import static com.yugabyte.ysql.LoadBalanceProperties.TOPOLOGY_AWARE_PROPERTY_KEY;

import com.yugabyte.ysql.LoadBalanceService.LoadBalance;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,11 +31,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

import com.yugabyte.ysql.LoadBalanceService.LoadBalance;

public class TopologyAwareLoadBalancer implements LoadBalancer {
protected static final Logger LOGGER =
Logger.getLogger("org.postgresql." + TopologyAwareLoadBalancer.class.getName());
protected static final Logger LOGGER = Logger.getLogger("org.postgresql." + TopologyAwareLoadBalancer.class.getName());
/**
* Holds the value of topology-keys specified.
*/
Expand All @@ -44,8 +43,7 @@ public class TopologyAwareLoadBalancer implements LoadBalancer {
/**
* Derived from the placements value above.
*/
private final Map<Integer, Set<LoadBalanceService.CloudPlacement>> allowedPlacements =
new HashMap<>();
private final Map<Integer, Set<LoadBalanceService.CloudPlacement>> allowedPlacements = new HashMap<>();
private final int PRIMARY_PLACEMENTS_INDEX = 1;
private final int REST_OF_CLUSTER_INDEX = -1;
/**
Expand Down Expand Up @@ -101,8 +99,7 @@ private void parseGeoLocations() {
} else {
int pref = Integer.parseInt(v[1]);
if (pref > 0 && pref <= MAX_PREFERENCE_VALUE) {
Set<LoadBalanceService.CloudPlacement> cpSet = allowedPlacements.computeIfAbsent(pref,
k -> new HashSet<>());
Set<LoadBalanceService.CloudPlacement> cpSet = allowedPlacements.computeIfAbsent(pref, k -> new HashSet<>());
populatePlacementSet(v[0], cpSet);
} else {
throw new IllegalArgumentException("Invalid preference value for property " + TOPOLOGY_AWARE_PROPERTY_KEY + ": " + value);
Expand Down Expand Up @@ -139,8 +136,7 @@ public boolean isHostEligible(Map.Entry<String, LoadBalanceService.NodeInfo> e,
&& isRightNode;
}

public synchronized String getLeastLoadedServer(boolean newRequest, List<String> failedHosts,
ArrayList<String> timedOutHosts) {
public synchronized String getLeastLoadedServer(boolean newRequest, List<String> failedHosts, ArrayList<String> timedOutHosts) {
LOGGER.fine("newRequest: " + newRequest + ", failedHosts: " + failedHosts);
// Reset currentPlacementIndex if it's a new request AND refresh() happened after the
// last request was processed
Expand Down
2 changes: 1 addition & 1 deletion pgjdbc/src/main/java/org/postgresql/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.postgresql.util.SharedTimer;
import org.postgresql.util.URLCoder;

import com.yugabyte.ysql.LoadBalanceService;
import com.yugabyte.ysql.LoadBalanceProperties;
import com.yugabyte.ysql.LoadBalanceService;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.IOException;
Expand Down

0 comments on commit c5bfcef

Please sign in to comment.