From 9dfa82b4484e79ba6f84541a4e5710e4e9fdb9d2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 5 Sep 2022 13:38:54 +0200 Subject: [PATCH] fix(shared_sub): Dispatch shared messages via gen_rpc --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx/src/emqx_shared_sub.erl | 37 ++++++++----- .../src/proto/emqx_shared_sub_proto_v1.erl | 50 +++++++++++++++++ apps/emqx/test/emqx_shared_sub_SUITE.erl | 53 +++++++++++++++++++ 4 files changed, 129 insertions(+), 12 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index d1176e7ac..e224e6425 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -26,6 +26,7 @@ {emqx_resource,1}. {emqx_retainer,1}. {emqx_rule_engine,1}. +{emqx_shared_sub,1}. {emqx_slow_subs,1}. {emqx_statsd,1}. {emqx_telemetry,1}. diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 3d14dddd0..0527cbfe7 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -38,7 +38,8 @@ -export([ dispatch/3, - dispatch/4 + dispatch/4, + do_dispatch_with_ack/4 ]). -export([ @@ -170,30 +171,31 @@ do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); do_dispatch(SubPid, Group, Topic, Msg, fresh) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Group, Topic, Msg); + %% FIXME: replace with `emqx_shared_sub_proto:dispatch_with_ack' in 5.2 + do_dispatch_with_ack(SubPid, Group, Topic, Msg); false -> - SubPid ! {deliver, Topic, Msg}, - ok + send(SubPid, Topic, {deliver, Topic, Msg}) end. -dispatch_with_ack(SubPid, Group, Topic, Msg) -> +-spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) -> + ok | {error, _}. +do_dispatch_with_ack(SubPid, Group, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, + %% FIXME: replace with regular send in 5.2 + send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}), Timeout = case Msg#message.qos of - ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); - ?QOS_2 -> infinity + ?QOS_2 -> infinity; + _ -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS) end, try receive @@ -412,6 +414,17 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +send(Pid, Topic, Msg) -> + Node = node(Pid), + _ = + case Node =:= node() of + true -> + Pid ! Msg; + false -> + emqx_shared_sub_proto_v1:send(Node, Pid, Topic, Msg) + end, + ok. + maybe_insert_round_robin_count({Group, _Topic} = GroupTopic) -> strategy(Group) =:= round_robin_per_group andalso ets:insert(?SHARED_SUBS_ROUND_ROBIN_COUNTER, {GroupTopic, 0}), diff --git a/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl b/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl new file mode 100644 index 000000000..b224bf705 --- /dev/null +++ b/apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl @@ -0,0 +1,50 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_shared_sub_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + send/4, + dispatch_with_ack/5 +]). + +-include("bpapi.hrl"). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +introduced_in() -> + "5.0.8". + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec send(node(), pid(), emqx_types:topic(), term()) -> true. +send(Node, Pid, Topic, Msg) -> + emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]). + +-spec dispatch_with_ack( + pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message(), timeout() +) -> + ok | {error, _}. +dispatch_with_ack(Pid, Group, Topic, Msg, Timeout) -> + emqx_rpc:call( + Topic, node(Pid), emqx_shared_sub, do_dispatch_with_ack, [Pid, Group, Topic, Msg], Timeout + ). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index aeb44d529..f53e8f374 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -567,6 +567,59 @@ t_local(_) -> ?assertNotEqual(UsedSubPid1, UsedSubPid2), ok. +t_remote(_) -> + %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API. + %% + %% In this testcase we start two EMQX nodes: local and remote. + %% A subscriber connects to the remote node. + %% A publisher connects to the local node and sends three messages with different QoS. + %% The test verifies that the remote side received all three messages. + ok = ensure_config(sticky, true), + GroupConfig = #{ + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }, + + Node = start_slave('remote_shared_sub_testtesttest', 21999), + ok = ensure_group_config(GroupConfig), + ok = ensure_group_config(Node, GroupConfig), + + Topic = <<"foo/bar">>, + ClientIdLocal = <<"ClientId1">>, + ClientIdRemote = <<"ClientId2">>, + + {ok, ConnPidLocal} = emqtt:start_link([{clientid, ClientIdLocal}]), + {ok, ConnPidRemote} = emqtt:start_link([{clientid, ClientIdRemote}, {port, 21999}]), + + try + {ok, ClientPidLocal} = emqtt:connect(ConnPidLocal), + {ok, ClientPidRemote} = emqtt:connect(ConnPidRemote), + + emqtt:subscribe(ConnPidRemote, {<<"$share/remote_group/", Topic/binary>>, 0}), + + ct:sleep(100), + + Message1 = emqx_message:make(ClientPidLocal, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientPidLocal, 1, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientPidLocal, 2, Topic, <<"hello3">>), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPidRemote]), + + emqx:publish(Message2), + {true, UsedSubPid1} = last_message(<<"hello2">>, [ConnPidRemote]), + + emqx:publish(Message3), + {true, UsedSubPid1} = last_message(<<"hello3">>, [ConnPidRemote]), + + ok + after + emqtt:stop(ConnPidLocal), + emqtt:stop(ConnPidRemote), + stop_slave(Node) + end. + t_local_fallback(_) -> ok = ensure_group_config(#{ <<"local_group">> => local,