fix(shared): ACK header compatability

This commit is contained in:
Georgy Sychev 2022-05-24 21:25:15 +04:00
parent e175b3a1d6
commit dd6c7efc84
1 changed files with 13 additions and 3 deletions

View File

@ -230,9 +230,13 @@ is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()).
maybe_nack_dropped(Msg) ->
case get_group_ack(Msg) of
?NO_ACK -> false;
?NO_ACK -> false;
{fresh, _Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped);
{retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false
{retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false;
%% This clause is for backward compatability
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
ok == nack(Sender, Ref, dropped)
end.
%% @doc Negative ack message due to connection down.
@ -253,11 +257,17 @@ maybe_ack(Msg) ->
case get_group_ack(Msg) of
?NO_ACK ->
Msg;
{_Type, _Group, Sender, Ref} ->
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
Sender ! {Ref, ?ACK},
without_group_ack(Msg)
end.
fetch_sender_ref({_Type, _Group, Sender, Ref}) -> {Sender, Ref};
%% These clauses are for backward compatability
fetch_sender_ref({_Group, Sender, Ref}) -> {Sender, Ref};
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
case is_active_sub(Sub0, FailedSubs) of