Merge remote-tracking branch 'origin/release-v43' into release-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-10-19 19:25:43 +02:00
commit fd7a8064f4
3 changed files with 146 additions and 14 deletions

View File

@ -39,22 +39,24 @@ File format:
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
to prevent sessions from buffering messages, however this acknowledgement comes with a cost. to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
- Prior to this fix, some of the time stamps were taken from the `os` module (system call),
while majority of other places are using `erlang` module (from Erlang virtual machine).
This inconsistent behaviour has caused some trouble for the Delayed Publish feature when OS time changes.
Now all time stamps are from `erlang` module. [#8908](https://github.com/emqx/emqx/pull/8908)
### Bug fixes ### Bug fixes
- Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145) - Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145)
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) - Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
In this change, it also included more changes in redis client: In this change, it also included more changes in redis client:
- Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19). - Improve redis connection error logging [eredis #19](https://github.com/emqx/eredis/pull/19).
Also added support for eredis to accept an anonymous function as password instead of Also added support for eredis to accept an anonymous function as password instead of
passing around plaintext args which may get dumpped to crash logs (hard to predict where). passing around plaintext args which may get dumpped to crash logs (hard to predict where).
This change also added `format_status` callback for `gen_server` states which hold plaintext This change also added `format_status` callback for `gen_server` states which hold plaintext
password so the process termination log and `sys:get_status` will print '******' instead of password so the process termination log and `sys:get_status` will print '******' instead of
the password to console. the password to console.
- Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) - Avoid pool name clashing [eredis_cluster #22](https://github.com/emqx/eredis_cluster/pull/22)
Same `format_status` callback is added here too for `gen_server`s which hold password in Same `format_status` callback is added here too for `gen_server`s which hold password in
their state. their state.
@ -76,7 +78,7 @@ File format:
subscriber from another node in the cluster. subscriber from another node in the cluster.
Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) Fixed in [#9122](https://github.com/emqx/emqx/pull/9122)
- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125) - Fix rule engine fallback actions metrics reset. [#9125](https://github.com/emqx/emqx/pull/9125)
## v4.3.20 ## v4.3.20

View File

@ -670,10 +670,11 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
end. end.
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
terminate(ClientInfo, {shutdown, Reason}, Session) ->
terminate(ClientInfo, Reason, Session);
terminate(ClientInfo, Reason, Session) -> terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session), run_terminate_hooks(ClientInfo, Reason, Session),
Reason =/= takeovered andalso maybe_redispatch_shared_messages(Reason, Session),
redispatch_shared_messages(Session),
ok. ok.
run_terminate_hooks(ClientInfo, discarded, Session) -> run_terminate_hooks(ClientInfo, discarded, Session) ->
@ -683,6 +684,13 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
maybe_redispatch_shared_messages(takeovered, _Session) ->
ok;
maybe_redispatch_shared_messages(kicked, _Session) ->
ok;
maybe_redispatch_shared_messages(_Reason, Session) ->
redispatch_shared_messages(Session).
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
F = fun({_, {Msg, _Ts}}) -> F = fun({_, {Msg, _Ts}}) ->

View File

@ -290,7 +290,7 @@ last_message(ExpectedPayload, Pids) ->
last_message(ExpectedPayload, Pids, Timeout) -> last_message(ExpectedPayload, Pids, Timeout) ->
receive receive
{publish, #{client_pid := Pid, payload := ExpectedPayload}} -> {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), ?assert(lists:member(Pid, Pids)),
{true, Pid} {true, Pid}
after Timeout -> after Timeout ->
ct:pal("not yet"), ct:pal("not yet"),
@ -587,11 +587,16 @@ t_dispatch_qos2(Config) when is_list(Config) ->
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3 case MsgRec2 of
?assert(MsgRec2 > MsgRec1), <<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
<<"hello4">> ->
?assertEqual(<<"hello2">>, MsgRec1)
end,
sys:resume(ConnPid1), sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send %% so it will never send PUBCOMP, hence EMQX should not attempt to send
@ -604,8 +609,14 @@ t_dispatch_qos2(Config) when is_list(Config) ->
kill_process(ConnPid1), kill_process(ConnPid1),
%% client 2 should receive the message %% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3 case MsgRec2 of
?assert(MsgRec4 > MsgRec3), <<"hello3">> ->
?assertEqual(<<"hello2">>, MsgRec3),
?assertEqual(<<"hello4">>, MsgRec4);
<<"hello4">> ->
?assertEqual(<<"hello1">>, MsgRec3),
?assertEqual(<<"hello3">>, MsgRec4)
end,
emqtt:stop(ConnPid2), emqtt:stop(ConnPid2),
ok. ok.
@ -654,17 +665,128 @@ t_dispatch_qos0(Config) when is_list(Config) ->
emqtt:stop(ConnPid2), emqtt:stop(ConnPid2),
ok. ok.
t_session_takeover({init, Config}) when is_list(Config) ->
Config;
t_session_takeover({'end', Config}) when is_list(Config) ->
ok;
t_session_takeover(Config) when is_list(Config) ->
Topic = <<"t1/a">>,
ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
Opts = [{clientid, ClientId},
{auto_ack, true},
{proto_ver, v5},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 60}}
],
{ok, ConnPid1} = emqtt:start_link(Opts),
%% with the same client ID, start another client
{ok, ConnPid2} = emqtt:start_link(Opts),
{ok, _} = emqtt:connect(ConnPid1),
emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
%% Make sure client1 is functioning
?assertMatch([_], emqx:publish(Message1)),
{true, _} = last_message(<<"hello1">>, [ConnPid1]),
%% Kill client1
emqtt:stop(ConnPid1),
%% publish another message (should end up in client1's session)
?assertMatch([_], emqx:publish(Message2)),
%% connect client2 (with the same clientid)
{ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
{true, _} = last_message(<<"hello2">>, [ConnPid2]),
{true, _} = last_message(<<"hello3">>, [ConnPid2]),
{true, _} = last_message(<<"hello4">>, [ConnPid2]),
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
t_session_kicked({init, Config}) when is_list(Config) ->
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_session_kicked({'end', Config}) when is_list(Config) ->
meck:unload(emqx_zone);
t_session_kicked(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
<<"hello4">> ->
?assertEqual(<<"hello2">>, MsgRec1)
end,
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello2">>, MsgRec3);
<<"hello4">> ->
?assertEqual(<<"hello1">>, MsgRec3)
end,
%% no message expected
?assertEqual([], collect_msgs(0)),
%% now kick client 1
kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
%% client 2 should NOT receive the message
?assertEqual([], collect_msgs(1000)),
emqtt:stop(ConnPid2),
?assertEqual([], collect_msgs(0)),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
kill_process(Pid) -> kill_process(Pid) ->
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
kill_process(Pid, WithFun) ->
_ = unlink(Pid), _ = unlink(Pid),
_ = monitor(process, Pid), _ = monitor(process, Pid),
erlang:exit(Pid, kill), _ = WithFun(Pid),
receive receive
{'DOWN', _, process, Pid, _} -> {'DOWN', _, process, Pid, _} ->
ok ok
after 10_000 ->
error(timeout)
end. end.
collect_msgs(Timeout) -> collect_msgs(Timeout) ->