From 51ad27cb4bb9fe3539dc39a0f160582ee43a9fcb Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 4 Jan 2023 12:11:35 -0300 Subject: [PATCH] test(retainer): assert that retained messages are not lost when changing storage type --- .../src/emqx_retainer_mnesia.erl | 4 +- .../test/emqx_retainer_api_SUITE.erl | 59 +++++++++++++++---- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 69a6a877a..cadb9110f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -146,7 +146,9 @@ store_retained(_, Msg = #message{topic = Topic}) -> reason => table_is_full }); false -> - do_store_retained(Msg, Tokens, ExpiryTime) + do_store_retained(Msg, Tokens, ExpiryTime), + ?tp(message_retained, #{topic => Topic}), + ok end. clear_expired(_) -> diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index a64e9a7df..ba96887a2 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -31,7 +31,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), application:load(emqx_conf), ok = ekka:start(), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), @@ -104,11 +103,12 @@ t_messages(_) -> end, ?check_trace( - ?wait_async_action( - lists:foreach(Each, lists:seq(1, 5)), - #{?snk_kind := message_retained, topic := <<"retained/A">>}, - 500 - ), + {ok, {ok, _}} = + ?wait_async_action( + lists:foreach(Each, lists:seq(1, 5)), + #{?snk_kind := message_retained, topic := <<"retained/A">>}, + 500 + ), [] ), @@ -150,11 +150,12 @@ t_messages_page(_) -> end, ?check_trace( - ?wait_async_action( - lists:foreach(Each, lists:seq(1, 5)), - #{?snk_kind := message_retained, topic := <<"retained/A">>}, - 500 - ), + {ok, {ok, _}} = + ?wait_async_action( + lists:foreach(Each, lists:seq(1, 5)), + #{?snk_kind := message_retained, topic := <<"retained/A">>}, + 500 + ), [] ), Page = 4, @@ -238,6 +239,23 @@ t_change_storage_type(_Config) -> ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)), ?assertEqual(ram_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)), ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX, storage_type)), + %% insert some retained messages + {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C0), + ok = snabbkaffe:start_trace(), + Topic = <<"retained">>, + Payload = <<"retained">>, + {ok, {ok, _}} = + ?wait_async_action( + emqtt:publish(C0, Topic, Payload, [{qos, 0}, {retain, true}]), + #{?snk_kind := message_retained, topic := Topic}, + 500 + ), + emqtt:stop(C0), + ok = snabbkaffe:stop(), + {ok, MsgsJson0} = request_api(get, api_path(["mqtt", "retainer", "messages"])), + #{data := Msgs0, meta := _} = decode_json(MsgsJson0), + ?assertEqual(1, length(Msgs0)), ChangedConf = emqx_map_lib:deep_merge( RawConf, @@ -267,6 +285,25 @@ t_change_storage_type(_Config) -> ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)), ?assertEqual(disc_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)), ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX, storage_type)), + %% keep retained messages + {ok, MsgsJson1} = request_api(get, api_path(["mqtt", "retainer", "messages"])), + #{data := Msgs1, meta := _} = decode_json(MsgsJson1), + ?assertEqual(1, length(Msgs1)), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + {ok, _, _} = emqtt:subscribe(C1, Topic), + + receive + {publish, #{topic := T, payload := P, retain := R}} -> + ?assertEqual(Payload, P), + ?assertEqual(Topic, T), + ?assert(R), + ok + after 500 -> + emqtt:stop(C1), + ct:fail("should have preserved retained messages") + end, + emqtt:stop(C1), ok.