Merge pull request #8522 from ieQu1/dispatch-shared-sub-over-gen-rpc

fix(shared_sub): Use gen_rpc to send shared_sub payloads
This commit is contained in:
ieQu1 2022-07-20 15:06:12 +02:00 committed by GitHub
commit e36bc96028
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 7 deletions

View File

@ -164,27 +164,24 @@ ack_enabled() ->
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise
SubPid ! {deliver, Topic, Msg},
ok;
send(SubPid, Topic, {deliver, Topic, Msg});
%% return either 'ok' (when everything is fine) or 'error'
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch
SubPid ! {deliver, Topic, Msg},
ok;
send(SubPid, Topic, {deliver, Topic, Msg});
do_dispatch(SubPid, Group, Topic, Msg, Type) ->
case ack_enabled() of
true ->
dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
false ->
SubPid ! {deliver, Topic, Msg},
ok
send(SubPid, Topic, {deliver, Topic, Msg})
end.
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
%% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid),
Sender = self(),
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}),
Timeout = case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity
@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
_ = erlang:demonitor(Ref, [flush])
end.
send(Pid, Topic, Msg) ->
Node = node(Pid),
if Node =:= node() ->
Pid ! Msg;
true ->
emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg])
end,
ok.
with_group_ack(Msg, Group, Type, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).