From f964989d6bb401207a7964ef46d6fa56b78ee77e Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Wed, 27 Apr 2022 20:13:31 +0400 Subject: [PATCH 1/6] feat(shared): redispatch QoS1 messages when sub dies --- apps/emqx/src/emqx_session.erl | 63 ++++++++++++++------- apps/emqx/src/emqx_shared_sub.erl | 71 +++++++++++++----------- apps/emqx/test/emqx_shared_sub_SUITE.erl | 51 +++++++++++++---- 3 files changed, 122 insertions(+), 63 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 310d21c92..1a784d301 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -561,10 +561,13 @@ deliver_msg( end, {ok, Session1}; false -> + %% Note that we publish message without shared ack header + %% But add to inflight with ack headers + %% This ack header is required for redispatch-on-terminate feature to work Publish = {PacketId, maybe_ack(Msg)}, - Msg2 = mark_begin_deliver(Msg), - Session1 = await(PacketId, Msg2, Session), - {ok, [Publish], next_pkt_id(Session1)} + MarkedMsg = mark_begin_deliver(Msg), + Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight), + {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})} end. -spec enqueue( @@ -625,14 +628,10 @@ enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> - case emqx_shared_sub:is_ack_required(Msg) of - true -> emqx_shared_sub:maybe_ack(Msg); - false -> Msg - end. + emqx_shared_sub:maybe_ack(Msg). maybe_nack(Msg) -> - emqx_shared_sub:is_ack_required(Msg) andalso - (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). + emqx_shared_sub:maybe_nack_dropped(Msg). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of @@ -673,14 +672,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) -> Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). -%%-------------------------------------------------------------------- -%% Awaiting ACK for QoS1/QoS2 Messages -%%-------------------------------------------------------------------- - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), - Session#session{inflight = Inflight1}. - %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- @@ -808,13 +799,43 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. -terminate(ClientInfo, discarded, Session) -> - run_hook('session.discarded', [ClientInfo, info(Session)]); -terminate(ClientInfo, takenover, Session) -> - run_hook('session.takenover', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> + run_terminate_hooks(ClientInfo, Reason, Session), + redispatch_shared_messages(Session), + ok. + +run_terminate_hooks(ClientInfo, discarded, Session) -> + run_hook('session.discarded', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, takeovered, Session) -> + run_hook('session.takeovered', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +redispatch_shared_messages(#session{inflight = Inflight}) -> + InflightList = emqx_inflight:to_list(Inflight), + lists:foreach( + fun + %% Only QoS1 messages get redispatched, because QoS2 messages + %% must be sent to the same client, once they're in flight + ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) -> + ?SLOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); + ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) -> + case emqx_shared_sub:get_group(Msg) of + {ok, Group} -> + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); + _ -> + false + end; + (_) -> + ok + end, + InflightList + ). + +-compile({inline, [run_hook/2]}). run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ee45b991d..22866bb3d 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -36,13 +36,17 @@ unsubscribe/3 ]). --export([dispatch/3]). +-export([ + dispatch/3, + dispatch/4 +]). -export([ maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, - is_ack_required/1 + is_ack_required/1, + get_group/1 ]). %% for testing @@ -132,7 +136,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> false -> {error, no_subscribers}; {Type, SubPid} -> - case do_dispatch(SubPid, Topic, Msg, Type) of + case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, _Reason} -> @@ -152,36 +156,33 @@ strategy(Group) -> ack_enabled() -> emqx:get_config([broker, shared_dispatch_ack_enabled]). -do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> +do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, Topic, Msg, Type) -> - dispatch_per_qos(SubPid, Topic, Msg, Type). - %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> +do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, retry) -> +do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, fresh) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg); false -> - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); @@ -203,24 +204,32 @@ dispatch_with_ack(SubPid, Topic, Msg) -> ok = emqx_pmon:demonitor(Ref) end. -with_ack_ref(Msg, SenderRef) -> - emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). +with_group_ack(Msg, Group, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). -without_ack_ref(Msg) -> +-spec without_group_ack(emqx_types:message()) -> emqx_types:message(). +without_group_ack(Msg) -> emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). -get_ack_ref(Msg) -> +get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). -spec is_ack_required(emqx_types:message()) -> boolean(). -is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). +is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). + +-spec get_group(emqx_types:message()) -> {ok, any()} | error. +get_group(Msg) -> + case get_group_ack(Msg) of + ?NO_ACK -> error; + {Group, _Sender, _Ref} -> {ok, Group} + end. %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec maybe_nack_dropped(emqx_types:message()) -> ok. maybe_nack_dropped(Msg) -> - case get_ack_ref(Msg) of - ?NO_ACK -> ok; - {Sender, Ref} -> nack(Sender, Ref, dropped) + case get_group_ack(Msg) of + ?NO_ACK -> false; + {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -228,22 +237,22 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec nack_no_connection(emqx_types:message()) -> ok. nack_no_connection(Msg) -> - {Sender, Ref} = get_ack_ref(Msg), + {_Group, Sender, Ref} = get_group_ack(Msg), nack(Sender, Ref, no_connection). -spec nack(pid(), reference(), dropped | no_connection) -> ok. nack(Sender, Ref, Reason) -> - erlang:send(Sender, {Ref, ?NACK(Reason)}), + Sender ! {Ref, ?NACK(Reason)}, ok. -spec maybe_ack(emqx_types:message()) -> emqx_types:message(). maybe_ack(Msg) -> - case get_ack_ref(Msg) of + case get_group_ack(Msg) of ?NO_ACK -> Msg; - {Sender, Ref} -> - erlang:send(Sender, {Ref, ?ACK}), - without_ack_ref(Msg) + {_Group, Sender, Ref} -> + Sender ! {Ref, ?ACK}, + without_group_ack(Msg) end. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 19aa96942..3b990efec 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -49,9 +49,9 @@ t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). t_maybe_nack_dropped(_) -> - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), + ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual( ok, receive @@ -61,7 +61,7 @@ t_maybe_nack_dropped(_) -> ). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual( ok, @@ -73,7 +73,7 @@ t_nack_no_connection(_) -> t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual( #message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg) @@ -313,11 +313,15 @@ test_two_messages(Strategy, Group) -> ok. last_message(ExpectedPayload, Pids) -> + last_message(ExpectedPayload, Pids, 100). + +last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> ct:pal("~p ====== ~p", [Pids, Pid]), {true, Pid} - after 500 -> + after Timeout -> + ct:pal("not yet"), <<"not yet?">> end. @@ -334,11 +338,6 @@ t_dispatch(_) -> emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) ). -% t_unsubscribe(_) -> -% error('TODO'). - -% t_subscribe(_) -> -% error('TODO'). t_uncovered_func(_) -> ignored = gen_server:call(emqx_shared_sub, ignored), ok = gen_server:cast(emqx_shared_sub, ignored), @@ -438,6 +437,36 @@ t_local_fallback(_) -> stop_slave(Node), ?assertEqual(UsedSubPid1, UsedSubPid2), + +%% This one tests that broker tries to select another shared subscriber +%% If the first one doesn't return an ACK +t_redispatch(_) -> + ok = ensure_config(sticky, true), + + Group = <<"group1">>, + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), ok. %%-------------------------------------------------------------------- From 671906dd596aefacec72915bb6bfaf26e8915b1f Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Mon, 23 May 2022 22:46:44 +0400 Subject: [PATCH 2/6] fix: typo --- apps/emqx/test/emqx_shared_sub_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 3b990efec..21ffd385e 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -437,6 +437,7 @@ t_local_fallback(_) -> stop_slave(Node), ?assertEqual(UsedSubPid1, UsedSubPid2), + ok. %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK From ea313df2cc235b5c2ee9322bcaf86e9e6cb6f951 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Tue, 24 May 2022 00:06:29 +0400 Subject: [PATCH 3/6] fix(shared): removed emqx_message:format --- apps/emqx/src/emqx_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 1a784d301..0dd116347 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -818,7 +818,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% Only QoS1 messages get redispatched, because QoS2 messages %% must be sent to the same client, once they're in flight ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) -> - ?SLOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); + ?SLOG(warning, "Not redispatching qos2 msg: ~s", [Msg]); ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) -> case emqx_shared_sub:get_group(Msg) of {ok, Group} -> From d0cf112e1798646ca660c42bfa0181df7278b24b Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Tue, 24 May 2022 11:17:34 +0400 Subject: [PATCH 4/6] fix(shared): dialyzer --- apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/src/emqx_shared_sub.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 0dd116347..e32ebc610 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -818,7 +818,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% Only QoS1 messages get redispatched, because QoS2 messages %% must be sent to the same client, once they're in flight ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) -> - ?SLOG(warning, "Not redispatching qos2 msg: ~s", [Msg]); + ?SLOG(warning, "Not redispatching qos2 msg: ~p", [Msg]); ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) -> case emqx_shared_sub:get_group(Msg) of {ok, Group} -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 22866bb3d..5163554a7 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -225,7 +225,7 @@ get_group(Msg) -> end. %% @doc Negative ack dropped message due to inflight window or message queue being full. --spec maybe_nack_dropped(emqx_types:message()) -> ok. +-spec maybe_nack_dropped(emqx_types:message()) -> boolean(). maybe_nack_dropped(Msg) -> case get_group_ack(Msg) of ?NO_ACK -> false; From d911a5a9d4b446986ea96fd83ce9d675633d80f7 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Tue, 24 May 2022 15:57:38 +0400 Subject: [PATCH 5/6] fix(shared): fixed logging --- apps/emqx/src/emqx_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index e32ebc610..e37680ddf 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -818,7 +818,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% Only QoS1 messages get redispatched, because QoS2 messages %% must be sent to the same client, once they're in flight ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) -> - ?SLOG(warning, "Not redispatching qos2 msg: ~p", [Msg]); + ?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg}); ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) -> case emqx_shared_sub:get_group(Msg) of {ok, Group} -> From db368e83b16d39d2db25192a5dd1bf8240c52ffe Mon Sep 17 00:00:00 2001 From: gsychev <79104934+gsychev@users.noreply.github.com> Date: Fri, 27 May 2022 14:54:18 +0000 Subject: [PATCH 6/6] fix(session): takenover Co-authored-by: Thales Macedo Garitezi --- apps/emqx/src/emqx_session.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index e37680ddf..71ffeb24e 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -806,8 +806,8 @@ terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, discarded, Session) -> run_hook('session.discarded', [ClientInfo, info(Session)]); -run_terminate_hooks(ClientInfo, takeovered, Session) -> - run_hook('session.takeovered', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, takenover, Session) -> + run_hook('session.takenover', [ClientInfo, info(Session)]); run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).