From b345002e84043a103f8943ed037b01f22a601de3 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Wed, 27 Apr 2022 20:13:31 +0400 Subject: [PATCH 1/2] feat(shared): redispatch to another shared sub, when no ACK received --- CHANGES-4.3.md | 1 + src/emqx.appup.src | 18 ++++++--- src/emqx_session.erl | 63 ++++++++++++++++++++---------- src/emqx_shared_sub.erl | 70 +++++++++++++++++++--------------- test/emqx_shared_sub_SUITE.erl | 49 +++++++++++++++++++++--- 5 files changed, 138 insertions(+), 63 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 566022ddf..79757b1c1 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -22,6 +22,7 @@ File format: * Add more rule engine date functions: format_date/3, format_date/4, date_to_unix_ts/4 [#7894] * Add proto_name and proto_ver fields for $event/client_disconnected event. * Mnesia auth/acl http api support multiple condition queries. +* Inflight QoS1 Messages for shared topics are now redispatched to another alive subscribers upon chosen subscriber session termination. ### Bug fixes * List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index ea2c006bc..09ad2d13a 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,15 +2,18 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.15", - [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, - {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -19,7 +22,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", - [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, @@ -462,12 +466,15 @@ {<<".*">>,[]}], [{"4.3.15", [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, - {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -478,7 +485,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", - [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2d4e60db8..ae4d035c3 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -454,9 +454,12 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = end, {ok, Session1}; false -> + %% Note that we publish message without shared ack header + %% But add to inflight with ack headers + %% This ack header is required for redispatch-on-terminate feature to work Publish = {PacketId, maybe_ack(Msg)}, - Session1 = await(PacketId, Msg, Session), - {ok, [Publish], next_pkt_id(Session1)} + Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), + {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})} end. -spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(), @@ -491,14 +494,10 @@ enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> - case emqx_shared_sub:is_ack_required(Msg) of - true -> emqx_shared_sub:maybe_ack(Msg); - false -> Msg - end. + emqx_shared_sub:maybe_ack(Msg). maybe_nack(Msg) -> - emqx_shared_sub:is_ack_required(Msg) - andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). + emqx_shared_sub:maybe_nack_dropped(Msg). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of @@ -531,14 +530,6 @@ enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). -%%-------------------------------------------------------------------- -%% Awaiting ACK for QoS1/QoS2 Messages -%%-------------------------------------------------------------------- - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), - Session#session{inflight = Inflight1}. - %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- @@ -634,16 +625,46 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). -terminate(ClientInfo, discarded, Session) -> - run_hook('session.discarded', [ClientInfo, info(Session)]); -terminate(ClientInfo, takeovered, Session) -> - run_hook('session.takeovered', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> + run_terminate_hooks(ClientInfo, Reason, Session), + redispatch_shared_messages(Session), + ok. + +run_terminate_hooks(ClientInfo, discarded, Session) -> + run_hook('session.discarded', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, takeovered, Session) -> + run_hook('session.takeovered', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +redispatch_shared_messages(#session{inflight = Inflight}) -> + InflightList = emqx_inflight:to_list(Inflight), + lists:foreach(fun + %% Only QoS1 messages get redispatched, because QoS2 messages + %% must be sent to the same client, once they're in flight + ({_, {#message{qos = ?QOS_2} = Msg, _}}) -> + ?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); + + ({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) -> + case emqx_shared_sub:get_group(Msg) of + {ok, Group} -> + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); + + _ -> + false + end; + + (_) -> + ok + end, InflightList). + -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> - ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + ok = emqx_metrics:inc(Name), + emqx_hooks:run(Name, Args). %%-------------------------------------------------------------------- %% Inc message/delivery expired counter diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 12e64b59a..121d777ca 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -38,12 +38,15 @@ , unsubscribe/3 ]). --export([dispatch/3]). +-export([ dispatch/3 + , dispatch/4 + ]). -export([ maybe_ack/1 , maybe_nack_dropped/1 , nack_no_connection/1 , is_ack_required/1 + , get_group/1 ]). %% for testing @@ -131,7 +134,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> false -> {error, no_subscribers}; {Type, SubPid} -> - case do_dispatch(SubPid, Topic, Msg, Type) of + case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, _Reason} -> %% Failed to dispatch to this sub, try next. @@ -153,36 +156,33 @@ strategy(Group) -> ack_enabled() -> emqx:get_env(shared_dispatch_ack_enabled, false). -do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> +do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, Topic, Msg, Type) -> - dispatch_per_qos(SubPid, Topic, Msg, Type). - %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> +do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, retry) -> +do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, fresh) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg); false -> - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity @@ -204,24 +204,32 @@ dispatch_with_ack(SubPid, Topic, Msg) -> _ = erlang:demonitor(Ref, [flush]) end. -with_ack_ref(Msg, SenderRef) -> - emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). +with_group_ack(Msg, Group, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). -without_ack_ref(Msg) -> +-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). +without_group_ack(Msg) -> emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). -get_ack_ref(Msg) -> +get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). +-spec(get_group(emqx_types:message()) -> {ok, any()} | error). +get_group(Msg) -> + case get_group_ack(Msg) of + ?NO_ACK -> error; + {Group, _Sender, _Ref} -> {ok, Group} + end. + -spec(is_ack_required(emqx_types:message()) -> boolean()). -is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). +is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). %% @doc Negative ack dropped message due to inflight window or message queue being full. --spec(maybe_nack_dropped(emqx_types:message()) -> ok). +-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). maybe_nack_dropped(Msg) -> - case get_ack_ref(Msg) of - ?NO_ACK -> ok; - {Sender, Ref} -> nack(Sender, Ref, dropped) + case get_group_ack(Msg) of + ?NO_ACK -> false; + {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -229,22 +237,22 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec(nack_no_connection(emqx_types:message()) -> ok). nack_no_connection(Msg) -> - {Sender, Ref} = get_ack_ref(Msg), + {_Group, Sender, Ref} = get_group_ack(Msg), nack(Sender, Ref, no_connection). -spec(nack(pid(), reference(), dropped | no_connection) -> ok). nack(Sender, Ref, Reason) -> - erlang:send(Sender, {Ref, ?NACK(Reason)}), + Sender ! {Ref, ?NACK(Reason)}, ok. -spec(maybe_ack(emqx_types:message()) -> emqx_types:message()). maybe_ack(Msg) -> - case get_ack_ref(Msg) of + case get_group_ack(Msg) of ?NO_ACK -> Msg; - {Sender, Ref} -> - erlang:send(Sender, {Ref, ?ACK}), - without_ack_ref(Msg) + {_Group, Sender, Ref} -> + Sender ! {Ref, ?ACK}, + without_group_ack(Msg) end. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 0a6c47d60..5c2de94d0 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -47,20 +47,20 @@ t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). t_maybe_nack_dropped(_) -> - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), + ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). @@ -284,11 +284,14 @@ test_two_messages(Strategy, Group) -> ok. last_message(ExpectedPayload, Pids) -> + last_message(ExpectedPayload, Pids, 100). + +last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), {true, Pid} - after 100 -> + after Timeout -> ct:pal("not yet"), <<"not yet?">> end. @@ -410,6 +413,39 @@ t_local_fallback(_) -> ?assertEqual(UsedSubPid1, UsedSubPid2), ok. +%% This one tests that broker tries to select another shared subscriber +%% If the first one doesn't return an ACK +t_redispatch(_) -> + ok = ensure_config(sticky, true), + application:set_env(emqx, shared_dispatch_ack_enabled, true), + + Group = <<"group1">>, + + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -481,6 +517,7 @@ setup_node(Node, Port) -> opts => [{zone,internal}], proto => tcp}]), application:set_env(gen_rpc, port_discovery, manual), + application:set_env(gen_rpc, tcp_server_port, Port * 2), ok; (_) -> ok From 411ee84a359b39cf0268f4f60932bb8d9d3a9b3f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 19 May 2022 09:23:58 +0800 Subject: [PATCH 2/2] chore(session): fix bad indent --- src/emqx_session.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index ae4d035c3..d01f33e6a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -644,19 +644,16 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% must be sent to the same client, once they're in flight ({_, {#message{qos = ?QOS_2} = Msg, _}}) -> ?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); - ({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) -> case emqx_shared_sub:get_group(Msg) of {ok, Group} -> - %% Note that dispatch is called with self() in failed subs + %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); - _ -> false end; - (_) -> ok end, InflightList).