fix(shared-sub): fix dead loop if all subscribers are disconected

In `broker.shared_dispatch_ack_enabled=true`, if all subscribers seesion are
kept and but connnection gone. the subscribers will reply NACKs if someone shared
delivers reached. However, the shared subscription always pick a subscriber to retry,
even if it has already replied with NACKs.

After this PR, the subscriber replies with an ACK and stores it into mqueue,
instead of replying with a NACK
This commit is contained in:
JianBo He 2022-09-07 17:03:41 +08:00
parent ad31dfff35
commit 8ec432481d
2 changed files with 35 additions and 11 deletions

View File

@ -735,8 +735,8 @@ handle_deliver(Delivers, Channel = #channel{
%% NOTE: Order is important here. While the takeover is in %% NOTE: Order is important here. While the takeover is in
%% progress, the session cannot enqueue messages, since it already %% progress, the session cannot enqueue messages, since it already
%% passed on the queue to the new connection in the session state. %% passed on the queue to the new connection in the session state.
NPendings = lists:append(Pendings, NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)), NPendings = lists:append(Pendings, NDelivers),
{ok, Channel#channel{pendings = NPendings}}; {ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{ handle_deliver(Delivers, Channel = #channel{
@ -744,8 +744,8 @@ handle_deliver(Delivers, Channel = #channel{
takeover = false, takeover = false,
session = Session, session = Session,
clientinfo = #{clientid := ClientId} = ClientInfo}) -> clientinfo = #{clientid := ClientId} = ClientInfo}) ->
NSession = emqx_session:enqueue(ClientInfo, NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session), NSession = emqx_session:enqueue(ClientInfo, NDelivers, Session),
{ok, Channel#channel{session = NSession}}; {ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{ handle_deliver(Delivers, Channel = #channel{
@ -776,12 +776,23 @@ ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
end, Delivers). end, Delivers).
%% Nack delivers from shared subscription %% Nack delivers from shared subscription
maybe_nack(Delivers) -> maybe_discard_shared_delivers(Delivers) ->
lists:filter(fun not_nacked/1, Delivers). lists:filtermap(
fun({deliver, Topic, Msg}) ->
not_nacked({deliver, _Topic, Msg}) -> case emqx_shared_sub:is_ack_required(Msg) of
not (emqx_shared_sub:is_ack_required(Msg) false ->
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))). true;
true ->
case emqx_shared_sub:is_retry_dispatch(Msg) of
true ->
%% force enqueue the retried shared deliver
{true, {deliver, Topic, emqx_shared_sub:maybe_ack(Msg)}};
false ->
ok = emqx_shared_sub:nack_no_connection(Msg),
false
end
end
end, Delivers).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packet %% Handle outgoing packet

View File

@ -46,6 +46,7 @@
, maybe_nack_dropped/1 , maybe_nack_dropped/1
, nack_no_connection/1 , nack_no_connection/1
, is_ack_required/1 , is_ack_required/1
, is_retry_dispatch/1
, get_group/1 , get_group/1
]). ]).
@ -239,6 +240,13 @@ get_group(Msg) ->
-spec(is_ack_required(emqx_types:message()) -> boolean()). -spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
-spec(is_retry_dispatch(emqx_types:message()) -> boolean()).
is_retry_dispatch(Msg) ->
case get_group_ack(Msg) of
{_Sender, {retry, _Group, _Ref}} -> true;
_ -> false
end.
%% @doc Negative ack dropped message due to inflight window or message queue being full. %% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) -> maybe_nack_dropped(Msg) ->
@ -280,10 +288,15 @@ maybe_ack(Msg) ->
Msg; Msg;
Ack -> Ack ->
{Sender, Ref} = fetch_sender_ref(Ack), {Sender, Ref} = fetch_sender_ref(Ack),
Sender ! {Ref, ?ACK}, ack(Sender, Ref),
without_group_ack(Msg) without_group_ack(Msg)
end. end.
-spec(ack(pid(), reference()) -> ok).
ack(Sender, Ref) ->
Sender ! {Ref, ?ACK},
ok.
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref}; fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
%% These clauses are for backward compatibility %% These clauses are for backward compatibility
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.