diff --git a/etc/emqx.conf b/etc/emqx.conf index 5a7a698d7..554238f2c 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1694,11 +1694,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: Number ## bridge.aws.subscription.2.qos = 1 -## Maximum number of messages in one batch for buffer queue to store +## Maximum number of messages in one batch when sending to remote borkers +## NOTE: when bridging viar MQTT connection to remote broker, this config is only +## used for internal message passing optimization as the underlying MQTT +## protocol does not supports batching. ## ## Value: Integer -## default: 1000 -## bridge.aws.queue.batch_size = 1000 +## default: 32 +## bridge.aws.queue.batch_size = 32 ## Base directory for replayq to store messages on disk ## If this config entry is missing or set to undefined, @@ -1844,11 +1847,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: Number ## bridge.azure.subscription.2.qos = 1 -## Batch size for buffer queue stored +## Maximum number of messages in one batch when sending to remote borkers +## NOTE: when bridging viar MQTT connection to remote broker, this config is only +## used for internal message passing optimization as the underlying MQTT +## protocol does not supports batching. ## ## Value: Integer -## default: 1000 -## bridge.azure.queue.batch_size = 1000 +## default: 32 +## bridge.azure.queue.batch_size = 32 ## Base directory for replayq to store messages on disk ## If this config entry is missing or set to undefined, diff --git a/src/emqx_portal_connect.erl b/src/emqx_portal_connect.erl index 35f4b53dc..ab3a3f5b8 100644 --- a/src/emqx_portal_connect.erl +++ b/src/emqx_portal_connect.erl @@ -20,11 +20,12 @@ -optional_callbacks([]). +%% map fields depend on implementation -type config() :: map(). -type connection() :: term(). -type conn_ref() :: term(). -type batch() :: emqx_protal:batch(). --type batch_ref() :: reference(). +-type ack_ref() :: emqx_portal:ack_ref(). -include("logger.hrl"). @@ -36,7 +37,7 @@ %% send to remote node/cluster %% portal worker (the caller process) should be expecting %% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster --callback send(connection(), batch()) -> {ok, batch_ref()} | {error, any()}. +-callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}. %% called when owner is shutting down. -callback stop(conn_ref(), connection()) -> ok. diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index 7e7d678f6..623f7f233 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -52,6 +52,9 @@ %% to support automatic load-balancing, i.e. in case it can not keep up %% with the amount of messages comming in, administrator should split and %% balance topics between worker/connections manually. +%% +%% NOTES: +%% * Local messages are all normalised to QoS-1 when exporting to remote -module(emqx_portal). -behaviour(gen_statem). @@ -71,17 +74,18 @@ -export_type([config/0, batch/0, - ref/0 + ack_ref/0 ]). -type config() :: map(). -type batch() :: [emqx_portal_msg:msg()]. --type ref() :: reference(). +-type ack_ref() :: term(). -include("logger.hrl"). -include("emqx_mqtt.hrl"). --define(DEFAULT_BATCH_COUNT, 100). +%% same as default in-flight limit for emqx_client +-define(DEFAULT_BATCH_COUNT, 32). -define(DEFAULT_BATCH_BYTES, 1 bsl 20). -define(DEFAULT_SEND_AHEAD, 8). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). @@ -110,7 +114,7 @@ start_link(Name, Config) when is_list(Config) -> start_link(Name, maps:from_list(Config)); start_link(Name, Config) -> - gen_statem:start_link({local, Name}, ?MODULE, Config, []). + gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). stop(Pid) -> gen_statem:stop(Pid). @@ -122,7 +126,7 @@ import_batch(Batch, AckFun) -> %% @doc This function is to be evaluated on message/batch exporter side %% when message/batch is accepted by remote node. --spec handle_ack(pid(), ref()) -> ok. +-spec handle_ack(pid(), ack_ref()) -> ok. handle_ack(Pid, Ref) when node() =:= node(Pid) -> Pid ! {batch_ack, Ref}, ok. @@ -231,7 +235,8 @@ connected(internal, maybe_send, State) -> end; connected(info, {disconnected, ConnRef, Reason}, #{conn_ref := ConnRef, connection := Conn} = State) -> - ?INFO("Portal ~p diconnected~nreason=~p", [Conn, Reason]), + ?INFO("Portal ~p diconnected~nreason=~p", + [name(), Conn, Reason]), {next_state, connecting, State#{conn_ref := undefined, connection := undefined @@ -255,7 +260,8 @@ common(_StateName, info, {dispatch, _, Msg}, NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?INFO("Ignored unknown ~p event ~p at state ~p", [Type, Content, StateName]), + ?DEBUG("Portal ~p discarded ~p type event at state ~p:~p", + [name(), Type, StateName, Content]), {keep_state, State}. collect(Acc) -> @@ -300,6 +306,7 @@ pop_and_send(#{replayq := Q, do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> case maybe_send(State, Batch) of {ok, Ref} -> + %% this is a list of inflight BATCHes, not expecting it to be too long NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, send_ack_ref => Ref, batch => Batch @@ -326,8 +333,6 @@ subscribe_local_topics(Topics) -> emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}) end, Topics). -name() -> {_, Name} = process_info(self(), registered_name), Name. - disconnect(#{connection := Conn, conn_ref := ConnRef, connect_module := Module @@ -347,10 +352,14 @@ maybe_send(#{connect_module := Module, connection := Connection, mountpoint := Mountpoint }, Batch) -> - Module:send(Connection, [emqx_portal_msg:apply_mountpoint(M, Mountpoint) || M <- Batch]). + Module:send(Connection, [emqx_portal_msg:to_export(M, Mountpoint) || M <- Batch]). format_mountpoint(undefined) -> undefined; format_mountpoint(Prefix) -> binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). +name() -> {_, Name} = process_info(self(), registered_name), Name. + +name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). + diff --git a/src/portal/emqx_portal_mqtt.erl b/src/portal/emqx_portal_mqtt.erl new file mode 100644 index 000000000..0ce5140b0 --- /dev/null +++ b/src/portal/emqx_portal_mqtt.erl @@ -0,0 +1,135 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements EMQX Portal transport layer on top of MQTT protocol + +-module(emqx_portal_mqtt). +-behaviour(emqx_portal_connect). + +%% behaviour callbacks +-export([start/1, + send/2, + stop/2 + ]). + +-include("emqx_mqtt.hrl"). + +-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}). + +%% Messages towards ack collector process +-define(SENT(MaxPktId), {sent, MaxPktId}). +-define(ACKED(AnyPktId), {acked, AnyPktId}). +-define(STOP(Ref), {stop, Ref}). + +start(Config) -> + Ref = make_ref(), + Parent = self(), + AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end), + Handlers = make_hdlr(Parent, AckCollector, Ref), + case emqx_client:start_link(Config#{msg_handler => Handlers, owner => AckCollector}) of + {ok, Pid} -> + case emqx_client:connect(Pid) of + {ok, _} -> + %% ack collector is always a new pid every reconnect. + %% use it as a connection reference + {ok, Ref, #{ack_collector => AckCollector, + client_pid => Pid}}; + {error, Reason} -> + ok = stop(AckCollector, Pid), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +stop(Ref, #{ack_collector := AckCollector, + client_pid := Pid}) -> + MRef = monitor(process, AckCollector), + unlink(AckCollector), + _ = AckCollector ! ?STOP(Ref), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + 1000 -> + exit(AckCollector, kill) + end, + _ = emqx_client:stop(Pid), + ok. + +send(#{client_pid := ClientPid, ack_collector := AckCollector}, Batch) -> + send_loop(ClientPid, AckCollector, Batch). + +send_loop(ClientPid, AckCollector, [Msg | Rest]) -> + case emqx_client:publish(ClientPid, Msg) of + {ok, PktId} when Rest =:= [] -> + Rest =:= [] andalso AckCollector ! ?SENT(PktId), + {ok, PktId}; + {ok, _PktId} -> + send_loop(ClientPid, AckCollector, Rest); + {error, {_PacketId, inflight_full}} -> + timer:sleep(100), + send_loop(ClientPid, AckCollector, [Msg | Rest]); + {error, Reason} -> + %% There is no partial sucess of a batch and recover from the middle + %% only to retry all messages in one batch + {error, Reason} + end. + +ack_collector(Parent, ConnRef) -> + ack_collector(Parent, ConnRef, []). + +ack_collector(Parent, ConnRef, PktIds) -> + NewIds = + receive + ?STOP(ConnRef) -> + exit(normal); + ?SENT(PktId) -> + %% this ++ only happens per-BATCH, hence no optimization + PktIds ++ [PktId]; + ?ACKED(PktId) -> + handle_ack(Parent, PktId, PktIds) + after + 200 -> + PktIds + end, + ack_collector(Parent, ConnRef, NewIds). + +handle_ack(Parent, PktId, [PktId | Rest]) -> + %% A batch is finished, time to ack portal + ok = emqx_portal:handle_ack(Parent, PktId), + Rest; +handle_ack(_Parent, PktId, [BatchMaxPktId | _] = All) -> + %% partial ack of a batch, terminate here. + true = (PktId < BatchMaxPktId), %% bad order otherwise + All. + +%% When puback for QoS-1 message is received from remote MQTT broker +%% NOTE: no support for QoS-2 +handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) -> + RC =:= ?RC_SUCCESS andalso error(RC), + AckCollector ! ?ACKED(PktId), + ok. + +%% Message published from remote broker. Import to local broker. +import_msg(Msg) -> + %% auto-ack should be enabled in emqx_client, hence dummy ack-fun. + emqx_portal:import_batch([Msg], _AckFun = fun() -> ok end). + +make_hdlr(Parent, AckCollector, Ref) -> + #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end, + publish => fun(Msg) -> import_msg(Msg) end, + disconnected => fun(RC, _Properties) -> Parent ! {disconnected, Ref, RC}, ok end + }. + diff --git a/src/portal/emqx_portal_msg.erl b/src/portal/emqx_portal_msg.erl index af9cc1b01..252ea72d0 100644 --- a/src/portal/emqx_portal_msg.erl +++ b/src/portal/emqx_portal_msg.erl @@ -16,7 +16,7 @@ -export([ to_binary/1 , from_binary/1 - , apply_mountpoint/2 + , to_export/2 , to_broker_msgs/1 , estimate_size/1 ]). @@ -28,10 +28,12 @@ -type msg() :: emqx_types:message(). -%% @doc Mount topic to a prefix. --spec apply_mountpoint(msg(), undefined | binary()) -> msg(). -apply_mountpoint(#{topic := Topic} = Msg, Mountpoint) -> - Msg#{topic := topic(Mountpoint, Topic)}. +%% @doc Make export format: +%% 1. Mount topic to a prefix +%% 2. fix QoS to 1 +-spec to_export(msg(), undefined | binary()) -> msg(). +to_export(#{topic := Topic} = Msg, Mountpoint) -> + Msg#{topic := topic(Mountpoint, Topic), qos => 1}. %% @doc Make `binary()' in order to make iodata to be persisted on disk. -spec to_binary(msg()) -> binary(). diff --git a/src/portal/emqx_portal_rpc.erl b/src/portal/emqx_portal_rpc.erl index 8b1136353..fcd8b24e9 100644 --- a/src/portal/emqx_portal_rpc.erl +++ b/src/portal/emqx_portal_rpc.erl @@ -29,7 +29,7 @@ , heartbeat/2 ]). --type batch_ref() :: emqx_portal:batch_ref(). +-type ack_ref() :: emqx_portal:ack_ref(). -type batch() :: emqx_portal:batch(). -define(HEARTBEAT_INTERVAL, timer:seconds(1)). @@ -59,7 +59,7 @@ stop(Pid, _Remote) when is_pid(Pid) -> ok. %% @doc Callback for `emqx_portal_connect' behaviour --spec send(node(), batch()) -> {ok, batch_ref()} | {error, any()}. +-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. send(Remote, Batch) -> Sender = self(), case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of @@ -68,7 +68,7 @@ send(Remote, Batch) -> end. %% @doc Handle send on receiver side. --spec handle_send(pid(), batch()) -> {ok, batch_ref()} | {error, any()}. +-spec handle_send(pid(), batch()) -> {ok, ack_ref()} | {error, any()}. handle_send(SenderPid, Batch) -> SenderNode = node(SenderPid), Ref = make_ref(), diff --git a/test/emqx_portal_mqtt_tests.erl b/test/emqx_portal_mqtt_tests.erl new file mode 100644 index 000000000..0312bca49 --- /dev/null +++ b/test/emqx_portal_mqtt_tests.erl @@ -0,0 +1,62 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_mqtt_tests). +-include_lib("eunit/include/eunit.hrl"). + +send_and_ack_test() -> + %% delegate from gen_rpc to rpc for unit test + Tester = self(), + meck:new(emqx_client, [passthrough, no_history]), + meck:expect(emqx_client, start_link, 1, + fun(#{msg_handler := Hdlr}) -> {ok, Hdlr} end), + meck:expect(emqx_client, connect, 1, {ok, dummy}), + meck:expect(emqx_client, stop, 1, ok), + meck:expect(emqx_client, publish, 2, + fun(_Conn, Msg) -> + case rand:uniform(100) of + 1 -> + {error, {dummy, inflight_full}}; + _ -> + Tester ! {published, Msg}, + {ok, Msg} + end + end), + try + Max = 100, + Batch = lists:seq(1, Max), + {ok, Ref, Conn} = emqx_portal_mqtt:start(#{}), + %% return last packet id as batch reference + {ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch), + %% expect batch ack + {ok, LastId} = collect_acks(Conn, Batch), + %% asset received ack matches the batch ref returned in send API + ?assertEqual(AckRef, LastId), + ok = emqx_portal_mqtt:stop(Ref, Conn) + after + meck:unload(emqx_client) + end. + +collect_acks(_Conn, []) -> + receive {batch_ack, Id} -> {ok, Id} end; +collect_acks(#{client_pid := Client} = Conn, [Id | Rest]) -> + %% mocked for testing, should be a pid() at runtime + #{puback := PubAckCallback} = Client, + receive + {published, Id} -> + PubAckCallback(#{packet_id => Id, reason_code => dummy}), + collect_acks(Conn, Rest) + end. + + diff --git a/test/emqx_portal_tests.erl b/test/emqx_portal_tests.erl index 5826a327e..85975da3e 100644 --- a/test/emqx_portal_tests.erl +++ b/test/emqx_portal_tests.erl @@ -18,7 +18,8 @@ -include_lib("eunit/include/eunit.hrl"). -include("emqx_mqtt.hrl"). --define(PORTAL_NAME, test_portal). +-define(PORTAL_NAME, test). +-define(PORTAL_REG_NAME, emqx_portal_test). -define(WAIT(PATTERN, TIMEOUT), receive PATTERN -> @@ -49,11 +50,11 @@ reconnect_test() -> Config = make_config(Ref, self(), {error, test}), {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), %% assert name registered - ?assertEqual(Pid, whereis(?PORTAL_NAME)), + ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), ?WAIT({connection_start_attempt, Ref}, 1000), %% expect same message again ?WAIT({connection_start_attempt, Ref}, 1000), - ok = emqx_portal:stop(?PORTAL_NAME), + ok = emqx_portal:stop(?PORTAL_REG_NAME), ok. %% connect first, disconnect, then connect again @@ -61,11 +62,11 @@ disturbance_test() -> Ref = make_ref(), Config = make_config(Ref, self(), {ok, Ref, connection}), {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), - ?assertEqual(Pid, whereis(?PORTAL_NAME)), + ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), ?WAIT({connection_start_attempt, Ref}, 1000), Pid ! {disconnected, Ref, test}, ?WAIT({connection_start_attempt, Ref}, 1000), - ok = emqx_portal:stop(?PORTAL_NAME). + ok = emqx_portal:stop(?PORTAL_REG_NAME). %% buffer should continue taking in messages when disconnected buffer_when_disconnected_test_() -> @@ -88,11 +89,11 @@ test_buffer_when_disconnected() -> {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), Sender ! {portal, Pid}, Receiver ! {portal, Pid}, - ?assertEqual(Pid, whereis(?PORTAL_NAME)), + ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), Pid ! {disconnected, Ref, test}, ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 2000), ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), - ok = emqx_portal:stop(?PORTAL_NAME). + ok = emqx_portal:stop(?PORTAL_REG_NAME). %% Feed messages to portal sender_loop(_Pid, [], _) -> exit(normal);