Skip to content

Commit

Permalink
Use previous time references (#65)
Browse files Browse the repository at this point in the history
* use previous time references

* remarks

* remarks
  • Loading branch information
mensfeld authored Sep 11, 2023
1 parent 1fc03c4 commit 72b0f2e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Karafka core changelog

### 2.2.2 (2023-09-11)
- [Fix] Reuse previous frozen duration as a base for incoming computation.

## 2.2.1 (2023-09-10)
- Optimize statistics decorator by minimizing number of new objects created.
- Expand the decoration to include new value `_fd` providing freeze duration in milliseconds. This value informs us for how many consecutive ms the given value did not change. It can be useful for detecting values that should change once in a while but are stale.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
karafka-core (2.2.1)
karafka-core (2.2.2)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.1, < 0.14.0)

Expand Down
13 changes: 9 additions & 4 deletions lib/karafka/core/monitoring/statistics_decorator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ def call(emited_stats)
# @return [Object] the diff if the values were numerics or the current scope
def diff(previous, current)
if current.is_a?(Hash)
filled_previous = previous || EMPTY_HASH
filled_current = current || EMPTY_HASH

# @note We cannot use #each_key as we modify the content of the current scope
# in place (in case it's a hash)
current.keys.each do |key|
append(
current,
filled_previous,
filled_current,
key,
diff((previous || EMPTY_HASH)[key], (current || EMPTY_HASH)[key])
diff(filled_previous[key], filled_current[key])
)
end
end
Expand All @@ -84,17 +88,18 @@ def diff(previous, current)

# Appends the result of the diff to a given key as long as the result is numeric
#
# @param previous [Hash] previous scope
# @param current [Hash] current scope
# @param key [Symbol] key based on which we were diffing
# @param result [Object] diff result
def append(current, key, result)
def append(previous, current, key, result)
return unless result.is_a?(Numeric)
return if current.frozen?

freeze_duration_key = "#{key}_fd"

if result.zero?
current[freeze_duration_key] ||= 0
current[freeze_duration_key] = previous[freeze_duration_key] || 0
current[freeze_duration_key] += @change_d
else
current[freeze_duration_key] = 0
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/core/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ module Karafka
module Core
# Current Karafka::Core version
# We follow the versioning schema of given Karafka version
VERSION = '2.2.1'
VERSION = '2.2.2'
end
end
25 changes: 25 additions & 0 deletions spec/lib/karafka/core/monitoring/statistics_decorator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,29 @@
it { expect(decorated).to be_frozen }
it { expect(decorated.key?('float_d_d')).to eq(false) }
end

context 'when value remains unchanged over multiple occurrences and time' do
subject(:decorated) do
# First one will set initial state
decorator.call(deep_copy.call)
# Second one will build first deltas with freeze duration of zero
decorator.call(deep_copy.call)
sleep(0.01)
# Third one will allow for proper freeze duration computation
decorator.call(deep_copy.call)
sleep(0.01)
decorator.call(deep_copy.call)
end

let(:deep_copy) { -> { Marshal.load(Marshal.dump(emited_stats1)) } }

it { expect(decorated.key?('string_d')).to eq(false) }
it { expect(decorated.key?('string_fd')).to eq(false) }
it { expect(decorated['float_d']).to eq(0) }
it { expect(decorated['float_fd']).to be_within(5).of(20) }
it { expect(decorated['int_d']).to eq(0) }
it { expect(decorated['int_fd']).to be_within(5).of(20) }
it { expect(decorated).to be_frozen }
it { expect(decorated.key?('float_d_d')).to eq(false) }
end
end

0 comments on commit 72b0f2e

Please sign in to comment.