fix(shared): ACK forward header compatability
This commit is contained in:
parent
dd6c7efc84
commit
137b85bb9a
|
@ -207,7 +207,7 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
with_group_ack(Msg, Group, Type, Sender, Ref) ->
|
with_group_ack(Msg, Group, Type, Sender, Ref) ->
|
||||||
emqx_message:set_headers(#{shared_dispatch_ack => {Type, Group, Sender, Ref}}, Msg).
|
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, {Type, Group, Ref}}}, Msg).
|
||||||
|
|
||||||
-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
|
-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
|
||||||
without_group_ack(Msg) ->
|
without_group_ack(Msg) ->
|
||||||
|
@ -219,7 +219,7 @@ get_group_ack(Msg) ->
|
||||||
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
|
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
|
||||||
get_group(Msg) ->
|
get_group(Msg) ->
|
||||||
case get_group_ack(Msg) of
|
case get_group_ack(Msg) of
|
||||||
{_Type, Group, _Sender, _Ref} -> {ok, Group};
|
{_Sender, {_Type, Group, _Ref}} -> {ok, Group};
|
||||||
_ -> error
|
_ -> error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -230,9 +230,9 @@ is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
||||||
-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()).
|
-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()).
|
||||||
maybe_nack_dropped(Msg) ->
|
maybe_nack_dropped(Msg) ->
|
||||||
case get_group_ack(Msg) of
|
case get_group_ack(Msg) of
|
||||||
?NO_ACK -> false;
|
?NO_ACK -> false;
|
||||||
{fresh, _Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped);
|
{Sender, {fresh, _Group, Ref}} -> ok == nack(Sender, Ref, dropped);
|
||||||
{retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false;
|
{_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), false;
|
||||||
%% This clause is for backward compatability
|
%% This clause is for backward compatability
|
||||||
Ack ->
|
Ack ->
|
||||||
{Sender, Ref} = fetch_sender_ref(Ack),
|
{Sender, Ref} = fetch_sender_ref(Ack),
|
||||||
|
@ -244,7 +244,7 @@ maybe_nack_dropped(Msg) ->
|
||||||
%% i.e is_ack_required returned true.
|
%% i.e is_ack_required returned true.
|
||||||
-spec(nack_no_connection(emqx_types:message()) -> ok).
|
-spec(nack_no_connection(emqx_types:message()) -> ok).
|
||||||
nack_no_connection(Msg) ->
|
nack_no_connection(Msg) ->
|
||||||
{_Type, _Group, Sender, Ref} = get_group_ack(Msg),
|
{Sender, Ref} = fetch_sender_ref(get_group_ack(Msg)),
|
||||||
nack(Sender, Ref, no_connection).
|
nack(Sender, Ref, no_connection).
|
||||||
|
|
||||||
-spec(nack(pid(), reference(), dropped | no_connection) -> ok).
|
-spec(nack(pid(), reference(), dropped | no_connection) -> ok).
|
||||||
|
@ -263,9 +263,9 @@ maybe_ack(Msg) ->
|
||||||
without_group_ack(Msg)
|
without_group_ack(Msg)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
fetch_sender_ref({_Type, _Group, Sender, Ref}) -> {Sender, Ref};
|
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
|
||||||
%% These clauses are for backward compatability
|
%% These clauses are for backward compatability
|
||||||
fetch_sender_ref({_Group, Sender, Ref}) -> {Sender, Ref};
|
fetch_sender_ref({Sender, {_Group, Ref}}) -> {Sender, Ref};
|
||||||
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
|
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
|
||||||
|
|
||||||
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
|
|
|
@ -48,19 +48,19 @@ t_is_ack_required(_) ->
|
||||||
|
|
||||||
t_maybe_nack_dropped(_) ->
|
t_maybe_nack_dropped(_) ->
|
||||||
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||||
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
||||||
|
|
||||||
t_nack_no_connection(_) ->
|
t_nack_no_connection(_) ->
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||||
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
||||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
||||||
after 100 -> timeout end).
|
after 100 -> timeout end).
|
||||||
|
|
||||||
t_maybe_ack(_) ->
|
t_maybe_ack(_) ->
|
||||||
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||||
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
||||||
emqx_shared_sub:maybe_ack(Msg)),
|
emqx_shared_sub:maybe_ack(Msg)),
|
||||||
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
||||||
|
|
Loading…
Reference in New Issue