diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 90d78d5..15c407b 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -27,29 +27,30 @@ var ( // Config holds configuration parameters. Config struct { - KafkaNativeMode bool - KafkaAPIRequestTimeout int - APIKey string - AppKey string - NetworkTXQuery string - NetworkRXQuery string - BrokerIDTag string - InstanceTypeTag string - MetricsWindow int - BootstrapServers string - ZKAddr string - ZKPrefix string - Interval int - APIListen string - ConfigZKPrefix string - DDEventTags string - MinRate float64 - SourceMaxRate float64 - DestinationMaxRate float64 - ChangeThreshold float64 - FailureThreshold int - CapMap map[string]float64 - CleanupAfter int64 + KafkaNativeMode bool + KafkaAPIRequestTimeout int + APIKey string + AppKey string + NetworkTXQuery string + NetworkRXQuery string + BrokerIDTag string + InstanceTypeTag string + MetricsWindow int + BootstrapServers string + ZKAddr string + ZKPrefix string + Interval int + APIListen string + ConfigZKPrefix string + DDEventTags string + MinRate float64 + SourceMaxRate float64 + DestinationMaxRate float64 + ChangeThreshold float64 + FailureThreshold int + CapMap map[string]float64 + CleanupAfter int64 + SkipAutoDeleteThrottles bool } ) @@ -78,6 +79,7 @@ func main() { flag.IntVar(&Config.FailureThreshold, "failure-threshold", 1, "Number of iterations that throttle determinations can fail before reverting to the min-rate") m := flag.String("cap-map", "", "JSON map of instance types to network capacity in MB/s") flag.Int64Var(&Config.CleanupAfter, "cleanup-after", 60, "Number of intervals after which to issue a global throttle unset if no replication is running") + flag.BoolVar(&Config.SkipAutoDeleteThrottles, "skip-auto-delete-throttles", false, "Skip automatic throttle removal") envy.Parse("AUTOTHROTTLE") flag.Parse() @@ -422,31 +424,35 @@ func main() { // If there's previously set throttles but no topics reassigning nor // broker overrides set, we can issue a global throttle removal. - if throttlesToClear && !topicsReassigning && !brokerOverridesSet { + if !topicsReassigning && throttlesToClear && !brokerOverridesSet { // Reset the interval count. interval = 0 - // Remove all the broker + topic throttle configs. - err := throttleManager.RemoveAllThrottles() - if err != nil { - log.Printf("Error removing throttles: %s\n", err.Error()) + if Config.SkipAutoDeleteThrottles { + log.Println("There may be throttles eligible for removal, but skipping automatic removal since skip-auto-delete-throttles is set") } else { - // Only set knownThrottles to false if we've removed all - // without error. - knownThrottles = false - } - - // Ensure topic throttle updates are re-enabled. - throttleManager.EnableTopicUpdates() - throttleManager.EnableOverrideTopicUpdates() - - // Remove any configured throttle overrides if AutoRemove is true. - if overrideCfg.AutoRemove { - err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{}) + // Remove all the broker + topic throttle configs. + err := throttleManager.RemoveAllThrottles() if err != nil { - log.Println(err) + log.Printf("Error removing throttles: %s\n", err.Error()) } else { - log.Println("Global throttle override removed") + // Only set knownThrottles to false if we've removed all + // without error. + knownThrottles = false + } + + // Ensure topic throttle updates are re-enabled. + throttleManager.EnableTopicUpdates() + throttleManager.EnableOverrideTopicUpdates() + + // Remove any configured throttle overrides if AutoRemove is true. + if overrideCfg.AutoRemove { + err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{}) + if err != nil { + log.Println(err) + } else { + log.Println("Global throttle override removed") + } } } }