From fbe67e67842edeeabd765be465ce7456909f31c6 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Mon, 14 Jan 2019 07:38:43 +0100 Subject: [PATCH] Introduce new bridge impl --- Makefile | 8 +- etc/emqx.conf | 21 +- .../emqx_client.hrl | 18 +- priv/emqx.schema | 85 +++- rebar.config | 3 + src/emqx_bridge.erl | 463 ------------------ src/emqx_bridge_sup.erl | 45 -- src/emqx_client.erl | 6 +- src/emqx_local_bridge.erl | 157 ------ src/emqx_local_bridge_sup_sup.erl | 74 --- src/emqx_mqueue.erl | 2 +- src/emqx_portal_connect.erl | 65 +++ src/emqx_sup.erl | 8 +- src/emqx_topic.erl | 14 +- src/portal/emqx_portal.erl | 356 ++++++++++++++ src/portal/emqx_portal_msg.erl | 61 +++ src/portal/emqx_portal_rpc.erl | 106 ++++ src/portal/emqx_portal_sup.erl | 54 ++ test/emqx_bridge_SUITE.erl | 58 --- test/emqx_ct_broker_helpers.erl | 48 +- test/emqx_pool_SUITE.erl | 2 +- test/emqx_portal_rpc_tests.erl | 43 ++ test/emqx_portal_tests.erl | 146 ++++++ 23 files changed, 956 insertions(+), 887 deletions(-) rename src/emqx_local_bridge_sup.erl => include/emqx_client.hrl (55%) delete mode 100644 src/emqx_bridge.erl delete mode 100644 src/emqx_bridge_sup.erl delete mode 100644 src/emqx_local_bridge.erl delete mode 100644 src/emqx_local_bridge_sup_sup.erl create mode 100644 src/emqx_portal_connect.erl create mode 100644 src/portal/emqx_portal.erl create mode 100644 src/portal/emqx_portal_msg.erl create mode 100644 src/portal/emqx_portal_rpc.erl create mode 100644 src/portal/emqx_portal_sup.erl delete mode 100644 test/emqx_bridge_SUITE.erl create mode 100644 test/emqx_portal_rpc_tests.erl create mode 100644 test/emqx_portal_tests.erl diff --git a/Makefile b/Makefile index 6022dbf8f..f6230d2c7 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,8 @@ ERLC_OPTS += +debug_info -DAPPLICATION=emqx BUILD_DEPS = cuttlefish dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1 -#TEST_DEPS = emqx_ct_helplers -#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers +TEST_DEPS = meck +dep_meck = hex-emqx 0.8.13 TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx @@ -35,7 +35,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message @@ -96,7 +96,7 @@ rebar-deps: @rebar3 get-deps rebar-eunit: $(CUTTLEFISH_SCRIPT) - @rebar3 eunit + @rebar3 eunit -v rebar-compile: @rebar3 compile diff --git a/etc/emqx.conf b/etc/emqx.conf index 166fca25c..5a7a698d7 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1694,14 +1694,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: Number ## bridge.aws.subscription.2.qos = 1 -## If enabled, queue would be written into disk more quickly. -## However, If disabled, some message would be dropped in -## the situation emqx crashed. -## -## Value: on | off -## bridge.aws.queue.mem_cache = on - -## Batch size for buffer queue stored +## Maximum number of messages in one batch for buffer queue to store ## ## Value: Integer ## default: 1000 @@ -1709,9 +1702,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Base directory for replayq to store messages on disk ## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. If the config -## entry was set to `bridge.aws.mqueue_type = memory` -## this config entry would have no effect on mqueue +## replayq works in a mem-only manner. ## ## Value: String ## bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ @@ -1861,13 +1852,11 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Base directory for replayq to store messages on disk ## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. If the config -## entry was set to `bridge.aws.mqueue_type = memory` -## this config entry would have no effect on mqueue +## replayq works in a mem-only manner. ## ## Value: String -## Default: {{ platform_data_dir }}/emqx_aws_bridge/ -## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ +## Default: "" +## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_azure.bridge/ ## Replayq segment size ## diff --git a/src/emqx_local_bridge_sup.erl b/include/emqx_client.hrl similarity index 55% rename from src/emqx_local_bridge_sup.erl rename to include/emqx_client.hrl index db349b94d..ce66c98d0 100644 --- a/src/emqx_local_bridge_sup.erl +++ b/include/emqx_client.hrl @@ -1,4 +1,4 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -12,15 +12,9 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_local_bridge_sup). - --include("emqx.hrl"). - --export([start_link/3]). - --spec(start_link(node(), emqx_topic:topic(), [emqx_local_bridge:option()]) - -> {ok, pid()} | {error, term()}). -start_link(Node, Topic, Options) -> - MFA = {emqx_local_bridge, start_link, [Node, Topic, Options]}, - emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA). +-ifndef(EMQX_CLIENT_HRL). +-define(EMQX_CLIENT_HRL, true). +-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, + packet_id, topic, props, payload}). +-endif. diff --git a/priv/emqx.schema b/priv/emqx.schema index 9e71248a7..7082d4f87 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1512,20 +1512,9 @@ end}. %%-------------------------------------------------------------------- %% Bridges %%-------------------------------------------------------------------- -{mapping, "bridge.$name.queue.mem_cache", "emqx.bridges", [ - {datatype, flag} -]}. - -{mapping, "bridge.$name.queue.batch_size", "emqx.bridges", [ - {datatype, integer} -]}. - -{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [ - {datatype, bytesize} +{mapping, "bridge.$name.transport", "emqx.bridges", [ + {default, mqtt_client}, + {datatype, {enum, [emqx_portal, mqtt_client]}} ]}. {mapping, "bridge.$name.address", "emqx.bridges", [ @@ -1611,16 +1600,23 @@ end}. {datatype, {duration, ms}} ]}. -{mapping, "bridge.$name.retry_interval", "emqx.bridges", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - {mapping, "bridge.$name.max_inflight", "emqx.bridges", [ {default, 0}, {datatype, integer} ]}. +{mapping, "bridge.$name.queue.batch_size", "emqx.bridges", [ + {datatype, integer} +]}. + +{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [ + {datatype, bytesize} +]}. + {translation, "emqx.bridges", fun(Conf) -> Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, @@ -1661,17 +1657,58 @@ end}. lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) end, - + IsNodeAddr = fun(Addr) -> + case string:tokens(Addr, "@") of + [_NodeName, _Hostname] -> true; + _ -> false + end + end, + ConnMod = fun(Name) -> + [Addr] = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".address", Conf), + Subs = Subscriptions(Name), + case IsNodeAddr(Addr) of + true when Subs =/= [] -> + error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs}); + true -> + emqx_portal_rpc; + false -> + emqx_portal_mqtt + end + end, + %% to be backward compatible + Translate = + fun Tr(queue, Q, Cfg) -> + NewQ = maps:fold(Tr, #{}, Q), + Cfg#{queue => NewQ}; + Tr(address, Addr0, Cfg) -> + Addr = case IsNodeAddr(Addr0) of + true -> list_to_atom(Addr0); + false -> Addr0 + end, + Cfg#{address => Addr}; + Tr(batch_size, Count, Cfg) -> + Cfg#{batch_count_limit => Count}; + Tr(reconnect_interval, Ms, Cfg) -> + Cfg#{reconnect_delay_ms => Ms}; + Tr(max_inflight, Count, Cfg) -> + Cfg#{max_inflight_batches => Count}; + Tr(Key, Value, Cfg) -> + Cfg#{Key => Value} + end, maps:to_list( lists:foldl( fun({["bridge", Name, Opt], Val}, Acc) -> %% e.g #{aws => [{OptKey, OptVal}]} - Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}, {queue, Queue(Name)}], - maps:update_with(list_to_atom(Name), - fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); + Init = [{list_to_atom(Opt), Val}, + {connect_module, ConnMod(Name)}, + {subscriptions, Subscriptions(Name)}, + {queue, Queue(Name)} + ], + C = maps:update_with(list_to_atom(Name), + fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc), + maps:fold(Translate, #{}, C); (_, Acc) -> Acc end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf)))) - end}. %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index 7486e7267..b6c68208b 100644 --- a/rebar.config +++ b/rebar.config @@ -27,3 +27,6 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. + +{profiles, [{test, [{deps, [{meck, "0.8.13"}]}]}]}. + diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl deleted file mode 100644 index 1ee5612e6..000000000 --- a/src/emqx_bridge.erl +++ /dev/null @@ -1,463 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge). - --behaviour(gen_server). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - --import(proplists, [get_value/2, get_value/3]). - --export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). - --export([show_forwards/1, add_forward/2, del_forward/2]). - --export([show_subscriptions/1, add_subscription/3, del_subscription/2]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --record(state, {client_pid :: pid(), - options :: list(), - reconnect_interval :: pos_integer(), - mountpoint :: binary(), - readq :: list(), - writeq :: list(), - replayq :: map(), - ackref :: replayq:ack_ref(), - queue_option :: map(), - forwards :: list(), - subscriptions :: list()}). - --record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, - packet_id, topic, props, payload}). - -start_link(Name, Options) -> - gen_server:start_link({local, name(Name)}, ?MODULE, [Options], []). - -start_bridge(Name) -> - gen_server:call(name(Name), start_bridge). - -stop_bridge(Name) -> - gen_server:call(name(Name), stop_bridge). - --spec(show_forwards(atom()) -> list()). -show_forwards(Name) -> - gen_server:call(name(Name), show_forwards). - --spec(add_forward(atom(), binary()) -> ok | {error, already_exists | validate_fail}). -add_forward(Name, Topic) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {add_forward, Topic}) - catch - _Error:_Reason -> - {error, validate_fail} - end. - --spec(del_forward(atom(), binary()) -> ok | {error, validate_fail}). -del_forward(Name, Topic) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {del_forward, Topic}) - catch - _Error:_Reason -> - {error, validate_fail} - end. - --spec(show_subscriptions(atom()) -> list()). -show_subscriptions(Name) -> - gen_server:call(name(Name), show_subscriptions). - --spec(add_subscription(atom(), binary(), integer()) -> ok | {error, already_exists | validate_fail}). -add_subscription(Name, Topic, QoS) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {add_subscription, Topic, QoS}) - catch - _Error:_Reason -> - {error, validate_fail} - end. - --spec(del_subscription(atom(), binary()) -> ok | {error, validate_fail}). -del_subscription(Name, Topic) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {del_subscription, Topic}) - catch - error:_Reason -> - {error, validate_fail} - end. - -status(Pid) -> - gen_server:call(Pid, status). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Options]) -> - process_flag(trap_exit, true), - case get_value(start_type, Options, manual) of - manual -> ok; - auto -> erlang:send_after(1000, self(), start) - end, - ReconnectInterval = get_value(reconnect_interval, Options, 30000), - Mountpoint = format_mountpoint(get_value(mountpoint, Options)), - QueueOptions = get_value(queue, Options), - {ok, #state{mountpoint = Mountpoint, - queue_option = QueueOptions, - readq = [], - writeq = [], - options = Options, - reconnect_interval = ReconnectInterval}}. - -handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> - {Msg, NewState} = bridge(start, State), - {reply, #{msg => Msg}, NewState}; - -handle_call(start_bridge, _From, State) -> - {reply, #{msg => <<"bridge already started">>}, State}; - -handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> - {reply, #{msg => <<"bridge not started">>}, State}; - -handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> - emqx_client:disconnect(Pid), - {reply, #{msg => <<"stop bridge successfully">>}, State}; - -handle_call(status, _From, State = #state{client_pid = undefined}) -> - {reply, #{status => <<"Stopped">>}, State}; -handle_call(status, _From, State = #state{client_pid = _Pid})-> - {reply, #{status => <<"Running">>}, State}; - -handle_call(show_forwards, _From, State = #state{forwards = Forwards}) -> - {reply, Forwards, State}; - -handle_call({add_forward, Topic}, _From, State = #state{forwards = Forwards}) -> - case not lists:member(Topic, Forwards) of - true -> - emqx_broker:subscribe(Topic), - {reply, ok, State#state{forwards = [Topic | Forwards]}}; - false -> - {reply, {error, already_exists}, State} - end; - -handle_call({del_forward, Topic}, _From, State = #state{forwards = Forwards}) -> - case lists:member(Topic, Forwards) of - true -> - emqx_broker:unsubscribe(Topic), - {reply, ok, State#state{forwards = lists:delete(Topic, Forwards)}}; - false -> - {reply, ok, State} - end; - -handle_call(show_subscriptions, _From, State = #state{subscriptions = Subscriptions}) -> - {reply, Subscriptions, State}; - -handle_call({add_subscription, Topic, Qos}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> - case not lists:keymember(Topic, 1, Subscriptions) of - true -> - emqx_client:subscribe(ClientPid, {Topic, Qos}), - {reply, ok, State#state{subscriptions = [{Topic, Qos} | Subscriptions]}}; - false -> - {reply, {error, already_exists}, State} - end; - -handle_call({del_subscription, Topic}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> - case lists:keymember(Topic, 1, Subscriptions) of - true -> - emqx_client:unsubscribe(ClientPid, Topic), - {reply, ok, State#state{subscriptions = lists:keydelete(Topic, 1, Subscriptions)}}; - false -> - {reply, ok, State} - end; - -handle_call(Req, _From, State) -> - emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), - {noreply, State}. - -%%---------------------------------------------------------------- -%% Start or restart bridge -%%---------------------------------------------------------------- -handle_info(start, State) -> - {_Msg, NewState} = bridge(start, State), - {noreply, NewState}; - -handle_info(restart, State) -> - {_Msg, NewState} = bridge(restart, State), - {noreply, NewState}; - -%%---------------------------------------------------------------- -%% pop message from replayq and publish again -%%---------------------------------------------------------------- -handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ, - queue_option = #{batch_size := BatchSize}}) -> - {NewReplayQ, AckRef, NewReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), - {NewReadQ1, NewWriteQ} = case NewReadQ of - [] -> {WriteQ, []}; - _ -> {NewReadQ, WriteQ} - end, - self() ! replay, - {noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}}; - -handle_info(dump, State = #state{writeq = WriteQ, replayq = ReplayQ}) -> - NewReplayQueue = replayq:append(ReplayQ, lists:reverse(WriteQ)), - {noreply, State#state{replayq = NewReplayQueue, writeq = []}}; - -%%---------------------------------------------------------------- -%% replay message from replayq -%%---------------------------------------------------------------- -handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) -> - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {noreply, State#state{readq = NewReadQ}}; - -%%---------------------------------------------------------------- -%% received local node message -%%---------------------------------------------------------------- -handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = undefined, - mountpoint = Mountpoint}) - when QoS =< 1 -> - Msg = #mqtt_msg{qos = 1, - retain = Retain, - topic = mountpoint(Mountpoint, Topic), - payload = Payload}, - {noreply, en_writeq({undefined, Msg}, State)}; -handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = Pid, - mountpoint = Mountpoint}) - when QoS =< 1 -> - Msg = #mqtt_msg{qos = 1, - retain = Retain, - topic = mountpoint(Mountpoint, Topic), - payload = Payload}, - case emqx_client:publish(Pid, Msg) of - {ok, PktId} -> - {noreply, en_writeq({PktId, Msg}, State)}; - {error, {PktId, Reason}} -> - emqx_logger:error("[Bridge] Publish fail:~p", [Reason]), - {noreply, en_writeq({PktId, Msg}, State)} - end; - -%%---------------------------------------------------------------- -%% received remote node message -%%---------------------------------------------------------------- -handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, - properties := Props, payload := Payload}}, State) -> - NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), - NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, NewMsg0)), - emqx_broker:publish(NewMsg1), - {noreply, State}; - -%%---------------------------------------------------------------- -%% received remote puback message -%%---------------------------------------------------------------- -handle_info({puback, #{packet_id := PktId}}, State) -> - {noreply, delete(PktId, State)}; - -handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> - emqx_logger:warning("[Bridge] stop ~p", [normal]), - self() ! dump, - {noreply, State#state{client_pid = undefined}}; - -handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, - reconnect_interval = ReconnectInterval}) -> - emqx_logger:error("[Bridge] stop ~p", [Reason]), - self() ! dump, - erlang:send_after(ReconnectInterval, self(), restart), - {noreply, State#state{client_pid = undefined}}; - -handle_info(Info, State) -> - emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{}) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -subscribe_remote_topics(ClientPid, Subscriptions) -> - [begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end - || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})]. - -subscribe_local_topics(Options) -> - Topics = get_value(forwards, Options, []), - Subid = get_value(client_id, Options, <<"bridge">>), - [begin emqx_broker:subscribe(bin(Topic), #{qos => 1, subid => Subid}), bin(Topic) end - || Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})]. - -proto_ver(mqttv3) -> v3; -proto_ver(mqttv4) -> v4; -proto_ver(mqttv5) -> v5. -address(Address) -> - case string:tokens(Address, ":") of - [Host] -> {Host, 1883}; - [Host, Port] -> {Host, list_to_integer(Port)} - end. -options(Options) -> - options(Options, []). -options([], Acc) -> - Acc; -options([{username, Username}| Options], Acc) -> - options(Options, [{username, Username}|Acc]); -options([{proto_ver, ProtoVer}| Options], Acc) -> - options(Options, [{proto_ver, proto_ver(ProtoVer)}|Acc]); -options([{password, Password}| Options], Acc) -> - options(Options, [{password, Password}|Acc]); -options([{keepalive, Keepalive}| Options], Acc) -> - options(Options, [{keepalive, Keepalive}|Acc]); -options([{client_id, ClientId}| Options], Acc) -> - options(Options, [{client_id, ClientId}|Acc]); -options([{clean_start, CleanStart}| Options], Acc) -> - options(Options, [{clean_start, CleanStart}|Acc]); -options([{address, Address}| Options], Acc) -> - {Host, Port} = address(Address), - options(Options, [{host, Host}, {port, Port}|Acc]); -options([{ssl, Ssl}| Options], Acc) -> - options(Options, [{ssl, Ssl}|Acc]); -options([{ssl_opts, SslOpts}| Options], Acc) -> - options(Options, [{ssl_opts, SslOpts}|Acc]); -options([_Option | Options], Acc) -> - options(Options, Acc). - -name(Id) -> - list_to_atom(lists:concat([?MODULE, "_", Id])). - -bin(L) -> iolist_to_binary(L). - -mountpoint(undefined, Topic) -> - Topic; -mountpoint(Prefix, Topic) -> - <>. - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - -en_writeq(Msg, State = #state{replayq = ReplayQ, - queue_option = #{mem_cache := false}}) -> - NewReplayQ = replayq:append(ReplayQ, [Msg]), - State#state{replayq = NewReplayQ}; -en_writeq(Msg, State = #state{writeq = WriteQ, - queue_option = #{batch_size := BatchSize, - mem_cache := true}}) - when length(WriteQ) < BatchSize-> - State#state{writeq = [Msg | WriteQ]} ; -en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ, - queue_option = #{mem_cache := true}}) -> - NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)), - State#state{writeq = [Msg], replayq = NewReplayQ}. - -publish_readq_msg(_ClientPid, [], NewReadQ) -> - {ok, NewReadQ}; -publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) -> - {ok, PktId} = emqx_client:publish(ClientPid, Msg), - publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]). - -delete(PktId, State = #state{ replayq = ReplayQ, - readq = [], - queue_option = #{ mem_cache := false}}) -> - {NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}), - logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]), - ok = replayq:ack(NewReplayQ, NewAckRef), - case Msgs of - [{PktId, _Msg}] -> - self() ! pop, - State#state{ replayq = NewReplayQ, ackref = NewAckRef }; - [{_PktId, _Msg}] -> - NewReplayQ1 = replayq:append(NewReplayQ, Msgs), - self() ! pop, - State#state{ replayq = NewReplayQ1, ackref = NewAckRef }; - _Empty -> - State#state{ replayq = NewReplayQ, ackref = NewAckRef} - end; -delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> - ok = replayq:ack(ReplayQ, AckRef), - self() ! pop, - State; - -delete(PktId, State = #state{readq = [], writeq = WriteQ}) -> - State#state{writeq = lists:keydelete(PktId, 1, WriteQ)}; - -delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) -> - NewReadQ = lists:keydelete(PktId, 1, ReadQ), - case NewReadQ of - [] -> - ok = replayq:ack(ReplayQ, AckRef), - self() ! pop; - _NewReadQ -> - ok - end, - State#state{ readq = NewReadQ }. - -bridge(Action, State = #state{options = Options, - replayq = ReplayQ, - queue_option - = QueueOption - = #{batch_size := BatchSize}}) - when BatchSize > 0 -> - case emqx_client:start_link([{owner, self()} | options(Options)]) of - {ok, ClientPid} -> - case emqx_client:connect(ClientPid) of - {ok, _} -> - emqx_logger:info("[Bridge] connected to remote successfully"), - Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), - Forwards = subscribe_local_topics(Options), - {NewReplayQ, AckRef, ReadQ} = open_replayq(ReplayQ, QueueOption), - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {<<"start bridge successfully">>, - State#state{client_pid = ClientPid, - subscriptions = Subs, - readq = NewReadQ, - replayq = NewReplayQ, - ackref = AckRef, - forwards = Forwards}}; - {error, Reason} -> - emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), - {<<"connect to remote failed">>, - State#state{client_pid = ClientPid}} - end; - {error, Reason} -> - emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]), - {<<"start bridge failed">>, State} - end; -bridge(Action, State) -> - emqx_logger:error("[Bridge] ~p failed! error: batch_size should greater than zero", [Action]), - {<<"Open Replayq failed">>, State}. - -open_replayq(undefined, #{batch_size := BatchSize, - replayq_dir := ReplayqDir, - replayq_seg_bytes := ReplayqSegBytes}) -> - ReplayQ = replayq:open(#{dir => ReplayqDir, - seg_bytes => ReplayqSegBytes, - sizer => fun(Term) -> - size(term_to_binary(Term)) - end, - marshaller => fun({PktId, Msg}) -> - term_to_binary({PktId, Msg}); - (Bin) -> - binary_to_term(Bin) - end}), - replayq:pop(ReplayQ, #{count_limit => BatchSize}); -open_replayq(ReplayQ, #{batch_size := BatchSize}) -> - replayq:pop(ReplayQ, #{count_limit => BatchSize}). diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl deleted file mode 100644 index baa857074..000000000 --- a/src/emqx_bridge_sup.erl +++ /dev/null @@ -1,45 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_sup). - --behavior(supervisor). - --include("emqx.hrl"). - --export([start_link/0, bridges/0]). - -%% Supervisor callbacks --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{node(), map()}]). -bridges() -> - [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. - -init([]) -> - BridgesOpts = emqx_config:get_env(bridges, []), - Bridges = [spec(Opts)|| Opts <- BridgesOpts], - {ok, {{one_for_one, 10, 100}, Bridges}}. - -spec({Id, Options})-> - #{id => Id, - start => {emqx_bridge, start_link, [Id, Options]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_bridge]}. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 7ebd40769..0153f6570 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -18,6 +18,7 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_client.hrl"). -export([start_link/0, start_link/1]). -export([request/5, request/6, request_async/7, receive_response/3]). @@ -42,7 +43,7 @@ -export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, request_input/0, response_payload/0, request_handler/0, - corr_data/0]). + corr_data/0, mqtt_msg/0]). -export_type([host/0, option/0]). @@ -97,9 +98,6 @@ | {force_ping, boolean()} | {properties, properties()}). --record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, - packet_id, topic, props, payload}). - -type(mqtt_msg() :: #mqtt_msg{}). -record(state, {name :: atom(), diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl deleted file mode 100644 index 0521e6d3f..000000000 --- a/src/emqx_local_bridge.erl +++ /dev/null @@ -1,157 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_local_bridge). - --behaviour(gen_server). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - --export([start_link/5]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(PING_DOWN_INTERVAL, 1000). - --record(state, {pool, id, - node, subtopic, - qos = ?QOS_0, - topic_suffix = <<>>, - topic_prefix = <<>>, - mqueue :: emqx_mqueue:mqueue(), - max_queue_len = 10000, - ping_down_interval = ?PING_DOWN_INTERVAL, - status = up}). - --type(option() :: {qos, emqx_mqtt_types:qos()} | - {topic_suffix, binary()} | - {topic_prefix, binary()} | - {max_queue_len, pos_integer()} | - {ping_down_interval, pos_integer()}). - --export_type([option/0]). - -%% @doc Start a bridge --spec(start_link(term(), pos_integer(), atom(), binary(), [option()]) - -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id, Node, Topic, Options) -> - gen_server:start_link(?MODULE, [Pool, Id, Node, Topic, Options], [{hibernate_after, 5000}]). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Pool, Id, Node, Topic, Options]) -> - process_flag(trap_exit, true), - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - case net_kernel:connect_node(Node) of - true -> - true = erlang:monitor_node(Node, true), - Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_broker:subscribe(Topic, #{share => Group, qos => ?QOS_0}), - State = parse_opts(Options, #state{node = Node, subtopic = Topic}), - MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len, - store_qos0 => true}), - {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; - false -> - {stop, {cannot_connect_node, Node}} - end. - -parse_opts([], State) -> - State; -parse_opts([{qos, QoS} | Opts], State) -> - parse_opts(Opts, State#state{qos = QoS}); -parse_opts([{topic_suffix, Suffix} | Opts], State) -> - parse_opts(Opts, State#state{topic_suffix= Suffix}); -parse_opts([{topic_prefix, Prefix} | Opts], State) -> - parse_opts(Opts, State#state{topic_prefix = Prefix}); -parse_opts([{max_queue_len, Len} | Opts], State) -> - parse_opts(Opts, State#state{max_queue_len = Len}); -parse_opts([{ping_down_interval, Interval} | Opts], State) -> - parse_opts(Opts, State#state{ping_down_interval = Interval}); -parse_opts([_Opt | Opts], State) -> - parse_opts(Opts, State). - -handle_call(Req, _From, State) -> - emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) -> - %% TODO: how to drop??? - {_Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - {noreply, State#state{mqueue = NewQ}}; - -handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> - emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), - {noreply, State}; - -handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> - emqx_logger:warning("[Bridge] node down: ~s", [Node]), - erlang:send_after(Interval, self(), ping_down_node), - {noreply, State#state{status = down}, hibernate}; - -handle_info({nodeup, Node}, State = #state{node = Node}) -> - %% TODO: Really fast?? - case emqx:is_running(Node) of - true -> emqx_logger:warning("[Bridge] Node up: ~s", [Node]), - {noreply, dequeue(State#state{status = up})}; - false -> self() ! {nodedown, Node}, - {noreply, State#state{status = down}} - end; - -handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> - Self = self(), - spawn_link(fun() -> - case net_kernel:connect_node(Node) of - true -> Self ! {nodeup, Node}; - false -> erlang:send_after(Interval, Self, ping_down_node) - end - end), - {noreply, State}; - -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; - -handle_info(Info, State) -> - emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -dequeue(State = #state{mqueue = MQ}) -> - case emqx_mqueue:out(MQ) of - {empty, MQ1} -> - State#state{mqueue = MQ1}; - {{value, Msg}, MQ1} -> - handle_info({dispatch, Msg#message.topic, Msg}, State), - dequeue(State#state{mqueue = MQ1}) - end. - -transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> - Msg#message{topic = <>}. - diff --git a/src/emqx_local_bridge_sup_sup.erl b/src/emqx_local_bridge_sup_sup.erl deleted file mode 100644 index 8a61d5936..000000000 --- a/src/emqx_local_bridge_sup_sup.erl +++ /dev/null @@ -1,74 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_local_bridge_sup_sup). - --behavior(supervisor). - --include("emqx.hrl"). - --export([start_link/0, bridges/0]). --export([start_bridge/2, start_bridge/3, stop_bridge/2]). - -%% Supervisor callbacks --export([init/1]). - --define(CHILD_ID(Node, Topic), {bridge_sup, Node, Topic}). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). -bridges() -> - [{Node, Topic, Pid} || {?CHILD_ID(Node, Topic), Pid, supervisor, _} - <- supervisor:which_children(?MODULE)]. - -%% @doc Start a bridge --spec(start_bridge(node(), emqx_topic:topic()) -> {ok, pid()} | {error, term()}). -start_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) -> - start_bridge(Node, Topic, []). - --spec(start_bridge(node(), emqx_topic:topic(), [emqx_bridge:option()]) - -> {ok, pid()} | {error, term()}). -start_bridge(Node, _Topic, _Options) when Node =:= node() -> - {error, bridge_to_self}; -start_bridge(Node, Topic, Options) when is_atom(Node), is_binary(Topic) -> - Options1 = emqx_misc:merge_opts(emqx_config:get_env(bridge, []), Options), - supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). - -%% @doc Stop a bridge --spec(stop_bridge(node(), emqx_topic:topic()) -> ok | {error, term()}). -stop_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) -> - ChildId = ?CHILD_ID(Node, Topic), - case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - Error -> Error - end. - -%%------------------------------------------------------------------------------ -%% Supervisor callbacks -%%------------------------------------------------------------------------------ - -init([]) -> - {ok, {{one_for_one, 10, 3600}, []}}. - -bridge_spec(Node, Topic, Options) -> - #{id => ?CHILD_ID(Node, Topic), - start => {emqx_local_bridge_sup, start_link, [Node, Topic, Options]}, - restart => permanent, - shutdown => infinity, - type => supervisor, - modules => [emqx_local_bridge_sup]}. - diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 20a08ba9f..016ff007f 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -67,7 +67,7 @@ default_priority => highest | lowest, store_qos0 => boolean() }). --type(message() :: pemqx_types:message()). +-type(message() :: emqx_types:message()). -type(stat() :: {len, non_neg_integer()} | {max_len, non_neg_integer()} diff --git a/src/emqx_portal_connect.erl b/src/emqx_portal_connect.erl new file mode 100644 index 000000000..35f4b53dc --- /dev/null +++ b/src/emqx_portal_connect.erl @@ -0,0 +1,65 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_connect). + +-export([start/2]). + +-export_type([config/0, connection/0]). + +-optional_callbacks([]). + +-type config() :: map(). +-type connection() :: term(). +-type conn_ref() :: term(). +-type batch() :: emqx_protal:batch(). +-type batch_ref() :: reference(). + +-include("logger.hrl"). + +%% establish the connection to remote node/cluster +%% protal worker (the caller process) should be expecting +%% a message {disconnected, conn_ref()} when disconnected. +-callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}. + +%% send to remote node/cluster +%% portal worker (the caller process) should be expecting +%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster +-callback send(connection(), batch()) -> {ok, batch_ref()} | {error, any()}. + +%% called when owner is shutting down. +-callback stop(conn_ref(), connection()) -> ok. + +start(Module, Config) -> + case Module:start(Config) of + {ok, Ref, Conn} -> + {ok, Ref, Conn}; + {error, Reason} -> + Config1 = obfuscate(Config), + ?ERROR("Failed to connect with module=~p\n" + "config=~p\nreason:~p", [Module, Config1, Reason]), + error + end. + +obfuscate(Map) -> + maps:fold(fun(K, V, Acc) -> + case is_sensitive(K) of + true -> [{K, '***'} | Acc]; + false -> [{K, V} | Acc] + end + end, [], Map). + +is_sensitive(passsword) -> true; +is_sensitive(_) -> false. + diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 5f62df904..0e6ebb08a 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -61,10 +61,7 @@ init([]) -> RouterSup = supervisor_spec(emqx_router_sup), %% Broker Sup BrokerSup = supervisor_spec(emqx_broker_sup), - %% BridgeSup - LocalBridgeSup = supervisor_spec(emqx_local_bridge_sup_sup), - - BridgeSup = supervisor_spec(emqx_bridge_sup), + PortalSup = supervisor_spec(emqx_portal_sup), %% AccessControl AccessControl = worker_spec(emqx_access_control), %% Session Manager @@ -77,8 +74,7 @@ init([]) -> [KernelSup, RouterSup, BrokerSup, - LocalBridgeSup, - BridgeSup, + PortalSup, AccessControl, SMSup, CMSup, diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 59f592984..4c90c3f39 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -20,7 +20,7 @@ -export([triples/1]). -export([words/1]). -export([wildcard/1]). --export([join/1]). +-export([join/1, prepend/2]). -export([feed_var/3]). -export([systop/1]). -export([parse/1, parse/2]). @@ -129,6 +129,18 @@ join(root, W) -> join(Parent, W) -> <<(bin(Parent))/binary, $/, (bin(W))/binary>>. +%% @doc Prepend a topic prefix. +%% Ensured to have only one / between prefix and suffix. +prepend(root, W) -> bin(W); +prepend(undefined, W) -> bin(W); +prepend(<<>>, W) -> bin(W); +prepend(Parent0, W) -> + Parent = bin(Parent0), + case binary:last(Parent) of + $/ -> <>; + _ -> join(Parent, W) + end. + bin('') -> <<>>; bin('+') -> <<"+">>; bin('#') -> <<"#">>; diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl new file mode 100644 index 000000000..7e7d678f6 --- /dev/null +++ b/src/portal/emqx_portal.erl @@ -0,0 +1,356 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc Portal works in two layers (1) batching layer (2) transport layer +%% The `portal' batching layer collects local messages in batches and sends over +%% to remote MQTT node/cluster via `connetion' transport layer. +%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be +%% the `gen_rpc' based implementation `emqx_portal_rpc'. Otherwise `connection' +%% has to be `emqx_portal_mqtt'. +%% +%% ``` +%% +------+ +--------+ +%% | EMQX | | REMOTE | +%% | | | | +%% | (portal) <==(connection)==> | | +%% | | | | +%% | | | | +%% +------+ +--------+ +%% ''' +%% +%% +%% This module implements 2 kinds of APIs with regards to batching and +%% messaging protocol. (1) A `gen_statem' based local batch collector; +%% (2) APIs for incoming remote batches/messages. +%% +%% Batch collector state diagram +%% +%% [connecting] --(2)--> [connected] +%% | ^ | +%% | | | +%% '--(1)---'--------(3)------' +%% +%% (1): timeout +%% (2): successfuly connected to remote node/cluster +%% (3): received {disconnected, conn_ref(), Reason} OR +%% failed to send to remote node/cluster. +%% +%% NOTE: A portal worker may subscribe to multiple (including wildcard) +%% local topics, and the underlying `emqx_portal_connect' may subscribe to +%% multiple remote topics, however, worker/connections are not designed +%% to support automatic load-balancing, i.e. in case it can not keep up +%% with the amount of messages comming in, administrator should split and +%% balance topics between worker/connections manually. + +-module(emqx_portal). +-behaviour(gen_statem). + +%% APIs +-export([start_link/2, + import_batch/2, + handle_ack/2, + stop/1 + ]). + +%% gen_statem callbacks +-export([terminate/3, code_change/4, init/1, callback_mode/0]). + +%% state functions +-export([connecting/3, connected/3]). + +-export_type([config/0, + batch/0, + ref/0 + ]). + +-type config() :: map(). +-type batch() :: [emqx_portal_msg:msg()]. +-type ref() :: reference(). + +-include("logger.hrl"). +-include("emqx_mqtt.hrl"). + +-define(DEFAULT_BATCH_COUNT, 100). +-define(DEFAULT_BATCH_BYTES, 1 bsl 20). +-define(DEFAULT_SEND_AHEAD, 8). +-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). +-define(DEFAULT_SEG_BYTES, (1 bsl 20)). +-define(maybe_send, {next_event, internal, maybe_send}). + +%% @doc Start a portal worker. Supported configs: +%% connect_module: The module which implements emqx_portal_connect behaviour +%% and work as message batch transport layer +%% reconnect_delay_ms: Delay in milli-seconds for the portal worker to retry +%% in case of transportation failure. +%% max_inflight_batches: Max number of batches allowed to send-ahead before +%% receiving confirmation from remote node/cluster +%% mountpoint: The topic mount point for messages sent to remote node/cluster +%% `undefined', `<<>>' or `""' to disalble +%% forwards: Local topics to subscribe. +%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each +%% send call towards emqx_portal_connect +%% queue.batch_count_limit: Max number of messages to collect in a batch for +%% each send call towards eqmx_portal_connect +%% queue.replayq_dir: Directory where replayq should persist messages +%% queue.replayq_seg_bytes: Size in bytes for each replqyq segnment file +%% +%% Find more connection specific configs in the callback modules +%% of emqx_portal_connect behaviour. +start_link(Name, Config) when is_list(Config) -> + start_link(Name, maps:from_list(Config)); +start_link(Name, Config) -> + gen_statem:start_link({local, Name}, ?MODULE, Config, []). + +stop(Pid) -> gen_statem:stop(Pid). + +%% @doc This function is to be evaluated on message/batch receiver side. +-spec import_batch(batch(), fun(() -> ok)) -> ok. +import_batch(Batch, AckFun) -> + lists:foreach(fun emqx_broker:publish/1, emqx_portal_msg:to_broker_msgs(Batch)), + AckFun(). + +%% @doc This function is to be evaluated on message/batch exporter side +%% when message/batch is accepted by remote node. +-spec handle_ack(pid(), ref()) -> ok. +handle_ack(Pid, Ref) when node() =:= node(Pid) -> + Pid ! {batch_ack, Ref}, + ok. + + +callback_mode() -> [state_functions, state_enter]. + +%% @doc Config should be a map(). +init(Config) -> + erlang:process_flag(trap_exit, true), + Get = fun(K, D) -> maps:get(K, Config, D) end, + QCfg = maps:get(queue, Config, #{}), + GetQ = fun(K, D) -> maps:get(K, QCfg, D) end, + Dir = GetQ(replayq_dir, undefined), + QueueConfig = + case Dir =:= undefined orelse Dir =:= "" of + true -> #{mem_only => true}; + false -> #{dir => Dir, + seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES) + } + end, + Queue = replayq:open(QueueConfig#{sizer => fun emqx_portal_msg:estimate_size/1, + marshaller => fun msg_marshaller/1}), + Topics = Get(forwards, []), + ok = subscribe_local_topics(Topics), + ConnectModule = maps:get(connect_module, Config), + ConnectConfig = maps:without([connect_module, + queue, + reconnect_delay_ms, + max_inflight_batches, + mountpoint, + forwards + ], Config), + ConnectFun = fun() -> emqx_portal_connect:start(ConnectModule, ConnectConfig) end, + {ok, connecting, + #{connect_module => ConnectModule, + connect_fun => ConnectFun, + reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS), + batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES), + batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT), + max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD), + mountpoint => format_mountpoint(Get(mountpoint, undefined)), + topics => Topics, + replayq => Queue, + inflight => [] + }}. + +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. + +terminate(_Reason, _StateName, #{replayq := Q} = State) -> + _ = disconnect(State), + _ = replayq:close(Q), + ok. + +%% @doc Connecting state is a state with timeout. +%% After each timeout, it re-enters this state and start a retry until +%% successfuly connected to remote node/cluster. +connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> + Action = {state_timeout, Timeout, reconnect}, + {keep_state_and_data, Action}; +connecting(enter, connecting, #{reconnect_delay_ms := Timeout, + connect_fun := ConnectFun} = State) -> + case ConnectFun() of + {ok, ConnRef, Conn} -> + Action = {state_timeout, 0, connected}, + {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; + error -> + Action = {state_timeout, Timeout, reconnect}, + {keep_state_and_data, Action} + end; +connecting(state_timeout, connected, State) -> + {next_state, connected, State}; +connecting(state_timeout, reconnect, _State) -> + repeat_state_and_data; +connecting(info, {batch_ack, Ref}, State) -> + case do_ack(State, Ref) of + {ok, NewState} -> + {keep_state, NewState}; + _ -> + keep_state_and_data + end; +connecting(Type, Content, State) -> + common(connecting, Type, Content, State). + +%% @doc Send batches to remote node/cluster when in 'connected' state. +connected(enter, _OldState, #{inflight := Inflight} = State) -> + case retry_inflight(State#{inflight := []}, Inflight) of + {ok, NewState} -> + Action = {state_timeout, 0, success}, + {keep_state, NewState, Action}; + {error, NewState} -> + Action = {state_timeout, 0, failure}, + {keep_state, disconnect(NewState), Action} + end; +connected(state_timeout, failure, State) -> + {next_state, connecting, State}; +connected(state_timeout, success, State) -> + {keep_state, State, ?maybe_send}; +connected(internal, maybe_send, State) -> + case pop_and_send(State) of + {ok, NewState} -> + {keep_state, NewState}; + {error, NewState} -> + {next_state, connecting, disconnect(NewState)} + end; +connected(info, {disconnected, ConnRef, Reason}, + #{conn_ref := ConnRef, connection := Conn} = State) -> + ?INFO("Portal ~p diconnected~nreason=~p", [Conn, Reason]), + {next_state, connecting, + State#{conn_ref := undefined, + connection := undefined + }}; +connected(info, {batch_ack, Ref}, State) -> + case do_ack(State, Ref) of + stale -> + keep_state_and_data; + bad_order -> + %% try re-connect then re-send + {next_state, connecting, disconnect(State)}; + {ok, NewState} -> + {keep_state, NewState} + end; +connected(Type, Content, State) -> + common(connected, Type, Content, State). + +%% Common handlers +common(_StateName, info, {dispatch, _, Msg}, + #{replayq := Q} = State) -> + NewQ = replayq:append(Q, collect([Msg])), + {keep_state, State#{replayq => NewQ}, ?maybe_send}; +common(StateName, Type, Content, State) -> + ?INFO("Ignored unknown ~p event ~p at state ~p", [Type, Content, StateName]), + {keep_state, State}. + +collect(Acc) -> + receive + {dispatch, _, Msg} -> + collect([Msg | Acc]) + after + 0 -> + lists:reverse(Acc) + end. + +%% Retry all inflight (previously sent but not acked) batches. +retry_inflight(State, []) -> {ok, State}; +retry_inflight(#{inflight := Inflight} = State, + [#{q_ack_ref := QAckRef, batch := Batch} | T] = Remain) -> + case do_send(State, QAckRef, Batch) of + {ok, NewState} -> + retry_inflight(NewState, T); + {error, Reason} -> + ?ERROR("Inflight retry failed\n~p", [Reason]), + {error, State#{inflight := Inflight ++ Remain}} + end. + +pop_and_send(#{inflight := Inflight, + max_inflight_batches := Max + } = State) when length(Inflight) >= Max -> + {ok, State}; +pop_and_send(#{replayq := Q, + batch_count_limit := CountLimit, + batch_bytes_limit := BytesLimit + } = State) -> + case replayq:is_empty(Q) of + true -> + {ok, State}; + false -> + Opts = #{count_limit => CountLimit, bytes_limit => BytesLimit}, + {Q1, QAckRef, Batch} = replayq:pop(Q, Opts), + do_send(State#{replayq := Q1}, QAckRef, Batch) + end. + +%% Assert non-empty batch because we have a is_empty check earlier. +do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> + case maybe_send(State, Batch) of + {ok, Ref} -> + NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, + send_ack_ref => Ref, + batch => Batch + }], + {ok, State#{inflight := NewInflight}}; + {error, Reason} -> + ?INFO("Batch produce failed\n~p", [Reason]), + {error, State} + end. + +do_ack(State = #{inflight := [#{send_ack_ref := Ref} | Rest]}, Ref) -> + {ok, State#{inflight := Rest}}; +do_ack(#{inflight := Inflight}, Ref) -> + case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of + true -> bad_order; + false -> stale + end. + +subscribe_local_topics(Topics) -> + lists:foreach( + fun(Topic0) -> + Topic = iolist_to_binary(Topic0), + emqx_topic:validate({filter, Topic}) orelse erlang:error({bad_topic, Topic}), + emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}) + end, Topics). + +name() -> {_, Name} = process_info(self(), registered_name), Name. + +disconnect(#{connection := Conn, + conn_ref := ConnRef, + connect_module := Module + } = State) when Conn =/= undefined -> + ok = Module:stop(ConnRef, Conn), + State#{conn_ref => undefined, + connection => undefined + }; +disconnect(State) -> State. + +%% Called only when replayq needs to dump it to disk. +msg_marshaller(Bin) when is_binary(Bin) -> emqx_portal_msg:from_binary(Bin); +msg_marshaller(Msg) -> emqx_portal_msg:to_binary(Msg). + +%% Return {ok, SendAckRef} or {error, Reason} +maybe_send(#{connect_module := Module, + connection := Connection, + mountpoint := Mountpoint + }, Batch) -> + Module:send(Connection, [emqx_portal_msg:apply_mountpoint(M, Mountpoint) || M <- Batch]). + +format_mountpoint(undefined) -> + undefined; +format_mountpoint(Prefix) -> + binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + diff --git a/src/portal/emqx_portal_msg.erl b/src/portal/emqx_portal_msg.erl new file mode 100644 index 000000000..af9cc1b01 --- /dev/null +++ b/src/portal/emqx_portal_msg.erl @@ -0,0 +1,61 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_msg). + +-export([ to_binary/1 + , from_binary/1 + , apply_mountpoint/2 + , to_broker_msgs/1 + , estimate_size/1 + ]). + +-export_type([msg/0]). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +-type msg() :: emqx_types:message(). + +%% @doc Mount topic to a prefix. +-spec apply_mountpoint(msg(), undefined | binary()) -> msg(). +apply_mountpoint(#{topic := Topic} = Msg, Mountpoint) -> + Msg#{topic := topic(Mountpoint, Topic)}. + +%% @doc Make `binary()' in order to make iodata to be persisted on disk. +-spec to_binary(msg()) -> binary(). +to_binary(Msg) -> term_to_binary(Msg). + +%% @doc Unmarshal binary into `msg()'. +-spec from_binary(binary()) -> msg(). +from_binary(Bin) -> binary_to_term(Bin). + +%% @doc Estimate the size of a message. +%% Count only the topic length + payload size +-spec estimate_size(msg()) -> integer(). +estimate_size(#{topic := Topic, payload := Payload}) -> + size(Topic) + size(Payload). + +%% @doc By message/batch receiver, transform received batch into +%% messages to dispatch to local brokers. +to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch). + +to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic, + properties := Props, payload := Payload}) -> + emqx_message:set_headers(Props, + emqx_message:set_flags(#{dup => Dup, retain => Retain}, + emqx_message:make(portal, QoS, Topic, Payload))). + +topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic). + diff --git a/src/portal/emqx_portal_rpc.erl b/src/portal/emqx_portal_rpc.erl new file mode 100644 index 000000000..8b1136353 --- /dev/null +++ b/src/portal/emqx_portal_rpc.erl @@ -0,0 +1,106 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements EMQX Portal transport layer based on gen_rpc. + +-module(emqx_portal_rpc). +-behaviour(emqx_portal_connect). + +%% behaviour callbacks +-export([start/1, + send/2, + stop/2 + ]). + +%% Internal exports +-export([ handle_send/2 + , handle_ack/2 + , heartbeat/2 + ]). + +-type batch_ref() :: emqx_portal:batch_ref(). +-type batch() :: emqx_portal:batch(). + +-define(HEARTBEAT_INTERVAL, timer:seconds(1)). + +-define(RPC, gen_rpc). + +start(#{address := Remote}) -> + case poke(Remote) of + ok -> + Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), Remote]), + {ok, Pid, Remote}; + Error -> + Error + end. + +stop(Pid, _Remote) when is_pid(Pid) -> + Ref = erlang:monitor(process, Pid), + unlink(Pid), + Pid ! stop, + receive + {'DOWN', Ref, process, Pid, _Reason} -> + ok + after + 1000 -> + exit(Pid, kill) + end, + ok. + +%% @doc Callback for `emqx_portal_connect' behaviour +-spec send(node(), batch()) -> {ok, batch_ref()} | {error, any()}. +send(Remote, Batch) -> + Sender = self(), + case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of + {ok, Ref} -> {ok, Ref}; + {badrpc, Reason} -> {error, Reason} + end. + +%% @doc Handle send on receiver side. +-spec handle_send(pid(), batch()) -> {ok, batch_ref()} | {error, any()}. +handle_send(SenderPid, Batch) -> + SenderNode = node(SenderPid), + Ref = make_ref(), + AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end, + case emqx_portal:import_batch(Batch, AckFun) of + ok -> {ok, Ref}; + Error -> Error + end. + +%% @doc Handle batch ack in sender node. +handle_ack(SenderPid, Ref) -> + ok = emqx_portal:handle_ack(SenderPid, Ref). + +%% @hidden Heartbeat loop +heartbeat(Parent, RemoteNode) -> + Interval = ?HEARTBEAT_INTERVAL, + receive + stop -> exit(normal) + after + Interval -> + case poke(RemoteNode) of + ok -> + ?MODULE:heartbeat(Parent, RemoteNode); + {error, Reason} -> + Parent ! {disconnected, self(), Reason}, + exit(normal) + end + end. + +poke(Node) -> + case ?RPC:call(Node, erlang, node, []) of + Node -> ok; + {badrpc, Reason} -> {error, Reason} + end. + diff --git a/src/portal/emqx_portal_sup.erl b/src/portal/emqx_portal_sup.erl new file mode 100644 index 000000000..79afd6352 --- /dev/null +++ b/src/portal/emqx_portal_sup.erl @@ -0,0 +1,54 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_sup). +-behavior(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +-define(SUP, ?MODULE). +-define(WORKER_SUP, emqx_portal_worker_sup). + +start_link() -> start_link(?SUP). + +start_link(Name) -> + supervisor:start_link({local, Name}, ?MODULE, Name). + +init(?SUP) -> + Sp = fun(Name) -> + #{id => Name, + start => {?MODULE, start_link, [Name]}, + restart => permanent, + shutdown => 5000, + type => supervisor, + modules => [?MODULE] + } + end, + {ok, {{one_for_one, 5, 10}, [Sp(?WORKER_SUP)]}}; +init(?WORKER_SUP) -> + BridgesConf = emqx_config:get_env(bridges, []), + BridgesSpec = lists:map(fun portal_spec/1, BridgesConf), + {ok, {{one_for_one, 10, 100}, BridgesSpec}}. + +portal_spec({Name, Config}) -> + #{id => Name, + start => {emqx_portal, start_link, [Name, Config]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_portal] + }. + diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl deleted file mode 100644 index 9681c27e6..000000000 --- a/test/emqx_bridge_SUITE.erl +++ /dev/null @@ -1,58 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - -all() -> - [bridge_test]. - -init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), - Config. - -end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). - -bridge_test(_) -> - #{msg := <<"start bridge successfully">>} - = emqx_bridge:start_bridge(aws), - test_forwards(), - test_subscriptions(0), - test_subscriptions(1), - test_subscriptions(2), - #{msg := <<"stop bridge successfully">>} - = emqx_bridge:stop_bridge(aws), - ok. - -test_forwards() -> - emqx_bridge:add_forward(aws, <<"test_forwards">>), - [<<"test_forwards">>, <<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws), - emqx_bridge:del_forward(aws, <<"test_forwards">>), - [<<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws), - ok. - -test_subscriptions(QoS) -> - emqx_bridge:add_subscription(aws, <<"test_subscriptions">>, QoS), - [{<<"test_subscriptions">>, QoS}, - {<<"cmd/topic1">>, 1}, - {<<"cmd/topic2">>, 1}] = emqx_bridge:show_subscriptions(aws), - emqx_bridge:del_subscription(aws, <<"test_subscriptions">>), - [{<<"cmd/topic1">>,1}, {<<"cmd/topic2">>,1}] = emqx_bridge:show_subscriptions(aws), - ok. diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 1ab79e8a9..1ef5b1fa3 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -156,24 +156,30 @@ flush(Msgs) -> end. bridge_conf() -> - [{aws, - [{username,"user"}, - {address,"127.0.0.1:1883"}, - {clean_start,true}, - {client_id,"bridge_aws"}, - {forwards,["topic1/#","topic2/#"]}, - {keepalive,60000}, - {max_inflight,32}, - {mountpoint,"bridge/aws/${node}/"}, - {password,"passwd"}, - {proto_ver,mqttv4}, - {queue, - #{batch_size => 1000,mem_cache => true, - replayq_dir => "data/emqx_aws_bridge/", - replayq_seg_bytes => 10485760}}, - {reconnect_interval,30000}, - {retry_interval,20000}, - {ssl,false}, - {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]}, - {start_type,manual}, - {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}]. \ No newline at end of file + [ {local_rpc, + [{connect_module, emqx_portal_rpc}, + {address, node()}, + {forwards, ["portal-1/#", "portal-2/#"]} + ]} + ]. + % [{aws, + % [{connect_module, emqx_portal_mqtt}, + % {username,"user"}, + % {address,"127.0.0.1:1883"}, + % {clean_start,true}, + % {client_id,"bridge_aws"}, + % {forwards,["topic1/#","topic2/#"]}, + % {keepalive,60000}, + % {max_inflight,32}, + % {mountpoint,"bridge/aws/${node}/"}, + % {password,"passwd"}, + % {proto_ver,mqttv4}, + % {queue, + % #{batch_coun t_limit => 1000, + % replayq_dir => "data/emqx_aws_bridge/", + % replayq_seg_bytes => 10485760}}, + % {reconnect_delay_ms,30000}, + % {ssl,false}, + % {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]}, + % {start_type,manual}, + % {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}]. diff --git a/test/emqx_pool_SUITE.erl b/test/emqx_pool_SUITE.erl index ea648709f..d2fb90afc 100644 --- a/test/emqx_pool_SUITE.erl +++ b/test/emqx_pool_SUITE.erl @@ -62,7 +62,7 @@ async_submit_mfa(_Config) -> emqx_pool:async_submit(fun ?MODULE:test_mfa/0, []). async_submit_crash(_) -> - emqx_pool:async_submit(fun() -> A = 1, A = 0 end). + emqx_pool:async_submit(fun() -> error(test) end). t_unexpected(_) -> Pid = emqx_pool:worker(), diff --git a/test/emqx_portal_rpc_tests.erl b/test/emqx_portal_rpc_tests.erl new file mode 100644 index 000000000..5fd7608c0 --- /dev/null +++ b/test/emqx_portal_rpc_tests.erl @@ -0,0 +1,43 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_rpc_tests). +-include_lib("eunit/include/eunit.hrl"). + +send_and_ack_test() -> + %% delegate from gen_rpc to rpc for unit test + meck:new(gen_rpc, [passthrough, no_history]), + meck:expect(gen_rpc, call, 4, + fun(Node, Module, Fun, Args) -> + rpc:call(Node, Module, Fun, Args) + end), + meck:expect(gen_rpc, cast, 4, + fun(Node, Module, Fun, Args) -> + rpc:cast(Node, Module, Fun, Args) + end), + meck:new(emqx_portal, [passthrough, no_history]), + meck:expect(emqx_portal, import_batch, 2, + fun(batch, AckFun) -> AckFun() end), + try + {ok, Pid, Node} = emqx_portal_rpc:start(#{address => node()}), + {ok, Ref} = emqx_portal_rpc:send(Node, batch), + receive + {batch_ack, Ref} -> + ok + end, + ok = emqx_portal_rpc:stop(Pid, Node) + after + meck:unload(gen_rpc), + meck:unload(emqx_portal) + end. diff --git a/test/emqx_portal_tests.erl b/test/emqx_portal_tests.erl new file mode 100644 index 000000000..5826a327e --- /dev/null +++ b/test/emqx_portal_tests.erl @@ -0,0 +1,146 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_tests). +-behaviour(emqx_portal_connect). + +-include_lib("eunit/include/eunit.hrl"). +-include("emqx_mqtt.hrl"). + +-define(PORTAL_NAME, test_portal). +-define(WAIT(PATTERN, TIMEOUT), + receive + PATTERN -> + ok + after + TIMEOUT -> + error(timeout) + end). + +%% stub callbacks +-export([start/1, send/2, stop/2]). + +start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) -> + case is_pid(Pid) of + true -> Pid ! {connection_start_attempt, Ref}; + false -> ok + end, + Result. + +send(SendFun, Batch) when is_function(SendFun, 1) -> + SendFun(Batch). + +stop(_Ref, _Pid) -> ok. + +%% portal worker should retry connecting remote node indefinitely +reconnect_test() -> + Ref = make_ref(), + Config = make_config(Ref, self(), {error, test}), + {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), + %% assert name registered + ?assertEqual(Pid, whereis(?PORTAL_NAME)), + ?WAIT({connection_start_attempt, Ref}, 1000), + %% expect same message again + ?WAIT({connection_start_attempt, Ref}, 1000), + ok = emqx_portal:stop(?PORTAL_NAME), + ok. + +%% connect first, disconnect, then connect again +disturbance_test() -> + Ref = make_ref(), + Config = make_config(Ref, self(), {ok, Ref, connection}), + {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), + ?assertEqual(Pid, whereis(?PORTAL_NAME)), + ?WAIT({connection_start_attempt, Ref}, 1000), + Pid ! {disconnected, Ref, test}, + ?WAIT({connection_start_attempt, Ref}, 1000), + ok = emqx_portal:stop(?PORTAL_NAME). + +%% buffer should continue taking in messages when disconnected +buffer_when_disconnected_test_() -> + {timeout, 5000, fun test_buffer_when_disconnected/0}. + +test_buffer_when_disconnected() -> + Ref = make_ref(), + Nums = lists:seq(1, 100), + Sender = spawn_link(fun() -> receive {portal, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end), + SenderMref = monitor(process, Sender), + Receiver = spawn_link(fun() -> receive {portal, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end), + ReceiverMref = monitor(process, Receiver), + SendFun = fun(Batch) -> + BatchRef = make_ref(), + Receiver ! {batch, BatchRef, Batch}, + {ok, BatchRef} + end, + Config0 = make_config(Ref, false, {ok, Ref, SendFun}), + Config = Config0#{reconnect_delay_ms => 100}, + {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), + Sender ! {portal, Pid}, + Receiver ! {portal, Pid}, + ?assertEqual(Pid, whereis(?PORTAL_NAME)), + Pid ! {disconnected, Ref, test}, + ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 2000), + ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), + ok = emqx_portal:stop(?PORTAL_NAME). + +%% Feed messages to portal +sender_loop(_Pid, [], _) -> exit(normal); +sender_loop(Pid, [Num | Rest], Interval) -> + random_sleep(Interval), + Pid ! {dispatch, dummy, make_msg(Num)}, + sender_loop(Pid, Rest, Interval). + +%% Feed acknowledgments to portal +receiver_loop(_Pid, [], _) -> ok; +receiver_loop(Pid, Nums, Interval) -> + receive + {batch, BatchRef, Batch} -> + Rest = match_nums(Batch, Nums), + random_sleep(Interval), + emqx_portal:handle_ack(Pid, BatchRef), + receiver_loop(Pid, Rest, Interval) + end. + +random_sleep(MaxInterval) -> + case rand:uniform(MaxInterval) - 1 of + 0 -> ok; + T -> timer:sleep(T) + end. + +match_nums([], Rest) -> Rest; +match_nums([#{payload := P} | Rest], Nums) -> + I = binary_to_integer(P), + case Nums of + [I | NumsLeft] -> match_nums(Rest, NumsLeft); + _ -> error({I, Nums}) + end. + +make_config(Ref, TestPid, Result) -> + #{test_pid => TestPid, + test_ref => Ref, + connect_module => ?MODULE, + reconnect_delay_ms => 50, + connect_result => Result + }. + +make_msg(I) -> + Payload = integer_to_binary(I), + #{qos => ?QOS_1, + dup => false, + retain => false, + topic => <<"test/topic">>, + properties => [], + payload => Payload + }. +