From 59b94788bf7c5815f1fefcd7258e6746730e1845 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 11 Sep 2023 10:35:22 -0300 Subject: [PATCH] refactor: address review comments --- apps/emqx/include/emqx.hrl | 2 +- apps/emqx/integration_test/emqx_ds_SUITE.erl | 40 +++++++++---------- .../emqx_ps_ds_int.hrl | 6 +-- .../src/emqx_persistent_session_ds_router.erl | 4 ++ .../test/emqx_persistent_messages_SUITE.erl | 35 ++++++++-------- ...mqx_persistent_session_ds_router_SUITE.erl | 2 +- 6 files changed, 46 insertions(+), 43 deletions(-) diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 7c3eed70b..664ec5803 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,7 +23,7 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). --define(PS_ROUTER_SHARD, ps_router_shard). +-define(PS_ROUTER_SHARD, persistent_session_router_shard). %% Banner %%-------------------------------------------------------------------- diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index b4b03226e..264cbde14 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -144,13 +144,13 @@ t_non_persistent_session_subscription(_Config) -> SubTopicFilter = <<"t/#">>, ?check_trace( begin - ct:pal("starting"), + ?tp(notice, "starting", #{}), Client = start_client(#{ clientid => ClientId, properties => #{'Session-Expiry-Interval' => 0} }), {ok, _} = emqtt:connect(Client), - ct:pal("subscribing"), + ?tp(notice, "subscribing", #{}), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), IteratorRefs = get_all_iterator_refs(node()), IteratorIds = get_all_iterator_ids(node()), @@ -198,7 +198,7 @@ t_session_subscription_idempotency(Config) -> spawn_link(fun() -> ?tp(will_restart_node, #{}), - ct:pal("restarting node ~p", [Node1]), + ?tp(notice, "restarting node", #{node => Node1}), true = monitor_node(Node1, true), ok = erpc:call(Node1, init, restart, []), receive @@ -207,10 +207,10 @@ t_session_subscription_idempotency(Config) -> after 10_000 -> ct:fail("node ~p didn't stop", [Node1]) end, - ct:pal("waiting for nodeup ~p", [Node1]), + ?tp(notice, "waiting for nodeup", #{node => Node1}), wait_nodeup(Node1), wait_gen_rpc_down(Node1Spec), - ct:pal("restarting apps on ~p", [Node1]), + ?tp(notice, "restarting apps", #{node => Node1}), Apps = maps:get(apps, Node1Spec), ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), @@ -218,15 +218,15 @@ t_session_subscription_idempotency(Config) -> %% end.... ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), ok = snabbkaffe:forward_trace(Node1), - ct:pal("node ~p restarted", [Node1]), + ?tp(notice, "node restarted", #{node => Node1}), ?tp(restarted_node, #{}), ok end), - ct:pal("starting 1"), + ?tp(notice, "starting 1", #{}), Client0 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client0), - ct:pal("subscribing 1"), + ?tp(notice, "subscribing 1", #{}), process_flag(trap_exit, true), catch emqtt:subscribe(Client0, SubTopicFilter, qos2), receive @@ -237,10 +237,10 @@ t_session_subscription_idempotency(Config) -> process_flag(trap_exit, false), {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), - ct:pal("starting 2"), + ?tp(notice, "starting 2", #{}), Client1 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client1), - ct:pal("subscribing 2"), + ?tp(notice, "subscribing 2", #{}), {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), ok = emqtt:stop(Client1), @@ -286,7 +286,7 @@ t_session_unsubscription_idempotency(Config) -> spawn_link(fun() -> ?tp(will_restart_node, #{}), - ct:pal("restarting node ~p", [Node1]), + ?tp(notice, "restarting node", #{node => Node1}), true = monitor_node(Node1, true), ok = erpc:call(Node1, init, restart, []), receive @@ -295,10 +295,10 @@ t_session_unsubscription_idempotency(Config) -> after 10_000 -> ct:fail("node ~p didn't stop", [Node1]) end, - ct:pal("waiting for nodeup ~p", [Node1]), + ?tp(notice, "waiting for nodeup", #{node => Node1}), wait_nodeup(Node1), wait_gen_rpc_down(Node1Spec), - ct:pal("restarting apps on ~p", [Node1]), + ?tp(notice, "restarting apps", #{node => Node1}), Apps = maps:get(apps, Node1Spec), ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), @@ -306,17 +306,17 @@ t_session_unsubscription_idempotency(Config) -> %% end.... ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), ok = snabbkaffe:forward_trace(Node1), - ct:pal("node ~p restarted", [Node1]), + ?tp(notice, "node restarted", #{node => Node1}), ?tp(restarted_node, #{}), ok end), - ct:pal("starting 1"), + ?tp(notice, "starting 1", #{}), Client0 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client0), - ct:pal("subscribing 1"), + ?tp(notice, "subscribing 1", #{}), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2), - ct:pal("unsubscribing 1"), + ?tp(notice, "unsubscribing 1", #{}), process_flag(trap_exit, true), catch emqtt:unsubscribe(Client0, SubTopicFilter), receive @@ -327,12 +327,12 @@ t_session_unsubscription_idempotency(Config) -> process_flag(trap_exit, false), {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), - ct:pal("starting 2"), + ?tp(notice, "starting 2", #{}), Client1 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client1), - ct:pal("subscribing 2"), + ?tp(notice, "subscribing 2", #{}), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), - ct:pal("unsubscribing 2"), + ?tp(notice, "unsubscribing 2", #{}), {{ok, _, [?RC_SUCCESS]}, {ok, _}} = ?wait_async_action( emqtt:unsubscribe(Client1, SubTopicFilter), diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl index 157be320e..7815e25f5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,14 +19,12 @@ -define(PS_ROUTER_TAB, emqx_ds_ps_router). -define(PS_FILTERS_TAB, emqx_ds_ps_filters). --type dest() :: emqx_ds:session_id(). - -record(ps_route, { topic :: binary(), dest :: emqx_ds:session_id() }). -record(ps_routeidx, { - entry :: emqx_topic_index:key(dest()), + entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()), unused = [] :: nil() }). diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl index 2bcaad0ea..4400b23ff 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -39,6 +39,10 @@ -export([has_route/2]). -endif. +-type dest() :: emqx_ds:session_id(). + +-export_type([dest/0]). + %%-------------------------------------------------------------------- %% Table Initialization %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index c4c689704..a04a0d4de 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -24,6 +24,8 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(DS_SHARD, <<"local">>). all() -> @@ -34,33 +36,31 @@ init_per_suite(Config) -> %% TODO: remove after other suites start to use `emx_cth_suite' application:stop(emqx), application:stop(emqx_durable_storage), - TCApps = emqx_cth_suite:start( - app_specs(), - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - [{tc_apps, TCApps} | Config]. + Config. -end_per_suite(Config) -> - TCApps = ?config(tc_apps, Config), - emqx_cth_suite:stop(TCApps), +end_per_suite(_Config) -> ok. -init_per_testcase(t_session_subscription_iterators, Config) -> +init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Cluster = cluster(), - Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), [{nodes, Nodes} | Config]; -init_per_testcase(_TestCase, Config) -> - Config. +init_per_testcase(TestCase, Config) -> + Apps = emqx_cth_suite:start( + app_specs(), + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + [{apps, Apps} | Config]. end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), + emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), - ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>), - delete_all_messages(), ok; -end_per_testcase(_TestCase, _Config) -> - ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>), - delete_all_messages(), +end_per_testcase(_TestCase, Config) -> + Apps = ?config(apps, Config), + emqx_common_test_helpers:call_janitor(60_000), + emqx_cth_suite:stop(Apps), ok. t_messages_persisted(_Config) -> @@ -256,6 +256,7 @@ connect(Opts0 = #{}) -> Defaults = #{proto_ver => v5}, Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), {ok, Client} = emqtt:start_link(Opts), + on_exit(fun() -> catch emqtt:stop(Client) end), {ok, _} = emqtt:connect(Client), Client. diff --git a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl index 445ccd9a4..742dc5b41 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.