From 8e418cdc6b5ed65a769951208c33234d9e1577a0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 20 Jul 2022 13:55:12 +0200 Subject: [PATCH] fix(shared_sub): Use gen_rpc to send shared_sub payloads --- src/emqx_shared_sub.erl | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 1e77b6014..3f67c225d 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -164,27 +164,24 @@ ack_enabled() -> do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); do_dispatch(SubPid, Group, Topic, Msg, Type) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Group, Topic, Msg, Type); false -> - SubPid ! {deliver, Topic, Msg}, - ok + send(SubPid, Topic, {deliver, Topic, Msg}) end. dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, + send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}), Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity @@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> _ = erlang:demonitor(Ref, [flush]) end. +send(Pid, Topic, Msg) -> + Node = node(Pid), + if Node =:= node() -> + Pid ! Msg; + true -> + emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]) + end, + ok. + with_group_ack(Msg, Group, Type, Sender, Ref) -> emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).