From 1c5ac33f16325a9b148800e6059a299fe2a8bb47 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 28 Sep 2022 09:39:14 +0200 Subject: [PATCH 1/9] chore: upgrade eredist to 1.2.9 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 19d199a6e..1c0f0b73a 100644 --- a/rebar.config +++ b/rebar.config @@ -42,7 +42,7 @@ , {redbug, "2.0.7"} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} - , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}} + , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} From 4743bcdd078f3059441a7dc10de8ddd205d5e588 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 28 Sep 2022 16:47:20 +0200 Subject: [PATCH 2/9] docs: update change log v43 --- CHANGES-4.3.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index eb5587d78..2edc931d5 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -36,6 +36,18 @@ File format: - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) +- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) + In this change, it also included more changes in redis client: + - Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19). + Also added support for eredis to accept an anonymous function as password instead of + passing around plaintext args which may get dumpped to crash logs (hard to predict where). + This change also added `format_status` callback for `gen_server` states which hold plaintext + password so the process termination log and `sys:get_status` will print '******' instead of + the password to console. + - Avoid pool name clasing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) + Same `format_status` callback is added here too for `gen_server`s which hold password in + their state. + ## v4.3.20 ### Bug fixes From 097d4ef120b7ef3f9ae0fbee04db5ef1ddb29500 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 28 Sep 2022 19:03:25 +0200 Subject: [PATCH 3/9] docs: Update CHANGES-4.3.md Co-authored-by: Thales Macedo Garitezi --- CHANGES-4.3.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 2edc931d5..713f89f37 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -44,7 +44,7 @@ File format: This change also added `format_status` callback for `gen_server` states which hold plaintext password so the process termination log and `sys:get_status` will print '******' instead of the password to console. - - Avoid pool name clasing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) + - Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) Same `format_status` callback is added here too for `gen_server`s which hold password in their state. From ebf131266a74a982d3d272e2fdfec254bc77c7c7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 20 Sep 2022 11:37:28 -0300 Subject: [PATCH 4/9] test: fix flaky shared sub test case the route replication is async, added a function to wait for it --- src/emqx_misc.erl | 112 +++++++++++++++++++++++++++++++++ test/emqx_misc_SUITE.erl | 33 ++++++++++ test/emqx_node_helpers.erl | 40 +++++++++++- test/emqx_shared_sub_SUITE.erl | 1 + 4 files changed, 185 insertions(+), 1 deletion(-) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index bfda5bb38..d256569fb 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -46,6 +46,8 @@ , maybe_parse_ip/1 , ipv6_probe/1 , ipv6_probe/2 + , pmap/2 + , pmap/3 ]). -export([ bin2hexstr_A_F/1 @@ -56,7 +58,13 @@ -export([ is_sane_id/1 ]). +-export([ + nolink_apply/1, + nolink_apply/2 +]). + -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). +-define(DEFAULT_PMAP_TIMEOUT, 5000). -spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. is_sane_id(Str) -> @@ -332,6 +340,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0; hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10; hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. +%% @doc Like lists:map/2, only the callback function is evaluated +%% concurrently. +-spec pmap(fun((A) -> B), list(A)) -> list(B). +pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> + pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT). + +-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B). +pmap(Fun, List, Timeout) when + is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 +-> + nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout). + +%% @doc Delegate a function to a worker process. +%% The function may spawn_link other processes but we do not +%% want the caller process to be linked. +%% This is done by isolating the possible link with a not-linked +%% middleman process. +nolink_apply(Fun) -> nolink_apply(Fun, infinity). + +%% @doc Same as `nolink_apply/1', with a timeout. +-spec nolink_apply(function(), timer:timeout()) -> term(). +nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> + Caller = self(), + ResRef = make_ref(), + Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)), + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after Timeout -> + exit(Middleman, kill), + exit(timeout) + end. + +-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_middleman_fn(Caller, Fun, ResRef) -> + fun() -> + process_flag(trap_exit, true), + CallerMRef = erlang:monitor(process, Caller), + Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)), + receive + {'DOWN', CallerMRef, process, _, _} -> + %% For whatever reason, if the caller is dead, + %% there is no reason to continue + exit(Worker, kill), + exit(normal); + {'EXIT', Worker, normal} -> + exit(normal); + {'EXIT', Worker, Reason} -> + %% worker exited with some reason other than 'normal' + _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + exit(normal) + end + end. + +-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_worker_fn(Caller, Fun, ResRef) -> + fun() -> + Res = + try + {normal, Fun()} + catch + C:E:S -> + {exception, {C, E, S}} + end, + _ = erlang:send(Caller, {ResRef, Res}), + exit(normal) + end. + +do_parallel_map(Fun, List) -> + Parent = self(), + PidList = lists:map( + fun(Item) -> + erlang:spawn_link( + fun() -> + Res = + try + {normal, Fun(Item)} + catch + C:E:St -> + {exception, {C, E, St}} + end, + Parent ! {self(), Res} + end + ) + end, + List + ), + lists:foldr( + fun(Pid, Acc) -> + receive + {Pid, {normal, Result}} -> + [Result | Acc]; + {Pid, {exception, {C, E, St}}} -> + erlang:raise(C, E, St) + end + end, + [], + PidList + ). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index 0eec55faa..e9dd3e132 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -146,3 +146,36 @@ t_now_to_secs(_) -> t_now_to_ms(_) -> ?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))). +t_pmap_normal(_) -> + ?assertEqual( + [5, 7, 9], + emqx_misc:pmap( + fun({A, B}) -> A + B end, + [{2, 3}, {3, 4}, {4, 5}] + ) + ). + +t_pmap_timeout(_) -> + ?assertExit( + timeout, + emqx_misc:pmap( + fun + (timeout) -> ct:sleep(1000); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, timeout], + 100 + ) + ). + +t_pmap_exception(_) -> + ?assertError( + foobar, + emqx_misc:pmap( + fun + (error) -> error(foobar); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, error] + ) + ). diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index 16d4be000..d1b7a99cf 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -22,7 +22,9 @@ -export([start_slave/1, start_slave/2, - stop_slave/1]). + stop_slave/1, + wait_for_synced_routes/3 + ]). start_slave(Name) -> start_slave(Name, #{}). @@ -81,3 +83,39 @@ setup_node(Node, #{} = Opts) -> , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) ), ok. + +%% Routes are replicated async. +%% Call this function to wait for nodes in the cluster to have the same view +%% for a given topic. +wait_for_synced_routes(Nodes, Topic, Timeout) -> + F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end, + emqx_misc:nolink_apply(F, Timeout). + +do_wait_for_synced_routes(Nodes, Topic) -> + PerNodeView0 = + lists:map( + fun(Node) -> + {rpc:call(Node, emqx_router, match_routes, [Topic]), Node} + end, Nodes), + PerNodeView = lists:keysort(1, PerNodeView0), + case check_consistent_view(PerNodeView) of + {ok, OneView} -> + io:format(user, "~p~n", [OneView]), + ok; + {error, Reason}-> + ct:pal("inconsistent_routes_view ~p", [Reason]), + timer:sleep(10), + do_wait_for_synced_routes(Nodes, Topic) + end. + +check_consistent_view(PerNodeView) -> + check_consistent_view(PerNodeView, []). + +check_consistent_view([], Acc) -> {ok, Acc}; +check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) -> + check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]); +check_consistent_view([{View, Node} | Rest], Acc) -> + check_consistent_view(Rest, [{View, Node} | Acc]). + +add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes]; +add_to_list(Node, Node1) -> [Node, Node1]. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5e79ee983..2bf0cf048 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -407,6 +407,7 @@ t_local_fallback(_) -> Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}), + ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)), [{share, Topic, {ok, 1}}] = emqx:publish(Message1), {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]), From 402553b95ace9f2fb68eb0b5d4f0d19dae0b2003 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 09:19:26 +0200 Subject: [PATCH 5/9] test: fix false one-view check --- test/emqx_node_helpers.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index d1b7a99cf..15eaddc70 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -100,10 +100,10 @@ do_wait_for_synced_routes(Nodes, Topic) -> PerNodeView = lists:keysort(1, PerNodeView0), case check_consistent_view(PerNodeView) of {ok, OneView} -> - io:format(user, "~p~n", [OneView]), + ct:pal(user, "consistent_routes_view~n~p", [OneView]), ok; {error, Reason}-> - ct:pal("inconsistent_routes_view ~p", [Reason]), + ct:pal(user, "inconsistent_routes_view~n~p", [Reason]), timer:sleep(10), do_wait_for_synced_routes(Nodes, Topic) end. @@ -111,7 +111,8 @@ do_wait_for_synced_routes(Nodes, Topic) -> check_consistent_view(PerNodeView) -> check_consistent_view(PerNodeView, []). -check_consistent_view([], Acc) -> {ok, Acc}; +check_consistent_view([], [OneView]) -> {ok, OneView}; +check_consistent_view([], MoreThanOneView) -> {error, MoreThanOneView}; check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) -> check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]); check_consistent_view([{View, Node} | Rest], Acc) -> From c39116c7a508b256cf0f53e2e8b572a937b192c3 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 09:22:10 +0200 Subject: [PATCH 6/9] test: fix bad ct:pal call --- test/emqx_node_helpers.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index 15eaddc70..b516e3752 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -100,10 +100,10 @@ do_wait_for_synced_routes(Nodes, Topic) -> PerNodeView = lists:keysort(1, PerNodeView0), case check_consistent_view(PerNodeView) of {ok, OneView} -> - ct:pal(user, "consistent_routes_view~n~p", [OneView]), + ct:pal("consistent_routes_view~n~p", [OneView]), ok; {error, Reason}-> - ct:pal(user, "inconsistent_routes_view~n~p", [Reason]), + ct:pal("inconsistent_routes_view~n~p", [Reason]), timer:sleep(10), do_wait_for_synced_routes(Nodes, Topic) end. From 2e28d5e73ed0f8b051413aaf3bd8f0cff60865fc Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 13:00:41 +0200 Subject: [PATCH 7/9] test: fix more flaky test in share sub SUITE --- test/emqx_node_helpers.erl | 47 ++++++++++++----- test/emqx_shared_sub_SUITE.erl | 94 +++++++++++++++++++++++----------- 2 files changed, 97 insertions(+), 44 deletions(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index b516e3752..d68fee62e 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -30,23 +30,44 @@ start_slave(Name) -> start_slave(Name, #{}). start_slave(Name, Opts) -> - {ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()), - [{kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 10000}, - {startup_timeout, 10000}, - {erl_flags, ebin_path()}]), - + Node = make_node_name(Name), + case ct_slave:start(Node, [{kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()}]) of + {ok, _} -> + ok; + {error, started_not_connected, _} -> + ok + end, pong = net_adm:ping(Node), setup_node(Node, Opts), Node. -stop_slave(Node) -> - rpc:call(Node, ekka, leave, []), - ct_slave:stop(Node). +make_node_name(Name) -> + case string:tokens(atom_to_list(Name), "@") of + [_Name, _Host] -> + %% the name already has a @ + Name; + _ -> + list_to_atom(atom_to_list(Name) ++ "@" ++ host()) + end. + +stop_slave(Node0) -> + Node = make_node_name(Node0), + case rpc:call(Node, ekka, leave, []) of + ok -> ok; + {badrpc, nodedown} -> ok + end, + case ct_slave:stop(Node) of + {ok, _} -> ok; + {error, not_started, _} -> ok + end. host() -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. ebin_path() -> string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). @@ -70,10 +91,10 @@ setup_node(Node, #{} = Opts) -> end, EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), - [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_modules, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), - rpc:call(Node, ekka, join, [node()]), + ok = rpc:call(Node, ekka, join, [node()]), %% Sanity check. Assert that `gen_rpc' is set up correctly: ?assertEqual( Node diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 2bf0cf048..9ffce523d 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -40,6 +40,9 @@ init_per_suite(Config) -> PortDiscovery = application:get_env(gen_rpc, port_discovery), application:set_env(gen_rpc, port_discovery, stateless), application:ensure_all_started(gen_rpc), + %% ensure emqx_moduels' app modules are loaded + %% so the mnesia tables are created + ok = load_app(emqx_modules), emqx_ct_helpers:start_apps([]), [{port_discovery, PortDiscovery} | Config]. @@ -50,32 +53,45 @@ end_per_suite(Config) -> _ -> ok end. -t_is_ack_required(_) -> +init_per_testcase(Case, Config) -> + try + ?MODULE:Case({'init', Config}) + catch + error : function_clause -> + Config + end. + +end_per_testcase(Case, Config) -> + try + ?MODULE:Case({'end', Config}) + catch + error : function_clause -> + ok + end. + +t_is_ack_required(Config) when is_list(Config) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). -t_maybe_nack_dropped(_) -> +t_maybe_nack_dropped(Config) when is_list(Config) -> ?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). -t_nack_no_connection(_) -> +t_nack_no_connection(Config) when is_list(Config) -> Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). -t_maybe_ack(_) -> +t_maybe_ack(Config) when is_list(Config) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). -% t_subscribers(_) -> -% error('TODO'). - -t_random_basic(_) -> +t_random_basic(Config) when is_list(Config) -> ok = ensure_config(random), ClientId = <<"ClientId">>, Topic = <<"foo">>, @@ -105,7 +121,7 @@ t_random_basic(_) -> %% After the connection for the 2nd session is also closed, %% i.e. when all clients are offline, the following message(s) %% should be delivered randomly. -t_no_connection_nack(_) -> +t_no_connection_nack(Config) when is_list(Config) -> ok = ensure_config(sticky), Publisher = <<"publisher">>, Subscriber1 = <<"Subscriber1">>, @@ -171,27 +187,27 @@ t_no_connection_nack(_) -> % emqx_sm:close_session(SPid2), ok. -t_random(_) -> +t_random(Config) when is_list(Config) -> ok = ensure_config(random, true), test_two_messages(random). -t_round_robin(_) -> +t_round_robin(Config) when is_list(Config) -> ok = ensure_config(round_robin, true), test_two_messages(round_robin). -t_sticky(_) -> +t_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky, true), test_two_messages(sticky). -t_hash(_) -> +t_hash(Config) when is_list(Config) -> ok = ensure_config(hash, false), test_two_messages(hash). -t_hash_clinetid(_) -> +t_hash_clinetid(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). -t_hash_topic(_) -> +t_hash_topic(Config) when is_list(Config) -> ok = ensure_config(hash_topic, false), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -230,7 +246,7 @@ t_hash_topic(_) -> ok. %% if the original subscriber dies, change to another one alive -t_not_so_sticky(_) -> +t_not_so_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) -> <<"not yet?">> end. -t_dispatch(_) -> +t_dispatch(Config) when is_list(Config) -> ok = ensure_config(random), Topic = <<"foo">>, ?assertEqual({error, no_subscribers}, @@ -312,18 +328,13 @@ t_dispatch(_) -> ?assertEqual({ok, 1}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})). -% t_unsubscribe(_) -> -% error('TODO'). - -% t_subscribe(_) -> -% error('TODO'). -t_uncovered_func(_) -> +t_uncovered_func(Config) when is_list(Config) -> ignored = gen_server:call(emqx_shared_sub, ignored), ok = gen_server:cast(emqx_shared_sub, ignored), ignored = emqx_shared_sub ! ignored, {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. -t_per_group_config(_) -> +t_per_group_config(Config) when is_list(Config) -> ok = ensure_group_config(#{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -342,8 +353,8 @@ t_per_group_config(_) -> test_two_messages(round_robin, <<"round_robin_group">>), test_two_messages(round_robin, <<"round_robin_group">>). -t_local(_) -> - Node = start_slave('local_shared_sub_test19', 21884), +t_local({'init', Config}) -> + Node = start_slave(local_shared_sub_test19, 21884), GroupConfig = #{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -352,7 +363,11 @@ t_local(_) -> }, ok = ensure_group_config(Node, GroupConfig), ok = ensure_group_config(GroupConfig), - + [{slave_node, Node} | Config]; +t_local({'end', _Config}) -> + ok = stop_slave(local_shared_sub_test19); +t_local(Config) when is_list(Config) -> + Node = proplists:get_value(slave_node, Config), Topic = <<"local_foo1/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -388,7 +403,7 @@ t_local(_) -> ?assertNotEqual(UsedSubPid1, UsedSubPid2), ok. -t_local_fallback(_) -> +t_local_fallback({'init', Config}) -> ok = ensure_group_config(#{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -396,10 +411,15 @@ t_local_fallback(_) -> <<"sticky_group">> => sticky }), + Node = start_slave(local_fallback_shared_sub_test19, 11885), + [{slave_node, Node} | Config]; +t_local_fallback({'end', _}) -> + ok = stop_slave(local_fallback_shared_sub_test19); +t_local_fallback(Config) when is_list(Config) -> Topic = <<"local_foo2/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - Node = start_slave('local_fallback_shared_sub_test19', 11885), + Node = proplists:get_value(slave_node, Config), {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, _} = emqtt:connect(ConnPid1), @@ -423,7 +443,7 @@ t_local_fallback(_) -> %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK -t_redispatch(_) -> +t_redispatch(Config) when is_list(Config) -> ok = ensure_config(sticky, true), application:set_env(emqx, shared_dispatch_ack_enabled, true), @@ -454,7 +474,7 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. -t_dispatch_when_inflights_are_full(_) -> +t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> ok = ensure_config(round_robin, true), Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, @@ -537,8 +557,20 @@ recv_msgs(Count, Msgs) -> end. start_slave(Name, Port) -> + ok = emqx_ct_helpers:start_apps([emqx_modules]), Listeners = [#{listen_on => {{127,0,0,1}, Port}, + start_apps => [emqx, emqx_modules], name => "internal", opts => [{zone,internal}], proto => tcp}], emqx_node_helpers:start_slave(Name, #{listeners => Listeners}). + +stop_slave(Name) -> + emqx_node_helpers:stop_slave(Name). + +load_app(App) -> + case application:load(App) of + ok -> ok; + {error, {already_loaded, _}} -> ok; + {error, Reason} -> error({failed_to_load_app, App, Reason}) + end. From 7423646191aefa90def937f560f9a1977c045f73 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 17:05:35 +0200 Subject: [PATCH 8/9] test: ensure emqx_modules app is started in ct-slave node --- test/emqx_node_helpers.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index d68fee62e..f7c048d51 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -18,7 +18,7 @@ -include_lib("eunit/include/eunit.hrl"). --define(SLAVE_START_APPS, [emqx]). +-define(SLAVE_START_APPS, [emqx, emqx_modules]). -export([start_slave/1, start_slave/2, @@ -91,7 +91,7 @@ setup_node(Node, #{} = Opts) -> end, EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), - [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_modules, emqx]], + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), ok = rpc:call(Node, ekka, join, [node()]), From ea6f2bd8d72e292d7337dabe6ae9bbc899d60eb9 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 22:01:52 +0200 Subject: [PATCH 9/9] test: allow starting ct-slave without join cluster This is to test/inspect states before/after join --- test/emqx_node_helpers.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index f7c048d51..ad530b3d7 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -18,6 +18,7 @@ -include_lib("eunit/include/eunit.hrl"). +%% modules is included because code is called before cluster join -define(SLAVE_START_APPS, [emqx, emqx_modules]). -export([start_slave/1, @@ -94,7 +95,12 @@ setup_node(Node, #{} = Opts) -> [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), - ok = rpc:call(Node, ekka, join, [node()]), + case maps:get(no_join, Opts, false) of + true -> + ok; + false -> + ok = rpc:call(Node, ekka, join, [node()]) + end, %% Sanity check. Assert that `gen_rpc' is set up correctly: ?assertEqual( Node