Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Jul 29, 2023
1 parent fa9f030 commit 9473e29
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
- [Improvement] Introduce states and metrics schema validation.
- [Improvement] Prevent locking in sampler for time of OS data aggregation.
- [Improvement] Collect and report number of messages in particular jobs.
- [Improvement] Limit segment size for Web topics to ensure, that Web-UI does not drain resources.
- [Fix] Fix a case where live-poll would be disabled but would still update data.
- [Fix] Fix a case where states materializing consumer would update state too often.
- [Fix] Fix a bug when rapid non-initialized shutdown could mess up the metrics.
- [Fix] Fix a case where upon multiple rebalances, part of the states materialization could be lost.
Expand Down
9 changes: 6 additions & 3 deletions lib/karafka/web/installer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ def bootstrap_topics!(replication_factor = 1)
replication_factor,
# We care only about the most recent state, previous are irrelevant. So we can easily
# compact after one minute. We do not use this beyond the most recent collective
# state, hence it all can easily go away.
# state, hence it all can easily go away. We also limit the segment size to at most
# 100MB not to use more space ever.
{
'cleanup.policy': 'compact',
'retention.ms': 60 * 60 * 1_000
'retention.ms': 60 * 60 * 1_000,
'log.segment.bytes': 104_857_600 # 10MB
}
)
end
Expand All @@ -168,7 +170,8 @@ def bootstrap_topics!(replication_factor = 1)
replication_factor,
{
'cleanup.policy': 'compact',
'retention.ms': 60 * 60 * 1_000
'retention.ms': 60 * 60 * 1_000,
'log.segment.bytes': 104_857_600 # 10MB
}
)
end
Expand Down
7 changes: 6 additions & 1 deletion lib/karafka/web/processing/consumers/aggregators/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ def materialize_consumers_groups_current_state
.map { |p_details| p_details.fetch(:lag_stored) }
.reject(&:negative?)

offsets_hi = partitions_data
.map { |p_details| p_details.fetch(:hi_offset) }
.reject(&:negative?)

# If there is no lag that would not be negative, it means we did not mark
# any messages as consumed on this topic in any partitions, hence we cannot
# compute lag easily
Expand All @@ -124,7 +128,8 @@ def materialize_consumers_groups_current_state
cgs[group_name] ||= {}
cgs[group_name][topic_name] = {
lag_stored: lags_stored.sum,
lag: lags.sum
lag: lags.sum,
offset_hi: offsets_hi.sum
}
end
end
Expand Down
2 changes: 0 additions & 2 deletions lib/karafka/web/processing/consumers/aggregators/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def refresh_current_stats
stats[:listeners_count] = 0
stats[:lag] = 0
stats[:lag_stored] = 0
stats[:ongoing] = 0
utilization = 0

@active_reports
Expand All @@ -141,7 +140,6 @@ def refresh_current_stats
lags_stored << partition_stats[:lag_stored]
end

stats[:ongoing] += report_stats[:ongoing]
stats[:busy] += report_stats[:busy]
stats[:enqueued] += report_stats[:enqueued]
stats[:threads_count] += report_process[:concurrency]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class TopicStats < Web::Contracts::Base

required(:lag_stored) { |val| val.is_a?(Integer) }
required(:lag) { |val| val.is_a?(Integer) }
required(:offset_hi) { |val| val.is_a?(Integer) }
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/karafka/web/tracking/consumers/contracts/partition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Partition < Web::Contracts::Base
required(:lag_stored_d) { |val| val.is_a?(Integer) }
required(:committed_offset) { |val| val.is_a?(Integer) }
required(:stored_offset) { |val| val.is_a?(Integer) }
required(:hi_offset) { |val| val.is_a?(Integer) }
end
end
end
Expand Down
1 change: 0 additions & 1 deletion lib/karafka/web/tracking/consumers/contracts/report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class Report < Web::Contracts::Base
required(:busy) { |val| val.is_a?(Integer) && val >= 0 }
required(:enqueued) { |val| val.is_a?(Integer) && val >= 0 }
required(:utilization) { |val| val.is_a?(Numeric) && val >= 0 }
required(:ongoing) { |val| val.is_a?(Integer) && val >= 0 }

nested(:total) do
required(:batches) { |val| val.is_a?(Integer) && val >= 0 }
Expand Down
9 changes: 0 additions & 9 deletions lib/karafka/web/tracking/consumers/listeners/processing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def on_consumer_consume(event)
# if error occurs, etc.
sampler.counters[:batches] += 1
sampler.counters[:messages] += messages_count
sampler.states[:ongoing] += messages_count
sampler.jobs[jid] = job_details
end
end
Expand Down Expand Up @@ -60,13 +59,6 @@ def on_error_occurred(event)
# This also refers only to consumer work that runs user operations.
return unless type

# Decrement number of ongoing messages only in case of consume because only for it
# we increment
if type == 'consume'
consumer = event.payload[:caller]
sampler.states[:ongoing] -= consumer.messages.size
end

sampler.jobs.delete(
job_id(event[:caller], type)
)
Expand All @@ -87,7 +79,6 @@ def on_consumer_consumed(event)

track do |sampler|
sampler.jobs.delete(jid)
sampler.states[:ongoing] -= messages_count
sampler.times[consumer_group_id] << [topic.name, time, messages_count]
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/karafka/web/tracking/consumers/listeners/statistics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def extract_partition_metrics(pt_stats)
'consumer_lag_stored_d',
'committed_offset',
'stored_offset',
'fetch_state'
'fetch_state',
'hi_offset'
)

# Rename as we do not need `consumer_` prefix
Expand Down
14 changes: 3 additions & 11 deletions lib/karafka/web/tracking/consumers/sampler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Consumers
class Sampler < Tracking::Sampler
include ::Karafka::Core::Helpers::Time

attr_reader :counters, :consumer_groups, :errors, :times, :pauses, :jobs, :states
attr_reader :counters, :consumer_groups, :errors, :times, :pauses, :jobs

# Current schema version
# This can be used in the future for detecting incompatible changes and writing
Expand All @@ -36,20 +36,12 @@ class Sampler < Tracking::Sampler
dead: 0
}.freeze

# Base for states that are not growing counters we can clear and count again but values
# that represent a state in time that can change in between flushes
STATES_BASE = {
# Number of messages that are under processing at the moment (not yet finished)
ongoing: 0
}.freeze

private_constant :TIMES_TTL, :TIMES_TTL_MS, :SCHEMA_VERSION, :COUNTERS_BASE, :STATES_BASE
private_constant :TIMES_TTL, :TIMES_TTL_MS, :SCHEMA_VERSION, :COUNTERS_BASE

def initialize
super

@counters = COUNTERS_BASE.dup
@states = STATES_BASE.dup
@times = TtlHash.new(TIMES_TTL_MS)
@consumer_groups = {}
@errors = []
Expand Down Expand Up @@ -100,7 +92,7 @@ def to_report

stats: jobs_queue_statistics.merge(
utilization: utilization
).merge(total: @counters).merge(@states),
).merge(total: @counters),

consumer_groups: @consumer_groups,
jobs: jobs.values
Expand Down
29 changes: 29 additions & 0 deletions lib/karafka/web/ui/models/metrics/charts/topics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,35 @@ def lags_stored

sum.to_json
end

def produced
res = @data.to_h.map do |topic, metrics|
previous = nil
[
topic,
metrics.map do |metric|
unless previous
previous = metric
next
end

current = metric.last[:offset_hi]

if previous.last[:offset_hi].nil? || current.nil?
r = [metric.first, 0]
else
r = [metric.first, current - previous.last[:offset_hi]]
end

previous = metric

r
end.compact
]
end.compact.to_h

res.to_json
end
end
end
end
Expand Down
11 changes: 5 additions & 6 deletions lib/karafka/web/ui/pro/views/dashboard/index.erb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<ul class="nav nav-tabs" id="graphs1" role="tablist">
<%== partial 'shared/tab_nav', locals: { title: 'Messages', id: 'messages', active: true } %>
<%== partial 'shared/tab_nav', locals: { title: 'Batches', id: 'batches' } %>
<%== partial 'shared/tab_nav', locals: { title: 'In progress', id: 'in-progress' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Lags stored', id: 'lags-stored' } %>
<%== partial 'shared/tab_nav', locals: { title: 'Produced', id: 'produced' } %>
</ul>

<div class="tab-content">
Expand All @@ -26,14 +26,13 @@
<%== partial 'shared/chart', locals: { data: data, id: 'batches' } %>
</div>

<div class="tab-pane" id="in-progress" role="tabpanel">
<% data = @aggregated_charts.with(:enqueued, :busy, :ongoing) %>
<%== partial 'shared/chart', locals: { data: data, id: 'in-progress' } %>
</div>

<div class="tab-pane" id="lags-stored" role="tabpanel">
<%== partial 'shared/chart', locals: { data: @topics_charts.lags_stored, id: 'lags-stored' } %>
</div>

<div class="tab-pane" id="produced" role="tabpanel">
<%== partial 'shared/chart', locals: { data: @topics_charts.produced, id: 'produced' } %>
</div>
</div>
</div>
</div>
Expand Down
6 changes: 4 additions & 2 deletions lib/karafka/web/ui/public/javascripts/live_poll.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ function livePollCallback() {
}

function setPollingListener() {
selector = document.getElementById("live-poll");
var selector = document.getElementById("live-poll");

var polling = localStorage.karafkaLivePoll

if (localStorage.karafkaLivePoll == "disabled" || selector == null) {
if (polling == "disabled" || polling == undefined || selector == null) {
clearTimeout(livePollTimer);
livePollTimer = null;
} else {
Expand Down

0 comments on commit 9473e29

Please sign in to comment.