Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autothrottle] optionally skip auto-delete throttle #434

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 48 additions & 42 deletions cmd/autothrottle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,30 @@ var (

// Config holds configuration parameters.
Config struct {
KafkaNativeMode bool
KafkaAPIRequestTimeout int
APIKey string
AppKey string
NetworkTXQuery string
NetworkRXQuery string
BrokerIDTag string
InstanceTypeTag string
MetricsWindow int
BootstrapServers string
ZKAddr string
ZKPrefix string
Interval int
APIListen string
ConfigZKPrefix string
DDEventTags string
MinRate float64
SourceMaxRate float64
DestinationMaxRate float64
ChangeThreshold float64
FailureThreshold int
CapMap map[string]float64
CleanupAfter int64
KafkaNativeMode bool
KafkaAPIRequestTimeout int
APIKey string
AppKey string
NetworkTXQuery string
NetworkRXQuery string
BrokerIDTag string
InstanceTypeTag string
MetricsWindow int
BootstrapServers string
ZKAddr string
ZKPrefix string
Interval int
APIListen string
ConfigZKPrefix string
DDEventTags string
MinRate float64
SourceMaxRate float64
DestinationMaxRate float64
ChangeThreshold float64
FailureThreshold int
CapMap map[string]float64
CleanupAfter int64
SkipAutoDeleteThrottles bool
}
)

Expand Down Expand Up @@ -78,6 +79,7 @@ func main() {
flag.IntVar(&Config.FailureThreshold, "failure-threshold", 1, "Number of iterations that throttle determinations can fail before reverting to the min-rate")
m := flag.String("cap-map", "", "JSON map of instance types to network capacity in MB/s")
flag.Int64Var(&Config.CleanupAfter, "cleanup-after", 60, "Number of intervals after which to issue a global throttle unset if no replication is running")
flag.BoolVar(&Config.SkipAutoDeleteThrottles, "skip-auto-delete-throttles", false, "Skip automatic throttle removal")

envy.Parse("AUTOTHROTTLE")
flag.Parse()
Expand Down Expand Up @@ -422,31 +424,35 @@ func main() {

// If there's previously set throttles but no topics reassigning nor
// broker overrides set, we can issue a global throttle removal.
if throttlesToClear && !topicsReassigning && !brokerOverridesSet {
if !topicsReassigning && throttlesToClear && !brokerOverridesSet {
// Reset the interval count.
interval = 0

// Remove all the broker + topic throttle configs.
err := throttleManager.RemoveAllThrottles()
if err != nil {
log.Printf("Error removing throttles: %s\n", err.Error())
if Config.SkipAutoDeleteThrottles {
log.Println("There may be throttles eligible for removal, but skipping automatic removal since skip-auto-delete-throttles is set")
} else {
// Only set knownThrottles to false if we've removed all
// without error.
knownThrottles = false
}

// Ensure topic throttle updates are re-enabled.
throttleManager.EnableTopicUpdates()
throttleManager.EnableOverrideTopicUpdates()

// Remove any configured throttle overrides if AutoRemove is true.
if overrideCfg.AutoRemove {
err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{})
// Remove all the broker + topic throttle configs.
err := throttleManager.RemoveAllThrottles()
if err != nil {
log.Println(err)
log.Printf("Error removing throttles: %s\n", err.Error())
} else {
log.Println("Global throttle override removed")
// Only set knownThrottles to false if we've removed all
// without error.
knownThrottles = false
}

// Ensure topic throttle updates are re-enabled.
throttleManager.EnableTopicUpdates()
throttleManager.EnableOverrideTopicUpdates()

// Remove any configured throttle overrides if AutoRemove is true.
if overrideCfg.AutoRemove {
err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{})
if err != nil {
log.Println(err)
} else {
log.Println("Global throttle override removed")
}
}
}
}
Expand Down