diff --git a/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java b/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java index d561182baf33e..e0392edc455c8 100644 --- a/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java +++ b/features/db-discovery/provider/mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.dbdiscovery.mysql.type; -import com.google.common.base.Strings; import lombok.Getter; import org.apache.shardingsphere.dbdiscovery.mysql.exception.replica.DuplicatePrimaryDataSourceException; import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm; @@ -37,11 +36,11 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; /** * Normal replication database discovery provider algorithm for MySQL. */ -@Getter public final class MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm implements DatabaseDiscoveryProviderAlgorithm { private static final String SHOW_SLAVE_STATUS = "SHOW SLAVE STATUS"; @@ -50,32 +49,36 @@ public final class MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm impl private static final String SHOW_VARIABLES_READ_ONLY = "SHOW VARIABLES LIKE 'read_only'"; + @Getter private Properties props; + private long delayMillisecondsThreshold; + @Override public void init(final Properties props) { this.props = props; + delayMillisecondsThreshold = Long.parseLong(props.getProperty("delay-milliseconds-threshold", "0")); } @Override public void checkEnvironment(final String databaseName, final Collection dataSources) { ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService(); - Collection> completableFutures = new LinkedList<>(); - for (DataSource dataSource : dataSources) { - completableFutures.add(supplyAsyncCheckEnvironment(dataSource, executorService)); - } + checkPrimaryDataSource(databaseName, dataSources.stream().map(each -> asyncCheckEnvironment(executorService, each)).collect(Collectors.toList())); + } + + private void checkPrimaryDataSource(final String databaseName, final Collection> completableFutures) { CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); Iterator> primaryInstancesFuture = completableFutures.stream().iterator(); - int primaryCount = 0; + int primaryInstanceCount = 0; while (primaryInstancesFuture.hasNext()) { if (primaryInstancesFuture.next().join()) { - primaryCount++; + primaryInstanceCount++; } } - ShardingSpherePreconditions.checkState(1 == primaryCount, () -> new DuplicatePrimaryDataSourceException(databaseName)); + ShardingSpherePreconditions.checkState(1 == primaryInstanceCount, () -> new DuplicatePrimaryDataSourceException(databaseName)); } - private CompletableFuture supplyAsyncCheckEnvironment(final DataSource dataSource, final ExecutorService executorService) { + private CompletableFuture asyncCheckEnvironment(final ExecutorService executorService, final DataSource dataSource) { return CompletableFuture.supplyAsync(() -> { try { return isPrimaryInstance(dataSource); @@ -122,12 +125,11 @@ public ReplicaDataSourceStatus loadReplicaStatus(final DataSource replicaDataSou try ( Connection connection = replicaDataSource.getConnection(); Statement statement = connection.createStatement()) { - String delayMillisecondsThreshold = getProps().getProperty("delay-milliseconds-threshold"); - if (Strings.isNullOrEmpty(delayMillisecondsThreshold)) { + if (0L == delayMillisecondsThreshold) { return new ReplicaDataSourceStatus(true, 0L); } long replicationDelayMilliseconds = queryReplicationDelayMilliseconds(statement); - boolean isDelay = replicationDelayMilliseconds >= Long.parseLong(delayMillisecondsThreshold); + boolean isDelay = replicationDelayMilliseconds >= delayMillisecondsThreshold; return new ReplicaDataSourceStatus(!isDelay, replicationDelayMilliseconds); } }