Merge remote-tracking branch 'origin/release-v44' into sync-v44-a

This commit is contained in:
Thales Macedo Garitezi 2022-10-24 16:26:18 -03:00
commit ba422532c7
3 changed files with 146 additions and 14 deletions

View File

@ -39,22 +39,24 @@ File format:
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
- Prior to this fix, some of the time stamps were taken from the `os` module (system call),
while majority of other places are using `erlang` module (from Erlang virtual machine).
This inconsistent behaviour has caused some trouble for the Delayed Publish feature when OS time changes.
Now all time stamps are from `erlang` module. [#8908](https://github.com/emqx/emqx/pull/8908)
### Bug fixes
- Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145)
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
In this change, it also included more changes in redis client:
- Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19).
- Improve redis connection error logging [eredis #19](https://github.com/emqx/eredis/pull/19).
Also added support for eredis to accept an anonymous function as password instead of
passing around plaintext args which may get dumpped to crash logs (hard to predict where).
This change also added `format_status` callback for `gen_server` states which hold plaintext
password so the process termination log and `sys:get_status` will print '******' instead of
the password to console.
- Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22)
- Avoid pool name clashing [eredis_cluster #22](https://github.com/emqx/eredis_cluster/pull/22)
Same `format_status` callback is added here too for `gen_server`s which hold password in
their state.
@ -76,7 +78,7 @@ File format:
subscriber from another node in the cluster.
Fixed in [#9122](https://github.com/emqx/emqx/pull/9122)
- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125)
- Fix rule engine fallback actions metrics reset. [#9125](https://github.com/emqx/emqx/pull/9125)
## v4.3.20

View File

@ -670,10 +670,11 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
end.
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
terminate(ClientInfo, {shutdown, Reason}, Session) ->
terminate(ClientInfo, Reason, Session);
terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
Reason =/= takeovered andalso
redispatch_shared_messages(Session),
maybe_redispatch_shared_messages(Reason, Session),
ok.
run_terminate_hooks(ClientInfo, discarded, Session) ->
@ -683,6 +684,13 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
maybe_redispatch_shared_messages(takeovered, _Session) ->
ok;
maybe_redispatch_shared_messages(kicked, _Session) ->
ok;
maybe_redispatch_shared_messages(_Reason, Session) ->
redispatch_shared_messages(Session).
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
F = fun({_, {Msg, _Ts}}) ->

View File

@ -290,7 +290,7 @@ last_message(ExpectedPayload, Pids) ->
last_message(ExpectedPayload, Pids, Timeout) ->
receive
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
?assert(lists:member(Pid, Pids)),
{true, Pid}
after Timeout ->
ct:pal("not yet"),
@ -587,11 +587,16 @@ t_dispatch_qos2(Config) when is_list(Config) ->
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
<<"hello4">> ->
?assertEqual(<<"hello2">>, MsgRec1)
end,
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
@ -604,8 +609,14 @@ t_dispatch_qos2(Config) when is_list(Config) ->
kill_process(ConnPid1),
%% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec4 > MsgRec3),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello2">>, MsgRec3),
?assertEqual(<<"hello4">>, MsgRec4);
<<"hello4">> ->
?assertEqual(<<"hello1">>, MsgRec3),
?assertEqual(<<"hello3">>, MsgRec4)
end,
emqtt:stop(ConnPid2),
ok.
@ -654,17 +665,128 @@ t_dispatch_qos0(Config) when is_list(Config) ->
emqtt:stop(ConnPid2),
ok.
t_session_takeover({init, Config}) when is_list(Config) ->
Config;
t_session_takeover({'end', Config}) when is_list(Config) ->
ok;
t_session_takeover(Config) when is_list(Config) ->
Topic = <<"t1/a">>,
ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
Opts = [{clientid, ClientId},
{auto_ack, true},
{proto_ver, v5},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 60}}
],
{ok, ConnPid1} = emqtt:start_link(Opts),
%% with the same client ID, start another client
{ok, ConnPid2} = emqtt:start_link(Opts),
{ok, _} = emqtt:connect(ConnPid1),
emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
%% Make sure client1 is functioning
?assertMatch([_], emqx:publish(Message1)),
{true, _} = last_message(<<"hello1">>, [ConnPid1]),
%% Kill client1
emqtt:stop(ConnPid1),
%% publish another message (should end up in client1's session)
?assertMatch([_], emqx:publish(Message2)),
%% connect client2 (with the same clientid)
{ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
{true, _} = last_message(<<"hello2">>, [ConnPid2]),
{true, _} = last_message(<<"hello3">>, [ConnPid2]),
{true, _} = last_message(<<"hello4">>, [ConnPid2]),
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
t_session_kicked({init, Config}) when is_list(Config) ->
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_session_kicked({'end', Config}) when is_list(Config) ->
meck:unload(emqx_zone);
t_session_kicked(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
<<"hello4">> ->
?assertEqual(<<"hello2">>, MsgRec1)
end,
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello2">>, MsgRec3);
<<"hello4">> ->
?assertEqual(<<"hello1">>, MsgRec3)
end,
%% no message expected
?assertEqual([], collect_msgs(0)),
%% now kick client 1
kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
%% client 2 should NOT receive the message
?assertEqual([], collect_msgs(1000)),
emqtt:stop(ConnPid2),
?assertEqual([], collect_msgs(0)),
ok.
%%--------------------------------------------------------------------
%% help functions
%%--------------------------------------------------------------------
kill_process(Pid) ->
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
kill_process(Pid, WithFun) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
erlang:exit(Pid, kill),
_ = WithFun(Pid),
receive
{'DOWN', _, process, Pid, _} ->
ok
after 10_000 ->
error(timeout)
end.
collect_msgs(Timeout) ->