Merge pull request #8893 from ieQu1/master

Dispatch shared messages via gen_rpc
This commit is contained in:
ieQu1 2022-09-05 15:13:42 +02:00 committed by GitHub
commit bf87889b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 142 additions and 15 deletions

View File

@ -26,6 +26,7 @@
{emqx_resource,1}.
{emqx_retainer,1}.
{emqx_rule_engine,1}.
{emqx_shared_sub,1}.
{emqx_slow_subs,1}.
{emqx_statsd,1}.
{emqx_telemetry,1}.

View File

@ -69,9 +69,12 @@ start() ->
announce(emqx).
%% @doc Get maximum version of the backplane API supported by the node
-spec supported_version(node(), api()) -> api_version().
-spec supported_version(node(), api()) -> api_version() | undefined.
supported_version(Node, API) ->
ets:lookup_element(?TAB, {Node, API}, #?TAB.version).
case ets:lookup(?TAB, {Node, API}) of
[#?TAB{version = V}] -> V;
[] -> undefined
end.
%% @doc Get maximum version of the backplane API supported by the
%% entire cluster

View File

@ -153,6 +153,8 @@ extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) ->
{call_or_cast(CallOrCast), M, F, A};
extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) ->
{call_or_cast(CallOrCast), M, F, A};
extract_mfa(?BACKEND(emqx_rpc, call), [_Tag, _Node, M, F, A, _Timeout]) ->
{call_or_cast(call), M, F, A};
%% (e)rpc:
extract_mfa(?BACKEND(rpc, multicall), [M, F, A]) ->
{call_or_cast(multicall), M, F, A};

View File

@ -22,6 +22,7 @@
-export([
call/4,
call/5,
call/6,
cast/4,
cast/5,
multicall/4,
@ -78,6 +79,10 @@ call(Node, Mod, Fun, Args) ->
call(Key, Node, Mod, Fun, Args) ->
filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)).
-spec call(term(), node(), module(), atom(), list(), timeout()) -> call_result().
call(Key, Node, Mod, Fun, Args, Timeout) ->
filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args, Timeout)).
-spec multicall([node()], module(), atom(), list()) -> multicall_result().
multicall(Nodes, Mod, Fun, Args) ->
gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args).

View File

@ -38,7 +38,8 @@
-export([
dispatch/3,
dispatch/4
dispatch/4,
do_dispatch_with_ack/4
]).
-export([
@ -170,30 +171,31 @@ do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% return either 'ok' (when everything is fine) or 'error'
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch
SubPid ! {deliver, Topic, Msg},
ok;
send(SubPid, Topic, {deliver, Topic, Msg});
do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
%% Retry implies all subscribers nack:ed, send again without ack
SubPid ! {deliver, Topic, Msg},
ok;
send(SubPid, Topic, {deliver, Topic, Msg});
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
case ack_enabled() of
true ->
dispatch_with_ack(SubPid, Group, Topic, Msg);
%% FIXME: replace with `emqx_shared_sub_proto:dispatch_with_ack' in 5.2
do_dispatch_with_ack(SubPid, Group, Topic, Msg);
false ->
SubPid ! {deliver, Topic, Msg},
ok
send(SubPid, Topic, {deliver, Topic, Msg})
end.
dispatch_with_ack(SubPid, Group, Topic, Msg) ->
-spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) ->
ok | {error, _}.
do_dispatch_with_ack(SubPid, Group, Topic, Msg) ->
%% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid),
Sender = self(),
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
%% FIXME: replace with regular send in 5.2
send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}),
Timeout =
case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity
?QOS_2 -> infinity;
_ -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS)
end,
try
receive
@ -412,6 +414,17 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
send(Pid, Topic, Msg) ->
Node = node(Pid),
_ =
case Node =:= node() of
true ->
Pid ! Msg;
false ->
emqx_shared_sub_proto_v1:send(Node, Pid, Topic, Msg)
end,
ok.
maybe_insert_round_robin_count({Group, _Topic} = GroupTopic) ->
strategy(Group) =:= round_robin_per_group andalso
ets:insert(?SHARED_SUBS_ROUND_ROBIN_COUNTER, {GroupTopic, 0}),

View File

@ -0,0 +1,50 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_shared_sub_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
send/4,
dispatch_with_ack/5
]).
-include("bpapi.hrl").
%%================================================================================
%% behavior callbacks
%%================================================================================
introduced_in() ->
"5.0.8".
%%================================================================================
%% API funcions
%%================================================================================
-spec send(node(), pid(), emqx_types:topic(), term()) -> true.
send(Node, Pid, Topic, Msg) ->
emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]).
-spec dispatch_with_ack(
pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message(), timeout()
) ->
ok | {error, _}.
dispatch_with_ack(Pid, Group, Topic, Msg, Timeout) ->
emqx_rpc:call(
Topic, node(Pid), emqx_shared_sub, do_dispatch_with_ack, [Pid, Group, Topic, Msg], Timeout
).

View File

@ -40,7 +40,7 @@ end_per_suite(_Config) ->
t_max_supported_version(_Config) ->
?assertMatch(3, emqx_bpapi:supported_version('fake-node2@localhost', api2)),
?assertMatch(2, emqx_bpapi:supported_version(api2)),
?assertError(_, emqx_bpapi:supported_version('fake-node2@localhost', nonexistent_api)),
?assertMatch(undefined, emqx_bpapi:supported_version('fake-node2@localhost', nonexistent_api)),
?assertError(_, emqx_bpapi:supported_version(nonexistent_api)).
t_announce(Config) ->

View File

@ -567,6 +567,59 @@ t_local(_) ->
?assertNotEqual(UsedSubPid1, UsedSubPid2),
ok.
t_remote(_) ->
%% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
%%
%% In this testcase we start two EMQX nodes: local and remote.
%% A subscriber connects to the remote node.
%% A publisher connects to the local node and sends three messages with different QoS.
%% The test verifies that the remote side received all three messages.
ok = ensure_config(sticky, true),
GroupConfig = #{
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
<<"sticky_group">> => sticky
},
Node = start_slave('remote_shared_sub_testtesttest', 21999),
ok = ensure_group_config(GroupConfig),
ok = ensure_group_config(Node, GroupConfig),
Topic = <<"foo/bar">>,
ClientIdLocal = <<"ClientId1">>,
ClientIdRemote = <<"ClientId2">>,
{ok, ConnPidLocal} = emqtt:start_link([{clientid, ClientIdLocal}]),
{ok, ConnPidRemote} = emqtt:start_link([{clientid, ClientIdRemote}, {port, 21999}]),
try
{ok, ClientPidLocal} = emqtt:connect(ConnPidLocal),
{ok, ClientPidRemote} = emqtt:connect(ConnPidRemote),
emqtt:subscribe(ConnPidRemote, {<<"$share/remote_group/", Topic/binary>>, 0}),
ct:sleep(100),
Message1 = emqx_message:make(ClientPidLocal, 0, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientPidLocal, 1, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientPidLocal, 2, Topic, <<"hello3">>),
emqx:publish(Message1),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPidRemote]),
emqx:publish(Message2),
{true, UsedSubPid1} = last_message(<<"hello2">>, [ConnPidRemote]),
emqx:publish(Message3),
{true, UsedSubPid1} = last_message(<<"hello3">>, [ConnPidRemote]),
ok
after
emqtt:stop(ConnPidLocal),
emqtt:stop(ConnPidRemote),
stop_slave(Node)
end.
t_local_fallback(_) ->
ok = ensure_group_config(#{
<<"local_group">> => local,