diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl new file mode 100644 index 000000000..0e9f9477a --- /dev/null +++ b/apps/emqx/include/asserts.hrl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(assertWaitEvent(Code, EventMatch, Timeout), + ?check_trace( + ?wait_async_action( + Code, + EventMatch, + Timeout + ), + fun(Trace) -> + ?assert( + lists:any( + fun + (EventMatch) -> true; + (_) -> false + end, + Trace + ) + ) + end + ) +). diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl index f58569a27..8417053f6 100644 --- a/apps/emqx_ft/src/emqx_ft_responder.erl +++ b/apps/emqx_ft/src/emqx_ft_responder.erl @@ -21,6 +21,8 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([start_link/0]). -export([ @@ -87,10 +89,13 @@ handle_call({unregister, Key}, _From, State) -> _ = erlang:cancel_timer(TRef), true = ets:delete(?TAB, Key), {reply, ok, State} - end. + end; +handle_call(Msg, _From, State) -> + ?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}), + {reply, {error, unknown_call}, State}. handle_cast(Msg, State) -> - ?SLOG(warning, #{msg => "unknown cast", cast_msg => Msg}), + ?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}), {noreply, State}. handle_info({timeout, TRef, {timeout, Key}}, State) -> @@ -100,12 +105,12 @@ handle_info({timeout, TRef, {timeout, Key}}, State) -> [{_, Action, TRef}] -> _ = erlang:cancel_timer(TRef), true = ets:delete(?TAB, Key), - %% TODO: safe apply - _ = Action(Key), + ok = safe_apply(Action, [Key]), + ?tp(ft_timeout_action_applied, #{key => Key}), {noreply, State} end; handle_info(Msg, State) -> - ?SLOG(warning, #{msg => "unknown message", info_msg => Msg}), + ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}), {noreply, State}. code_change(_OldVsn, State, _Extra) -> @@ -113,3 +118,20 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, _State) -> ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +safe_apply(Fun, Args) -> + try apply(Fun, Args) of + _ -> ok + catch + Class:Reason:Stacktrace -> + ?SLOG(error, #{ + msg => "safe_apply_failed", + class => Class, + reason => Reason, + stacktrace => Stacktrace + }) + end. diff --git a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl new file mode 100644 index 000000000..c08986120 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_responder_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_ft]), + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_ft]), + ok. + +init_per_testcase(_Case, Config) -> + Config. + +end_per_testcase(_Case, _Config) -> + ok. + +t_register_unregister(_Config) -> + Key = <<"test">>, + DefaultAction = fun(_) -> ok end, + ?assertEqual( + ok, + emqx_ft_responder:register(Key, DefaultAction, 1000) + ), + ?assertEqual( + {error, already_registered}, + emqx_ft_responder:register(Key, DefaultAction, 1000) + ), + ?assertEqual( + ok, + emqx_ft_responder:unregister(Key) + ), + ?assertEqual( + {error, not_found}, + emqx_ft_responder:unregister(Key) + ). + +t_timeout(_Config) -> + Key = <<"test">>, + Self = self(), + DefaultAction = fun(K) -> Self ! {timeout, K} end, + ok = emqx_ft_responder:register(Key, DefaultAction, 20), + receive + {timeout, Key} -> + ok + after 100 -> + ct:fail("emqx_ft_responder not called") + end, + ?assertEqual( + {error, not_found}, + emqx_ft_responder:unregister(Key) + ). + +t_action_exception(_Config) -> + Key = <<"test">>, + DefaultAction = fun(K) -> error({oops, K}) end, + + ?assertWaitEvent( + emqx_ft_responder:register(Key, DefaultAction, 10), + #{?snk_kind := ft_timeout_action_applied, key := <<"test">>}, + 1000 + ), + ?assertEqual( + {error, not_found}, + emqx_ft_responder:unregister(Key) + ). + +t_unknown_msgs(_Config) -> + Pid = whereis(emqx_ft_responder), + Pid ! {unknown_msg, <<"test">>}, + ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}), + ?assertEqual( + {error, unknown_call}, + gen_server:call(Pid, {unknown_call, <<"test">>}) + ).