From 9989f4df7ef450bb16928acbf13cb842f78dcd39 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Oct 2022 18:09:38 +0200 Subject: [PATCH 1/8] chore: fix shared subscription redispatch --- src/emqx_session.erl | 35 ++++++++++++-------------- src/emqx_shared_sub.erl | 54 +++++++++++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 30 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 94483bb87..73eacea8b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -638,26 +638,21 @@ run_terminate_hooks(ClientInfo, takeovered, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -redispatch_shared_messages(#session{inflight = Inflight}) -> - InflightList = emqx_inflight:to_list(Inflight), - lists:foreach(fun - %% Only QoS1 messages get redispatched, because QoS2 messages - %% must be sent to the same client, once they're in flight - ({_, {#message{qos = ?QOS_2} = Msg, _}}) -> - ?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); - ({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) -> - case emqx_shared_sub:get_group(Msg) of - {ok, Group} -> - %% Note that dispatch is called with self() in failed subs - %% This is done to avoid dispatching back to caller - Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); - _ -> - false - end; - (_) -> - ok - end, InflightList). +redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> + InflightList = lists:map(fun({_, {Msg, _Ts}}) -> Msg end, + emqx_inflight:to_list(sort_fun(), Inflight)), + MqList = mqueue_to_list(Q, []), + emqx_shared_sub:redispatch(InflightList ++ MqList). + +%% convert mqueue to a list +%% the messages at the head of the list is to be dispatched before the tail +mqueue_to_list(Q, Acc) -> + case emqx_mqueue:out(Q) of + {empty, _Q} -> + lists:reverse(Acc); + {{value, Msg}, Q1} -> + mqueue_to_list(Q1, [Msg | Acc]) + end. -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index cc57e001f..4987248cf 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -39,7 +39,7 @@ ]). -export([ dispatch/3 - , dispatch_to_non_self/3 + , redispatch/1 ]). -export([ maybe_ack/1 @@ -47,7 +47,6 @@ , nack_no_connection/1 , is_ack_required/1 , is_retry_dispatch/1 - , get_group/1 ]). %% for testing @@ -84,6 +83,7 @@ -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). +-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). -record(state, {pmon}). @@ -134,11 +134,12 @@ dispatch(Group, Topic, Delivery) -> Strategy = strategy(Group), dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg, +dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg0, case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> + Msg = with_redispatch_to(Msg0, Group, Topic), case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -162,7 +163,7 @@ ack_enabled() -> emqx:get_env(shared_dispatch_ack_enabled, false). do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> - %% Deadlock otherwise + %% dispatch without ack, deadlock otherwise send(SubPid, Topic, {deliver, Topic, Msg}); %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> @@ -176,6 +177,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) -> send(SubPid, Topic, {deliver, Topic, Msg}) end. +with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg; +with_redispatch_to(Msg, Group, Topic) -> + emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). + dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), @@ -228,13 +233,22 @@ without_group_ack(Msg) -> get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). --spec(get_group(emqx_types:message()) -> {ok, any()} | error). -get_group(Msg) -> - case get_group_ack(Msg) of - {_Sender, {_Type, Group, _Ref}} -> {ok, Group}; - _ -> error +%% @hidden Redispatch is neede only for the messages with redispatch_to header added. +is_redispatch_needed(Msg) -> + case get_redispatch_to(Msg) of + ?REDISPATCH_TO(_, _) -> + true; + _ -> + false end. +%% @hidden Return the `redispatch_to` group-topic in the message header. +%% `false` is returned if the message is not a shared dispatch. +%% or when it's a QoS 0 message. +-spec(get_redispatch_to(emqx_types:message()) -> emqx_types:topic() | false). +get_redispatch_to(Msg) -> + emqx_message:get_header(redispatch_to, Msg, false). + -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). @@ -245,6 +259,26 @@ is_retry_dispatch(Msg) -> _ -> false end. +%% @doc Redispatch shared deliveries to other members in the group. +redispatch(Messages0) -> + Messages = lists:filter(fun is_redispatch_needed/1, Messages0), + case length(Messages) of + L when L > 0 -> + ?LOG(info, "Redispatching ~p shared subscription messages", [L]), + lists:foreach(fun redispatch_shared_message/1, Messages); + _ -> + ok + end. + +redispatch_shared_message(Msg) -> + %% As long as it's still a #message{} record in inflight, + %% we should try to re-dispatch + ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg), + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + dispatch_to_non_self(Group, Topic, Delivery). + %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) -> From ba1c276c75433c53902475a408733c842376b8a7 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 10:04:09 +0200 Subject: [PATCH 2/8] fix(typespec): fix type spec for emqx_shared_sub:redispatch_to --- src/emqx_shared_sub.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 4987248cf..be897569a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -85,6 +85,8 @@ -define(NO_ACK, no_ack). -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). +-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). + -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -245,7 +247,7 @@ is_redispatch_needed(Msg) -> %% @hidden Return the `redispatch_to` group-topic in the message header. %% `false` is returned if the message is not a shared dispatch. %% or when it's a QoS 0 message. --spec(get_redispatch_to(emqx_types:message()) -> emqx_types:topic() | false). +-spec(get_redispatch_to(emqx_types:message()) -> redispatch_to() | false). get_redispatch_to(Msg) -> emqx_message:get_header(redispatch_to, Msg, false). From 6769bd4edc598cf82f7b666718633ea2017ee054 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 10:30:59 +0200 Subject: [PATCH 3/8] fix(shared): drop pubrel from inflight collection before redispatch --- src/emqx_session.erl | 13 +++++++++++-- src/emqx_shared_sub.erl | 4 ++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 73eacea8b..f87fc1f23 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -639,8 +639,17 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> - InflightList = lists:map(fun({_, {Msg, _Ts}}) -> Msg end, - emqx_inflight:to_list(sort_fun(), Inflight)), + F = fun({_, {Msg, _Ts}}) -> + case Msg of + #message{} -> + {true, Msg}; + _ -> + %% QoS 2, after pubrec is received + %% the inflight record is updated to an atom + false + end + end, + InflightList = lists:filtermap(F, emqx_inflight:to_list(sort_fun(), Inflight)), MqList = mqueue_to_list(Q, []), emqx_shared_sub:redispatch(InflightList ++ MqList). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index be897569a..8667ae6c4 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -236,7 +236,7 @@ get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). %% @hidden Redispatch is neede only for the messages with redispatch_to header added. -is_redispatch_needed(Msg) -> +is_redispatch_needed(#message{} = Msg) -> case get_redispatch_to(Msg) of ?REDISPATCH_TO(_, _) -> true; @@ -272,7 +272,7 @@ redispatch(Messages0) -> ok end. -redispatch_shared_message(Msg) -> +redispatch_shared_message(#message{} = Msg) -> %% As long as it's still a #message{} record in inflight, %% we should try to re-dispatch ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg), From 3339df8b249a0acba545292beabad15dd85daf2a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 12:33:15 +0200 Subject: [PATCH 4/8] test: Add test case to cover shared sub QoS2 pubrel in inflights --- src/emqx_session.erl | 3 +- src/emqx_shared_sub.erl | 2 +- test/emqx_shared_sub_SUITE.erl | 192 +++++++++++++++++++++++++-------- 3 files changed, 149 insertions(+), 48 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index f87fc1f23..40c8eacc0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -639,6 +639,7 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> + AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), F = fun({_, {Msg, _Ts}}) -> case Msg of #message{} -> @@ -649,7 +650,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> false end end, - InflightList = lists:filtermap(F, emqx_inflight:to_list(sort_fun(), Inflight)), + InflightList = lists:filtermap(F, AllInflights), MqList = mqueue_to_list(Q, []), emqx_shared_sub:redispatch(InflightList ++ MqList). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 8667ae6c4..6130ccc0a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -266,7 +266,7 @@ redispatch(Messages0) -> Messages = lists:filter(fun is_redispatch_needed/1, Messages0), case length(Messages) of L when L > 0 -> - ?LOG(info, "Redispatching ~p shared subscription messages", [L]), + ?LOG(info, "Redispatching ~p shared subscription message(s)", [L]), lists:foreach(fun redispatch_shared_message/1, Messages); _ -> ok diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5e79ee983..d8a7d12b4 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -25,13 +25,24 @@ -define(SUITE, ?MODULE). --define(wait(For, Timeout), - emqx_ct_helpers:wait_for( - ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). - -define(ack, shared_sub_ack). -define(no_ack, no_ack). +-define(WAIT(TIMEOUT, PATTERN, Res), + (fun() -> + receive + PATTERN -> + Res; + Other -> + ct:fail(#{expected => ??PATTERN, + got => Other + }) + after + TIMEOUT -> + ct:fail({timeout, ??PATTERN}) + end + end)()). + all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> @@ -135,40 +146,7 @@ t_no_connection_nack(_) -> SendF(1), ct:sleep(200), %% This is the connection which was picked by broker to dispatch (sticky) for 1st message - ?assertMatch([#{packet_id := 1}], recv_msgs(1)), - %% Now kill the connection, expect all following messages to be delivered to the other - %% subscriber. - %emqx_mock_client:stop(ConnPid), - %% sleep then make synced calls to session processes to ensure that - %% the connection pid's 'EXIT' message is propagated to the session process - %% also to be sure sessions are still alive - % timer:sleep(2), - % _ = emqx_session:info(SPid1), - % _ = emqx_session:info(SPid2), - % %% Now we know what is the other still alive connection - % [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid], - % %% Send some more messages - % PacketIdList = lists:seq(2, 10), - % lists:foreach(fun(Id) -> - % SendF(Id), - % ?wait(Received(Id, TheOtherConnPid), 1000) - % end, PacketIdList), - % %% Now close the 2nd (last connection) - % emqx_mock_client:stop(TheOtherConnPid), - % timer:sleep(2), - % %% both sessions should have conn_pid = undefined - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), - % %% send more messages, but all should be queued in session state - % lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), - % {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), - % {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), - % ?assertEqual(length(PacketIdList), L1 + L2), - % %% clean up - % emqx_mock_client:close_session(PubConnPid), - % emqx_sm:close_session(SPid1), - % emqx_sm:close_session(SPid2), ok. t_random(_) -> @@ -422,8 +400,14 @@ t_local_fallback(_) -> %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK -t_redispatch(_) -> - ok = ensure_config(sticky, true), +t_redispatch_with_ack(Config) -> + test_redispatch(Config, true). + +t_redispatch_no_ack(Config) -> + test_redispatch(Config, false). + +test_redispatch(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), application:set_env(emqx, shared_dispatch_ack_enabled, true), Group = <<"group1">>, @@ -453,15 +437,55 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. -t_dispatch_when_inflights_are_full(_) -> - ok = ensure_config(round_robin, true), +t_redispatch_wildcard_with_ack(Config) -> + redispatch_wildcard(Config, true). + +t_redispatch_wildcard_no_ack(Config) -> + redispatch_wildcard(Config, false). + +%% This one tests that broker tries to redispatch to another member in the group +%% if the first one disconnected before acking (auto_ack set to false) +redispatch_wildcard(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), + + Group = <<"group1">>, + + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), + ok. + +t_dispatch_when_inflights_are_full_with_ack(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = true), Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - %% Note that max_inflight is 1 - {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), - {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), + %% make sure broker does not push more than one inflight + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), {ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid2), @@ -484,8 +508,7 @@ t_dispatch_when_inflights_are_full(_) -> ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), %% Now kill any client - erlang:exit(ConnPid1, normal), - ct:sleep(100), + ok = kill_process(ConnPid1), %% And try to send the message ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), @@ -497,13 +520,90 @@ t_dispatch_when_inflights_are_full(_) -> ?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])), ?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])), + meck:unload(emqx_zone), emqtt:stop(ConnPid2), ok. +%% No ack, QoS 2 subscriptions, +%% client1 receives one message, send pubrec, then suspend +%% client2 acts normal (aot_ack=true) +%% Expected behaviour: +%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down +t_dispatch_qos2(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + %% One message is inflight + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + sys:resume(ConnPid1), + %% emqtt automatically send PUBREC, but since auto_ack is set to false + %% so it will never send PUBCOMP, hence EMQX should not attempt to send + %% the 4th message yet since max_inflight is 1. + MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), + ct:sleep(100), + %% no message expected + ?assertEqual([], collect_msgs([])), + %% now kill client 1 + kill_process(ConnPid1), + %% client 2 should receive the message + MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec4 > MsgRec3), + emqtt:stop(ConnPid2), + meck:unload(emqx_zone), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- +kill_process(Pid) -> + _ = unlink(Pid), + _ = monitor(process, Pid), + erlang:exit(Pid, kill), + receive + {'DOWN', _, process, Pid, _} -> + ok + end. + +collect_msgs(Acc) -> + receive + Msg -> + collect_msgs([Msg | Acc]) + after + 0 -> + lists:reverse(Acc) + end. + ensure_config(Strategy) -> ensure_config(Strategy, _AckEnabled = true). From 8d42589bf5045742ae60732f4c1042796a30a32e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 13:32:48 +0200 Subject: [PATCH 5/8] chore: update appup --- src/emqx.appup.src | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 3db3d2c45..51576061c 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,9 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.21", - [{add_module,emqx_secret}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {add_module,emqx_secret}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -17,7 +19,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}]}, {"4.3.20", - [{add_module,emqx_secret}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {add_module,emqx_secret}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, @@ -34,7 +37,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}]}, {"4.3.19", - [{add_module,emqx_secret}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {add_module,emqx_secret}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, @@ -52,7 +56,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.18", - [{add_module,emqx_secret}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {add_module,emqx_secret}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, @@ -857,9 +862,10 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [ - {"4.3.21", - [{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + [{"4.3.21", + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -873,7 +879,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}]}, {"4.3.20", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]}, @@ -889,7 +896,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}]}, {"4.3.19", - [{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]}, @@ -906,7 +914,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.18", - [{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]}, From d23dfcca39efa778f056c4a2094cd6e900c62fcf Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 16:03:00 +0200 Subject: [PATCH 6/8] fix(shared): only re-dispatch QoS1 inflights --- CHANGES-4.3.md | 10 ++++++++++ src/emqx_session.erl | 20 ++++++++++++-------- test/emqx_shared_sub_SUITE.erl | 4 ++-- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 713f89f37..ef96ce745 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,6 +32,11 @@ File format: - Added a test to prevent a last will testament message to be published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) +- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group + when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094). + Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true + to prevent sessions from buffering messages, however this acknowledgement comes with a cost. + ### Bug fixes - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) @@ -48,6 +53,11 @@ File format: Same `format_status` callback is added here too for `gen_server`s which hold password in their state. +- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). + - When discarding QoS 2 inflight messages, there were excessive logs + - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, + but not the subscrbing topic), caused messages to be lost when dispatching. + ## v4.3.20 ### Bug fixes diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 40c8eacc0..d2379a3fd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -641,14 +641,18 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), F = fun({_, {Msg, _Ts}}) -> - case Msg of - #message{} -> - {true, Msg}; - _ -> - %% QoS 2, after pubrec is received - %% the inflight record is updated to an atom - false - end + case Msg of + #message{qos = ?QOS_1} -> + %% For QoS 2, here is what the spec says: + %% If the Client's Session terminates before the Client reconnects, + %% the Server MUST NOT send the Application Message to any other + %% subscribed Client [MQTT-4.8.2-5]. + {true, Msg}; + _ -> + %% QoS 2, after pubrec is received + %% the inflight record is updated to an atom + false + end end, InflightList = lists:filtermap(F, AllInflights), MqList = mqueue_to_list(Q, []), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index ba216abba..a16f948ff 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -550,7 +550,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> %% No ack, QoS 2 subscriptions, %% client1 receives one message, send pubrec, then suspend -%% client2 acts normal (aot_ack=true) +%% client2 acts normal (auto_ack=true) %% Expected behaviour: %% the messages sent to client1's inflight and mq are re-dispatched after client1 is down t_dispatch_qos2({init, Config}) when is_list(Config) -> @@ -593,7 +593,7 @@ t_dispatch_qos2(Config) when is_list(Config) -> ?assert(MsgRec2 > MsgRec1), sys:resume(ConnPid1), - %% emqtt automatically send PUBREC, but since auto_ack is set to false + %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false %% so it will never send PUBCOMP, hence EMQX should not attempt to send %% the 4th message yet since max_inflight is 1. MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), From a1032db4e18a4ebb5ed25657dbf4b10bfbad4869 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 17:17:51 +0200 Subject: [PATCH 7/8] test: add test case to verify QoS 0 message is never redispatched --- CHANGES-4.3.md | 2 +- test/emqx_shared_sub_SUITE.erl | 56 +++++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index ef96ce745..26dd529a2 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -56,7 +56,7 @@ File format: - Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). - When discarding QoS 2 inflight messages, there were excessive logs - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, - but not the subscrbing topic), caused messages to be lost when dispatching. + but not the subscribing topic), caused messages to be lost when dispatching. ## v4.3.20 diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index a16f948ff..2c4ecf265 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -599,7 +599,7 @@ t_dispatch_qos2(Config) when is_list(Config) -> MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), ct:sleep(100), %% no message expected - ?assertEqual([], collect_msgs([])), + ?assertEqual([], collect_msgs(0)), %% now kill client 1 kill_process(ConnPid1), %% client 2 should receive the message @@ -609,6 +609,51 @@ t_dispatch_qos2(Config) when is_list(Config) -> emqtt:stop(ConnPid2), ok. +t_dispatch_qos0({init, Config}) when is_list(Config) -> + Config; +t_dispatch_qos0({'end', Config}) when is_list(Config) -> + ok; +t_dispatch_qos0(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + %% subscribe with QoS 0 + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}), + + %% publish with QoS 2, but should be downgraded to 0 as the subscribers + %% subscribe with QoS 0 + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + ?assertMatch([_], emqx:publish(Message1)), + ?assertMatch([_], emqx:publish(Message2)), + ?assertMatch([_], emqx:publish(Message3)), + ?assertMatch([_], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + kill_process(ConnPid1), + %% expect no redispatch + ?assertEqual([], collect_msgs(timer:seconds(2))), + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -622,12 +667,15 @@ kill_process(Pid) -> ok end. -collect_msgs(Acc) -> +collect_msgs(Timeout) -> + collect_msgs([], Timeout). + +collect_msgs(Acc, Timeout) -> receive Msg -> - collect_msgs([Msg | Acc]) + collect_msgs([Msg | Acc], Timeout) after - 0 -> + Timeout -> lists:reverse(Acc) end. From 206ab125a4b1157d7f1a17ebe6cb920980bc1f7b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 6 Oct 2022 09:21:52 +0200 Subject: [PATCH 8/8] build: support additional checks before cutting a release tag --- scripts/rel/cut4x.sh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/scripts/rel/cut4x.sh b/scripts/rel/cut4x.sh index 58131c757..df142b936 100755 --- a/scripts/rel/cut4x.sh +++ b/scripts/rel/cut4x.sh @@ -230,6 +230,16 @@ if [ "$HAS_RELUP_DB" = 'yes' ]; then ./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS" fi +## Run some additional checks (e.g. some for enterprise edition only) +CHECKS_DIR="./scripts/rel/checks" +if [ -d "${CHECKS_DIR}" ]; then + CHECKS="$(find "${CHECKS_DIR}" -name "*.sh" -print0 2>/dev/null | xargs -0)" + for c in $CHECKS; do + logmsg "Executing $c" + $c + done +fi + if [ "$DRYRUN" = 'yes' ]; then logmsg "Release tag is ready to be created with command: git tag $TAG" else