Skip to content

Commit

Permalink
Merge pull request #4153 from esl/error-on-ping-timeout
Browse files Browse the repository at this point in the history
Fix error on ping timeout with stream management
  • Loading branch information
DenysGonchar authored Nov 8, 2023
2 parents f2c1fb1 + e0f3cd4 commit ab0b37f
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 24 deletions.
24 changes: 22 additions & 2 deletions big_tests/tests/mod_ping_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ all_tests() ->
active,
active_keep_alive,
server_ping_pong,
server_ping_pang
server_ping_pang,
service_unavailable_response
].

suite() ->
Expand Down Expand Up @@ -184,6 +185,25 @@ wrong_ping(Config) ->
escalus:assert(is_iq_error, [PingReq], PingResp)
end).

service_unavailable_response(Config) ->
escalus:fresh_story(Config, [{alice, 1}],
fun(Alice) ->
PingReq = wait_for_ping_req(Alice),
PingId = exml_query:attr(PingReq, <<"id">>),

ErrorStanzaBody = [#xmlel{name = <<"ping">>, attrs = [{<<"xmlns">>, ?NS_PING}]},
#xmlel{name = <<"error">>, attrs = [{<<"type">>, <<"cancel">>}],
children = [#xmlel{name = <<"service-unavailable">>,
attrs = [{<<"xmlns">>, ?NS_STANZA_ERRORS}]}]}],
ErrorStanza = escalus_stanza:set_id(
escalus_stanza:iq(domain(), <<"error">>, ErrorStanzaBody), PingId),
escalus_client:send(Alice, ErrorStanza),

TimeoutAction = ?config(timeout_action, Config),
check_connection(TimeoutAction, Alice),
escalus_client:kill_connection(Config, Alice)
end).

active(ConfigIn) ->
Domain = domain(),
HostType = domain_helper:host_type(mim),
Expand Down Expand Up @@ -265,7 +285,7 @@ wait_ping_interval(Ration) ->
ct:sleep(WaitTime).

check_connection(kill, Client) ->
false = escalus_connection:is_connected(Client);
mongoose_helper:wait_until(fun() -> escalus_connection:is_connected(Client) end, false);
check_connection(_, Client) ->
true = escalus_connection:is_connected(Client).

Expand Down
74 changes: 70 additions & 4 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
-define(LONG_TIMEOUT, 3600).
-define(SHORT_TIMEOUT, 1).
-define(SMALL_SM_BUFFER, 3).
-define(PING_REQUEST_TIMEOUT, 1).
-define(PING_INTERVAL, 3).

%%--------------------------------------------------------------------
%% Suite configuration
Expand All @@ -49,7 +51,7 @@ suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

all() ->
ct_helper:groups_to_all(groups()).
ct_helper:groups_to_all(groups()) ++ [ping_timeout].

groups() ->
[
Expand Down Expand Up @@ -166,6 +168,10 @@ init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleanin
dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
Config2 = register_some_smid_h(Config),
escalus:init_per_testcase(CN, Config2);
init_per_testcase(ping_timeout = CN, Config) ->
ok = rpc(mim(), meck, new, [mod_ping, [passthrough, no_link]]),
dynamic_modules:ensure_modules(host_type(), required_modules(Config, testcase, CN)),
escalus:init_per_testcase(CN, Config);
init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
escalus:init_per_testcase(CN, Config);
init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
Expand All @@ -183,6 +189,9 @@ end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_
end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
unregister_handler(),
escalus:end_per_testcase(CN, Config);
end_per_testcase(ping_timeout = CN, Config) ->
rpc(mim(), meck, unload, [mod_ping]),
escalus:end_per_testcase(CN, Config);
end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

Expand All @@ -194,8 +203,16 @@ required_modules(Scope, Name) ->
ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
end,
Backend = mongoose_helper:mnesia_or_rdbms_backend(),
[{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})}].
BaseModules = [
{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})}
],
case Name of
ping_timeout ->
BaseModules ++ [{mod_ping, config_parser_helper:mod_config(mod_ping, mod_ping_opts())}];
_ ->
BaseModules
end.

required_sm_opts(group, parallel) ->
#{ack_freq => never};
Expand All @@ -217,7 +234,10 @@ required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
#{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)};
required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
#{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}.
#{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)};
required_sm_opts(testcase, ping_timeout) ->
#{ack_freq => 1,
resume_timeout => ?SHORT_TIMEOUT}.

common_sm_opts() ->
Backend = ct_helper:get_internal_database(),
Expand All @@ -240,6 +260,12 @@ register_some_smid_h(Config) ->
TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)),
[{smid_test, TestSmids} | Config].

mod_ping_opts() ->
#{send_pings => true,
ping_interval => ?PING_INTERVAL,
ping_req_timeout => ?PING_REQUEST_TIMEOUT,
timeout_action => kill}.

%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -591,6 +617,35 @@ resend_unacked_after_resume_timeout(Config) ->
escalus_connection:stop(Bob),
escalus_connection:stop(NewAlice).

ping_timeout(Config) ->
%% make sure there are no leftover stanzas in the history
?assertEqual([], get_stanzas_filtered_by_mod_ping()),

%% connect Alice and wait for the session to close
Alice = connect_fresh(Config, alice, sr_presence),

escalus_client:wait_for_stanza(Alice),
ct:sleep(?PING_REQUEST_TIMEOUT + ?PING_INTERVAL + timer:seconds(1)),

%% attempt to resume the session after the connection drop
NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice),

%% after resume_timeout, we expect the session to be closed
escalus_connection:get_stanza(NewAlice, failed_resumption),

%% bind a new session and expect unacknowledged messages to be resent
escalus_session:session(escalus_session:bind(NewAlice)),
send_initial_presence(NewAlice),

%% check if the error stanza was handled by mod_ping
[Stanza] = get_stanzas_filtered_by_mod_ping(),
escalus:assert(is_iq_error, Stanza),
?assertNotEqual(undefined,
exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, <<"urn:xmpp:ping">>)),

%% stop the connection
escalus_connection:stop(NewAlice).

resume_expired_session_returns_correct_h(Config) ->
%% connect bob and alice
Bob = connect_fresh(Config, bob, sr_presence),
Expand Down Expand Up @@ -1272,6 +1327,17 @@ is_presence(Type) ->
three_texts() ->
[<<"msg-1">>, <<"msg-2">>, <<"msg-3">>].

get_stanzas_filtered_by_mod_ping() ->
History = rpc(mim(), meck, history, [mod_ping]),
[Stanza ||
{_Pid,
{_Mod,
filter_local_packet = _Func,
[{_, _, _, Stanza} = _Acc, _Params, _Extra] = _Args
},
{stop, drop} = _Result
} <- History
].
%%--------------------------------------------------------------------
%% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session"
%%--------------------------------------------------------------------
Expand Down
79 changes: 61 additions & 18 deletions src/mod_ping.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
-export([user_send_packet/3,
user_send_iq/3,
user_ping_response/3,
filter_local_packet/3,
iq_ping/5]).

%% Record that will be stored in the c2s state when the server pings the client,
Expand All @@ -40,7 +41,8 @@
%%====================================================================

hooks(HostType) ->
[{user_ping_response, HostType, fun ?MODULE:user_ping_response/3, #{}, 100}
[{user_ping_response, HostType, fun ?MODULE:user_ping_response/3, #{}, 100},
{filter_local_packet, HostType, fun ?MODULE:filter_local_packet/3, #{}, 100}
| c2s_hooks(HostType)].

-spec c2s_hooks(mongooseim:host_type()) -> gen_hook:hook_list(mongoose_c2s_hooks:fn()).
Expand Down Expand Up @@ -120,28 +122,51 @@ iq_ping(Acc, _From, _To, #iq{sub_el = SubEl} = IQ, _) ->
%% Hook callbacks
%%====================================================================

-spec filter_local_packet(Acc, Params, Extra) -> {ok, Acc} | {stop, drop} when
Acc :: mongoose_hooks:filter_packet_acc(),
Params :: map(),
Extra :: gen_hook:extra().
filter_local_packet({_, _, _, Stanza} = Acc, _Params, _Extra) ->
case is_ping_error(Stanza) of
true ->
?LOG_DEBUG(#{what => ping_error_received, acc => Acc}),
{stop, drop};
false ->
{ok, Acc}
end.

-spec user_send_iq(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
user_send_iq(Acc, #{c2s_data := StateData}, #{host_type := HostType}) ->
case {mongoose_acc:stanza_type(Acc),
mongoose_c2s:get_mod_state(StateData, ?MODULE)} of
{<<"result">>, {ok, #ping_handler{id = PingId, time = T0}}} ->
IqResponse = mongoose_acc:element(Acc),
IqId = exml_query:attr(IqResponse, <<"id">>),
case {IqId, PingId} of
{Id, Id} ->
Jid = mongoose_c2s:get_jid(StateData),
TDelta = erlang:monotonic_time(millisecond) - T0,
mongoose_hooks:user_ping_response(HostType, #{}, Jid, IqResponse, TDelta),
Action = {{timeout, ping_timeout}, cancel},
{stop, mongoose_c2s_acc:to_acc(Acc, actions, Action)};
_ ->
{ok, Acc}
end;
StanzaType = mongoose_acc:stanza_type(Acc),
ModState = mongoose_c2s:get_mod_state(StateData, ?MODULE),
handle_stanza(StanzaType, ModState, Acc, StateData, HostType).

handle_stanza(Type, {ok, PingHandler}, Acc, StateData, HostType) when Type == <<"result">>;
Type == <<"error">> ->
handle_ping_response(Type, PingHandler, Acc, StateData, HostType);
handle_stanza(_, _, Acc, _, _) ->
{ok, Acc}.

handle_ping_response(Type, #ping_handler{id = PingId, time = T0}, Acc, StateData, HostType) ->
IqResponse = mongoose_acc:element(Acc),
IqId = exml_query:attr(IqResponse, <<"id">>),
case IqId of
PingId ->
Jid = mongoose_c2s:get_jid(StateData),
TDelta = erlang:monotonic_time(millisecond) - T0,
mongoose_hooks:user_ping_response(HostType, #{}, Jid, IqResponse, TDelta),
Action = determine_action(Type),
{stop, mongoose_c2s_acc:to_acc(Acc, actions, Action)};
_ ->
{ok, Acc}
end.

determine_action(<<"result">>) ->
{{timeout, ping_timeout}, cancel};
determine_action(<<"error">>) ->
[{{timeout, ping_timeout}, cancel}, {{timeout, ping_error}, 0, fun ping_c2s_handler/2}].

-spec user_send_packet(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
user_send_packet(Acc, _Params, #{host_type := HostType}) ->
Expand Down Expand Up @@ -174,8 +199,15 @@ ping_c2s_handler(ping_timeout, StateData) ->
Jid = mongoose_c2s:get_jid(StateData),
HostType = mongoose_c2s:get_host_type(StateData),
mongoose_hooks:user_ping_response(HostType, #{}, Jid, timeout, 0),
case gen_mod:get_module_opt(HostType, ?MODULE, timeout_action) of
kill -> mongoose_c2s_acc:new(#{stop => {shutdown, ping_timeout}});
handle_ping_action(HostType, ping_timeout);
ping_c2s_handler(ping_error, StateData) ->
HostType = mongoose_c2s:get_host_type(StateData),
handle_ping_action(HostType, ping_error).

handle_ping_action(HostType, Reason) ->
TimeoutAction = gen_mod:get_module_opt(HostType, ?MODULE, timeout_action),
case TimeoutAction of
kill -> mongoose_c2s_acc:new(#{stop => {shutdown, Reason}});
_ -> mongoose_c2s_acc:new()
end.

Expand All @@ -200,3 +232,14 @@ ping_get(Id) ->
#xmlel{name = <<"iq">>,
attrs = [{<<"type">>, <<"get">>}, {<<"id">>, Id}],
children = [#xmlel{name = <<"ping">>, attrs = [{<<"xmlns">>, ?NS_PING}]}]}.

-spec is_ping_error(exml:element()) -> boolean().
is_ping_error(Stanza) ->
case exml_query:attr(Stanza, <<"type">>) of
<<"error">> ->
undefined =/= exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, ?NS_PING)
andalso
undefined =/= exml_query:subelement(Stanza, <<"error">>);
_ ->
false
end.

0 comments on commit ab0b37f

Please sign in to comment.