diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 939be4270..c49299c58 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -735,8 +735,8 @@ handle_deliver(Delivers, Channel = #channel{ %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. - NPendings = lists:append(Pendings, - ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)), + NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session), + NPendings = lists:append(Pendings, NDelivers), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{ @@ -744,8 +744,8 @@ handle_deliver(Delivers, Channel = #channel{ takeover = false, session = Session, clientinfo = #{clientid := ClientId} = ClientInfo}) -> - NSession = emqx_session:enqueue(ClientInfo, - ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session), + NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session), + NSession = emqx_session:enqueue(ClientInfo, NDelivers, Session), {ok, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel = #channel{ @@ -776,12 +776,23 @@ ignore_local(ClientInfo, Delivers, Subscriber, Session) -> end, Delivers). %% Nack delivers from shared subscription -maybe_nack(Delivers) -> - lists:filter(fun not_nacked/1, Delivers). - -not_nacked({deliver, _Topic, Msg}) -> - not (emqx_shared_sub:is_ack_required(Msg) - andalso (ok == emqx_shared_sub:nack_no_connection(Msg))). +maybe_discard_shared_delivers(Delivers) -> + lists:filtermap( + fun({deliver, Topic, Msg}) -> + case emqx_shared_sub:is_ack_required(Msg) of + false -> + true; + true -> + case emqx_shared_sub:is_retry_dispatch(Msg) of + true -> + %% force enqueue the retried shared deliver + {true, {deliver, Topic, emqx_shared_sub:maybe_ack(Msg)}}; + false -> + ok = emqx_shared_sub:nack_no_connection(Msg), + false + end + end + end, Delivers). %%-------------------------------------------------------------------- %% Handle outgoing packet diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 337d63d15..3b533d9d3 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -46,6 +46,7 @@ , maybe_nack_dropped/1 , nack_no_connection/1 , is_ack_required/1 + , is_retry_dispatch/1 , get_group/1 ]). @@ -239,6 +240,13 @@ get_group(Msg) -> -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). +-spec(is_retry_dispatch(emqx_types:message()) -> boolean()). +is_retry_dispatch(Msg) -> + case get_group_ack(Msg) of + {_Sender, {retry, _Group, _Ref}} -> true; + _ -> false + end. + %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) -> @@ -280,10 +288,15 @@ maybe_ack(Msg) -> Msg; Ack -> {Sender, Ref} = fetch_sender_ref(Ack), - Sender ! {Ref, ?ACK}, + ack(Sender, Ref), without_group_ack(Msg) end. +-spec(ack(pid(), reference()) -> ok). +ack(Sender, Ref) -> + Sender ! {Ref, ?ACK}, + ok. + fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref}; %% These clauses are for backward compatibility fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.