Skip to content

Commit

Permalink
Add GraalVM Reachability Metadata and corresponding nativeTest for Co…
Browse files Browse the repository at this point in the history
…nsul integration
  • Loading branch information
linghengqian committed Jan 1, 2024
1 parent d152b65 commit 6f8e68c
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
Expand All @@ -27,14 +28,20 @@
import com.ecwid.consul.v1.session.model.Session;
import com.google.common.base.Strings;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -63,23 +70,31 @@ public final class ConsulRepository implements ClusterPersistRepository {

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
ConsulRawClient rawClient = Strings.isNullOrEmpty(config.getServerLists()) ? new ConsulRawClient() : new ConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
consulProps = new ConsulProperties(config.getProps());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists(), consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS));
consulClient = new ShardingSphereConsulClient(rawClient);
distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
}

@Override
public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
return null == response ? null : response.getValue().getValue();
if (null == response) {
return null;
}
GetValue value = response.getValue();
return null == value ? null : value.getValue();
}

@Override
public List<String> getChildrenKeys(final String key) {
Response<List<String>> response = consulClient.getKVKeysOnly(key);
return null == response ? Collections.emptyList() : response.getValue();
if (null == response) {
return Collections.emptyList();
}
List<String> value = response.getValue();
return null == value ? Collections.emptyList() : value;
}

@Override
Expand All @@ -102,9 +117,15 @@ public void delete(final String key) {
consulClient.deleteKVValue(key);
}

/**
* {@link ConsulRawClient} is a wrapper of blocking HTTP client and does not have a close method.
* Using such a Client does not necessarily conform to the implementation of the relevant SPI. ShardingSphere needs to
* consider solutions similar to <a href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
*
* @see ConsulRawClient
*/
@Override
public void close() {
// TODO
}

@Override
Expand All @@ -115,6 +136,22 @@ public void persistEphemeral(final String key, final String value) {
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
generatorFlushSessionTtlTask(consulClient, sessionId);
verifyConsulAgentRunning();
}

@SuppressWarnings("HttpUrlsUsage")
@SneakyThrows(MalformedURLException.class)
private ConsulRawClient createConsulRawClient(final String serverLists, final long blockQueryTimeToSeconds) {
CloseableHttpClient httpClient = HttpClientBuilder.create().setConnectionTimeToLive(blockQueryTimeToSeconds, TimeUnit.SECONDS).build();
if (Strings.isNullOrEmpty(serverLists)) {
return new ConsulRawClient(httpClient);
}
URL serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
if (-1 == serverUrl.getPort()) {
return new ConsulRawClient(serverUrl.getHost(), httpClient);
} else {
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort(), httpClient);
}
}

private NewSession createNewSession(final String key) {
Expand Down Expand Up @@ -142,23 +179,27 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : response.getValue()) {
for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(response.getValue().size(), 1F);
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : response.getValue()) {
for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
Expand Down Expand Up @@ -189,12 +230,19 @@ private void fireDataChangeEvent(final GetValue getValue, final DataChangedEvent
* Flush session by update TTL.
*
* @param consulClient consul client
* @param sessionId session id
* @param sessionId session id
*/
public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
}

private void verifyConsulAgentRunning() {
HttpResponse httpResponse = consulClient.getRawClient().makeGetRequest("/v1/status/leader");
if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
throw new RuntimeException("Unable to connect to Consul Agent and StatusCode is " + httpResponse.getStatusCode() + ".");
}
}

@Override
public String getType() {
return "Consul";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.awaitility.Awaitility;
Expand All @@ -47,6 +50,7 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -83,6 +87,12 @@ class ConsulRepositoryTest {
@Mock
private List<GetValue> getValueList;

@Mock
private ConsulRawClient consulRawClient;

@Mock
private HttpResponse httpResponse;

private long index = 123456L;

@BeforeEach
Expand Down Expand Up @@ -140,6 +150,9 @@ void assertGetChildrenKeys() {

@Test
void assertPersistEphemeral() {
when(client.getRawClient()).thenReturn(consulRawClient);
when(consulRawClient.makeGetRequest(any(String.class))).thenReturn(httpResponse);
when(httpResponse.getStatusCode()).thenReturn(HttpStatus.SC_OK);
repository.persistEphemeral("key1", "value1");
verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
Expand Down Expand Up @@ -205,4 +218,13 @@ void assertPersist() {
repository.persist("key1", "value1");
verify(client).setKVValue(any(String.class), any(String.class));
}

@Test
void assertNullResponse() {
when(response.getValue()).thenReturn(null);
assertDoesNotThrow(() -> {
repository.getDirectly("key");
repository.getChildrenKeys("key");
});
}
}
6 changes: 6 additions & 0 deletions test/native/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-cluster-mode-repository-consul</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.natived.jdbc.commons.testcontainers;

import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.Capability;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@SuppressWarnings({"FieldMayBeFinal", "FieldCanBeLocal", "resource", "DataFlowIssue", "unused"})
public class ShardingSphereConsulContainer extends GenericContainer<ShardingSphereConsulContainer> {

private static final DockerImageName DEFAULT_OLD_IMAGE_NAME = DockerImageName.parse("consul");

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("hashicorp/consul");

private static final int CONSUL_HTTP_PORT = 8500;

private static final int CONSUL_GRPC_PORT = 8502;

private List<String> initCommands = new ArrayList<>();

private String[] startConsulCmd = new String[]{"agent", "-dev", "-client", "0.0.0.0"};

/**
* Manually specify the Port for ShardingSphere's nativeTest.
* @param dockerImageName docker image name
*/
public ShardingSphereConsulContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_OLD_IMAGE_NAME, DEFAULT_IMAGE_NAME);
setWaitStrategy(Wait.forHttp("/v1/status/leader").forPort(CONSUL_HTTP_PORT).forStatusCode(200));
withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withCapAdd(Capability.IPC_LOCK);
cmd.withHostConfig(new HostConfig().withPortBindings(new PortBinding(Ports.Binding.bindPort(62391), new ExposedPort(CONSUL_HTTP_PORT))));
});
withEnv("CONSUL_ADDR", "http://0.0.0.0:" + CONSUL_HTTP_PORT);
withCommand(startConsulCmd);
}

@Override
protected void containerIsStarted(final InspectContainerResponse containerInfo) {
if (!initCommands.isEmpty()) {
String commands = initCommands.stream().map(command -> "consul " + command).collect(Collectors.joining(" && "));
try {
ExecResult execResult = this.execInContainer("/bin/sh", "-c", commands);
if (0 != execResult.getExitCode()) {
logger().error(
"Failed to execute these init commands {}. Exit code {}. Stdout {}. Stderr {}",
initCommands,
execResult.getExitCode(),
execResult.getStdout(),
execResult.getStderr());
}
} catch (IOException | InterruptedException e) {
logger().error(
"Failed to execute these init commands {}. Exception message: {}",
initCommands,
e.getMessage());
}
}
}

/**
* work with Consul Command.
* @param commands The commands to send to the consul cli
* @return this
*/
public ShardingSphereConsulContainer withConsulCommand(final String... commands) {
initCommands.addAll(Arrays.asList(commands));
return self();
}
}
Loading

0 comments on commit 6f8e68c

Please sign in to comment.