diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5cbcbaa0..29c94d5c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,7 @@
## 0.10.3 (Unreleased)
- **[Feature]** Introduce ability to brand Web UI with environment (Pro).
+- [Enhancement] Provide assignment status in the routing (Pro).
- [Enhancement] Support schedule cancellation via Web UI.
- [Enhancement] Rename "probing" to "tracing" to better reflect what this commanding option does.
- [Fix] Fix not working primary and secondary alert styles.
diff --git a/lib/karafka/web/pro/ui/controllers/routing_controller.rb b/lib/karafka/web/pro/ui/controllers/routing_controller.rb
index d1ebd523..ac491cc7 100644
--- a/lib/karafka/web/pro/ui/controllers/routing_controller.rb
+++ b/lib/karafka/web/pro/ui/controllers/routing_controller.rb
@@ -28,11 +28,28 @@ def index
detect_patterns_routes
@routes = Karafka::App.routes
-
@routes.each do |consumer_group|
refine(consumer_group.topics)
end
+ current_state = Models::ConsumersState.current
+ @assigned = Hash.new { |h, k| h[k] = Set.new }
+
+ # If there are active processes, we can use their data to mark certain topics as
+ # assigned. This does not cover the full scope as some partitions may be assigned
+ # and some not, but provides general overview
+ if current_state
+ Models::Processes.active(current_state).each do |process|
+ process.consumer_groups.each do |consumer_group|
+ consumer_group.subscription_groups.each do |subscription_group|
+ subscription_group.topics.each do |topic|
+ @assigned[consumer_group.id.to_s] << topic.name
+ end
+ end
+ end
+ end
+ end
+
render
end
diff --git a/lib/karafka/web/pro/ui/views/routing/_consumer_group.erb b/lib/karafka/web/pro/ui/views/routing/_consumer_group.erb
index 0945dbe7..bb2da582 100644
--- a/lib/karafka/web/pro/ui/views/routing/_consumer_group.erb
+++ b/lib/karafka/web/pro/ui/views/routing/_consumer_group.erb
@@ -10,6 +10,7 @@
Subscription group |
<%== sort_link('Topic', :name) %> |
Type |
+ Assigned |
<%== sort_link(:active?) %> |
|
@@ -21,6 +22,7 @@
'routing/topic',
locals: {
subscription_group: topic.subscription_group,
+ consumer_group: consumer_group.id,
topic: topic
}
)
diff --git a/lib/karafka/web/pro/ui/views/routing/_topic.erb b/lib/karafka/web/pro/ui/views/routing/_topic.erb
index 41b0dcc5..4d9141ce 100644
--- a/lib/karafka/web/pro/ui/views/routing/_topic.erb
+++ b/lib/karafka/web/pro/ui/views/routing/_topic.erb
@@ -20,6 +20,16 @@
<% end %>
+
+ <% assigned = @assigned[consumer_group].include?(topic.name) %>
+
+ <% if assigned %>
+ <%== badge_success assigned %>
+ <% else %>
+ <%== badge_secondary assigned %>
+ <% end %>
+ |
+
<% if topic.active? %>
<%== badge_success topic.active? %>
diff --git a/spec/lib/karafka/web/pro/ui/controllers/routing_controller_spec.rb b/spec/lib/karafka/web/pro/ui/controllers/routing_controller_spec.rb
index 9c0d8a23..62c53dd0 100644
--- a/spec/lib/karafka/web/pro/ui/controllers/routing_controller_spec.rb
+++ b/spec/lib/karafka/web/pro/ui/controllers/routing_controller_spec.rb
@@ -4,17 +4,67 @@
subject(:app) { Karafka::Web::Pro::Ui::App }
describe '#index' do
- before { get 'routing' }
+ context 'when running against defaults' do
+ before { get 'routing' }
- it do
- expect(response).to be_ok
- expect(body).to include(topics_config.consumers.states)
- expect(body).to include(topics_config.consumers.metrics)
- expect(body).to include(topics_config.consumers.reports)
- expect(body).to include(topics_config.errors)
- expect(body).to include('karafka_web')
- expect(body).to include(breadcrumbs)
- expect(body).not_to include(support_message)
+ it do
+ expect(response).to be_ok
+ expect(body).to include(topics_config.consumers.states)
+ expect(body).to include(topics_config.consumers.metrics)
+ expect(body).to include(topics_config.consumers.reports)
+ expect(body).to include(topics_config.errors)
+ expect(body).to include('karafka_web')
+ expect(body).to include(breadcrumbs)
+ expect(body).not_to include(support_message)
+ end
+ end
+
+ context 'when there is no consumers state' do
+ before do
+ allow(Karafka::Web::Ui::Models::ConsumersState).to receive(:current).and_return(false)
+
+ get 'routing'
+ end
+
+ it do
+ expect(response).to be_ok
+ expect(body).to include(topics_config.consumers.states)
+ expect(body).to include(topics_config.consumers.metrics)
+ expect(body).to include(topics_config.consumers.reports)
+ expect(body).to include(topics_config.errors)
+ expect(body).to include('karafka_web')
+ expect(body).to include(breadcrumbs)
+ expect(body).not_to include(support_message)
+ end
+ end
+
+ context 'when there are states and reports' do
+ let(:states_topic) { create_topic }
+ let(:reports_topic) { create_topic }
+
+ before do
+ topics_config.consumers.states = states_topic
+ topics_config.consumers.reports = reports_topic
+
+ report = Fixtures.consumers_reports_json
+ scope = report[:consumer_groups][:example_app6_app][:subscription_groups][:c4ca4238a0b9_0]
+ base = scope[:topics][:default][:partitions]
+
+ 5.times { |i| base[i + 1] = base[:'0'].dup.merge(id: i + 1) }
+
+ produce(states_topic, Fixtures.consumers_states_file)
+ produce(reports_topic, report.to_json)
+
+ get 'routing'
+ end
+
+ it do
+ expect(response).to be_ok
+ expect(body).to include(topics_config.errors)
+ expect(body).to include('karafka_web')
+ expect(body).to include(breadcrumbs)
+ expect(body).not_to include(support_message)
+ end
end
end
|