Skip to content

Commit

Permalink
specs and remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Jul 21, 2023
1 parent aaf52fb commit f3c184a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 26 deletions.
49 changes: 31 additions & 18 deletions lib/karafka/web/installer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,36 @@ module Karafka
module Web
# Responsible for setup of the Web UI and Karafka Web-UI related components initialization.
class Installer
# Defaults stats state that we create in Kafka
DEFAULT_STATS = {
batches: 0,
messages: 0,
retries: 0,
dead: 0,
busy: 0,
enqueued: 0,
threads_count: 0,
processes: 0,
rss: 0,
listeners_count: 0,
utilization: 0,
lag_stored: 0
}.freeze

# Default empty historicals for first record in Kafka
DEFAULT_HISTORICALS = Processing::Consumers::Historicals::TIME_RANGES
.keys
.map { |range| [range, []] }
.to_h
.freeze

# WHole default empty state (aside from dispatch time)
DEFAULT_STATE = {
processes: {},
historicals: DEFAULT_HISTORICALS,
stats: DEFAULT_STATS
}.freeze

# Creates needed topics and the initial zero state, so even if no `karafka server` processes
# are running, we can still display the empty UI
#
Expand Down Expand Up @@ -146,24 +176,7 @@ def bootstrap_consumers_state!
::Karafka.producer.produce_sync(
topic: Karafka::Web.config.topics.consumers.states,
key: Karafka::Web.config.topics.consumers.states,
payload: {
processes: {},
historicals: {},
stats: {
batches: 0,
messages: 0,
retries: 0,
dead: 0,
busy: 0,
enqueued: 0,
threads_count: 0,
processes: 0,
rss: 0,
listeners_count: 0,
utilization: 0,
lag_stored: 0
}
}.to_json
payload: DEFAULT_STATE.to_json
)
end
end
Expand Down
2 changes: 0 additions & 2 deletions lib/karafka/web/processing/consumers/historicals.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class Historicals
}.freeze
}.freeze

private_constant :TIME_RANGES

# @param aggregated_state [Hash] full aggregated state without historicals
def initialize(aggregated_state)
# Builds an empty structure for potential time ranges we are interested in
Expand Down
9 changes: 4 additions & 5 deletions lib/karafka/web/ui/models/historicals.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def initialize(state)
def reject_drifters(historicals)
initial = nil

historicals[:seconds].delete_if do |sample|
historicals.fetch(:seconds).delete_if do |sample|
unless initial
initial = sample.first

Expand All @@ -79,10 +79,9 @@ def reject_drifters(historicals)
end
end


# In case of a positive drift, we may have gaps bigger than few seconds in reporting.
# This can create a false sense of spikes that do not reflect the reality. We compensate
# this by extrapolating the values.
# this by extrapolating the delta values and using the rest as they are.
#
# This problems only affects our near real-time metrics with seconds precision
#
Expand All @@ -91,7 +90,7 @@ def fill_gaps(historicals)
filled = []
previous = nil

historicals[:seconds].each do |sample|
historicals.fetch(:seconds).each do |sample|
unless previous
filled << sample
previous = sample
Expand Down Expand Up @@ -128,7 +127,7 @@ def fill_gaps(historicals)
# on the counter we keep.
def inject_current_stats(historicals, stats, dispatched_at)
historicals.each_value do |time_samples|
errors = time_samples.last.last[:errors]
errors = (time_samples.last || [{ errors: 0 }]).last[:errors]

time_samples << [dispatched_at.to_i, stats.merge(errors: errors)]
end
Expand Down
24 changes: 23 additions & 1 deletion spec/lib/karafka/web/ui/models/historicals_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
RSpec.describe_current do
subject(:historicals) { described_class.new(state) }

let(:dispatched_at) { Time.now.to_f.to_i }

# Deep dup same way as we would get from Kafka
let(:default_state) do
state = Karafka::Web::Installer::DEFAULT_STATE.merge(dispatched_at: dispatched_at)
Karafka::Web::Deserializer.new.call(OpenStruct.new(raw_payload: state.to_json))
end

context 'when stats are missing' do
let(:state) { {} }

Expand All @@ -24,6 +32,20 @@
context 'when historicals and stats are empty' do
let(:state) { { stats: {}, dispatched_at: Time.now.to_f, historicals: {} } }

it { expect(historicals.to_h).to eq({}) }
it { expect { historicals }.to raise_error(KeyError) }
end

# This one makes sure we can work with the default empty bootstrapped state
context 'when historicals and stats are present but without any values' do
let(:state) { default_state }

it { expect(historicals.days).to eq([]) }
it { expect(historicals.hours).to eq([]) }
it { expect(historicals.minutes).to eq([]) }
it { expect(historicals.seconds).to eq([]) }
end

context 'when we had previous historicals in the same recent window' do
pending
end
end

0 comments on commit f3c184a

Please sign in to comment.