diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 57c0171ce..db9abe18c 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -207,7 +207,7 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> end. with_group_ack(Msg, Group, Type, Sender, Ref) -> - emqx_message:set_headers(#{shared_dispatch_ack => {Type, Group, Sender, Ref}}, Msg). + emqx_message:set_headers(#{shared_dispatch_ack => {Sender, {Type, Group, Ref}}}, Msg). -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). without_group_ack(Msg) -> @@ -219,7 +219,7 @@ get_group_ack(Msg) -> -spec(get_group(emqx_types:message()) -> {ok, any()} | error). get_group(Msg) -> case get_group_ack(Msg) of - {_Type, Group, _Sender, _Ref} -> {ok, Group}; + {_Sender, {_Type, Group, _Ref}} -> {ok, Group}; _ -> error end. @@ -230,9 +230,9 @@ is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). -spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). maybe_nack_dropped(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> false; - {fresh, _Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped); - {retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false; + ?NO_ACK -> false; + {Sender, {fresh, _Group, Ref}} -> ok == nack(Sender, Ref, dropped); + {_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), false; %% This clause is for backward compatability Ack -> {Sender, Ref} = fetch_sender_ref(Ack), @@ -244,7 +244,7 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec(nack_no_connection(emqx_types:message()) -> ok). nack_no_connection(Msg) -> - {_Type, _Group, Sender, Ref} = get_group_ack(Msg), + {Sender, Ref} = fetch_sender_ref(get_group_ack(Msg)), nack(Sender, Ref, no_connection). -spec(nack(pid(), reference(), dropped | no_connection) -> ok). @@ -263,9 +263,9 @@ maybe_ack(Msg) -> without_group_ack(Msg) end. -fetch_sender_ref({_Type, _Group, Sender, Ref}) -> {Sender, Ref}; +fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref}; %% These clauses are for backward compatability -fetch_sender_ref({_Group, Sender, Ref}) -> {Sender, Ref}; +fetch_sender_ref({Sender, {_Group, Ref}}) -> {Sender, Ref}; fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index cee53775b..b999657a8 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -48,19 +48,19 @@ t_is_ack_required(_) -> t_maybe_nack_dropped(_) -> ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).