From b8cacd28336521db8f50a87898391bbd91c48b8e Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 22 Aug 2023 22:34:06 +0300 Subject: [PATCH] chore(ft): add tests for async reply registry --- apps/emqx/test/emqx_connection_SUITE.erl | 2 +- apps/emqx_ft/src/emqx_ft_app.erl | 2 +- apps/emqx_ft/src/emqx_ft_async_reply.erl | 15 +- .../test/emqx_ft_async_reply_SUITE.erl | 247 ++++++++++++++++++ 4 files changed, 257 insertions(+), 9 deletions(-) create mode 100644 apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index ea451eea5..2a96594e1 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -49,7 +49,7 @@ init_per_suite(Config) -> %% Meck Hooks ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]), ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), - ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end), + ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end), ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end), diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl index 9ef215bf9..114b4bff3 100644 --- a/apps/emqx_ft/src/emqx_ft_app.erl +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -22,7 +22,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_ft_sup:start_link(), - ok = emqx_ft_async_reply:create_table(), + ok = emqx_ft_async_reply:create_tables(), ok = emqx_ft_conf:load(), {ok, Sup}. diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl index 4eee2c544..f33558434 100644 --- a/apps/emqx_ft/src/emqx_ft_async_reply.erl +++ b/apps/emqx_ft/src/emqx_ft_async_reply.erl @@ -21,7 +21,8 @@ -include_lib("stdlib/include/ms_transform.hrl"). -export([ - create_tables/0 + create_tables/0, + info/0 ]). -export([ @@ -38,9 +39,9 @@ %% packets waiting for async workers --define(WORKER_TAB, emqx_ft_async_mons). --define(WORKER_KEY(MRef), ?WORKER_KEY(self(), MRef)). --define(WORKER_KEY(ChannelPid, MRef), {ChannelPid, MRef}). +-define(MON_TAB, emqx_ft_async_mons). +-define(MON_KEY(MRef), ?MON_KEY(self(), MRef)). +-define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}). %% async worker monitors by packet ids @@ -54,14 +55,14 @@ -spec create_tables() -> ok. create_tables() -> - _ = ets:new(?WORKER_TAB, [named_table, public, ordered_set]), + _ = ets:new(?MON_TAB, [named_table, public, ordered_set]), _ = ets:new(?PACKET_TAB, [named_table, public, ordered_set]), ok. -spec register(packet_id(), mon_ref(), timer_ref()) -> ok. register(PacketId, MRef, TRef) -> _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}), - _ = ets:insert(?WORKER_TAB, {?WORKER_KEY(MRef), PacketId, TRef}), + _ = ets:insert(?MON_TAB, {?MON_KEY(MRef), PacketId, TRef}), ok. -spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any(). @@ -73,7 +74,7 @@ with_new_packet(PacketId, Fun, Default) -> -spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found. take_by_mref(MRef) -> - case ets:take(?WORKER_TAB, ?WORKER_KEY(MRef)) of + case ets:take(?MON_TAB, ?MON_KEY(MRef)) of [{_, PacketId, TRef}] -> _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)), {ok, PacketId, TRef}; diff --git a/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl new file mode 100644 index 000000000..78a9b371c --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl @@ -0,0 +1,247 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_async_reply_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}}, + {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"} + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. + +init_per_testcase(_Case, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(_Case, _Config) -> + ok = snabbkaffe:stop(), + ok. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +t_register(_Config) -> + PacketId = 1, + MRef = make_ref(), + TRef = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + + ?assertEqual( + undefined, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + ?assertEqual( + ok, + emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined) + ), + + ?assertEqual( + {ok, PacketId, TRef}, + emqx_ft_async_reply:take_by_mref(MRef) + ). + +t_process_independence(_Config) -> + PacketId = 1, + MRef = make_ref(), + TRef = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + + Self = self(), + + spawn_link(fun() -> + Self ! emqx_ft_async_reply:take_by_mref(MRef) + end), + + Res1 = + receive + Msg1 -> Msg1 + end, + + ?assertEqual( + not_found, + Res1 + ), + + spawn_link(fun() -> + Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + end), + + Res2 = + receive + Msg2 -> Msg2 + end, + + ?assertEqual( + ok, + Res2 + ). + +t_take(_Config) -> + PacketId = 1, + MRef = make_ref(), + TRef = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + + ?assertEqual( + {ok, PacketId, TRef}, + emqx_ft_async_reply:take_by_mref(MRef) + ), + + ?assertEqual( + not_found, + emqx_ft_async_reply:take_by_mref(MRef) + ), + + ?assertEqual( + ok, + emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined) + ). + +t_cleanup(_Config) -> + PacketId = 1, + MRef0 = make_ref(), + TRef0 = make_ref(), + MRef1 = make_ref(), + TRef1 = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0), + + Self = self(), + + Pid = spawn_link(fun() -> + ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1), + receive + kickoff -> + ?assertEqual( + undefined, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + ?assertEqual( + {ok, PacketId, TRef1}, + emqx_ft_async_reply:take_by_mref(MRef1) + ), + + Self ! done + end + end), + + ?assertEqual( + undefined, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + ok = emqx_ft_async_reply:deregister_all(Self), + + ?assertEqual( + ok, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + Pid ! kickoff, + + receive + done -> ok + end. + +t_reply_by_tiemout(_Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = emqx_ft_test_helpers:start_client(ClientId, node()), + + SleepForever = fun() -> + Ref = make_ref(), + receive + Ref -> ok + end + end, + + ok = meck:new(emqx_ft_storage, [passthrough]), + meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end), + + FinTopic = <<"$file/fakeid/fin/999999">>, + + ?assertMatch( + {ok, #{reason_code_name := unspecified_error}}, + emqtt:publish(C, FinTopic, <<>>, 1) + ), + + meck:unload(emqx_ft_storage), + emqtt:stop(C). + +t_cleanup_by_cm(_Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = emqx_ft_test_helpers:start_client(ClientId, node()), + + ok = meck:new(emqx_ft_storage, [passthrough]), + meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end), + + FinTopic = <<"$file/fakeid/fin/999999">>, + + [ClientPid] = emqx_cm:lookup_channels(ClientId), + + ?assertWaitEvent( + begin + emqtt:publish(C, FinTopic, <<>>, 1), + exit(ClientPid, kill) + end, + #{?snk_kind := emqx_cm_clean_down, client_id := ClientId}, + 1000 + ), + + ?assertEqual( + {0, 0}, + emqx_ft_async_reply:info() + ), + + meck:unload(emqx_ft_storage). + +t_unrelated_events(_Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = emqx_ft_test_helpers:start_client(ClientId, node()), + [ClientPid] = emqx_cm:lookup_channels(ClientId), + + erlang:monitor(process, ClientPid), + + ClientPid ! {'DOWN', make_ref(), process, self(), normal}, + ClientPid ! {timeout, make_ref(), unknown_timer_event}, + + ?assertNotReceive( + {'DOWN', _Ref, process, ClientPid, _Reason}, + 500 + ), + + emqtt:stop(C).