diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 310d21c92..71ffeb24e 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -561,10 +561,13 @@ deliver_msg( end, {ok, Session1}; false -> + %% Note that we publish message without shared ack header + %% But add to inflight with ack headers + %% This ack header is required for redispatch-on-terminate feature to work Publish = {PacketId, maybe_ack(Msg)}, - Msg2 = mark_begin_deliver(Msg), - Session1 = await(PacketId, Msg2, Session), - {ok, [Publish], next_pkt_id(Session1)} + MarkedMsg = mark_begin_deliver(Msg), + Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight), + {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})} end. -spec enqueue( @@ -625,14 +628,10 @@ enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> - case emqx_shared_sub:is_ack_required(Msg) of - true -> emqx_shared_sub:maybe_ack(Msg); - false -> Msg - end. + emqx_shared_sub:maybe_ack(Msg). maybe_nack(Msg) -> - emqx_shared_sub:is_ack_required(Msg) andalso - (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). + emqx_shared_sub:maybe_nack_dropped(Msg). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of @@ -673,14 +672,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) -> Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). -%%-------------------------------------------------------------------- -%% Awaiting ACK for QoS1/QoS2 Messages -%%-------------------------------------------------------------------- - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), - Session#session{inflight = Inflight1}. - %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- @@ -808,13 +799,43 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. -terminate(ClientInfo, discarded, Session) -> - run_hook('session.discarded', [ClientInfo, info(Session)]); -terminate(ClientInfo, takenover, Session) -> - run_hook('session.takenover', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> + run_terminate_hooks(ClientInfo, Reason, Session), + redispatch_shared_messages(Session), + ok. + +run_terminate_hooks(ClientInfo, discarded, Session) -> + run_hook('session.discarded', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, takenover, Session) -> + run_hook('session.takenover', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +redispatch_shared_messages(#session{inflight = Inflight}) -> + InflightList = emqx_inflight:to_list(Inflight), + lists:foreach( + fun + %% Only QoS1 messages get redispatched, because QoS2 messages + %% must be sent to the same client, once they're in flight + ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) -> + ?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg}); + ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) -> + case emqx_shared_sub:get_group(Msg) of + {ok, Group} -> + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); + _ -> + false + end; + (_) -> + ok + end, + InflightList + ). + +-compile({inline, [run_hook/2]}). run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 5539c4427..bdd6d22ea 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -36,13 +36,17 @@ unsubscribe/3 ]). --export([dispatch/3]). +-export([ + dispatch/3, + dispatch/4 +]). -export([ maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, - is_ack_required/1 + is_ack_required/1, + get_group/1 ]). %% for testing @@ -132,7 +136,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> false -> {error, no_subscribers}; {Type, SubPid} -> - case do_dispatch(SubPid, Topic, Msg, Type) of + case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, _Reason} -> @@ -152,36 +156,33 @@ strategy(Group) -> ack_enabled() -> emqx:get_config([broker, shared_dispatch_ack_enabled]). -do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> +do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, Topic, Msg, Type) -> - dispatch_per_qos(SubPid, Topic, Msg, Type). - %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> +do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, retry) -> +do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, fresh) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg); false -> - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); @@ -203,24 +204,32 @@ dispatch_with_ack(SubPid, Topic, Msg) -> ok = emqx_pmon:demonitor(Ref) end. -with_ack_ref(Msg, SenderRef) -> - emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). +with_group_ack(Msg, Group, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). -without_ack_ref(Msg) -> +-spec without_group_ack(emqx_types:message()) -> emqx_types:message(). +without_group_ack(Msg) -> emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). -get_ack_ref(Msg) -> +get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). -spec is_ack_required(emqx_types:message()) -> boolean(). -is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). +is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). + +-spec get_group(emqx_types:message()) -> {ok, any()} | error. +get_group(Msg) -> + case get_group_ack(Msg) of + ?NO_ACK -> error; + {Group, _Sender, _Ref} -> {ok, Group} + end. %% @doc Negative ack dropped message due to inflight window or message queue being full. --spec maybe_nack_dropped(emqx_types:message()) -> ok. +-spec maybe_nack_dropped(emqx_types:message()) -> boolean(). maybe_nack_dropped(Msg) -> - case get_ack_ref(Msg) of - ?NO_ACK -> ok; - {Sender, Ref} -> nack(Sender, Ref, dropped) + case get_group_ack(Msg) of + ?NO_ACK -> false; + {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -228,22 +237,22 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec nack_no_connection(emqx_types:message()) -> ok. nack_no_connection(Msg) -> - {Sender, Ref} = get_ack_ref(Msg), + {_Group, Sender, Ref} = get_group_ack(Msg), nack(Sender, Ref, no_connection). -spec nack(pid(), reference(), dropped | no_connection) -> ok. nack(Sender, Ref, Reason) -> - erlang:send(Sender, {Ref, ?NACK(Reason)}), + Sender ! {Ref, ?NACK(Reason)}, ok. -spec maybe_ack(emqx_types:message()) -> emqx_types:message(). maybe_ack(Msg) -> - case get_ack_ref(Msg) of + case get_group_ack(Msg) of ?NO_ACK -> Msg; - {Sender, Ref} -> - erlang:send(Sender, {Ref, ?ACK}), - without_ack_ref(Msg) + {_Group, Sender, Ref} -> + Sender ! {Ref, ?ACK}, + without_group_ack(Msg) end. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 19aa96942..21ffd385e 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -49,9 +49,9 @@ t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). t_maybe_nack_dropped(_) -> - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), + ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual( ok, receive @@ -61,7 +61,7 @@ t_maybe_nack_dropped(_) -> ). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual( ok, @@ -73,7 +73,7 @@ t_nack_no_connection(_) -> t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual( #message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg) @@ -313,11 +313,15 @@ test_two_messages(Strategy, Group) -> ok. last_message(ExpectedPayload, Pids) -> + last_message(ExpectedPayload, Pids, 100). + +last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> ct:pal("~p ====== ~p", [Pids, Pid]), {true, Pid} - after 500 -> + after Timeout -> + ct:pal("not yet"), <<"not yet?">> end. @@ -334,11 +338,6 @@ t_dispatch(_) -> emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) ). -% t_unsubscribe(_) -> -% error('TODO'). - -% t_subscribe(_) -> -% error('TODO'). t_uncovered_func(_) -> ignored = gen_server:call(emqx_shared_sub, ignored), ok = gen_server:cast(emqx_shared_sub, ignored), @@ -440,6 +439,37 @@ t_local_fallback(_) -> ?assertEqual(UsedSubPid1, UsedSubPid2), ok. +%% This one tests that broker tries to select another shared subscriber +%% If the first one doesn't return an ACK +t_redispatch(_) -> + ok = ensure_config(sticky, true), + + Group = <<"group1">>, + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%--------------------------------------------------------------------