Skip to content

Commit

Permalink
Merge pull request #4136 from esl/cets-for-cluster-id-v3
Browse files Browse the repository at this point in the history
CETS backend for mongoose_cluster_id (minimal logic)
  • Loading branch information
chrzaszcz authored Oct 6, 2023
2 parents 7d05a65 + 48f1203 commit 9bbbbe2
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 47 deletions.
27 changes: 23 additions & 4 deletions big_tests/tests/distributed_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,39 @@ add_node_to_cluster(Config) ->
Node2 = mim2(),
add_node_to_cluster(Node2, Config).

has_mnesia(Node) ->
%% TODO We should check that Mnesia is configured here instead of is_running.
%% But it would require the issue fixed first:
%% "MIM-2067 Actually disable mnesia from starting in tests in pgsql_cets"
rpc(Node, mnesia, system_info, [is_running]) =:= yes.

add_node_to_cluster(Node, Config) ->
case has_mnesia(Node) of
true ->
add_node_to_mnesia_cluster(Node, Config);
false ->
ok
end,
Config.

add_node_to_mnesia_cluster(Node, Config) ->
ClusterMemberNode = maps:get(node, mim()),
ok = rpc(Node#{timeout => cluster_op_timeout()},
mongoose_cluster, join, [ClusterMemberNode]),
verify_result(Node, add),
Config.
verify_result(Node, add).

remove_node_from_cluster(_Config) ->
Node = mim2(),
remove_node_from_cluster(Node, _Config).

remove_node_from_cluster(Node, _Config) ->
ok = rpc(Node#{timeout => cluster_op_timeout()}, mongoose_cluster, leave, []),
verify_result(Node, remove),
case has_mnesia(Node) of
true ->
ok = rpc(Node#{timeout => cluster_op_timeout()}, mongoose_cluster, leave, []),
verify_result(Node, remove);
false ->
ok
end,
ok.

ctl_path(Node, Config) ->
Expand Down
9 changes: 7 additions & 2 deletions big_tests/tests/metrics_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ end_per_group(GroupName, Config) ->
metrics_helper:finalise_by_all_metrics_are_global(Config, GroupName =:= all_metrics_are_global).

init_per_testcase(cluster_size = CN, Config) ->
Config1 = ensure_nodes_not_clustered(Config),
escalus:init_per_testcase(CN, Config1);
case distributed_helper:has_mnesia(mim()) of
true ->
Config1 = ensure_nodes_not_clustered(Config),
escalus:init_per_testcase(CN, Config1);
false ->
{skip, "Requires Mnesia"}
end;
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

Expand Down
31 changes: 28 additions & 3 deletions big_tests/tests/persistent_cluster_id_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
id_persists_after_restart/1,
same_cluster_id_in_backend_and_mnesia/1,
backed_up_id_if_rdbms_is_added/1,
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost/1
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost/1,
clean_start_and_two_nodes/1
]).

-import(distributed_helper, [mim/0, mim2/0]).
Expand All @@ -39,7 +40,8 @@ tests() ->
id_persists_after_restart,
same_cluster_id_in_backend_and_mnesia,
backed_up_id_if_rdbms_is_added,
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost,
clean_start_and_two_nodes
].

groups() ->
Expand All @@ -52,7 +54,7 @@ groups() ->
%%% Overall setup/teardown
%%%===================================================================
init_per_suite(Config) ->
Config.
distributed_helper:require_rpc_nodes([mim]) ++ Config.

end_per_suite(_Config) ->
ok.
Expand Down Expand Up @@ -144,3 +146,26 @@ cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost(_Config) ->
{ok, SecondID} = mongoose_helper:successful_rpc(
Node, mongoose_cluster_id, get_cached_cluster_id, []),
?assertEqual(FirstID, SecondID).

clean_start_and_two_nodes(_Config) ->
{ok, MnesiaID} = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, get_cached_cluster_id, []),
{ok, MnesiaID2} = mongoose_helper:successful_rpc(
mim2(), mongoose_cluster_id, get_cached_cluster_id, []),
%% Sanity check: IDs are in sync
?assertEqual(MnesiaID, MnesiaID2),
%% Remove an old ID from anywhere
ok = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, clean_table, []),
ok = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, clean_cache, []),
ok = mongoose_helper:successful_rpc(
mim2(), mongoose_cluster_id, clean_cache, []),
{ok, AfterRestartID} = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, start, []),
{ok, AfterRestartID2} = mongoose_helper:successful_rpc(
mim2(), mongoose_cluster_id, start, []),
%% We've created a new ID
?assertNotEqual(AfterRestartID, MnesiaID),
%% Both nodes have the same ID
?assertEqual(AfterRestartID, AfterRestartID2).
2 changes: 1 addition & 1 deletion doc/operation-and-maintenance/MongooseIM-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Metrics specific to an extension, e.g. Message Archive Management, are described
| `[global, uniqueSessionCount]` | value | A number of unique users connected to a MongooseIM cluster (e.g. 3 sessions of the same user will be counted as 1 in this metric). |
| `[global, cache, unique_sessions_number]` | gauge | A cached value of `uniqueSessionCount`. It is automatically updated when a unique session count is calculated. |
| `[global, nodeUpTime]` | value | Node uptime. |
| `[global, clusterSize]` | value | A number of nodes in a MongooseIM cluster seen by a given MongooseIM node. |
| `[global, clusterSize]` | value | A number of nodes in a MongooseIM cluster seen by a given MongooseIM node (based on Mnesia). For CETS use `global.cets.system.joined_nodes` instead. |
| `[global, tcpPortsUsed]` | value | A number of open tcp connections. This should relate to the number of connected sessions and databases, as well as federations and http requests, in order to detect connection leaks. |
| `[global, processQueueLengths]` | probe | The number of queued messages in the internal message queue of every erlang process, and the internal queue of every fsm (ejabberd\_c2s). This is sampled every 30 seconds asynchronously. It is a good indicator of an overloaded system: if too many messages are queued at the same time, the system is not able to process the data at the rate it was designed for. |

Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"def7da28917fe7e21ad2b50d1b9939b1da5046cf"}},
{ref,"e3ad43f3836ea457bcb54e2f8266e9d7c32b014f"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
Expand Down
24 changes: 1 addition & 23 deletions src/ejabberd_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ do_start() ->
update_status_file(starting),
mongoose_config:start(),
mongoose_metrics:init(),
db_init(),
mongoose_internal_databases:init(),
mongoose_graphql:init(),
translate:start(),
ejabberd_commands:init(),
Expand Down Expand Up @@ -111,31 +111,9 @@ stop(_State) ->
%% That is why we call mnesia:stop() inside of db_init_mnesia() instead.
ok.


%%%
%%% Internal functions
%%%
db_init() ->
case mongoose_config:lookup_opt([internal_databases, mnesia]) of
{ok, _} ->
db_init_mnesia(),
mongoose_node_num_mnesia:init();
{error, _} ->
ok
end.

db_init_mnesia() ->
%% Mnesia should not be running at this point, unless it is started by tests.
%% Ensure Mnesia is stopped
mnesia:stop(),
case mnesia:system_info(extra_db_nodes) of
[] ->
mnesia:create_schema([node()]);
_ ->
ok
end,
application:start(mnesia, permanent),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).

-spec broadcast_c2s_shutdown_listeners() -> ok.
broadcast_c2s_shutdown_listeners() ->
Expand Down
72 changes: 59 additions & 13 deletions src/mongoose_cluster_id.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,35 @@
]).

% For testing purposes only
-export([clean_table/0]).
-export([clean_table/0, clean_cache/0]).

-ignore_xref([clean_table/0, get_backend_cluster_id/0]).
-ignore_xref([clean_table/0, clean_cache/0, get_backend_cluster_id/0]).

-record(mongoose_cluster_id, {key :: atom(), value :: cluster_id()}).
-type cluster_id() :: binary().
-type maybe_cluster_id() :: {ok, cluster_id()} | {error, any()}.
-type mongoose_backend() :: rdbms
| mnesia.
| mnesia
| cets.

-spec start() -> maybe_cluster_id().
start() ->
init_mnesia_cache(),
%% Consider rewriting this logic, so it does not block the starting process.
%% Currently, we have to do an SQL query each time we restart MongooseIM
%% application in the tests.
init_cache(),
Backend = which_backend_available(),
IntBackend = which_volatile_backend_available(),
maybe_prepare_queries(Backend),
CachedRes = get_cached_cluster_id(),
CachedRes = get_cached_cluster_id(IntBackend),
BackendRes = get_backend_cluster_id(),
case {CachedRes, BackendRes} of
{{ok, ID}, {ok, ID}} ->
{ok, ID};
{{ok, ID}, {error, _}} ->
set_new_cluster_id(ID, Backend);
{{error, _}, {ok, ID}} ->
set_new_cluster_id(ID, mnesia);
set_new_cluster_id(ID, IntBackend);
{{error, _}, {error, _}} ->
make_and_set_new_cluster_id();
{{ok, CachedID}, {ok, BackendID}} ->
Expand All @@ -45,14 +50,25 @@ start() ->
%% Get cached version
-spec get_cached_cluster_id() -> maybe_cluster_id().
get_cached_cluster_id() ->
IntBackend = which_volatile_backend_available(),
get_cached_cluster_id(IntBackend).

get_cached_cluster_id(mnesia) ->
T = fun() -> mnesia:read(mongoose_cluster_id, cluster_id) end,
case mnesia:transaction(T) of
{atomic, [#mongoose_cluster_id{value = ClusterID}]} ->
{ok, ClusterID};
{atomic, []} ->
{error, cluster_id_not_in_mnesia};
{error, cluster_id_not_in_cache};
{aborted, Reason} ->
{error, Reason}
end;
get_cached_cluster_id(cets) ->
case ets:lookup(cets_cluster_id, cluster_id) of
[{cluster_id, ClusterID}] ->
{ok, ClusterID};
[] ->
{error, cluster_id_not_in_cache}
end.

%% ====================================================================
Expand All @@ -74,14 +90,23 @@ make_and_set_new_cluster_id() ->
%% ====================================================================
%% Internal functions
%% ====================================================================
init_mnesia_cache() ->
init_cache() ->
init_cache(which_volatile_backend_available()).

init_cache(mnesia) ->
mnesia:create_table(mongoose_cluster_id,
[{type, set},
{record_name, mongoose_cluster_id},
{attributes, record_info(fields, mongoose_cluster_id)},
{ram_copies, [node()]}
]),
mnesia:add_table_copy(mongoose_cluster_id, node(), ram_copies).
mnesia:add_table_copy(mongoose_cluster_id, node(), ram_copies);
init_cache(cets) ->
cets:start(cets_cluster_id, #{}),
cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id),
%% We have to do it, because we want to read from across the cluster
%% in the start/0 function.
ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity).

-spec maybe_prepare_queries(mongoose_backend()) -> ok.
maybe_prepare_queries(mnesia) -> ok;
Expand All @@ -104,15 +129,23 @@ make_cluster_id() ->
-spec which_backend_available() -> mongoose_backend().
which_backend_available() ->
case mongoose_wpool:get_pool_settings(rdbms, global, default) of
undefined -> mnesia;
undefined -> which_volatile_backend_available();
_ -> rdbms
end.

which_volatile_backend_available() ->
case mongoose_config:get_opt(internal_databases) of
#{cets := _} ->
cets;
#{mnesia := _} ->
mnesia
end.

-spec set_new_cluster_id(cluster_id(), mongoose_backend()) -> ok | {error, any()}.
set_new_cluster_id(ID, rdbms) ->
try execute_cluster_insert_new(ID) of
{updated, 1} ->
set_new_cluster_id(ID, mnesia),
set_new_cluster_id(ID, which_volatile_backend_available()),
{ok, ID}
catch
Class:Reason:Stacktrace ->
Expand All @@ -129,7 +162,10 @@ set_new_cluster_id(ID, mnesia) ->
{ok, ID};
{aborted, Reason} ->
{error, Reason}
end.
end;
set_new_cluster_id(ID, cets) ->
cets:insert_serial(cets_cluster_id, {cluster_id, ID}),
{ok, ID}.

%% Get cluster ID
-spec get_backend_cluster_id(mongoose_backend()) -> maybe_cluster_id().
Expand All @@ -145,7 +181,9 @@ get_backend_cluster_id(rdbms) ->
{error, {Class, Reason}}
end;
get_backend_cluster_id(mnesia) ->
get_cached_cluster_id().
get_cached_cluster_id(mnesia);
get_backend_cluster_id(cets) ->
get_cached_cluster_id(cets).

clean_table() ->
clean_table(which_backend_available()).
Expand All @@ -166,3 +204,11 @@ clean_table(rdbms) ->
{error, {Class, Reason}}
end;
clean_table(_) -> ok.

clean_cache() ->
clean_cache(which_volatile_backend_available()).

clean_cache(mnesia) ->
mnesia:dirty_delete(mongoose_cluster_id, cluster_id);
clean_cache(cets) ->
cets:delete(cets_cluster_id, cluster_id).
29 changes: 29 additions & 0 deletions src/mongoose_internal_databases.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-module(mongoose_internal_databases).
-export([init/0]).

init() ->
case mongoose_config:lookup_opt([internal_databases, mnesia]) of
{ok, _} ->
init_mnesia(),
mongoose_node_num_mnesia:init();
{error, _} ->
%% Ensure mnesia is stopped when applying the test presets from the big tests.
%% So, we accidentually do not test with mnesia enabled, when starting the
%% test cases from the clean test build.
%% TODO Stopping here would break a lot of tests, stop here once tests are fixed.
% mnesia:stop(),
ok
end.

init_mnesia() ->
%% Mnesia should not be running at this point, unless it is started by tests.
%% Ensure Mnesia is stopped
mnesia:stop(),
case mnesia:system_info(extra_db_nodes) of
[] ->
mnesia:create_schema([node()]);
_ ->
ok
end,
application:start(mnesia, permanent),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).

0 comments on commit 9bbbbe2

Please sign in to comment.