fix(shared_sub): Use gen_rpc to send shared_sub payloads
This commit is contained in:
parent
1fff0ced2c
commit
8e418cdc6b
|
@ -164,27 +164,24 @@ ack_enabled() ->
|
||||||
|
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% Deadlock otherwise
|
||||||
SubPid ! {deliver, Topic, Msg},
|
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||||
ok;
|
|
||||||
%% return either 'ok' (when everything is fine) or 'error'
|
%% return either 'ok' (when everything is fine) or 'error'
|
||||||
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||||
%% For QoS 0 message, send it as regular dispatch
|
%% For QoS 0 message, send it as regular dispatch
|
||||||
SubPid ! {deliver, Topic, Msg},
|
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||||
ok;
|
|
||||||
do_dispatch(SubPid, Group, Topic, Msg, Type) ->
|
do_dispatch(SubPid, Group, Topic, Msg, Type) ->
|
||||||
case ack_enabled() of
|
case ack_enabled() of
|
||||||
true ->
|
true ->
|
||||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
|
dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
|
||||||
false ->
|
false ->
|
||||||
SubPid ! {deliver, Topic, Msg},
|
send(SubPid, Topic, {deliver, Topic, Msg})
|
||||||
ok
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||||
%% For QoS 1/2 message, expect an ack
|
%% For QoS 1/2 message, expect an ack
|
||||||
Ref = erlang:monitor(process, SubPid),
|
Ref = erlang:monitor(process, SubPid),
|
||||||
Sender = self(),
|
Sender = self(),
|
||||||
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
|
send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}),
|
||||||
Timeout = case Msg#message.qos of
|
Timeout = case Msg#message.qos of
|
||||||
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
|
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
|
||||||
?QOS_2 -> infinity
|
?QOS_2 -> infinity
|
||||||
|
@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||||
_ = erlang:demonitor(Ref, [flush])
|
_ = erlang:demonitor(Ref, [flush])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
send(Pid, Topic, Msg) ->
|
||||||
|
Node = node(Pid),
|
||||||
|
if Node =:= node() ->
|
||||||
|
Pid ! Msg;
|
||||||
|
true ->
|
||||||
|
emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg])
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
with_group_ack(Msg, Group, Type, Sender, Ref) ->
|
with_group_ack(Msg, Group, Type, Sender, Ref) ->
|
||||||
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).
|
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue