From 81b1ea8c559bb9adc6c85126bad05d7bd0895368 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 14 Oct 2022 21:23:14 +0200 Subject: [PATCH 1/4] docs: refine change log text --- CHANGES-4.3.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index f01b6078b..10f5078c9 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -76,7 +76,7 @@ File format: subscriber from another node in the cluster. Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) -- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125) +- Fix rule engine fallback actions metrics reset. [#9125](https://github.com/emqx/emqx/pull/9125) ## v4.3.20 From 86da0f548e12a1486de9d34d783192414348fc7d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 15 Oct 2022 14:57:31 +0200 Subject: [PATCH 2/4] docs: refine v43 changelogs --- CHANGES-4.3.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 10f5078c9..a12c279f8 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -39,22 +39,24 @@ File format: Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true to prevent sessions from buffering messages, however this acknowledgement comes with a cost. +- Prior to this fix, some of the time stamps were taken from the `os` module (system call), + while majority of other places are using `erlang` module (from Erlang virtual machine). + This inconsistent behaviour has caused some trouble for the Delayed Publish feature when OS time changes. + Now all time stamps are from `erlang` module. [#8908](https://github.com/emqx/emqx/pull/8908) ### Bug fixes - Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145) -- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) - - Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) In this change, it also included more changes in redis client: - - Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19). + - Improve redis connection error logging [eredis #19](https://github.com/emqx/eredis/pull/19). Also added support for eredis to accept an anonymous function as password instead of passing around plaintext args which may get dumpped to crash logs (hard to predict where). This change also added `format_status` callback for `gen_server` states which hold plaintext password so the process termination log and `sys:get_status` will print '******' instead of the password to console. - - Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) + - Avoid pool name clashing [eredis_cluster #22](https://github.com/emqx/eredis_cluster/pull/22) Same `format_status` callback is added here too for `gen_server`s which hold password in their state. From 73a5462cba3bec9435933573feafaf4341488c48 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 19 Oct 2022 17:09:37 +0200 Subject: [PATCH 3/4] fix(shared): do not redispatch shared messages for certain shutdown For takeover, there should be no message re-dispatch because the messages will be retried by the new session. For kick, messages should not be re-dispatched for security reason. i.e. if admin has identified that there are malicious messages stored in persisted sessions, killing the session should not cause messages to be re-dispatched --- src/emqx_session.erl | 12 ++- test/emqx_shared_sub_SUITE.erl | 134 +++++++++++++++++++++++++++++++-- 2 files changed, 138 insertions(+), 8 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d2379a3fd..41240aeaa 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -625,10 +625,11 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). +terminate(ClientInfo, {shutdown, Reason}, Session) -> + terminate(ClientInfo, Reason, Session); terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session), - Reason =/= takeovered andalso - redispatch_shared_messages(Session), + maybe_redispatch_shared_messages(Reason, Session), ok. run_terminate_hooks(ClientInfo, discarded, Session) -> @@ -638,6 +639,13 @@ run_terminate_hooks(ClientInfo, takeovered, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +maybe_redispatch_shared_messages(takeovered, _Session) -> + ok; +maybe_redispatch_shared_messages(kicked, _Session) -> + ok; +maybe_redispatch_shared_messages(_Reason, Session) -> + redispatch_shared_messages(Session). + redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), F = fun({_, {Msg, _Ts}}) -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 2c4ecf265..0613ba37f 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -587,11 +587,16 @@ t_dispatch_qos2(Config) when is_list(Config) -> ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending + %% on if it's picked as the first one for round_robin MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), - %% assert hello2 > hello1 or hello4 > hello3 - ?assert(MsgRec2 > MsgRec1), - + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello1">>, MsgRec1); + <<"hello4">> -> + ?assertEqual(<<"hello2">>, MsgRec1) + end, sys:resume(ConnPid1), %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false %% so it will never send PUBCOMP, hence EMQX should not attempt to send @@ -604,8 +609,14 @@ t_dispatch_qos2(Config) when is_list(Config) -> kill_process(ConnPid1), %% client 2 should receive the message MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), - %% assert hello2 > hello1 or hello4 > hello3 - ?assert(MsgRec4 > MsgRec3), + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello2">>, MsgRec3), + ?assertEqual(<<"hello4">>, MsgRec4); + <<"hello4">> -> + ?assertEqual(<<"hello1">>, MsgRec3), + ?assertEqual(<<"hello3">>, MsgRec4) + end, emqtt:stop(ConnPid2), ok. @@ -654,17 +665,128 @@ t_dispatch_qos0(Config) when is_list(Config) -> emqtt:stop(ConnPid2), ok. +t_session_takeover({init, Config}) when is_list(Config) -> + Config; +t_session_takeover({'end', Config}) when is_list(Config) -> + ok; +t_session_takeover(Config) when is_list(Config) -> + Topic = <<"t1/a">>, + ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())), + Opts = [{clientid, ClientId}, + {auto_ack, true}, + {proto_ver, v5}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 60}} + ], + {ok, ConnPid1} = emqtt:start_link(Opts), + %% with the same client ID, start another client + {ok, ConnPid2} = emqtt:start_link(Opts), + {ok, _} = emqtt:connect(ConnPid1), + emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}), + Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>), + %% Make sure client1 is functioning + ?assertMatch([_], emqx:publish(Message1)), + {true, _} = last_message(<<"hello1">>, [ConnPid1]), + %% Kill client1 + emqtt:stop(ConnPid1), + %% publish another message (should end up in client1's session) + ?assertMatch([_], emqx:publish(Message2)), + %% connect client2 (with the same clientid) + {ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over + ?assertMatch([_], emqx:publish(Message3)), + ?assertMatch([_], emqx:publish(Message4)), + {true, _} = last_message(<<"hello2">>, [ConnPid1]), + {true, _} = last_message(<<"hello3">>, [ConnPid1]), + {true, _} = last_message(<<"hello4">>, [ConnPid1]), + ?assertEqual([], collect_msgs(timer:seconds(2))), + emqtt:stop(ConnPid2), + ok. + + +t_session_kicked({init, Config}) when is_list(Config) -> + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + Config; +t_session_kicked({'end', Config}) when is_list(Config) -> + meck:unload(emqx_zone); +t_session_kicked(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + %% One message is inflight + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending + %% on if it's picked as the first one for round_robin + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello1">>, MsgRec1); + <<"hello4">> -> + ?assertEqual(<<"hello2">>, MsgRec1) + end, + sys:resume(ConnPid1), + %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false + %% so it will never send PUBCOMP, hence EMQX should not attempt to send + %% the 4th message yet since max_inflight is 1. + MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello2">>, MsgRec3); + <<"hello4">> -> + ?assertEqual(<<"hello1">>, MsgRec3) + end, + %% no message expected + ?assertEqual([], collect_msgs(0)), + %% now kick client 1 + kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end), + %% client 2 should NOT receive the message + ?assertEqual([], collect_msgs(1000)), + emqtt:stop(ConnPid2), + ?assertEqual([], collect_msgs(0)), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- kill_process(Pid) -> + kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end). + +kill_process(Pid, WithFun) -> _ = unlink(Pid), _ = monitor(process, Pid), - erlang:exit(Pid, kill), + _ = WithFun(Pid), receive {'DOWN', _, process, Pid, _} -> ok + after 10_000 -> + error(timeout) end. collect_msgs(Timeout) -> From a163ffce7ce3b899a4a931009ba480ccce666adb Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 19 Oct 2022 18:16:38 +0200 Subject: [PATCH 4/4] test: assert message receive pid is in the expected pids list --- test/emqx_shared_sub_SUITE.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 0613ba37f..2112f0b8c 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -290,7 +290,7 @@ last_message(ExpectedPayload, Pids) -> last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> - ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), + ?assert(lists:member(Pid, Pids)), {true, Pid} after Timeout -> ct:pal("not yet"), @@ -698,9 +698,9 @@ t_session_takeover(Config) when is_list(Config) -> {ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over ?assertMatch([_], emqx:publish(Message3)), ?assertMatch([_], emqx:publish(Message4)), - {true, _} = last_message(<<"hello2">>, [ConnPid1]), - {true, _} = last_message(<<"hello3">>, [ConnPid1]), - {true, _} = last_message(<<"hello4">>, [ConnPid1]), + {true, _} = last_message(<<"hello2">>, [ConnPid2]), + {true, _} = last_message(<<"hello3">>, [ConnPid2]), + {true, _} = last_message(<<"hello4">>, [ConnPid2]), ?assertEqual([], collect_msgs(timer:seconds(2))), emqtt:stop(ConnPid2), ok.