Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist demoted and removed brokers #2115

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

morgangalpin
Copy link

This PR implements #2109.

@morgangalpin morgangalpin force-pushed the persist-demoted-and-removed-brokers-#2109 branch from 73361d4 to 63f99c5 Compare January 26, 2024 18:50
* Simplified KafkaZkClient creation for debugging.
* Added persistence for demoted and removed brokers.
* Added tests and conformed to lint tests.
@morgangalpin morgangalpin force-pushed the persist-demoted-and-removed-brokers-#2109 branch from f1de99b to 5ae4d83 Compare January 29, 2024 19:23
@mhratson
Copy link
Contributor

mhratson commented Feb 1, 2024

Hey @morgangalpin thanks for this!

Do you mind sharing your testing approach/details aside from code tests?

@morgangalpin
Copy link
Author

Sure thing. We've been using it in production for several months now. I've also tried it out on a local dockerized kafka cluster. My manual testing only involved restarting CC to ensure the previous removed/demoted broker state was restored.

@mhratson
Copy link
Contributor

mhratson commented Feb 6, 2024

Got it, let me try reproduce this in my test environment and i'll get back to you…

@morgangalpin
Copy link
Author

@mhratson any progress with trying it out?

@morgangalpin
Copy link
Author

@mhratson just following up, is there anything else needed before we can merge this change?

@morgangalpin
Copy link
Author

@CCisGG or @viktorsomogyi, I haven't heard back from @mhratson . Are you able to review and merge this PR?

@morgangalpin
Copy link
Author

@mhratson @CCisGG Is there anything left to do before merging this PR? Is this even the right medium to communicate about this?

CONFIG = CruiseControlRequestConfig.define(CruiseControlParametersConfig.define(AnomalyDetectorConfig.define(
AnalyzerConfig.define(ExecutorConfig.define(MonitorConfig.define(WebServerConfig.define(
UserTaskManagerConfig.define(new ConfigDef())))))))).withClientSslSupport().withClientSaslSupport();
ConfigDef configDef = new ConfigDef();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file look like a refactoring rather than part of the feature, can this be extracted in a separate PR?

maybeUpdateTopicConfig(adminClient, maintenanceEventTopic);
maybeIncreasePartitionCount(adminClient, maintenanceEventTopic);
}
KafkaCruiseControlUtils.maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, an unrelated (though maybe tempting) refactoring…

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is needed by the new code, but isn't available when written as a protected method. The protected method looks like it can be removed because it is only used in one place, but I left it since removing it could break any subclasses that override it. I could mark it as @deprecated instead.

Copy link
Contributor

@viktorsomogyi viktorsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@morgangalpin sorry for this, I missed your ping. I started reviewing it, will likely do multiple rounds on it in the following days.


# The configs to apply to the kafka topic used to persist Cruise Control data. Only applies if
# "persisted.data.persist.method" is set to "kafka". This "list" should be a semicolon separated
# string of 'key=value' pairs. The keys and values need to be valid Kafka Topic configs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is unnecessary to separate them with semicolon and prefixing should be enough. My problem with semicolon separation is that with many enough configs it makes the list a little bit hard to interpret.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I can switch the semicolons to commas and provide an example that demonstrates tidy formatting. In case I've misunderstood your request, the reason for the separator character is that I want to provide a mechanism for users to add any number of key=value pairs to the list rather than mandating which configs are permitted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I can't switch the semicolons to commas because individual values can have commas. I can make sure the docs make it clear that readable formatting is easy and desirable.

private static final Class<StringDeserializer> DESERIALIZER_CLASS = StringDeserializer.class;

// The hard-coded producer config. This is overridable.
private static final Map<String, String> DEFAULT_PRODUCER_CONFIG = ImmutableMap.<String, String>builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All default configs should be part of cruisecontrol.properties and we should avoid hard-coding (similarly to goals and default.goals). This would simplify your code as well as you wouldn't need to merge configs.

config.getShort(PersistedDataConfig.KAFKA_TOPIC_REPLICATION_FACTOR_CONFIG),
config.getMap(PersistedDataConfig.KAFKA_TOPIC_ADDITIONAL_CONFIGS_MAP_CONFIG),
config.getMap(PersistedDataConfig.KAFKA_PRODUCER_ADDITIONAL_CONFIGS_MAP_CONFIG),
config.getMap(PersistedDataConfig.KAFKA_CONSUMER_ADDITIONAL_CONFIGS_MAP_CONFIG),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you only prefixed the configs, then you could get them the following way in an array and avoid duplicating functionality with getMap:

config.originalsWithPrefix("persisted.data.kafka.topic."),
config.originalsWithPrefix("persisted.data.kafka.producer."),
config.originalsWithPrefix("persisted.data.kafka.consumer."),

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense. Thanks for the examples.

* @param persistedMap The map to store {@link Executor} data in.
*/
public ExecutorPersistedData(Map<String, String> persistedMap) {
this._demotedOrRemovedBrokers = Namespace.EXECUTOR.embed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you do this because this way we can serialize demoted or removed brokers with their timestamp the same way in both cases (in-memory vs Kafka).
To be completely honest, I find this a bit less intuitive to understand at first. For this I'd like to propose that what if we move the type of serialization to the respective storage method? For instance we could have a class like this:

BrokerEvent implements Serializable {
  private String brokerId;
  private long timestamp;
  private EventType eventType; // demote or remove
  private Namespace namespace; // currently "executor" only
}

For memory serialization we could use the standard ByteArrayInput/OutputStream serialization while for Kafka topics we could use json that is serialized into Strings. That would make the Kafka topics easily debuggable too.
When deserialized, we can represent them in a Map<String, BrokerEvent> structure to map broker ID to the event to make it easier to query.
Let me know your thoughts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants