Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Summary Metrics as overview for the Cluster View #588

Draft
wants to merge 8 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* [Pipedrive](https://www.pipedrive.com)
* [TVG](https://www.tvg.com)
* [Vodeno](https://www.vodeno.com/)

* [FREE NOW](https://free-now.com/)


## Credits
Expand Down
3,836 changes: 1,831 additions & 2,005 deletions client/package-lock.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ export const uriClusters = () => {
return `${apiUrl}/cluster`;
};

export const uriClusterTopicStats = (clusterId) => {
return `${apiUrl}/${clusterId}/topic/stats`
}

export const uriUIOptions = (clusterId) => {
return `${apiUrl}/${clusterId}/ui-options`;
};
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/akhq/controllers/ConnectController.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Operation;
import org.akhq.configs.Role;
import org.akhq.models.ClusterStats;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.repositories.ConnectRepository;
Expand Down Expand Up @@ -102,6 +103,12 @@ public List<ConnectDefinition.TaskDefinition> tasks(HttpRequest<?> request, Stri
return this.connectRepository.getDefinition(cluster, connectId, name).getTasks();
}

@Get("/stats")
@Operation(tags = {"connect"}, summary = "Retrieve connect cluster statistics")
public ClusterStats.ConnectStats tasks(String cluster, String connectId) {
return this.connectRepository.getConnectStats(cluster, connectId);
}

@Get("/{name}/configs")
@Operation(tags = {"connect"}, summary = "Retrieve a connect config")
public Map<String, String> configs(HttpRequest<?> request, String cluster, String connectId, String name) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/akhq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.swagger.v3.oas.annotations.Operation;
import org.akhq.configs.Role;
import org.akhq.middlewares.SchemaComparator;
import org.akhq.models.ClusterStats;
import org.akhq.models.Schema;
import org.akhq.models.TopicSchema;
import org.akhq.repositories.SchemaRegistryRepository;
Expand Down Expand Up @@ -51,6 +52,14 @@ public ResultPagedList<Schema> list(
return ResultPagedList.of(this.schemaRepository.list(cluster, pagination, search));
}

@Get("api/{cluster}/schema/globalstats")
@Operation(tags={"schema registry"}, summary = "Count of schemas")
public ClusterStats.SchemaRegistryStats clusterSchemaStats(String cluster)
throws IOException, RestClientException
{
return this.schemaRepository.schemaCount(cluster);
}

@Get("api/{cluster}/schema/topic/{topic}")
@Operation(tags = {"schema registry"}, summary = "List all schemas prefered schemas for this topic")
public TopicSchema listSchemaForTopic(
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.CollectionUtils;
Expand All @@ -28,6 +29,7 @@
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.reactivestreams.Publisher;

import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
Expand All @@ -51,6 +53,8 @@ public class TopicController extends AbstractController {
@Inject
private ConsumerGroupRepository consumerGroupRepository;
@Inject
private ConnectRepository connectRepository;
@Inject
private Environment environment;
@Inject
private AccessControlListRepository aclRepository;
Expand Down Expand Up @@ -96,6 +100,14 @@ public ResultPagedList<Topic> list(
));
}

@Get("api/{cluster}/topic/globalstats")
@Operation(tags = {"topic"}, summary = "Summary of topics on cluster")
public ClusterStats.TopicStats clusterTopicStats(String cluster)
throws ExecutionException, InterruptedException, IOException, RestClientException
{
return this.topicRepository.getTopicStats(cluster);
}

@Get("api/{cluster}/topic/name")
@Operation(tags = {"topic"}, summary = "List all topics name")
public List<String> listTopicNames(
Expand Down
59 changes: 59 additions & 0 deletions src/main/java/org/akhq/models/ClusterStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.akhq.models;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.Map;

@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class ClusterStats
{
private String id;

@ToString
@Getter
@AllArgsConstructor
public static class TopicStats
{
private final Integer topics;
private final Integer partitions;
private final Integer replicaCount;
private final Integer inSyncReplicaCount;
}

@ToString
@Getter
@AllArgsConstructor
public static class ConsumerGroupStats
{
private final Integer consumerGroups;
private final Integer rebalancingGroups;
private final Integer emptyGroups;
}

@ToString
@Getter
@AllArgsConstructor
public static class ConnectStats
{
private final String connectId;
private final Integer connectors;
private final Integer tasks;
private final Map<String, Integer> stateCount;
}

@ToString
@Getter
@AllArgsConstructor
public static class SchemaRegistryStats
{
private final Integer schemas;
}
}
27 changes: 27 additions & 0 deletions src/main/java/org/akhq/repositories/ConnectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@
import io.micronaut.retry.annotation.Retryable;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import org.akhq.configs.SecurityProperties;
import org.akhq.models.ClusterStats;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.akhq.utils.DefaultGroupUtils;
import org.sourcelab.kafka.connect.apiclient.request.dto.*;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus.TaskStatus;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ConcurrentConfigModificationException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ResourceNotFoundException;
Expand All @@ -26,8 +30,13 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;

@Singleton
public class ConnectRepository extends AbstractRepository {
@Inject
Expand Down Expand Up @@ -228,6 +237,24 @@ public static Map<String, String> validConfigs(Map<String, String> configs, Stri
return list;
}

public ClusterStats.ConnectStats getConnectStats(String clusterId, String connectId) {
KafkaConnectClient client = this.kafkaModule.getConnectRestClient(clusterId).get(connectId);
Collection<String> connectors = client.getConnectors();
int connectorCount = connectors.size();
Map<String, Integer> collect = connectors
.stream()
.map(c -> client.getConnectorStatus(c).getTasks())
.flatMap(Collection::stream)
.map(TaskStatus::getState)
.collect(groupingBy(identity(), countingInt()));
int tasks = collect.values().stream().mapToInt(Integer::intValue).sum();
return new ClusterStats.ConnectStats(connectId, connectorCount, tasks, collect);
}

private static Collector<String, ?, Integer> countingInt() {
return reducing(0, e -> 1, Integer::sum);
}

private ConnectPlugin mapToConnectPlugin(ConnectorPlugin plugin, String clusterId, String connectId) {
return new ConnectPlugin(
plugin,
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/akhq/repositories/ConsumerGroupRepository.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.akhq.repositories;

import org.akhq.models.ClusterStats;
import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
Expand Down Expand Up @@ -186,4 +187,14 @@ private List<String> getConsumerGroupFilterRegexFromAttributes(Map<String, Objec
}
return new ArrayList<>();
}

public ClusterStats.ConsumerGroupStats getConsumerGroupStats(String clusterId)
throws ExecutionException, InterruptedException
{
// TODO: Define the rebalancing groups and empty groups
int rebalancingGroups = 0;
int emptyGroups = 0;
final Collection<ConsumerGroupListing> groupListings = kafkaWrapper.listConsumerGroups(clusterId);
return new ClusterStats.ConsumerGroupStats(groupListings.size(), rebalancingGroups, emptyGroups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import org.akhq.configs.Connection;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.models.ClusterStats;
import org.akhq.models.Schema;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.PagedList;
Expand Down Expand Up @@ -94,6 +95,12 @@ public List<String> all(String clusterId, Optional<String> search) throws IOExc
.collect(Collectors.toList());
}

public ClusterStats.SchemaRegistryStats schemaCount(String clusterId)
throws IOException, RestClientException
{
return new ClusterStats.SchemaRegistryStats(this.all(clusterId, Optional.empty()).size());
}

public boolean exist(String clusterId, String subject) throws IOException, RestClientException {
boolean found = false;

Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/akhq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.micronaut.retry.annotation.Retryable;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import org.akhq.configs.SecurityProperties;
import org.akhq.models.ClusterStats;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.akhq.models.Partition;
Expand Down Expand Up @@ -154,6 +156,25 @@ void checkIfTopicExists(String clusterId, String name) throws ExecutionException
kafkaWrapper.describeTopics(clusterId, Collections.singletonList(name));
}

public ClusterStats.TopicStats getTopicStats(String clusterId)
throws ExecutionException, InterruptedException
{
Collection<TopicListing> listTopics = kafkaWrapper.listTopics(clusterId);
int topics = 0;
int partitions = 0;
int replicaCount = 0;
int inSyncReplicaCount = 0;

for (TopicListing item : listTopics) {
Topic topic = this.findByName(clusterId, item.name());
topics += 1;
partitions += topic.getPartitions().size();
replicaCount += topic.getReplicaCount();
inSyncReplicaCount += topic.getInSyncReplicaCount();
}
return new ClusterStats.TopicStats(topics, partitions, replicaCount, inSyncReplicaCount);
}

private Optional<List<String>> getTopicFilterRegex() {

List<String> topicFilterRegex = new ArrayList<>();
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.inject.Inject;
import java.util.*;

import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -246,4 +247,33 @@ void getFilteredBySearchList() {
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1", "prefixed.Matching2");
}

@Test
public void getConnectStats() {
repository.create(KafkaTestCluster.CLUSTER_ID, "connect-1", "foo-bar", ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
));
repository.create(KafkaTestCluster.CLUSTER_ID, "connect-1", "foo-bar-2", ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
));
repository.create(KafkaTestCluster.CLUSTER_ID, "connect-1", "foo-bar-paused", ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
));
repository.pause(KafkaTestCluster.CLUSTER_ID, "connect-1", "foo-bar-paused");
Map<String, Integer> assertStats = Map.of("RUNNING", 2, "PAUSED", 1);
try {
sleep(500);
}
catch (InterruptedException e) {
e.printStackTrace();
}
assertEquals(3, repository.getConnectStats(KafkaTestCluster.CLUSTER_ID, "connect-1").getConnectors());
assertEquals(assertStats, repository.getConnectStats(KafkaTestCluster.CLUSTER_ID, "connect-1").getStateCount());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ void delete() throws IOException, RestClientException, ExecutionException, Inter
void getDefaultConfig() throws IOException, RestClientException {
assertEquals(Schema.Config.CompatibilityLevelConfig.BACKWARD, repository.getDefaultConfig(KafkaTestCluster.CLUSTER_ID).getCompatibilityLevel());
}

@Test
public void getSchemaStats() throws IOException, RestClientException
{
assertEquals(3, repository.schemaCount(KafkaTestCluster.CLUSTER_ID).getSchemas());
}
}
8 changes: 8 additions & 0 deletions src/test/java/org/akhq/repositories/TopicRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ void partition() throws ExecutionException, InterruptedException {
assertEquals(3, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COMPACTED).getPartitions().size());
}

@Test
public void stats() throws ExecutionException, InterruptedException
{
assertEquals(19, topicRepository.getTopicStats(KafkaTestCluster.CLUSTER_ID).getTopics());
assertEquals(115, topicRepository.getTopicStats(KafkaTestCluster.CLUSTER_ID).getPartitions());
assertEquals(19, topicRepository.getTopicStats(KafkaTestCluster.CLUSTER_ID).getInSyncReplicaCount());
}

private void mockApplicationContext() {
Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("topicsFilterRegexp", new ArrayList<>(Arrays.asList("rando.*"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
Expand Down