Skip to content

Commit

Permalink
Refactor MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm (#2…
Browse files Browse the repository at this point in the history
…3862)

* Remove useless ProxyContext.getRules()

* Refactor MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm
  • Loading branch information
terrymanu authored Jan 31, 2023
1 parent 7c6314c commit 0e23858
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.dbdiscovery.mysql.type;

import com.google.common.base.Strings;
import lombok.Getter;
import org.apache.shardingsphere.dbdiscovery.mysql.exception.replica.DuplicatePrimaryDataSourceException;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
Expand All @@ -37,11 +36,11 @@
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

/**
* Normal replication database discovery provider algorithm for MySQL.
*/
@Getter
public final class MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm implements DatabaseDiscoveryProviderAlgorithm {

private static final String SHOW_SLAVE_STATUS = "SHOW SLAVE STATUS";
Expand All @@ -50,32 +49,36 @@ public final class MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm impl

private static final String SHOW_VARIABLES_READ_ONLY = "SHOW VARIABLES LIKE 'read_only'";

@Getter
private Properties props;

private long delayMillisecondsThreshold;

@Override
public void init(final Properties props) {
this.props = props;
delayMillisecondsThreshold = Long.parseLong(props.getProperty("delay-milliseconds-threshold", "0"));
}

@Override
public void checkEnvironment(final String databaseName, final Collection<DataSource> dataSources) {
ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
Collection<CompletableFuture<Boolean>> completableFutures = new LinkedList<>();
for (DataSource dataSource : dataSources) {
completableFutures.add(supplyAsyncCheckEnvironment(dataSource, executorService));
}
checkPrimaryDataSource(databaseName, dataSources.stream().map(each -> asyncCheckEnvironment(executorService, each)).collect(Collectors.toList()));
}

private void checkPrimaryDataSource(final String databaseName, final Collection<CompletableFuture<Boolean>> completableFutures) {
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
Iterator<CompletableFuture<Boolean>> primaryInstancesFuture = completableFutures.stream().iterator();
int primaryCount = 0;
int primaryInstanceCount = 0;
while (primaryInstancesFuture.hasNext()) {
if (primaryInstancesFuture.next().join()) {
primaryCount++;
primaryInstanceCount++;
}
}
ShardingSpherePreconditions.checkState(1 == primaryCount, () -> new DuplicatePrimaryDataSourceException(databaseName));
ShardingSpherePreconditions.checkState(1 == primaryInstanceCount, () -> new DuplicatePrimaryDataSourceException(databaseName));
}

private CompletableFuture<Boolean> supplyAsyncCheckEnvironment(final DataSource dataSource, final ExecutorService executorService) {
private CompletableFuture<Boolean> asyncCheckEnvironment(final ExecutorService executorService, final DataSource dataSource) {
return CompletableFuture.supplyAsync(() -> {
try {
return isPrimaryInstance(dataSource);
Expand Down Expand Up @@ -122,12 +125,11 @@ public ReplicaDataSourceStatus loadReplicaStatus(final DataSource replicaDataSou
try (
Connection connection = replicaDataSource.getConnection();
Statement statement = connection.createStatement()) {
String delayMillisecondsThreshold = getProps().getProperty("delay-milliseconds-threshold");
if (Strings.isNullOrEmpty(delayMillisecondsThreshold)) {
if (0L == delayMillisecondsThreshold) {
return new ReplicaDataSourceStatus(true, 0L);
}
long replicationDelayMilliseconds = queryReplicationDelayMilliseconds(statement);
boolean isDelay = replicationDelayMilliseconds >= Long.parseLong(delayMillisecondsThreshold);
boolean isDelay = replicationDelayMilliseconds >= delayMillisecondsThreshold;
return new ReplicaDataSourceStatus(!isDelay, replicationDelayMilliseconds);
}
}
Expand Down

0 comments on commit 0e23858

Please sign in to comment.