Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the broker stats with different environments #570

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions front-end/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"mockjs": "1.0.1-beta3",
"normalize.css": "7.0.0",
"nprogress": "0.2.0",
"qs": "^6.12.1",
"showdown": "1.9.1",
"sortablejs": "1.7.0",
"vue": "2.6.0",
Expand Down
6 changes: 5 additions & 1 deletion front-end/src/utils/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import { getEnvironment } from '@/utils/environment'
import { getTenant } from '@/utils/tenant'
import router from '../router'
import { getCsrfToken } from '@/utils/csrfToken'
import qs from "qs";

// create an axios instance
const service = axios.create({
baseURL: process.env.BASE_API, // api 的 base_url
timeout: 60000 // request timeout
timeout: 60000, // request timeout
paramsSerializer: function(params) {
return qs.stringify(params, { arrayFormat: 'repeat' })
}
})

// request interceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.pulsar.client.admin.Topics;

public interface PulsarAdminService {
PulsarAdmin getPulsarAdmin(String url);
PulsarAdmin getPulsarAdmin(String url, String env, String token);
BrokerStats brokerStats(String url, String env);
BrokerStats brokerStats(String url);
Clusters clusters(String url);
Clusters clusters(String url, String token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;

@Service
Expand Down Expand Up @@ -122,24 +123,24 @@ private void scheduleCollectStats() {
List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments();
Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>();
for (EnvironmentEntity env : environmentEntities) {
String serviceUrl = checkServiceUrl(null, env.getBroker());
String brokerUrl = env.getBroker();
Map<String, Object> clusterObject =
clustersService.getClustersList(0, 0, serviceUrl, (c) -> serviceUrl);
clustersService.getClustersList(0, 0, brokerUrl, (c) -> brokerUrl);
List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data");
clusterLists.forEach((clusterMap) -> {
String cluster = (String) clusterMap.get("cluster");
Pair<String, String> envCluster = Pair.of(env.getName(), cluster);

log.debug(envCluster.toString());

String serviceUrlTls = (String) clusterMap.get("serviceUrlTls");
tlsEnabled = tlsEnabled && StringUtils.isNotBlank(serviceUrlTls);
String webServiceUrl = tlsEnabled ? serviceUrlTls : (String) clusterMap.get("serviceUrl");
String serviceUrl = (String) clusterMap.get("serviceUrl");

String webServiceUrl = StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl;
if (webServiceUrl.contains(",")) {
String[] webServiceUrlList = webServiceUrl.split(",");
for (String url : webServiceUrlList) {
// making sure the protocol is appended in case the env was added without the protocol
if (!tlsEnabled && !url.contains("http://")) {
url = (tlsEnabled ? "https://" : "http://") + url;
}

try {
Brokers brokers = pulsarAdminService.brokers(url);
brokers.healthcheck();
Expand All @@ -150,14 +151,10 @@ private void scheduleCollectStats() {
}
}
}
collectStatsServiceUrls.put(envCluster, webServiceUrl);
log.info("Start collecting stats from env {} / cluster {} @ {}", envCluster.getLeft(), envCluster.getRight(), serviceUrl);
collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), webServiceUrl);
});
}
collectStatsServiceUrls.forEach((envCluster, serviceUrl) -> {
log.info("Start collecting stats from env {} / cluster {} @ {}",
envCluster.getLeft(), envCluster.getRight(), serviceUrl);
collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), serviceUrl);
});

log.info("Start clearing stats from broker");
clearStats(unixTime, clearStatsInterval / 1000);
Expand All @@ -168,18 +165,21 @@ public void collectStatsToDB(long unixTime, String env, String cluster, String s
List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data");
brokerLists.forEach((brokerMap) -> {
// returns [Broker Hostname]:[Broker non Tls port]
String tempBroker = (String) brokerMap.get("broker");
//default to http
String broker = "http://" + tempBroker;
// if tls enabled the protocol and port is extracted from service url
if (tlsEnabled && tempBroker.contains(":")) {
String brokerHost = tempBroker.substring(0, tempBroker.indexOf(":"));
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(serviceUrl);
broker = builder.host(brokerHost).toUriString();
}
String broker = (String) brokerMap.get("broker");
log.info("processing broker: {}", broker);

// use web service url scheme to replace host part with broker
UriComponents serviceURI = UriComponentsBuilder.fromHttpUrl(serviceUrl).build();
UriComponentsBuilder builder = UriComponentsBuilder.newInstance()
.scheme(serviceURI.getScheme())
.host(broker.split(":")[0])
.port(serviceURI.getPort());
String finalBroker = builder.toUriString();

JsonObject result;
try {
result = pulsarAdminService.brokerStats(broker).getTopics();
log.info("Start collecting stats from broker {}", finalBroker);
result = pulsarAdminService.brokerStats(finalBroker, env).getTopics();
} catch(PulsarAdminException e) {
log.error("Failed to get broker metrics.", e);
return;
Expand All @@ -197,7 +197,7 @@ public void collectStatsToDB(long unixTime, String env, String cluster, String s
String[] topicPath = this.parseTopic(topic);
topicStatsEntity.setEnvironment(env);
topicStatsEntity.setCluster(cluster);
topicStatsEntity.setBroker(tempBroker);
topicStatsEntity.setBroker(finalBroker);
topicStatsEntity.setTenant(topicPath[0]);
topicStatsEntity.setNamespace(topicPath[1]);
topicStatsEntity.setBundle(bundle);
Expand Down Expand Up @@ -309,10 +309,6 @@ public static String checkServiceUrl(String serviceUrl, String requestHost) {
if (serviceUrl == null || serviceUrl.length() <= 0) {
serviceUrl = requestHost;
}

if (!serviceUrl.startsWith("http")) {
serviceUrl = "http://" + serviceUrl;
}
return serviceUrl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ private String pickOneServiceUrl(String webServiceUrl) {
String[] webServiceUrlList = webServiceUrl.split(",");
int index = ThreadLocalRandom.current().nextInt(0, webServiceUrlList.length);
String url = webServiceUrlList[index];
if (!url.contains("http://")) {
url = "http://" + url;
}
log.info("pick web url:{}", url);
return url;
}
Expand All @@ -141,7 +138,8 @@ private String getServiceUrl(String environment, String cluster, int numReloads)
throw new RuntimeException(
"No cluster '" + cluster + "' found in environment '" + environment + "'");
}
return tlsEnabled && StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? clusterData.getServiceUrlTls() : clusterData.getServiceUrl();

return StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? clusterData.getServiceUrlTls() : clusterData.getServiceUrl();
}

@Scheduled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,25 @@ public void destroy() {
pulsarAdmins.values().forEach(value -> value.close());
}

public synchronized PulsarAdmin getPulsarAdmin(String url) {
if (!pulsarAdmins.containsKey(url)) {
pulsarAdmins.put(url, this.createPulsarAdmin(url, null));
}
return pulsarAdmins.get(url);

public PulsarAdmin getPulsarAdmin(String url) {
return this.createPulsarAdmin(url, null, null);
}

public PulsarAdmin getPulsarAdmin(String url, String token) {
return this.createPulsarAdmin(url, token);
return this.createPulsarAdmin(url, null, token);
}

public PulsarAdmin getPulsarAdmin(String url, String env, String token) {
return this.createPulsarAdmin(url, env, token);
}

public BrokerStats brokerStats(String url) {
return getPulsarAdmin(url).brokerStats();
return getPulsarAdmin(url, null, null).brokerStats();
}

public BrokerStats brokerStats(String url, String env) {
return getPulsarAdmin(url, env, null).brokerStats();
}

public Clusters clusters(String url) {
Expand Down Expand Up @@ -149,24 +155,23 @@ public Map<String, String> getAuthHeader(String url) {
return result;
}

private String getEnvironmentToken(String url) {
private String getEnvironmentToken(String url, String env) {
Optional<EnvironmentEntity> optionalEnvironmentEntity = environmentsRepository.findByBroker(url);
if (optionalEnvironmentEntity.isPresent()) {
return optionalEnvironmentEntity.get().getToken();
}
String environment = environmentCacheService.getEnvironment(url);
Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(environment);
Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(env);
return environmentEntityOptional.map(EnvironmentEntity::getToken).orElse(null);
}

private PulsarAdmin createPulsarAdmin(String url, String token) {
private PulsarAdmin createPulsarAdmin(String url, String env, String token) {
try {
log.info("Create Pulsar Admin instance. url={}, authPlugin={}, authParams={}, tlsAllowInsecureConnection={}, tlsTrustCertsFilePath={}, tlsEnableHostnameVerification={}",
url, authPlugin, authParams, tlsAllowInsecureConnection, tlsTrustCertsFilePath, tlsEnableHostnameVerification);
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
pulsarAdminBuilder.serviceHttpUrl(url);
if (null == token) {
token = getEnvironmentToken(url);
token = getEnvironmentToken(url, env);
}
if (StringUtils.isNotBlank(token)) {
pulsarAdminBuilder.authentication(AuthenticationFactory.token(token));
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ spring.datasource.initialization-mode=always
#spring.datasource.username=postgres
#spring.datasource.password=postgres

# hikari configuration
spring.datasource.hikari.connectionTimeout=10000
spring.datasource.hikari.idleTimeout=60000
spring.datasource.hikari.maxLifetime=300000

# zuul config
# https://cloud.spring.io/spring-cloud-static/Dalston.SR5/multi/multi__router_and_filter_zuul.html
# By Default Zuul adds Authorization to be dropped headers list. Below we are manually setting it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void convertStatsToDbTest() throws Exception {
brokersMap.put("data", brokersArray);
Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
.thenReturn(brokersMap);
Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
Mockito.when(pulsarAdminService.brokerStats(serviceUrl, environment)).thenReturn(stats);
JsonObject data = new Gson().fromJson(testData, JsonObject.class);
Mockito.when(stats.getTopics())
.thenReturn(data);
Expand Down Expand Up @@ -310,7 +310,7 @@ public void findByMultiTenantOrMultiNamespace() throws Exception {
brokersMap.put("data", brokersArray);
Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
.thenReturn(brokersMap);
Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
Mockito.when(pulsarAdminService.brokerStats(serviceUrl, environment)).thenReturn(stats);
JsonObject data = new Gson().fromJson(testData, JsonObject.class);
Mockito.when(stats.getTopics())
.thenReturn(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void teardown() {

@Test
public void getPulsarAdminTest() {
String serviceUrl = pulsarAdminService.getPulsarAdmin("http://localhost:8080").getServiceUrl();
String serviceUrl = pulsarAdminService.getPulsarAdmin("http://localhost:8080", null, null).getServiceUrl();
Assert.assertEquals("http://localhost:8080", serviceUrl);
}

Expand Down
Loading