Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filtering only autosizing related sizing configs #1591

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions samza-core/src/main/java/org/apache/samza/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,16 @@ public boolean getAutosizingEnabled() {
}

/**
* Check if a given config parameter is an internal autosizing related config, based on
* its name having the prefix "job.autosizing"
* Check if a given config parameter is an internal autosizing related sizing config, based on
* if it maps to one of the autosizing related sizing configs.

* @param configParam the config param to determine
* @return true if the config is related to autosizing, false otherwise
* @return true if the config is related to sizing via autosizing, false otherwise
*/
public static boolean isAutosizingConfig(String configParam) {
return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX);
public static boolean isAutosizingSizingConfig(String configParam) {
return configParam.equals(JOB_AUTOSIZING_CONTAINER_COUNT) || configParam.equals(JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE)
|| configParam.equals(JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB) || configParam.equals(JOB_AUTOSIZING_CONTAINER_MEMORY_MB)
|| configParam.equals(JOB_AUTOSIZING_CONTAINER_MAX_CORES);
}

public boolean getJMXEnabled() {
Expand Down Expand Up @@ -490,4 +493,4 @@ public int getElasticityFactor() {
}
return elasticityFactor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ object CoordinatorStreamUtil extends Logging {
* Reads and returns launch config persisted in coordinator stream. Only job.auto sizing configs are currently supported.
* @param config full job config
* @param metadataStore an instance of the instantiated MetadataStore
* @return empty config if auto sizing is disabled, otherwise auto sizing related configs.
* @return empty config if auto sizing is disabled, otherwise auto sizing related sizing configs.
*/
def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: MetadataStore): Config = {
if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
new MapConfig()
} else {
val config = readConfigFromCoordinatorStream(metadataStore)
val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingConfig(key)).asJava
val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingSizingConfig(key)).asJava

new MapConfig(launchConfig)
}
Expand Down Expand Up @@ -196,7 +196,7 @@ object CoordinatorStreamUtil extends Logging {
val jobConfig = new JobConfig(config)
if (jobConfig.getAutosizingEnabled) {
// If autosizing is enabled, we retain auto-sizing related configs
keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingSizingConfig(configKey))
}

if (jobConfig.getApplicationMasterHighAvailabilityEnabled) {
Expand Down