From dd6c7efc84f5c8201eb14f132e8dbbaee5fcc251 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Tue, 24 May 2022 21:25:15 +0400 Subject: [PATCH] fix(shared): ACK header compatability --- src/emqx_shared_sub.erl | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 9eb2b1d5b..57c0171ce 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -230,9 +230,13 @@ is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). -spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). maybe_nack_dropped(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> false; + ?NO_ACK -> false; {fresh, _Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped); - {retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false + {retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false; + %% This clause is for backward compatability + Ack -> + {Sender, Ref} = fetch_sender_ref(Ack), + ok == nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -253,11 +257,17 @@ maybe_ack(Msg) -> case get_group_ack(Msg) of ?NO_ACK -> Msg; - {_Type, _Group, Sender, Ref} -> + Ack -> + {Sender, Ref} = fetch_sender_ref(Ack), Sender ! {Ref, ?ACK}, without_group_ack(Msg) end. +fetch_sender_ref({_Type, _Group, Sender, Ref}) -> {Sender, Ref}; +%% These clauses are for backward compatability +fetch_sender_ref({_Group, Sender, Ref}) -> {Sender, Ref}; +fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. + pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), case is_active_sub(Sub0, FailedSubs) of