Merge remote-tracking branch 'origin/release-v43' into release-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-10-05 12:55:21 +02:00
commit c8920b5499
6 changed files with 302 additions and 46 deletions

View File

@ -36,6 +36,18 @@ File format:
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
In this change, it also included more changes in redis client:
- Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19).
Also added support for eredis to accept an anonymous function as password instead of
passing around plaintext args which may get dumpped to crash logs (hard to predict where).
This change also added `format_status` callback for `gen_server` states which hold plaintext
password so the process termination log and `sys:get_status` will print '******' instead of
the password to console.
- Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22)
Same `format_status` callback is added here too for `gen_server`s which hold password in
their state.
## v4.3.20
### Bug fixes

View File

@ -42,7 +42,7 @@
, {redbug, "2.0.7"}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}

View File

@ -48,6 +48,8 @@
, maybe_parse_ip/1
, ipv6_probe/1
, ipv6_probe/2
, pmap/2
, pmap/3
]).
-export([ bin2hexstr_a_f_upper/1
@ -58,7 +60,13 @@
-export([ is_sane_id/1
]).
-export([
nolink_apply/1,
nolink_apply/2
]).
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
-define(DEFAULT_PMAP_TIMEOUT, 5000).
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
is_sane_id(Str) ->
@ -334,6 +342,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
%% @doc Like lists:map/2, only the callback function is evaluated
%% concurrently.
-spec pmap(fun((A) -> B), list(A)) -> list(B).
pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
pmap(Fun, List, Timeout) when
is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
->
nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
%% @doc Delegate a function to a worker process.
%% The function may spawn_link other processes but we do not
%% want the caller process to be linked.
%% This is done by isolating the possible link with a not-linked
%% middleman process.
nolink_apply(Fun) -> nolink_apply(Fun, infinity).
%% @doc Same as `nolink_apply/1', with a timeout.
-spec nolink_apply(function(), timer:timeout()) -> term().
nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
Caller = self(),
ResRef = make_ref(),
Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)),
receive
{ResRef, {normal, Result}} ->
Result;
{ResRef, {exception, {C, E, S}}} ->
erlang:raise(C, E, S);
{ResRef, {'EXIT', Reason}} ->
exit(Reason)
after Timeout ->
exit(Middleman, kill),
exit(timeout)
end.
-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_middleman_fn(Caller, Fun, ResRef) ->
fun() ->
process_flag(trap_exit, true),
CallerMRef = erlang:monitor(process, Caller),
Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)),
receive
{'DOWN', CallerMRef, process, _, _} ->
%% For whatever reason, if the caller is dead,
%% there is no reason to continue
exit(Worker, kill),
exit(normal);
{'EXIT', Worker, normal} ->
exit(normal);
{'EXIT', Worker, Reason} ->
%% worker exited with some reason other than 'normal'
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
exit(normal)
end
end.
-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_worker_fn(Caller, Fun, ResRef) ->
fun() ->
Res =
try
{normal, Fun()}
catch
C:E:S ->
{exception, {C, E, S}}
end,
_ = erlang:send(Caller, {ResRef, Res}),
exit(normal)
end.
do_parallel_map(Fun, List) ->
Parent = self(),
PidList = lists:map(
fun(Item) ->
erlang:spawn_link(
fun() ->
Res =
try
{normal, Fun(Item)}
catch
C:E:St ->
{exception, {C, E, St}}
end,
Parent ! {self(), Res}
end
)
end,
List
),
lists:foldr(
fun(Pid, Acc) ->
receive
{Pid, {normal, Result}} ->
[Result | Acc];
{Pid, {exception, {C, E, St}}} ->
erlang:raise(C, E, St)
end
end,
[],
PidList
).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

View File

@ -146,3 +146,36 @@ t_now_to_secs(_) ->
t_now_to_ms(_) ->
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
t_pmap_normal(_) ->
?assertEqual(
[5, 7, 9],
emqx_misc:pmap(
fun({A, B}) -> A + B end,
[{2, 3}, {3, 4}, {4, 5}]
)
).
t_pmap_timeout(_) ->
?assertExit(
timeout,
emqx_misc:pmap(
fun
(timeout) -> ct:sleep(1000);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, timeout],
100
)
).
t_pmap_exception(_) ->
?assertError(
foobar,
emqx_misc:pmap(
fun
(error) -> error(foobar);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, error]
)
).

View File

@ -18,33 +18,57 @@
-include_lib("eunit/include/eunit.hrl").
-define(SLAVE_START_APPS, [emqx]).
%% modules is included because code is called before cluster join
-define(SLAVE_START_APPS, [emqx, emqx_modules]).
-export([start_slave/1,
start_slave/2,
stop_slave/1]).
stop_slave/1,
wait_for_synced_routes/3
]).
start_slave(Name) ->
start_slave(Name, #{}).
start_slave(Name, Opts) ->
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]),
Node = make_node_name(Name),
case ct_slave:start(Node, [{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]) of
{ok, _} ->
ok;
{error, started_not_connected, _} ->
ok
end,
pong = net_adm:ping(Node),
setup_node(Node, Opts),
Node.
stop_slave(Node) ->
rpc:call(Node, ekka, leave, []),
ct_slave:stop(Node).
make_node_name(Name) ->
case string:tokens(atom_to_list(Name), "@") of
[_Name, _Host] ->
%% the name already has a @
Name;
_ ->
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
end.
stop_slave(Node0) ->
Node = make_node_name(Node0),
case rpc:call(Node, ekka, leave, []) of
ok -> ok;
{badrpc, nodedown} -> ok
end,
case ct_slave:stop(Node) of
{ok, _} -> ok;
{error, not_started, _} -> ok
end.
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
@ -71,7 +95,12 @@ setup_node(Node, #{} = Opts) ->
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
rpc:call(Node, ekka, join, [node()]),
case maps:get(no_join, Opts, false) of
true ->
ok;
false ->
ok = rpc:call(Node, ekka, join, [node()])
end,
%% Sanity check. Assert that `gen_rpc' is set up correctly:
?assertEqual( Node
@ -81,3 +110,40 @@ setup_node(Node, #{} = Opts) ->
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
),
ok.
%% Routes are replicated async.
%% Call this function to wait for nodes in the cluster to have the same view
%% for a given topic.
wait_for_synced_routes(Nodes, Topic, Timeout) ->
F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end,
emqx_misc:nolink_apply(F, Timeout).
do_wait_for_synced_routes(Nodes, Topic) ->
PerNodeView0 =
lists:map(
fun(Node) ->
{rpc:call(Node, emqx_router, match_routes, [Topic]), Node}
end, Nodes),
PerNodeView = lists:keysort(1, PerNodeView0),
case check_consistent_view(PerNodeView) of
{ok, OneView} ->
ct:pal("consistent_routes_view~n~p", [OneView]),
ok;
{error, Reason}->
ct:pal("inconsistent_routes_view~n~p", [Reason]),
timer:sleep(10),
do_wait_for_synced_routes(Nodes, Topic)
end.
check_consistent_view(PerNodeView) ->
check_consistent_view(PerNodeView, []).
check_consistent_view([], [OneView]) -> {ok, OneView};
check_consistent_view([], MoreThanOneView) -> {error, MoreThanOneView};
check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) ->
check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]);
check_consistent_view([{View, Node} | Rest], Acc) ->
check_consistent_view(Rest, [{View, Node} | Acc]).
add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes];
add_to_list(Node, Node1) -> [Node, Node1].

View File

@ -40,6 +40,9 @@ init_per_suite(Config) ->
PortDiscovery = application:get_env(gen_rpc, port_discovery),
application:set_env(gen_rpc, port_discovery, stateless),
application:ensure_all_started(gen_rpc),
%% ensure emqx_moduels' app modules are loaded
%% so the mnesia tables are created
ok = load_app(emqx_modules),
emqx_ct_helpers:start_apps([]),
[{port_discovery, PortDiscovery} | Config].
@ -50,32 +53,45 @@ end_per_suite(Config) ->
_ -> ok
end.
t_is_ack_required(_) ->
init_per_testcase(Case, Config) ->
try
?MODULE:Case({'init', Config})
catch
error : function_clause ->
Config
end.
end_per_testcase(Case, Config) ->
try
?MODULE:Case({'end', Config})
catch
error : function_clause ->
ok
end.
t_is_ack_required(Config) when is_list(Config) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) ->
t_maybe_nack_dropped(Config) when is_list(Config) ->
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
t_nack_no_connection(_) ->
t_nack_no_connection(Config) when is_list(Config) ->
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
after 100 -> timeout end).
t_maybe_ack(_) ->
t_maybe_ack(Config) when is_list(Config) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg)),
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
% t_subscribers(_) ->
% error('TODO').
t_random_basic(_) ->
t_random_basic(Config) when is_list(Config) ->
ok = ensure_config(random),
ClientId = <<"ClientId">>,
Topic = <<"foo">>,
@ -105,7 +121,7 @@ t_random_basic(_) ->
%% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly.
t_no_connection_nack(_) ->
t_no_connection_nack(Config) when is_list(Config) ->
ok = ensure_config(sticky),
Publisher = <<"publisher">>,
Subscriber1 = <<"Subscriber1">>,
@ -171,27 +187,27 @@ t_no_connection_nack(_) ->
% emqx_sm:close_session(SPid2),
ok.
t_random(_) ->
t_random(Config) when is_list(Config) ->
ok = ensure_config(random, true),
test_two_messages(random).
t_round_robin(_) ->
t_round_robin(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true),
test_two_messages(round_robin).
t_sticky(_) ->
t_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
test_two_messages(sticky).
t_hash(_) ->
t_hash(Config) when is_list(Config) ->
ok = ensure_config(hash, false),
test_two_messages(hash).
t_hash_clinetid(_) ->
t_hash_clinetid(Config) when is_list(Config) ->
ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid).
t_hash_topic(_) ->
t_hash_topic(Config) when is_list(Config) ->
ok = ensure_config(hash_topic, false),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -230,7 +246,7 @@ t_hash_topic(_) ->
ok.
%% if the original subscriber dies, change to another one alive
t_not_so_sticky(_) ->
t_not_so_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
<<"not yet?">>
end.
t_dispatch(_) ->
t_dispatch(Config) when is_list(Config) ->
ok = ensure_config(random),
Topic = <<"foo">>,
?assertEqual({error, no_subscribers},
@ -312,18 +328,13 @@ t_dispatch(_) ->
?assertEqual({ok, 1},
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
% t_unsubscribe(_) ->
% error('TODO').
% t_subscribe(_) ->
% error('TODO').
t_uncovered_func(_) ->
t_uncovered_func(Config) when is_list(Config) ->
ignored = gen_server:call(emqx_shared_sub, ignored),
ok = gen_server:cast(emqx_shared_sub, ignored),
ignored = emqx_shared_sub ! ignored,
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
t_per_group_config(_) ->
t_per_group_config(Config) when is_list(Config) ->
ok = ensure_group_config(#{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
@ -342,8 +353,8 @@ t_per_group_config(_) ->
test_two_messages(round_robin, <<"round_robin_group">>),
test_two_messages(round_robin, <<"round_robin_group">>).
t_local(_) ->
Node = start_slave('local_shared_sub_test19', 21884),
t_local({'init', Config}) ->
Node = start_slave(local_shared_sub_test19, 21884),
GroupConfig = #{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
@ -352,7 +363,11 @@ t_local(_) ->
},
ok = ensure_group_config(Node, GroupConfig),
ok = ensure_group_config(GroupConfig),
[{slave_node, Node} | Config];
t_local({'end', _Config}) ->
ok = stop_slave(local_shared_sub_test19);
t_local(Config) when is_list(Config) ->
Node = proplists:get_value(slave_node, Config),
Topic = <<"local_foo1/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -388,7 +403,7 @@ t_local(_) ->
?assertNotEqual(UsedSubPid1, UsedSubPid2),
ok.
t_local_fallback(_) ->
t_local_fallback({'init', Config}) ->
ok = ensure_group_config(#{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
@ -396,10 +411,15 @@ t_local_fallback(_) ->
<<"sticky_group">> => sticky
}),
Node = start_slave(local_fallback_shared_sub_test19, 11885),
[{slave_node, Node} | Config];
t_local_fallback({'end', _}) ->
ok = stop_slave(local_fallback_shared_sub_test19);
t_local_fallback(Config) when is_list(Config) ->
Topic = <<"local_foo2/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
Node = start_slave('local_fallback_shared_sub_test19', 11885),
Node = proplists:get_value(slave_node, Config),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1),
@ -407,6 +427,7 @@ t_local_fallback(_) ->
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)),
[{share, Topic, {ok, 1}}] = emqx:publish(Message1),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
@ -422,7 +443,7 @@ t_local_fallback(_) ->
%% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK
t_redispatch(_) ->
t_redispatch(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
application:set_env(emqx, shared_dispatch_ack_enabled, true),
@ -453,7 +474,7 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2),
ok.
t_dispatch_when_inflights_are_full(_) ->
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true),
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
@ -536,8 +557,20 @@ recv_msgs(Count, Msgs) ->
end.
start_slave(Name, Port) ->
ok = emqx_ct_helpers:start_apps([emqx_modules]),
Listeners = [#{listen_on => {{127,0,0,1}, Port},
start_apps => [emqx, emqx_modules],
name => "internal",
opts => [{zone,internal}],
proto => tcp}],
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
stop_slave(Name) ->
emqx_node_helpers:stop_slave(Name).
load_app(App) ->
case application:load(App) of
ok -> ok;
{error, {already_loaded, _}} -> ok;
{error, Reason} -> error({failed_to_load_app, App, Reason})
end.