diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java index 9c7e6475..ae0f0836 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java @@ -21,6 +21,7 @@ import java.text.DecimalFormat; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Brokers; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException; @@ -53,6 +54,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; +import org.springframework.web.util.UriComponentsBuilder; @Service @Configuration @@ -67,6 +69,9 @@ public class BrokerStatsServiceImpl implements BrokerStatsService { @Value("${clear.stats.interval}") private Long clearStatsInterval; + @Value("${tls.enabled}") + private boolean tlsEnabled; + private final EnvironmentsRepository environmentsRepository; private final ClustersService clustersService; private final BrokersService brokersService; @@ -124,12 +129,16 @@ private void scheduleCollectStats() { clusterLists.forEach((clusterMap) -> { String cluster = (String) clusterMap.get("cluster"); Pair envCluster = Pair.of(env.getName(), cluster); - String webServiceUrl = (String) clusterMap.get("serviceUrl"); + + String serviceUrlTls = (String) clusterMap.get("serviceUrlTls"); + tlsEnabled = tlsEnabled && StringUtils.isNotBlank(serviceUrlTls); + String webServiceUrl = tlsEnabled ? serviceUrlTls : (String) clusterMap.get("serviceUrl"); if (webServiceUrl.contains(",")) { String[] webServiceUrlList = webServiceUrl.split(","); for (String url : webServiceUrlList) { - if (!url.contains("http://")) { - url = "http://" + url; + // making sure the protocol is appended in case the env was added without the protocol + if (!tlsEnabled && !url.contains("http://")) { + url = (tlsEnabled ? "https://" : "http://") + url; } try { Brokers brokers = pulsarAdminService.brokers(url); @@ -158,9 +167,16 @@ public void collectStatsToDB(long unixTime, String env, String cluster, String s Map brokerObject = brokersService.getBrokersList(0, 0, cluster, serviceUrl); List> brokerLists = (List>) brokerObject.get("data"); brokerLists.forEach((brokerMap) -> { + // returns [Broker Hostname]:[Broker non Tls port] String tempBroker = (String) brokerMap.get("broker"); - // TODO: handle other protocols + //default to http String broker = "http://" + tempBroker; + // if tls enabled the protocol and port is extracted from service url + if (tlsEnabled && tempBroker.contains(":")) { + String brokerHost = tempBroker.substring(0, tempBroker.indexOf(":")); + UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(serviceUrl); + broker = builder.host(brokerHost).toUriString(); + } JsonObject result; try { result = pulsarAdminService.brokerStats(broker).getTopics(); diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java index 4ea07aa6..c75e267b 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.policies.data.ClusterData; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -51,6 +52,8 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService { private final PulsarAdminService pulsarAdminService; + @Value("${tls.enabled}") + private boolean tlsEnabled; private final Map serviceUrlEnvironmentMap; @Autowired @@ -138,7 +141,7 @@ private String getServiceUrl(String environment, String cluster, int numReloads) throw new RuntimeException( "No cluster '" + cluster + "' found in environment '" + environment + "'"); } - return clusterData.getServiceUrl(); + return tlsEnabled && StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? clusterData.getServiceUrlTls() : clusterData.getServiceUrl(); } @Scheduled(