fix(shared): ACK forward header compatability

This commit is contained in:
Georgy Sychev 2022-05-25 13:57:45 +04:00
parent 137b85bb9a
commit 56093cb5be
1 changed files with 7 additions and 3 deletions

View File

@ -185,15 +185,16 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
Ref = erlang:monitor(process, SubPid), Ref = erlang:monitor(process, SubPid),
Sender = self(), Sender = self(),
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
OldRef = old_ref(Type, Group, Ref),
Timeout = case Msg#message.qos of Timeout = case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity ?QOS_2 -> infinity
end, end,
try try
receive receive
{Ref, ?ACK} -> {ReceivedRef, ?ACK} when ReceivedRef =:= Ref; ReceivedRef =:= OldRef ->
ok; ok;
{Ref, ?NACK(Reason)} -> {ReceivedRef, ?NACK(Reason)} when ReceivedRef =:= Ref; ReceivedRef =:= OldRef ->
%% the receive session may nack this message when its queue is full %% the receive session may nack this message when its queue is full
{error, Reason}; {error, Reason};
{'DOWN', Ref, process, SubPid, Reason} -> {'DOWN', Ref, process, SubPid, Reason} ->
@ -207,7 +208,10 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
end. end.
with_group_ack(Msg, Group, Type, Sender, Ref) -> with_group_ack(Msg, Group, Type, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, {Type, Group, Ref}}}, Msg). emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).
old_ref(Type, Group, Ref) ->
{Type, Group, Ref}.
-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
without_group_ack(Msg) -> without_group_ack(Msg) ->