diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 13efa6369..c6ed9ae15 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -39,22 +39,24 @@ File format: Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true to prevent sessions from buffering messages, however this acknowledgement comes with a cost. +- Prior to this fix, some of the time stamps were taken from the `os` module (system call), + while majority of other places are using `erlang` module (from Erlang virtual machine). + This inconsistent behaviour has caused some trouble for the Delayed Publish feature when OS time changes. + Now all time stamps are from `erlang` module. [#8908](https://github.com/emqx/emqx/pull/8908) ### Bug fixes - Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145) -- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) - - Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) In this change, it also included more changes in redis client: - - Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19). + - Improve redis connection error logging [eredis #19](https://github.com/emqx/eredis/pull/19). Also added support for eredis to accept an anonymous function as password instead of passing around plaintext args which may get dumpped to crash logs (hard to predict where). This change also added `format_status` callback for `gen_server` states which hold plaintext password so the process termination log and `sys:get_status` will print '******' instead of the password to console. - - Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) + - Avoid pool name clashing [eredis_cluster #22](https://github.com/emqx/eredis_cluster/pull/22) Same `format_status` callback is added here too for `gen_server`s which hold password in their state. @@ -76,7 +78,7 @@ File format: subscriber from another node in the cluster. Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) -- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125) +- Fix rule engine fallback actions metrics reset. [#9125](https://github.com/emqx/emqx/pull/9125) ## v4.3.20 diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 7765f7d7a..b4e15cefb 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -670,10 +670,11 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). +terminate(ClientInfo, {shutdown, Reason}, Session) -> + terminate(ClientInfo, Reason, Session); terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session), - Reason =/= takeovered andalso - redispatch_shared_messages(Session), + maybe_redispatch_shared_messages(Reason, Session), ok. run_terminate_hooks(ClientInfo, discarded, Session) -> @@ -683,6 +684,13 @@ run_terminate_hooks(ClientInfo, takeovered, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +maybe_redispatch_shared_messages(takeovered, _Session) -> + ok; +maybe_redispatch_shared_messages(kicked, _Session) -> + ok; +maybe_redispatch_shared_messages(_Reason, Session) -> + redispatch_shared_messages(Session). + redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), F = fun({_, {Msg, _Ts}}) -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 2c4ecf265..2112f0b8c 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -290,7 +290,7 @@ last_message(ExpectedPayload, Pids) -> last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> - ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), + ?assert(lists:member(Pid, Pids)), {true, Pid} after Timeout -> ct:pal("not yet"), @@ -587,11 +587,16 @@ t_dispatch_qos2(Config) when is_list(Config) -> ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending + %% on if it's picked as the first one for round_robin MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), - %% assert hello2 > hello1 or hello4 > hello3 - ?assert(MsgRec2 > MsgRec1), - + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello1">>, MsgRec1); + <<"hello4">> -> + ?assertEqual(<<"hello2">>, MsgRec1) + end, sys:resume(ConnPid1), %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false %% so it will never send PUBCOMP, hence EMQX should not attempt to send @@ -604,8 +609,14 @@ t_dispatch_qos2(Config) when is_list(Config) -> kill_process(ConnPid1), %% client 2 should receive the message MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), - %% assert hello2 > hello1 or hello4 > hello3 - ?assert(MsgRec4 > MsgRec3), + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello2">>, MsgRec3), + ?assertEqual(<<"hello4">>, MsgRec4); + <<"hello4">> -> + ?assertEqual(<<"hello1">>, MsgRec3), + ?assertEqual(<<"hello3">>, MsgRec4) + end, emqtt:stop(ConnPid2), ok. @@ -654,17 +665,128 @@ t_dispatch_qos0(Config) when is_list(Config) -> emqtt:stop(ConnPid2), ok. +t_session_takeover({init, Config}) when is_list(Config) -> + Config; +t_session_takeover({'end', Config}) when is_list(Config) -> + ok; +t_session_takeover(Config) when is_list(Config) -> + Topic = <<"t1/a">>, + ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())), + Opts = [{clientid, ClientId}, + {auto_ack, true}, + {proto_ver, v5}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 60}} + ], + {ok, ConnPid1} = emqtt:start_link(Opts), + %% with the same client ID, start another client + {ok, ConnPid2} = emqtt:start_link(Opts), + {ok, _} = emqtt:connect(ConnPid1), + emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}), + Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>), + %% Make sure client1 is functioning + ?assertMatch([_], emqx:publish(Message1)), + {true, _} = last_message(<<"hello1">>, [ConnPid1]), + %% Kill client1 + emqtt:stop(ConnPid1), + %% publish another message (should end up in client1's session) + ?assertMatch([_], emqx:publish(Message2)), + %% connect client2 (with the same clientid) + {ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over + ?assertMatch([_], emqx:publish(Message3)), + ?assertMatch([_], emqx:publish(Message4)), + {true, _} = last_message(<<"hello2">>, [ConnPid2]), + {true, _} = last_message(<<"hello3">>, [ConnPid2]), + {true, _} = last_message(<<"hello4">>, [ConnPid2]), + ?assertEqual([], collect_msgs(timer:seconds(2))), + emqtt:stop(ConnPid2), + ok. + + +t_session_kicked({init, Config}) when is_list(Config) -> + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + Config; +t_session_kicked({'end', Config}) when is_list(Config) -> + meck:unload(emqx_zone); +t_session_kicked(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + %% One message is inflight + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending + %% on if it's picked as the first one for round_robin + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello1">>, MsgRec1); + <<"hello4">> -> + ?assertEqual(<<"hello2">>, MsgRec1) + end, + sys:resume(ConnPid1), + %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false + %% so it will never send PUBCOMP, hence EMQX should not attempt to send + %% the 4th message yet since max_inflight is 1. + MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), + case MsgRec2 of + <<"hello3">> -> + ?assertEqual(<<"hello2">>, MsgRec3); + <<"hello4">> -> + ?assertEqual(<<"hello1">>, MsgRec3) + end, + %% no message expected + ?assertEqual([], collect_msgs(0)), + %% now kick client 1 + kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end), + %% client 2 should NOT receive the message + ?assertEqual([], collect_msgs(1000)), + emqtt:stop(ConnPid2), + ?assertEqual([], collect_msgs(0)), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- kill_process(Pid) -> + kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end). + +kill_process(Pid, WithFun) -> _ = unlink(Pid), _ = monitor(process, Pid), - erlang:exit(Pid, kill), + _ = WithFun(Pid), receive {'DOWN', _, process, Pid, _} -> ok + after 10_000 -> + error(timeout) end. collect_msgs(Timeout) ->