From 56093cb5be55a3206cfbeb8c188d9f4a1fb2c5fa Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Wed, 25 May 2022 13:57:45 +0400 Subject: [PATCH] fix(shared): ACK forward header compatability --- src/emqx_shared_sub.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index db9abe18c..675309480 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -185,15 +185,16 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> Ref = erlang:monitor(process, SubPid), Sender = self(), SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, + OldRef = old_ref(Type, Group, Ref), Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity end, try receive - {Ref, ?ACK} -> + {ReceivedRef, ?ACK} when ReceivedRef =:= Ref; ReceivedRef =:= OldRef -> ok; - {Ref, ?NACK(Reason)} -> + {ReceivedRef, ?NACK(Reason)} when ReceivedRef =:= Ref; ReceivedRef =:= OldRef -> %% the receive session may nack this message when its queue is full {error, Reason}; {'DOWN', Ref, process, SubPid, Reason} -> @@ -207,7 +208,10 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> end. with_group_ack(Msg, Group, Type, Sender, Ref) -> - emqx_message:set_headers(#{shared_dispatch_ack => {Sender, {Type, Group, Ref}}}, Msg). + emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg). + +old_ref(Type, Group, Ref) -> + {Type, Group, Ref}. -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). without_group_ack(Msg) ->