diff --git a/.gitignore b/.gitignore index 7a4e891d1..ab0cbe156 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ cuttlefish rebar.lock xrefr erlang.mk +*.coverdata diff --git a/Makefile b/Makefile index 6022dbf8f..35c2c0bee 100644 --- a/Makefile +++ b/Makefile @@ -20,24 +20,24 @@ ERLC_OPTS += +debug_info -DAPPLICATION=emqx BUILD_DEPS = cuttlefish dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1 -#TEST_DEPS = emqx_ct_helplers -#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers +TEST_DEPS = meck +dep_meck = hex-emqx 0.8.13 TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx EUNIT_OPTS = verbose -# CT_SUITES = emqx_frame +CT_SUITES = emqx_bridge ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ - emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ - emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ - emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message +# CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ +# emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ +# emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ +# emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ +# emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ +# emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ +# emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ +# emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -96,17 +96,24 @@ rebar-deps: @rebar3 get-deps rebar-eunit: $(CUTTLEFISH_SCRIPT) - @rebar3 eunit + @rebar3 eunit -v rebar-compile: @rebar3 compile -rebar-ct: app.config +rebar-ct-setup: app.config @rebar3 as test compile @ln -s -f '../../../../etc' _build/test/lib/emqx/ @ln -s -f '../../../../data' _build/test/lib/emqx/ + +rebar-ct: rebar-ct-setup @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') +## Run one single CT with rebar3 +## e.g. make ct-one-suite suite=emqx_bridge +ct-one-suite: rebar-ct-setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE + rebar-clean: @rebar3 clean diff --git a/etc/emqx.conf b/etc/emqx.conf index 166fca25c..92d4f59b8 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1596,28 +1596,6 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ##-------------------------------------------------------------------- ## Bridges to aws ##-------------------------------------------------------------------- -## Start type of the bridge. -## -## Value: enum -## manual -## auto -## bridge.aws.start_type = manual - -## Bridge reconnect time. -## -## Value: Duration -## Default: 30 seconds -## bridge.aws.reconnect_interval = 30s - -## Retry interval for bridge QoS1 message delivering. -## -## Value: Duration -## bridge.aws.retry_interval = 20s - -## Inflight size. -## -## Value: Integer -## bridge.aws.max_inflight = 32 ## Bridge address: node name for local bridge, host:port for remote. ## @@ -1662,66 +1640,12 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: String ## bridge.aws.mountpoint = bridge/aws/${node}/ -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -## bridge.aws.keepalive = 60s - ## Forward message topics ## ## Value: String ## Example: topic1/#,topic2/# ## bridge.aws.forwards = topic1/#,topic2/# -## Subscriptions of the bridge topic. -## -## Value: String -## bridge.aws.subscription.1.topic = cmd/topic1 - -## Subscriptions of the bridge qos. -## -## Value: Number -## bridge.aws.subscription.1.qos = 1 - -## Subscriptions of the bridge topic. -## -## Value: String -## bridge.aws.subscription.2.topic = cmd/topic2 - -## Subscriptions of the bridge qos. -## -## Value: Number -## bridge.aws.subscription.2.qos = 1 - -## If enabled, queue would be written into disk more quickly. -## However, If disabled, some message would be dropped in -## the situation emqx crashed. -## -## Value: on | off -## bridge.aws.queue.mem_cache = on - -## Batch size for buffer queue stored -## -## Value: Integer -## default: 1000 -## bridge.aws.queue.batch_size = 1000 - -## Base directory for replayq to store messages on disk -## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. If the config -## entry was set to `bridge.aws.mqueue_type = memory` -## this config entry would have no effect on mqueue -## -## Value: String -## bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ - -## Replayq segment size -## -## Value: Bytesize - -## bridge.aws.queue.replayq_seg_bytes = 10MB - ## Bribge to remote server via SSL. ## ## Value: on | off @@ -1747,36 +1671,89 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: String ## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## Ping interval of a down bridge. +## +## Value: Duration +## Default: 10 seconds +## bridge.aws.keepalive = 60s + ## TLS versions used by the bridge. ## ## Value: String ## bridge.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -##-------------------------------------------------------------------- -## Bridges to azure -##-------------------------------------------------------------------- +## Subscriptions of the bridge topic. +## +## Value: String +## bridge.aws.subscription.1.topic = cmd/topic1 + +## Subscriptions of the bridge qos. +## +## Value: Number +## bridge.aws.subscription.1.qos = 1 + +## Subscriptions of the bridge topic. +## +## Value: String +## bridge.aws.subscription.2.topic = cmd/topic2 + +## Subscriptions of the bridge qos. +## +## Value: Number +## bridge.aws.subscription.2.qos = 1 + ## Start type of the bridge. ## ## Value: enum ## manual ## auto -## bridge.azure.start_type = manual - -## Bridge reconnect count. -## -## Value: Number -## bridge.azure.reconnect_count = 10 +## bridge.aws.start_type = manual ## Bridge reconnect time. ## ## Value: Duration ## Default: 30 seconds -## bridge.azure.reconnect_time = 30s +## bridge.aws.reconnect_interval = 30s ## Retry interval for bridge QoS1 message delivering. ## ## Value: Duration -## bridge.azure.retry_interval = 20s +## bridge.aws.retry_interval = 20s + +## Inflight size. +## +## Value: Integer +## bridge.aws.max_inflight_batches = 32 + +## Max number of messages to collect in a batch for +## each send call towards emqx_bridge_connect +## +## Value: Integer +## default: 32 +## bridge.aws.queue.batch_count_limit = 32 + +## Max number of bytes to collect in a batch for each +## send call towards emqx_bridge_connect +## +## Value: Bytesize +## default: 1000M +## bridge.aws.queue.batch_bytes_limit = 1000MB + +## Base directory for replayq to store messages on disk +## If this config entry is missing or set to undefined, +## replayq works in a mem-only manner. +## +## Value: String +## bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ + +## Replayq segment size +## +## Value: Bytesize +## bridge.aws.queue.replayq_seg_bytes = 10MB + +##-------------------------------------------------------------------- +## Bridges to azure +##-------------------------------------------------------------------- ## Bridge address: node name for local bridge, host:port for remote. ## @@ -1819,13 +1796,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Mountpoint of the bridge. ## ## Value: String -## bridge.azure.mountpoint = bridge/azure/${node}/ - -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -## bridge.azure.keepalive = 10s +## bridge.azure.mountpoint = bridge/aws/${node}/ ## Forward message topics ## @@ -1833,10 +1804,46 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Example: topic1/#,topic2/# ## bridge.azure.forwards = topic1/#,topic2/# +## Bribge to remote server via SSL. +## +## Value: on | off +## bridge.azure.ssl = off + +## PEM-encoded CA certificates of the bridge. +## +## Value: File +## bridge.azure.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## Client SSL Certfile of the bridge. +## +## Value: File +## bridge.azure.certfile = {{ platform_etc_dir }}/certs/client-cert.pem + +## Client SSL Keyfile of the bridge. +## +## Value: File +## bridge.azure.keyfile = {{ platform_etc_dir }}/certs/client-key.pem + +## SSL Ciphers used by the bridge. +## +## Value: String +## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## Ping interval of a down bridge. +## +## Value: Duration +## Default: 10 seconds +## bridge.azure.keepalive = 60s + +## TLS versions used by the bridge. +## +## Value: String +## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1 + ## Subscriptions of the bridge topic. ## ## Value: String -## bridge.azure.subscription.1.topic = $share/cmd/topic1 +## bridge.azure.subscription.1.topic = cmd/topic1 ## Subscriptions of the bridge qos. ## @@ -1846,27 +1853,50 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Subscriptions of the bridge topic. ## ## Value: String -## bridge.azure.subscription.2.topic = $share/cmd/topic2 +## bridge.azure.subscription.2.topic = cmd/topic2 ## Subscriptions of the bridge qos. ## ## Value: Number ## bridge.azure.subscription.2.qos = 1 -## Batch size for buffer queue stored +## Start type of the bridge. +## +## Value: enum +## manual +## auto +## bridge.azure.start_type = manual + +## Bridge reconnect time. +## +## Value: Duration +## Default: 30 seconds +## bridge.azure.reconnect_interval = 30s + +## Retry interval for bridge QoS1 message delivering. +## +## Value: Duration +## bridge.azure.retry_interval = 20s + +## Inflight size. ## ## Value: Integer -## default: 1000 -## bridge.azure.queue.batch_size = 1000 +## bridge.azure.max_inflight_batches = 32 + +## Maximum number of messages in one batch when sending to remote borkers +## NOTE: when bridging via MQTT connection to remote broker, this config is only +## used for internal message passing optimization as the underlying MQTT +## protocol does not supports batching. +## +## Value: Integer +## default: 32 +## bridge.azure.queue.batch_size = 32 ## Base directory for replayq to store messages on disk ## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. If the config -## entry was set to `bridge.aws.mqueue_type = memory` -## this config entry would have no effect on mqueue +## replayq works in a mem-only manner. ## ## Value: String -## Default: {{ platform_data_dir }}/emqx_aws_bridge/ ## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ ## Replayq segment size @@ -1874,30 +1904,6 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: Bytesize ## bridge.azure.queue.replayq_seg_bytes = 10MB -## PEM-encoded CA certificates of the bridge. -## -## Value: File -## bridge.azure.cacertfile = cacert.pem - -## Client SSL Certfile of the bridge. -## -## Value: File -## bridge.azure.certfile = cert.pem - -## Client SSL Keyfile of the bridge. -## -## Value: File -## bridge.azure.keyfile = key.pem - -## SSL Ciphers used by the bridge. -## -## Value: String -## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 - -## TLS versions used by the bridge. -## -## Value: String -## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1 ##-------------------------------------------------------------------- ## Modules diff --git a/src/emqx_local_bridge_sup.erl b/include/emqx_client.hrl similarity index 63% rename from src/emqx_local_bridge_sup.erl rename to include/emqx_client.hrl index db349b94d..535b8ad55 100644 --- a/src/emqx_local_bridge_sup.erl +++ b/include/emqx_client.hrl @@ -12,15 +12,9 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_local_bridge_sup). - --include("emqx.hrl"). - --export([start_link/3]). - --spec(start_link(node(), emqx_topic:topic(), [emqx_local_bridge:option()]) - -> {ok, pid()} | {error, term()}). -start_link(Node, Topic, Options) -> - MFA = {emqx_local_bridge, start_link, [Node, Topic, Options]}, - emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA). +-ifndef(EMQX_CLIENT_HRL). +-define(EMQX_CLIENT_HRL, true). +-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, + packet_id, topic, props, payload}). +-endif. diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 7e7670112..1c2ce1a27 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -171,9 +171,10 @@ -define(RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED, 16#A2). %%-------------------------------------------------------------------- -%% Maximum MQTT Packet Length +%% Maximum MQTT Packet ID and Length %%-------------------------------------------------------------------- +-define(MAX_PACKET_ID, 16#ffff). -define(MAX_PACKET_SIZE, 16#fffffff). %%-------------------------------------------------------------------- diff --git a/include/logger.hrl b/include/logger.hrl index 503b6cf21..1effab695 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -41,3 +41,5 @@ begin (logger:log(Level,#{},#{report_cb => fun(_) -> {(Format), (Args)} end})) end). + +-define(LOG(Level, Format), ?LOG(Level, Format, [])). diff --git a/priv/emqx.schema b/priv/emqx.schema index 9e71248a7..e490e024f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1512,22 +1512,6 @@ end}. %%-------------------------------------------------------------------- %% Bridges %%-------------------------------------------------------------------- -{mapping, "bridge.$name.queue.mem_cache", "emqx.bridges", [ - {datatype, flag} -]}. - -{mapping, "bridge.$name.queue.batch_size", "emqx.bridges", [ - {datatype, integer} -]}. - -{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [ - {datatype, bytesize} -]}. - {mapping, "bridge.$name.address", "emqx.bridges", [ {datatype, string} ]}. @@ -1616,11 +1600,27 @@ end}. {datatype, {duration, ms}} ]}. -{mapping, "bridge.$name.max_inflight", "emqx.bridges", [ +{mapping, "bridge.$name.max_inflight_batches", "emqx.bridges", [ {default, 0}, {datatype, integer} ]}. +{mapping, "bridge.$name.queue.batch_count_limit", "emqx.bridges", [ + {datatype, integer} +]}. + +{mapping, "bridge.$name.queue.batch_bytes_limit", "emqx.bridges", [ + {datatype, bytesize} +]}. + +{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [ + {datatype, bytesize} +]}. + {translation, "emqx.bridges", fun(Conf) -> Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, @@ -1661,17 +1661,59 @@ end}. lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) end, - + IsNodeAddr = fun(Addr) -> + case string:tokens(Addr, "@") of + [_NodeName, _Hostname] -> true; + _ -> false + end + end, + ConnMod = fun(Name) -> + [AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".address", Conf), + {_, Addr} = AddrConfig, + Subs = Subscriptions(Name), + case IsNodeAddr(Addr) of + true when Subs =/= [] -> + error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs}); + true -> + emqx_bridge_rpc; + false -> + emqx_bridge_mqtt + end + end, + %% to be backward compatible + Translate = + fun Tr(queue, Q, Cfg) -> + NewQ = maps:fold(Tr, #{}, Q), + Cfg#{queue => NewQ}; + Tr(address, Addr0, Cfg) -> + Addr = case IsNodeAddr(Addr0) of + true -> list_to_atom(Addr0); + false -> Addr0 + end, + Cfg#{address => Addr}; + Tr(batch_size, Count, Cfg) -> + Cfg#{batch_count_limit => Count}; + Tr(reconnect_interval, Ms, Cfg) -> + Cfg#{reconnect_delay_ms => Ms}; + Tr(max_inflight, Count, Cfg) -> + Cfg#{max_inflight_batches => Count}; + Tr(Key, Value, Cfg) -> + Cfg#{Key => Value} + end, maps:to_list( lists:foldl( fun({["bridge", Name, Opt], Val}, Acc) -> %% e.g #{aws => [{OptKey, OptVal}]} - Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}, {queue, Queue(Name)}], - maps:update_with(list_to_atom(Name), - fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); + Init = [{list_to_atom(Opt), Val}, + {connect_module, ConnMod(Name)}, + {subscriptions, Subscriptions(Name)}, + {queue, Queue(Name)} + ], + C = maps:update_with(list_to_atom(Name), + fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc), + maps:fold(Translate, #{}, C); (_, Acc) -> Acc end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf)))) - end}. %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index 7486e7267..b6c68208b 100644 --- a/rebar.config +++ b/rebar.config @@ -27,3 +27,6 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. + +{profiles, [{test, [{deps, [{meck, "0.8.13"}]}]}]}. + diff --git a/src/bridge/emqx_bridge.erl b/src/bridge/emqx_bridge.erl new file mode 100644 index 000000000..af85df68e --- /dev/null +++ b/src/bridge/emqx_bridge.erl @@ -0,0 +1,554 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc Bridge works in two layers (1) batching layer (2) transport layer +%% The `bridge' batching layer collects local messages in batches and sends over +%% to remote MQTT node/cluster via `connetion' transport layer. +%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be +%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection' +%% has to be `emqx_bridge_mqtt'. +%% +%% ``` +%% +------+ +--------+ +%% | EMQX | | REMOTE | +%% | | | | +%% | (bridge) <==(connection)==> | | +%% | | | | +%% | | | | +%% +------+ +--------+ +%% ''' +%% +%% +%% This module implements 2 kinds of APIs with regards to batching and +%% messaging protocol. (1) A `gen_statem' based local batch collector; +%% (2) APIs for incoming remote batches/messages. +%% +%% Batch collector state diagram +%% +%% [standing_by] --(0) --> [connecting] --(2)--> [connected] +%% | ^ | +%% | | | +%% '--(1)---'--------(3)------' +%% +%% (0): auto or manual start +%% (1): retry timeout +%% (2): successfuly connected to remote node/cluster +%% (3): received {disconnected, conn_ref(), Reason} OR +%% failed to send to remote node/cluster. +%% +%% NOTE: A bridge worker may subscribe to multiple (including wildcard) +%% local topics, and the underlying `emqx_bridge_connect' may subscribe to +%% multiple remote topics, however, worker/connections are not designed +%% to support automatic load-balancing, i.e. in case it can not keep up +%% with the amount of messages comming in, administrator should split and +%% balance topics between worker/connections manually. +%% +%% NOTES: +%% * Local messages are all normalised to QoS-1 when exporting to remote + +-module(emqx_bridge). +-behaviour(gen_statem). + +%% APIs +-export([start_link/2, + import_batch/2, + handle_ack/2, + stop/1]). + +%% gen_statem callbacks +-export([terminate/3, code_change/4, init/1, callback_mode/0]). + +%% state functions +-export([standing_by/3, connecting/3, connected/3]). + +%% management APIs +-export([ensure_started/1, ensure_started/2, ensure_stopped/1, ensure_stopped/2, status/1]). +-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). +-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). + +-export_type([config/0, + batch/0, + ack_ref/0]). + +-type id() :: atom() | string() | pid(). +-type qos() :: emqx_mqtt_types:qos(). +-type config() :: map(). +-type batch() :: [emqx_bridge_msg:exp_msg()]. +-type ack_ref() :: term(). +-type topic() :: emqx_topic:topic(). + +-include("logger.hrl"). +-include("emqx_mqtt.hrl"). + +%% same as default in-flight limit for emqx_client +-define(DEFAULT_BATCH_COUNT, 32). +-define(DEFAULT_BATCH_BYTES, 1 bsl 20). +-define(DEFAULT_SEND_AHEAD, 8). +-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). +-define(DEFAULT_SEG_BYTES, (1 bsl 20)). +-define(maybe_send, {next_event, internal, maybe_send}). + +%% @doc Start a bridge worker. Supported configs: +%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay +%% at 'standing_by' state until a manual call to start it. +%% connect_module: The module which implements emqx_bridge_connect behaviour +%% and work as message batch transport layer +%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry +%% in case of transportation failure. +%% max_inflight_batches: Max number of batches allowed to send-ahead before +%% receiving confirmation from remote node/cluster +%% mountpoint: The topic mount point for messages sent to remote node/cluster +%% `undefined', `<<>>' or `""' to disable +%% forwards: Local topics to subscribe. +%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each +%% send call towards emqx_bridge_connect +%% queue.batch_count_limit: Max number of messages to collect in a batch for +%% each send call towards emqx_bridge_connect +%% queue.replayq_dir: Directory where replayq should persist messages +%% queue.replayq_seg_bytes: Size in bytes for each replayq segment file +%% +%% Find more connection specific configs in the callback modules +%% of emqx_bridge_connect behaviour. +start_link(Name, Config) when is_list(Config) -> + start_link(Name, maps:from_list(Config)); +start_link(Name, Config) -> + gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). + +%% @doc Manually start bridge worker. State idempotency ensured. +ensure_started(Name) -> + gen_statem:call(name(Name), ensure_started). + +ensure_started(Name, Config) -> + case start_link(Name, Config) of + {ok, Pid} -> {ok, Pid}; + {error, {already_started,Pid}} -> {ok, Pid} + end. + +%% @doc Manually stop bridge worker. State idempotency ensured. +ensure_stopped(Id) -> + ensure_stopped(Id, 1000). + +ensure_stopped(Id, Timeout) -> + Pid = case id(Id) of + P when is_pid(P) -> P; + N -> whereis(N) + end, + case Pid of + undefined -> + ok; + _ -> + MRef = monitor(process, Pid), + unlink(Pid), + _ = gen_statem:call(id(Id), ensure_stopped, Timeout), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + Timeout -> + exit(Pid, kill) + end + end. + +stop(Pid) -> gen_statem:stop(Pid). + +status(Pid) -> + gen_statem:call(Pid, status). + +%% @doc This function is to be evaluated on message/batch receiver side. +-spec import_batch(batch(), fun(() -> ok)) -> ok. +import_batch(Batch, AckFun) -> + lists:foreach(fun emqx_broker:publish/1, emqx_bridge_msg:to_broker_msgs(Batch)), + AckFun(). + +%% @doc This function is to be evaluated on message/batch exporter side +%% when message/batch is accepted by remote node. +-spec handle_ack(pid(), ack_ref()) -> ok. +handle_ack(Pid, Ref) when node() =:= node(Pid) -> + Pid ! {batch_ack, Ref}, + ok. + +%% @doc Return all forwards (local subscriptions). +-spec get_forwards(id()) -> [topic()]. +get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)). + +%% @doc Return all subscriptions (subscription over mqtt connection to remote broker). +-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. +get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). + +%% @doc Add a new forward (local topic subscription). +-spec ensure_forward_present(id(), topic()) -> ok. +ensure_forward_present(Id, Topic) -> + gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}). + +%% @doc Ensure a forward topic is deleted. +-spec ensure_forward_absent(id(), topic()) -> ok. +ensure_forward_absent(Id, Topic) -> + gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}). + +%% @doc Ensure subscribed to remote topic. +%% NOTE: only applicable when connection module is emqx_bridge_mqtt +%% return `{error, no_remote_subscription_support}' otherwise. +-spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}. +ensure_subscription_present(Id, Topic, QoS) -> + gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}). + +%% @doc Ensure unsubscribed from remote topic. +%% NOTE: only applicable when connection module is emqx_bridge_mqtt +-spec ensure_subscription_absent(id(), topic()) -> ok. +ensure_subscription_absent(Id, Topic) -> + gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}). + +callback_mode() -> [state_functions, state_enter]. + +%% @doc Config should be a map(). +init(Config) -> + erlang:process_flag(trap_exit, true), + Get = fun(K, D) -> maps:get(K, Config, D) end, + QCfg = maps:get(queue, Config, #{}), + GetQ = fun(K, D) -> maps:get(K, QCfg, D) end, + Dir = GetQ(replayq_dir, undefined), + QueueConfig = + case Dir =:= undefined orelse Dir =:= "" of + true -> #{mem_only => true}; + false -> #{dir => Dir, + seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES) + } + end, + Queue = replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1, + marshaller => fun msg_marshaller/1}), + Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]), + Subs = lists:keysort(1, lists:map(fun({T0, QoS}) -> + T = iolist_to_binary(T0), + true = emqx_topic:validate({filter, T}), + {T, QoS} + end, Get(subscriptions, []))), + ConnectModule = maps:get(connect_module, Config), + ConnectConfig = maps:without([connect_module, + queue, + reconnect_delay_ms, + max_inflight_batches, + mountpoint, + forwards + ], Config#{subscriptions => Subs}), + ConnectFun = fun(SubsX) -> emqx_bridge_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end, + {ok, standing_by, + #{connect_module => ConnectModule, + connect_fun => ConnectFun, + start_type => Get(start_type, manual), + reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS), + batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES), + batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT), + max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD), + mountpoint => format_mountpoint(Get(mountpoint, undefined)), + forwards => Topics, + subscriptions => Subs, + replayq => Queue, + inflight => [] + }}. + +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. + +terminate(_Reason, _StateName, #{replayq := Q} = State) -> + _ = disconnect(State), + _ = replayq:close(Q), + ok. + +%% @doc Standing by for manual start. +standing_by(enter, _, #{start_type := auto}) -> + Action = {state_timeout, 0, do_connect}, + {keep_state_and_data, Action}; +standing_by(enter, _, #{start_type := manual}) -> + keep_state_and_data; +standing_by({call, From}, ensure_started, State) -> + {next_state, connecting, State, + [{reply, From, ok}]}; +standing_by(state_timeout, do_connect, State) -> + {next_state, connecting, State}; +standing_by({call, From}, _Call, _State) -> + {keep_state_and_data, [{reply, From, {error,standing_by}}]}; +standing_by(info, Info, State) -> + ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), + {keep_state_and_data, State}; +standing_by(Type, Content, State) -> + common(standing_by, Type, Content, State). + +%% @doc Connecting state is a state with timeout. +%% After each timeout, it re-enters this state and start a retry until +%% successfuly connected to remote node/cluster. +connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> + Action = {state_timeout, Timeout, reconnect}, + {keep_state_and_data, Action}; +connecting(enter, _, #{reconnect_delay_ms := Timeout, + connect_fun := ConnectFun, + subscriptions := Subs, + forwards := Forwards + } = State) -> + ok = subscribe_local_topics(Forwards), + case ConnectFun(Subs) of + {ok, ConnRef, Conn} -> + ?LOG(info, "Bridge ~p connected", [name()]), + Action = {state_timeout, 0, connected}, + {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; + error -> + Action = {state_timeout, Timeout, reconnect}, + {keep_state_and_data, Action} + end; +connecting(state_timeout, connected, State) -> + {next_state, connected, State}; +connecting(state_timeout, reconnect, _State) -> + repeat_state_and_data; +connecting(info, {batch_ack, Ref}, State) -> + case do_ack(State, Ref) of + {ok, NewState} -> + {keep_state, NewState}; + _ -> + keep_state_and_data + end; +connecting(internal, maybe_send, _State) -> + keep_state_and_data; +connecting(info, {disconnected, _Ref, _Reason}, _State) -> + keep_state_and_data; +connecting(Type, Content, State) -> + common(connecting, Type, Content, State). + +%% @doc Send batches to remote node/cluster when in 'connected' state. +connected(enter, _OldState, #{inflight := Inflight} = State) -> + case retry_inflight(State#{inflight := []}, Inflight) of + {ok, NewState} -> + Action = {state_timeout, 0, success}, + {keep_state, NewState, Action}; + {error, NewState} -> + Action = {state_timeout, 0, failure}, + {keep_state, disconnect(NewState), Action} + end; +connected(state_timeout, failure, State) -> + {next_state, connecting, State}; +connected(state_timeout, success, State) -> + {keep_state, State, ?maybe_send}; +connected(internal, maybe_send, State) -> + case pop_and_send(State) of + {ok, NewState} -> + {keep_state, NewState}; + {error, NewState} -> + {next_state, connecting, disconnect(NewState)} + end; +connected(info, {disconnected, ConnRef, Reason}, + #{conn_ref := ConnRefCurrent, connection := Conn} = State) -> + case ConnRefCurrent =:= ConnRef of + true -> + ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Conn, Reason]), + {next_state, connecting, + State#{conn_ref := undefined, connection := undefined}}; + false -> + keep_state_and_data + end; +connected(info, {batch_ack, Ref}, State) -> + case do_ack(State, Ref) of + stale -> + keep_state_and_data; + bad_order -> + %% try re-connect then re-send + ?LOG(error, "Bad order ack received by bridge ~p", [name()]), + {next_state, connecting, disconnect(State)}; + {ok, NewState} -> + {keep_state, NewState, ?maybe_send} + end; +connected(Type, Content, State) -> + common(connected, Type, Content, State). + +%% Common handlers +common(StateName, {call, From}, status, _State) -> + {keep_state_and_data, [{reply, From, StateName}]}; +common(_StateName, {call, From}, ensure_started, _State) -> + {keep_state_and_data, [{reply, From, ok}]}; +common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> + {keep_state_and_data, [{reply, From, Forwards}]}; +common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> + {keep_state_and_data, [{reply, From, Subs}]}; +common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> + {Result, NewState} = ensure_present(What, Topic, State), + {keep_state, NewState, [{reply, From, Result}]}; +common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> + {Result, NewState} = ensure_absent(What, Topic, State), + {keep_state, NewState, [{reply, From, Result}]}; +common(_StateName, {call, From}, ensure_stopped, _State) -> + {stop_and_reply, {shutdown, manual}, + [{reply, From, ok}]}; +common(_StateName, info, {dispatch, _, Msg}, + #{replayq := Q} = State) -> + NewQ = replayq:append(Q, collect([Msg])), + {keep_state, State#{replayq => NewQ}, ?maybe_send}; +common(StateName, Type, Content, State) -> + ?LOG(info, "Bridge ~p discarded ~p type event at state ~p:\n~p", + [name(), Type, StateName, Content]), + {keep_state, State}. + +ensure_present(Key, Topic, State) -> + Topics = maps:get(Key, State), + case is_topic_present(Topic, Topics) of + true -> + {ok, State}; + false -> + R = do_ensure_present(Key, Topic, State), + {R, State#{Key := lists:usort([Topic | Topics])}} + end. + +ensure_absent(Key, Topic, State) -> + Topics = maps:get(Key, State), + case is_topic_present(Topic, Topics) of + true -> + R = do_ensure_absent(Key, Topic, State), + {R, State#{Key := ensure_topic_absent(Topic, Topics)}}; + false -> + {ok, State} + end. + +ensure_topic_absent(_Topic, []) -> []; +ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L); +ensure_topic_absent(Topic, L) -> lists:delete(Topic, L). + +is_topic_present({Topic, _QoS}, Topics) -> + is_topic_present(Topic, Topics); +is_topic_present(Topic, Topics) -> + lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). + +do_ensure_present(forwards, Topic, _) -> + ok = subscribe_local_topic(Topic); +do_ensure_present(subscriptions, {Topic, QoS}, + #{connect_module := ConnectModule, connection := Conn}) -> + case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of + true -> + _ = ConnectModule:ensure_subscribed(Conn, Topic, QoS), + ok; + false -> + {error, no_remote_subscription_support} + end. + +do_ensure_absent(forwards, Topic, _) -> + ok = emqx_broker:unsubscribe(Topic); +do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule, + connection := Conn}) -> + case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of + true -> ConnectModule:ensure_unsubscribed(Conn, Topic); + false -> {error, no_remote_subscription_support} + end. + +collect(Acc) -> + receive + {dispatch, _, Msg} -> + collect([Msg | Acc]) + after + 0 -> + lists:reverse(Acc) + end. + +%% Retry all inflight (previously sent but not acked) batches. +retry_inflight(State, []) -> {ok, State}; +retry_inflight(#{inflight := Inflight} = State, + [#{q_ack_ref := QAckRef, batch := Batch} | T] = Remain) -> + case do_send(State, QAckRef, Batch) of + {ok, NewState} -> + retry_inflight(NewState, T); + {error, Reason} -> + ?LOG(error, "Inflight retry failed\n~p", [Reason]), + {error, State#{inflight := Inflight ++ Remain}} + end. + +pop_and_send(#{inflight := Inflight, + max_inflight_batches := Max + } = State) when length(Inflight) >= Max -> + {ok, State}; +pop_and_send(#{replayq := Q, + batch_count_limit := CountLimit, + batch_bytes_limit := BytesLimit + } = State) -> + case replayq:is_empty(Q) of + true -> + {ok, State}; + false -> + Opts = #{count_limit => CountLimit, bytes_limit => BytesLimit}, + {Q1, QAckRef, Batch} = replayq:pop(Q, Opts), + do_send(State#{replayq := Q1}, QAckRef, Batch) + end. + +%% Assert non-empty batch because we have a is_empty check earlier. +do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> + case maybe_send(State, Batch) of + {ok, Ref} -> + %% this is a list of inflight BATCHes, not expecting it to be too long + NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, + send_ack_ref => Ref, + batch => Batch}], + {ok, State#{inflight := NewInflight}}; + {error, Reason} -> + ?LOG(info, "Batch produce failed\n~p", [Reason]), + {error, State} + end. + +do_ack(State = #{inflight := [#{send_ack_ref := Refx, q_ack_ref := QAckRef} | Rest], + replayq := Q}, Ref) when Refx =:= Ref -> + ok = replayq:ack(Q, QAckRef), + {ok, State#{inflight := Rest}}; +do_ack(#{inflight := Inflight}, Ref) -> + case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of + true -> bad_order; + false -> stale + end. + +subscribe_local_topics(Topics) -> lists:foreach(fun subscribe_local_topic/1, Topics). + +subscribe_local_topic(Topic0) -> + Topic = topic(Topic0), + try + emqx_topic:validate({filter, Topic}) + catch + error : Reason -> + erlang:error({bad_topic, Topic, Reason}) + end, + ok = emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}). + +topic(T) -> iolist_to_binary(T). + +disconnect(#{connection := Conn, + conn_ref := ConnRef, + connect_module := Module + } = State) when Conn =/= undefined -> + ok = Module:stop(ConnRef, Conn), + State#{conn_ref => undefined, + connection => undefined}; +disconnect(State) -> State. + +%% Called only when replayq needs to dump it to disk. +msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); +msg_marshaller(Msg) -> emqx_bridge_msg:to_binary(Msg). + +%% Return {ok, SendAckRef} or {error, Reason} +maybe_send(#{connect_module := Module, + connection := Connection, + mountpoint := Mountpoint + }, Batch) -> + Module:send(Connection, [emqx_bridge_msg:to_export(Module, Mountpoint, M) || M <- Batch]). + +format_mountpoint(undefined) -> + undefined; +format_mountpoint(Prefix) -> + binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + +name() -> {_, Name} = process_info(self(), registered_name), Name. + +name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). + +id(Pid) when is_pid(Pid) -> Pid; +id(Name) -> name(Name). diff --git a/src/bridge/emqx_bridge_mqtt.erl b/src/bridge/emqx_bridge_mqtt.erl new file mode 100644 index 000000000..590fbabb7 --- /dev/null +++ b/src/bridge/emqx_bridge_mqtt.erl @@ -0,0 +1,188 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol + +-module(emqx_bridge_mqtt). +-behaviour(emqx_bridge_connect). + +%% behaviour callbacks +-export([start/1, + send/2, + stop/2 + ]). + +%% optional behaviour callbacks +-export([ensure_subscribed/3, + ensure_unsubscribed/2 + ]). + +-include("emqx_mqtt.hrl"). + +-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}). + +%% Messages towards ack collector process +-define(RANGE(Min, Max), {Min, Max}). +-define(REF_IDS(Ref, Ids), {Ref, Ids}). +-define(SENT(RefIds), {sent, RefIds}). +-define(ACKED(AnyPktId), {acked, AnyPktId}). +-define(STOP(Ref), {stop, Ref}). + +start(Config = #{address := Address}) -> + Ref = make_ref(), + Parent = self(), + AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end), + Handlers = make_hdlr(Parent, AckCollector, Ref), + {Host, Port} = case string:tokens(Address, ":") of + [H] -> {H, 1883}; + [H, P] -> {H, list_to_integer(P)} + end, + ClientConfig = Config#{msg_handler => Handlers, + owner => AckCollector, + host => Host, + port => Port}, + case emqx_client:start_link(ClientConfig) of + {ok, Pid} -> + case emqx_client:connect(Pid) of + {ok, _} -> + try + subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])), + {ok, Ref, #{ack_collector => AckCollector, + client_pid => Pid}} + catch + throw : Reason -> + ok = stop(AckCollector, Pid), + {error, Reason} + end; + {error, Reason} -> + ok = stop(Ref, #{ack_collector => AckCollector, client_pid => Pid}), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> + safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000), + safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000), + ok. + +ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> + emqx_client:subscribe(Pid, Topic, QoS); +ensure_subscribed(_Conn, _Topic, _QoS) -> + %% return ok for now, next re-connect should should call start with new topic added to config + ok. + +ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> + emqx_client:unsubscribe(Pid, Topic); +ensure_unsubscribed(_, _) -> + %% return ok for now, next re-connect should should call start with this topic deleted from config + ok. + +safe_stop(Pid, StopF, Timeout) -> + MRef = monitor(process, Pid), + unlink(Pid), + try + StopF() + catch + _ : _ -> + ok + end, + receive + {'DOWN', MRef, _, _, _} -> + ok + after + Timeout -> + exit(Pid, kill) + end. + +send(Conn, Batch) -> + send(Conn, Batch, []). + +send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) -> + case emqx_client:publish(ClientPid, Msg) of + {ok, PktId} when Rest =:= [] -> + %% last one sent + Ref = make_ref(), + AckCollector ! ?SENT(?REF_IDS(Ref, lists:reverse([PktId | Acc]))), + {ok, Ref}; + {ok, PktId} -> + send(Conn, Rest, [PktId | Acc]); + {error, {_PacketId, inflight_full}} -> + timer:sleep(10), + send(Conn, Batch, Acc); + {error, Reason} -> + %% NOTE: There is no partial sucess of a batch and recover from the middle + %% only to retry all messages in one batch + {error, Reason} + end. + +ack_collector(Parent, ConnRef) -> + ack_collector(Parent, ConnRef, queue:new(), []). + +ack_collector(Parent, ConnRef, Acked, Sent) -> + {NewAcked, NewSent} = + receive + ?STOP(ConnRef) -> + exit(normal); + ?ACKED(PktId) -> + match_acks(Parent, queue:in(PktId, Acked), Sent); + ?SENT(RefIds) -> + %% this message only happens per-batch, hence ++ is ok + match_acks(Parent, Acked, Sent ++ [RefIds]) + after + 200 -> + {Acked, Sent} + end, + ack_collector(Parent, ConnRef, NewAcked, NewSent). + +match_acks(_Parent, Acked, []) -> {Acked, []}; +match_acks(Parent, Acked, Sent) -> + match_acks_1(Parent, queue:out(Acked), Sent). + +match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent}; +match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) -> + %% batch finished + ok = emqx_bridge:handle_ack(Parent, Ref), + match_acks(Parent, Acked, Sent); +match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) -> + %% one message finished, but not the whole batch + match_acks(Parent, Acked, [?REF_IDS(Ref, RestIds) | Sent]). + + +%% When puback for QoS-1 message is received from remote MQTT broker +%% NOTE: no support for QoS-2 +handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) -> + RC =:= ?RC_SUCCESS orelse error({puback_error_code, RC}), + AckCollector ! ?ACKED(PktId), + ok. + +%% Message published from remote broker. Import to local broker. +import_msg(Msg) -> + %% auto-ack should be enabled in emqx_client, hence dummy ack-fun. + emqx_bridge:import_batch([Msg], _AckFun = fun() -> ok end). + +make_hdlr(Parent, AckCollector, Ref) -> + #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end, + publish => fun(Msg) -> import_msg(Msg) end, + disconnected => fun(Reason) -> Parent ! {disconnected, Ref, Reason}, ok end + }. + +subscribe_remote_topics(ClientPid, Subscriptions) -> + lists:foreach(fun({Topic, Qos}) -> + case emqx_client:subscribe(ClientPid, Topic, Qos) of + {ok, _, _} -> ok; + Error -> throw(Error) + end + end, Subscriptions). diff --git a/src/bridge/emqx_bridge_msg.erl b/src/bridge/emqx_bridge_msg.erl new file mode 100644 index 000000000..6633027f9 --- /dev/null +++ b/src/bridge/emqx_bridge_msg.erl @@ -0,0 +1,84 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_msg). + +-export([ to_binary/1 + , from_binary/1 + , to_export/3 + , to_broker_msgs/1 + , estimate_size/1 + ]). + +-export_type([msg/0]). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). +-include("emqx_client.hrl"). + +-type msg() :: emqx_types:message(). +-type exp_msg() :: emqx_types:message() | #mqtt_msg{}. + +%% @doc Make export format: +%% 1. Mount topic to a prefix +%% 2. Fix QoS to 1 +%% @end +%% Shame that we have to know the callback module here +%% would be great if we can get rid of #mqtt_msg{} record +%% and use #message{} in all places. +-spec to_export(emqx_bridge_rpc | emqx_bridge_mqtt, + undefined | binary(), msg()) -> exp_msg(). +to_export(emqx_bridge_mqtt, Mountpoint, + #message{topic = Topic, + payload = Payload, + flags = Flags + }) -> + Retain = maps:get(retain, Flags, false), + #mqtt_msg{qos = ?QOS_1, + retain = Retain, + topic = topic(Mountpoint, Topic), + payload = Payload}; +to_export(_Module, Mountpoint, + #message{topic = Topic} = Msg) -> + Msg#message{topic = topic(Mountpoint, Topic), qos = 1}. + +%% @doc Make `binary()' in order to make iodata to be persisted on disk. +-spec to_binary(msg()) -> binary(). +to_binary(Msg) -> term_to_binary(Msg). + +%% @doc Unmarshal binary into `msg()'. +-spec from_binary(binary()) -> msg(). +from_binary(Bin) -> binary_to_term(Bin). + +%% @doc Estimate the size of a message. +%% Count only the topic length + payload size +-spec estimate_size(msg()) -> integer(). +estimate_size(#message{topic = Topic, payload = Payload}) -> + size(Topic) + size(Payload). + +%% @doc By message/batch receiver, transform received batch into +%% messages to dispatch to local brokers. +to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch). + +to_broker_msg(#message{} = Msg) -> + %% internal format from another EMQX node via rpc + Msg; +to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic, + properties := Props, payload := Payload}) -> + %% published from remote node over a MQTT connection + emqx_message:set_headers(Props, + emqx_message:set_flags(#{dup => Dup, retain => Retain}, + emqx_message:make(bridge, QoS, Topic, Payload))). + +topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic). diff --git a/src/bridge/emqx_bridge_rpc.erl b/src/bridge/emqx_bridge_rpc.erl new file mode 100644 index 000000000..b818d65da --- /dev/null +++ b/src/bridge/emqx_bridge_rpc.erl @@ -0,0 +1,105 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements EMQX Bridge transport layer based on gen_rpc. + +-module(emqx_bridge_rpc). +-behaviour(emqx_bridge_connect). + +%% behaviour callbacks +-export([start/1, + send/2, + stop/2 + ]). + +%% Internal exports +-export([ handle_send/2 + , handle_ack/2 + , heartbeat/2 + ]). + +-type ack_ref() :: emqx_bridge:ack_ref(). +-type batch() :: emqx_bridge:batch(). + +-define(HEARTBEAT_INTERVAL, timer:seconds(1)). + +-define(RPC, gen_rpc). + +start(#{address := Remote}) -> + case poke(Remote) of + ok -> + Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), Remote]), + {ok, Pid, Remote}; + Error -> + Error + end. + +stop(Pid, _Remote) when is_pid(Pid) -> + Ref = erlang:monitor(process, Pid), + unlink(Pid), + Pid ! stop, + receive + {'DOWN', Ref, process, Pid, _Reason} -> + ok + after + 1000 -> + exit(Pid, kill) + end, + ok. + +%% @doc Callback for `emqx_bridge_connect' behaviour +-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. +send(Remote, Batch) -> + Sender = self(), + case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of + {ok, Ref} -> {ok, Ref}; + {badrpc, Reason} -> {error, Reason} + end. + +%% @doc Handle send on receiver side. +-spec handle_send(pid(), batch()) -> {ok, ack_ref()} | {error, any()}. +handle_send(SenderPid, Batch) -> + SenderNode = node(SenderPid), + Ref = make_ref(), + AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end, + case emqx_bridge:import_batch(Batch, AckFun) of + ok -> {ok, Ref}; + Error -> Error + end. + +%% @doc Handle batch ack in sender node. +handle_ack(SenderPid, Ref) -> + ok = emqx_bridge:handle_ack(SenderPid, Ref). + +%% @hidden Heartbeat loop +heartbeat(Parent, RemoteNode) -> + Interval = ?HEARTBEAT_INTERVAL, + receive + stop -> exit(normal) + after + Interval -> + case poke(RemoteNode) of + ok -> + ?MODULE:heartbeat(Parent, RemoteNode); + {error, Reason} -> + Parent ! {disconnected, self(), Reason}, + exit(normal) + end + end. + +poke(Node) -> + case ?RPC:call(Node, erlang, node, []) of + Node -> ok; + {badrpc, Reason} -> {error, Reason} + end. diff --git a/src/bridge/emqx_bridge_sup.erl b/src/bridge/emqx_bridge_sup.erl new file mode 100644 index 000000000..bcacb411c --- /dev/null +++ b/src/bridge/emqx_bridge_sup.erl @@ -0,0 +1,62 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_sup). +-behavior(supervisor). + +-include("logger.hrl"). + +-export([start_link/0, start_link/1, bridges/0]). +-export([create_bridge/2, drop_bridge/1]). +-export([init/1]). + +-define(SUP, ?MODULE). +-define(WORKER_SUP, emqx_bridge_worker_sup). + +start_link() -> start_link(?SUP). + +start_link(Name) -> + supervisor:start_link({local, Name}, ?MODULE, Name). + +init(?SUP) -> + BridgesConf = emqx_config:get_env(bridges, []), + BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf), + SupFlag = #{strategy => one_for_one, + intensity => 100, + period => 10}, + {ok, {SupFlag, BridgeSpec}}. + +bridge_spec({Name, Config}) -> + #{id => Name, + start => {emqx_bridge, start_link, [Name, Config]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_bridge]}. + +-spec(bridges() -> [{node(), map()}]). +bridges() -> + [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. + +create_bridge(Id, Config) -> + supervisor:start_child(?SUP, bridge_spec({Id, Config})). + +drop_bridge(Id) -> + case supervisor:terminate_child(?SUP, Id) of + ok -> + supervisor:delete_child(?SUP, Id); + Error -> + ?LOG(error, "[Bridge] Delete bridge failed", [Error]), + Error + end. diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl deleted file mode 100644 index 1ee5612e6..000000000 --- a/src/emqx_bridge.erl +++ /dev/null @@ -1,463 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge). - --behaviour(gen_server). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - --import(proplists, [get_value/2, get_value/3]). - --export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). - --export([show_forwards/1, add_forward/2, del_forward/2]). - --export([show_subscriptions/1, add_subscription/3, del_subscription/2]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --record(state, {client_pid :: pid(), - options :: list(), - reconnect_interval :: pos_integer(), - mountpoint :: binary(), - readq :: list(), - writeq :: list(), - replayq :: map(), - ackref :: replayq:ack_ref(), - queue_option :: map(), - forwards :: list(), - subscriptions :: list()}). - --record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, - packet_id, topic, props, payload}). - -start_link(Name, Options) -> - gen_server:start_link({local, name(Name)}, ?MODULE, [Options], []). - -start_bridge(Name) -> - gen_server:call(name(Name), start_bridge). - -stop_bridge(Name) -> - gen_server:call(name(Name), stop_bridge). - --spec(show_forwards(atom()) -> list()). -show_forwards(Name) -> - gen_server:call(name(Name), show_forwards). - --spec(add_forward(atom(), binary()) -> ok | {error, already_exists | validate_fail}). -add_forward(Name, Topic) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {add_forward, Topic}) - catch - _Error:_Reason -> - {error, validate_fail} - end. - --spec(del_forward(atom(), binary()) -> ok | {error, validate_fail}). -del_forward(Name, Topic) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {del_forward, Topic}) - catch - _Error:_Reason -> - {error, validate_fail} - end. - --spec(show_subscriptions(atom()) -> list()). -show_subscriptions(Name) -> - gen_server:call(name(Name), show_subscriptions). - --spec(add_subscription(atom(), binary(), integer()) -> ok | {error, already_exists | validate_fail}). -add_subscription(Name, Topic, QoS) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {add_subscription, Topic, QoS}) - catch - _Error:_Reason -> - {error, validate_fail} - end. - --spec(del_subscription(atom(), binary()) -> ok | {error, validate_fail}). -del_subscription(Name, Topic) -> - try emqx_topic:validate({filter, Topic}) of - true -> - gen_server:call(name(Name), {del_subscription, Topic}) - catch - error:_Reason -> - {error, validate_fail} - end. - -status(Pid) -> - gen_server:call(Pid, status). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Options]) -> - process_flag(trap_exit, true), - case get_value(start_type, Options, manual) of - manual -> ok; - auto -> erlang:send_after(1000, self(), start) - end, - ReconnectInterval = get_value(reconnect_interval, Options, 30000), - Mountpoint = format_mountpoint(get_value(mountpoint, Options)), - QueueOptions = get_value(queue, Options), - {ok, #state{mountpoint = Mountpoint, - queue_option = QueueOptions, - readq = [], - writeq = [], - options = Options, - reconnect_interval = ReconnectInterval}}. - -handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> - {Msg, NewState} = bridge(start, State), - {reply, #{msg => Msg}, NewState}; - -handle_call(start_bridge, _From, State) -> - {reply, #{msg => <<"bridge already started">>}, State}; - -handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> - {reply, #{msg => <<"bridge not started">>}, State}; - -handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> - emqx_client:disconnect(Pid), - {reply, #{msg => <<"stop bridge successfully">>}, State}; - -handle_call(status, _From, State = #state{client_pid = undefined}) -> - {reply, #{status => <<"Stopped">>}, State}; -handle_call(status, _From, State = #state{client_pid = _Pid})-> - {reply, #{status => <<"Running">>}, State}; - -handle_call(show_forwards, _From, State = #state{forwards = Forwards}) -> - {reply, Forwards, State}; - -handle_call({add_forward, Topic}, _From, State = #state{forwards = Forwards}) -> - case not lists:member(Topic, Forwards) of - true -> - emqx_broker:subscribe(Topic), - {reply, ok, State#state{forwards = [Topic | Forwards]}}; - false -> - {reply, {error, already_exists}, State} - end; - -handle_call({del_forward, Topic}, _From, State = #state{forwards = Forwards}) -> - case lists:member(Topic, Forwards) of - true -> - emqx_broker:unsubscribe(Topic), - {reply, ok, State#state{forwards = lists:delete(Topic, Forwards)}}; - false -> - {reply, ok, State} - end; - -handle_call(show_subscriptions, _From, State = #state{subscriptions = Subscriptions}) -> - {reply, Subscriptions, State}; - -handle_call({add_subscription, Topic, Qos}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> - case not lists:keymember(Topic, 1, Subscriptions) of - true -> - emqx_client:subscribe(ClientPid, {Topic, Qos}), - {reply, ok, State#state{subscriptions = [{Topic, Qos} | Subscriptions]}}; - false -> - {reply, {error, already_exists}, State} - end; - -handle_call({del_subscription, Topic}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> - case lists:keymember(Topic, 1, Subscriptions) of - true -> - emqx_client:unsubscribe(ClientPid, Topic), - {reply, ok, State#state{subscriptions = lists:keydelete(Topic, 1, Subscriptions)}}; - false -> - {reply, ok, State} - end; - -handle_call(Req, _From, State) -> - emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), - {noreply, State}. - -%%---------------------------------------------------------------- -%% Start or restart bridge -%%---------------------------------------------------------------- -handle_info(start, State) -> - {_Msg, NewState} = bridge(start, State), - {noreply, NewState}; - -handle_info(restart, State) -> - {_Msg, NewState} = bridge(restart, State), - {noreply, NewState}; - -%%---------------------------------------------------------------- -%% pop message from replayq and publish again -%%---------------------------------------------------------------- -handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ, - queue_option = #{batch_size := BatchSize}}) -> - {NewReplayQ, AckRef, NewReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), - {NewReadQ1, NewWriteQ} = case NewReadQ of - [] -> {WriteQ, []}; - _ -> {NewReadQ, WriteQ} - end, - self() ! replay, - {noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}}; - -handle_info(dump, State = #state{writeq = WriteQ, replayq = ReplayQ}) -> - NewReplayQueue = replayq:append(ReplayQ, lists:reverse(WriteQ)), - {noreply, State#state{replayq = NewReplayQueue, writeq = []}}; - -%%---------------------------------------------------------------- -%% replay message from replayq -%%---------------------------------------------------------------- -handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) -> - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {noreply, State#state{readq = NewReadQ}}; - -%%---------------------------------------------------------------- -%% received local node message -%%---------------------------------------------------------------- -handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = undefined, - mountpoint = Mountpoint}) - when QoS =< 1 -> - Msg = #mqtt_msg{qos = 1, - retain = Retain, - topic = mountpoint(Mountpoint, Topic), - payload = Payload}, - {noreply, en_writeq({undefined, Msg}, State)}; -handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = Pid, - mountpoint = Mountpoint}) - when QoS =< 1 -> - Msg = #mqtt_msg{qos = 1, - retain = Retain, - topic = mountpoint(Mountpoint, Topic), - payload = Payload}, - case emqx_client:publish(Pid, Msg) of - {ok, PktId} -> - {noreply, en_writeq({PktId, Msg}, State)}; - {error, {PktId, Reason}} -> - emqx_logger:error("[Bridge] Publish fail:~p", [Reason]), - {noreply, en_writeq({PktId, Msg}, State)} - end; - -%%---------------------------------------------------------------- -%% received remote node message -%%---------------------------------------------------------------- -handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, - properties := Props, payload := Payload}}, State) -> - NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), - NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, NewMsg0)), - emqx_broker:publish(NewMsg1), - {noreply, State}; - -%%---------------------------------------------------------------- -%% received remote puback message -%%---------------------------------------------------------------- -handle_info({puback, #{packet_id := PktId}}, State) -> - {noreply, delete(PktId, State)}; - -handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> - emqx_logger:warning("[Bridge] stop ~p", [normal]), - self() ! dump, - {noreply, State#state{client_pid = undefined}}; - -handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, - reconnect_interval = ReconnectInterval}) -> - emqx_logger:error("[Bridge] stop ~p", [Reason]), - self() ! dump, - erlang:send_after(ReconnectInterval, self(), restart), - {noreply, State#state{client_pid = undefined}}; - -handle_info(Info, State) -> - emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{}) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -subscribe_remote_topics(ClientPid, Subscriptions) -> - [begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end - || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})]. - -subscribe_local_topics(Options) -> - Topics = get_value(forwards, Options, []), - Subid = get_value(client_id, Options, <<"bridge">>), - [begin emqx_broker:subscribe(bin(Topic), #{qos => 1, subid => Subid}), bin(Topic) end - || Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})]. - -proto_ver(mqttv3) -> v3; -proto_ver(mqttv4) -> v4; -proto_ver(mqttv5) -> v5. -address(Address) -> - case string:tokens(Address, ":") of - [Host] -> {Host, 1883}; - [Host, Port] -> {Host, list_to_integer(Port)} - end. -options(Options) -> - options(Options, []). -options([], Acc) -> - Acc; -options([{username, Username}| Options], Acc) -> - options(Options, [{username, Username}|Acc]); -options([{proto_ver, ProtoVer}| Options], Acc) -> - options(Options, [{proto_ver, proto_ver(ProtoVer)}|Acc]); -options([{password, Password}| Options], Acc) -> - options(Options, [{password, Password}|Acc]); -options([{keepalive, Keepalive}| Options], Acc) -> - options(Options, [{keepalive, Keepalive}|Acc]); -options([{client_id, ClientId}| Options], Acc) -> - options(Options, [{client_id, ClientId}|Acc]); -options([{clean_start, CleanStart}| Options], Acc) -> - options(Options, [{clean_start, CleanStart}|Acc]); -options([{address, Address}| Options], Acc) -> - {Host, Port} = address(Address), - options(Options, [{host, Host}, {port, Port}|Acc]); -options([{ssl, Ssl}| Options], Acc) -> - options(Options, [{ssl, Ssl}|Acc]); -options([{ssl_opts, SslOpts}| Options], Acc) -> - options(Options, [{ssl_opts, SslOpts}|Acc]); -options([_Option | Options], Acc) -> - options(Options, Acc). - -name(Id) -> - list_to_atom(lists:concat([?MODULE, "_", Id])). - -bin(L) -> iolist_to_binary(L). - -mountpoint(undefined, Topic) -> - Topic; -mountpoint(Prefix, Topic) -> - <>. - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - -en_writeq(Msg, State = #state{replayq = ReplayQ, - queue_option = #{mem_cache := false}}) -> - NewReplayQ = replayq:append(ReplayQ, [Msg]), - State#state{replayq = NewReplayQ}; -en_writeq(Msg, State = #state{writeq = WriteQ, - queue_option = #{batch_size := BatchSize, - mem_cache := true}}) - when length(WriteQ) < BatchSize-> - State#state{writeq = [Msg | WriteQ]} ; -en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ, - queue_option = #{mem_cache := true}}) -> - NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)), - State#state{writeq = [Msg], replayq = NewReplayQ}. - -publish_readq_msg(_ClientPid, [], NewReadQ) -> - {ok, NewReadQ}; -publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) -> - {ok, PktId} = emqx_client:publish(ClientPid, Msg), - publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]). - -delete(PktId, State = #state{ replayq = ReplayQ, - readq = [], - queue_option = #{ mem_cache := false}}) -> - {NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}), - logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]), - ok = replayq:ack(NewReplayQ, NewAckRef), - case Msgs of - [{PktId, _Msg}] -> - self() ! pop, - State#state{ replayq = NewReplayQ, ackref = NewAckRef }; - [{_PktId, _Msg}] -> - NewReplayQ1 = replayq:append(NewReplayQ, Msgs), - self() ! pop, - State#state{ replayq = NewReplayQ1, ackref = NewAckRef }; - _Empty -> - State#state{ replayq = NewReplayQ, ackref = NewAckRef} - end; -delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> - ok = replayq:ack(ReplayQ, AckRef), - self() ! pop, - State; - -delete(PktId, State = #state{readq = [], writeq = WriteQ}) -> - State#state{writeq = lists:keydelete(PktId, 1, WriteQ)}; - -delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) -> - NewReadQ = lists:keydelete(PktId, 1, ReadQ), - case NewReadQ of - [] -> - ok = replayq:ack(ReplayQ, AckRef), - self() ! pop; - _NewReadQ -> - ok - end, - State#state{ readq = NewReadQ }. - -bridge(Action, State = #state{options = Options, - replayq = ReplayQ, - queue_option - = QueueOption - = #{batch_size := BatchSize}}) - when BatchSize > 0 -> - case emqx_client:start_link([{owner, self()} | options(Options)]) of - {ok, ClientPid} -> - case emqx_client:connect(ClientPid) of - {ok, _} -> - emqx_logger:info("[Bridge] connected to remote successfully"), - Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), - Forwards = subscribe_local_topics(Options), - {NewReplayQ, AckRef, ReadQ} = open_replayq(ReplayQ, QueueOption), - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {<<"start bridge successfully">>, - State#state{client_pid = ClientPid, - subscriptions = Subs, - readq = NewReadQ, - replayq = NewReplayQ, - ackref = AckRef, - forwards = Forwards}}; - {error, Reason} -> - emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), - {<<"connect to remote failed">>, - State#state{client_pid = ClientPid}} - end; - {error, Reason} -> - emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]), - {<<"start bridge failed">>, State} - end; -bridge(Action, State) -> - emqx_logger:error("[Bridge] ~p failed! error: batch_size should greater than zero", [Action]), - {<<"Open Replayq failed">>, State}. - -open_replayq(undefined, #{batch_size := BatchSize, - replayq_dir := ReplayqDir, - replayq_seg_bytes := ReplayqSegBytes}) -> - ReplayQ = replayq:open(#{dir => ReplayqDir, - seg_bytes => ReplayqSegBytes, - sizer => fun(Term) -> - size(term_to_binary(Term)) - end, - marshaller => fun({PktId, Msg}) -> - term_to_binary({PktId, Msg}); - (Bin) -> - binary_to_term(Bin) - end}), - replayq:pop(ReplayQ, #{count_limit => BatchSize}); -open_replayq(ReplayQ, #{batch_size := BatchSize}) -> - replayq:pop(ReplayQ, #{count_limit => BatchSize}). diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl new file mode 100644 index 000000000..b2781cc2c --- /dev/null +++ b/src/emqx_bridge_connect.erl @@ -0,0 +1,71 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_connect). + +-export([start/2]). + +-export_type([config/0, connection/0]). + +-optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]). + +%% map fields depend on implementation +-type config() :: map(). +-type connection() :: term(). +-type conn_ref() :: term(). +-type batch() :: emqx_protal:batch(). +-type ack_ref() :: emqx_bridge:ack_ref(). +-type topic() :: emqx_topic:topic(). +-type qos() :: emqx_mqtt_types:qos(). + +-include("logger.hrl"). + +%% establish the connection to remote node/cluster +%% protal worker (the caller process) should be expecting +%% a message {disconnected, conn_ref()} when disconnected. +-callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}. + +%% send to remote node/cluster +%% bridge worker (the caller process) should be expecting +%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster +-callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}. + +%% called when owner is shutting down. +-callback stop(conn_ref(), connection()) -> ok. + +-callback ensure_subscribed(connection(), topic(), qos()) -> ok. + +-callback ensure_unsubscribed(connection(), topic()) -> ok. + +start(Module, Config) -> + case Module:start(Config) of + {ok, Ref, Conn} -> + {ok, Ref, Conn}; + {error, Reason} -> + Config1 = obfuscate(Config), + ?LOG(error, "Failed to connect with module=~p\n" + "config=~p\nreason:~p", [Module, Config1, Reason]), + error + end. + +obfuscate(Map) -> + maps:fold(fun(K, V, Acc) -> + case is_sensitive(K) of + true -> [{K, '***'} | Acc]; + false -> [{K, V} | Acc] + end + end, [], Map). + +is_sensitive(password) -> true; +is_sensitive(_) -> false. diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl deleted file mode 100644 index baa857074..000000000 --- a/src/emqx_bridge_sup.erl +++ /dev/null @@ -1,45 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_sup). - --behavior(supervisor). - --include("emqx.hrl"). - --export([start_link/0, bridges/0]). - -%% Supervisor callbacks --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{node(), map()}]). -bridges() -> - [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. - -init([]) -> - BridgesOpts = emqx_config:get_env(bridges, []), - Bridges = [spec(Opts)|| Opts <- BridgesOpts], - {ok, {{one_for_one, 10, 100}, Bridges}}. - -spec({Id, Options})-> - #{id => Id, - start => {emqx_bridge, start_link, [Id, Options]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_bridge]}. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 7ebd40769..e29e50552 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -18,6 +18,7 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_client.hrl"). -export([start_link/0, start_link/1]). -export([request/5, request/6, request_async/7, receive_response/3]). @@ -42,7 +43,7 @@ -export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, request_input/0, response_payload/0, request_handler/0, - corr_data/0]). + corr_data/0, mqtt_msg/0]). -export_type([host/0, option/0]). @@ -58,7 +59,7 @@ -define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)). --define(NO_HANDLER, undefined). +-define(NO_REQ_HANDLER, undefined). -define(NO_GROUP, <<>>). @@ -66,10 +67,23 @@ -type(host() :: inet:ip_address() | inet:hostname()). --type corr_data() :: binary(). +-type(corr_data() :: binary()). + +%% NOTE: Message handler is different from request handler. +%% Message handler is a set of callbacks defined to handle MQTT messages as well as +%% the disconnect event. +%% Request handler is a callback to handle received MQTT message as in 'request', +%% and publish another MQTT message back to the defined topic as in 'response'. +%% `owner' and `msg_handler' has no effect when `request_handler' is set. +-define(NO_MSG_HDLR, undefined). +-type(msg_handler() :: #{puback := fun((_) -> any()), + publish := fun((emqx_types:message()) -> any()), + disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) + }). -type(option() :: {name, atom()} | {owner, pid()} + | {msg_handler, msg_handler()} | {host, host()} | {hosts, [{host(), inet:port_number()}]} | {port, inet:port_number()} @@ -97,13 +111,11 @@ | {force_ping, boolean()} | {properties, properties()}). --record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, - packet_id, topic, props, payload}). - -type(mqtt_msg() :: #mqtt_msg{}). -record(state, {name :: atom(), owner :: pid(), + msg_handler :: ?NO_MSG_HDLR | msg_handler(), host :: host(), port :: inet:port_number(), hosts :: [{host(), inet:port_number()}], @@ -378,7 +390,7 @@ publish(Client, Topic, Properties, Payload, Opts) payload = iolist_to_binary(Payload)}). -spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}). -publish(Client, Msg) when is_record(Msg, mqtt_msg) -> +publish(Client, Msg) -> gen_statem:call(Client, {publish, Msg}). -spec(unsubscribe(client(), topic() | [topic()]) -> subscribe_ret()). @@ -499,7 +511,7 @@ init([Options]) -> auto_ack = true, ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, - request_handler = ?NO_HANDLER, + request_handler = ?NO_REQ_HANDLER, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, last_packet_id = 1}), {ok, initialized, init_parse_state(State)}. @@ -518,6 +530,8 @@ init([{name, Name} | Opts], State) -> init([{owner, Owner} | Opts], State) when is_pid(Owner) -> link(Owner), init(Opts, State#state{owner = Owner}); +init([{msg_handler, Hdlr} | Opts], State) -> + init(Opts, State#state{msg_handler = Hdlr}); init([{host, Host} | Opts], State) -> init(Opts, State#state{host = Host}); init([{port, Port} | Opts], State) -> @@ -742,9 +756,6 @@ connected({call, From}, pause, State) -> connected({call, From}, resume, State) -> {keep_state, State#state{paused = false}, [{reply, From, ok}]}; -connected({call, From}, stop, _State) -> - {stop_and_reply, normal, [{reply, From, ok}]}; - connected({call, From}, get_properties, State = #state{properties = Properties}) -> {keep_state, State, [{reply, From, Properties}]}; @@ -779,20 +790,20 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) -> connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, State = #state{inflight = Inflight, last_packet_id = PacketId}) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> - case emqx_inflight:is_full(Inflight) of - true -> - {keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]}; - false -> - Msg1 = Msg#mqtt_msg{packet_id = PacketId}, - case send(Msg1, State) of - {ok, NewState} -> - Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), - {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), - [{reply, From, {ok, PacketId}}]}; - {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} - end - end; + case emqx_inflight:is_full(Inflight) of + true -> + {keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]}; + false -> + Msg1 = Msg#mqtt_msg{packet_id = PacketId}, + case send(Msg1, State) of + {ok, NewState} -> + Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), + {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), + [{reply, From, {ok, PacketId}}]}; + {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} + end + end; connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, State = #state{last_packet_id = PacketId}) -> @@ -859,12 +870,12 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> publish_process(?QOS_2, Packet, State); connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), - State = #state{owner = Owner, inflight = Inflight}) -> + State = #state{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> - Owner ! {puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}}, + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; none -> emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), @@ -901,12 +912,12 @@ connected(cast, ?PUBREL_PACKET(PacketId), end; connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), - State = #state{owner = Owner, inflight = Inflight}) -> + State = #state{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {pubrel, _PacketId, _Ts}} -> - Owner ! {puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}}, + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; none -> emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), @@ -945,10 +956,8 @@ connected(cast, ?PACKET(?PINGRESP), State) -> false -> {keep_state, State} end; -connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), - State = #state{owner = Owner}) -> - Owner ! {disconnected, ReasonCode, Properties}, - {stop, disconnected, State}; +connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> + {stop, {disconnected, ReasonCode, Properties}, State}; connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) -> case send(?PACKET(?PINGREQ), State) of @@ -998,6 +1007,8 @@ should_ping(Sock) -> Error end. +handle_event({call, From}, stop, _StateName, _State) -> + {stop_and_reply, normal, [{reply, From, ok}]}; handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> emqx_logger:debug("RECV Data: ~p", [Data]), @@ -1030,10 +1041,18 @@ handle_event(EventType, EventContent, StateName, StateData) -> {keep_state, StateData}. %% Mandatory callback functions -terminate(_Reason, _State, #state{socket = undefined}) -> - ok; -terminate(_Reason, _State, #state{socket = Socket}) -> - emqx_client_sock:close(Socket). +terminate(Reason, _StateName, State = #state{socket = Socket}) -> + case Reason of + {disconnected, ReasonCode, Properties} -> + %% backward compatible + ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}); + _ -> + ok = eval_msg_handler(State, disconnected, Reason) + end, + case Socket =:= undefined of + true -> ok; + _ -> emqx_client_sock:close(Socket) + end. code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. @@ -1103,8 +1122,8 @@ assign_id(?NO_CLIENT_ID, Props) -> assign_id(Id, _Props) -> Id. -publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #state{auto_ack = AutoAck}) -> - _ = deliver(packet_to_msg(Packet), State), +publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) -> + State = deliver(packet_to_msg(Packet), State0), case AutoAck of true -> send_puback(?PUBACK_PACKET(PacketId), State); false -> {keep_state, State} @@ -1118,18 +1137,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), Stop -> Stop end. -response_publish(undefined, State, _QoS, _Payload) -> - State; -response_publish(Properties, State = #state{request_handler = RequestHandler}, QoS, Payload) -> - case maps:find('Response-Topic', Properties) of - {ok, ResponseTopic} -> - case RequestHandler of - ?NO_HANDLER -> State; - _ -> do_publish(ResponseTopic, Properties, State, QoS, Payload) - end; - _ -> - State - end. +response_publish(#{'Response-Topic' := ResponseTopic} = Properties, + State = #state{request_handler = RequestHandler}, QoS, Payload) + when RequestHandler =/= ?NO_REQ_HANDLER -> + do_publish(ResponseTopic, Properties, State, QoS, Payload); +response_publish(_Properties, State, _QoS, _Payload) -> State. do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) -> Msg = #mqtt_msg{qos = ?QOS_0, @@ -1253,19 +1265,37 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> Error end. +deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER -> + %% message has been terminated by request handler, hence should not continue processing + State; deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, - State = #state{owner = Owner, request_handler = RequestHandler}) -> - case RequestHandler of - ?NO_HANDLER -> - Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, - topic => Topic, properties => Props, payload => Payload, - client_pid => self()}}; - _ -> - ok - end, + State) -> + Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, + topic => Topic, properties => Props, payload => Payload, + client_pid => self()}, + ok = eval_msg_handler(State, publish, Msg), State. +eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, + owner = Owner}, + disconnected, {ReasonCode, Properties}) -> + %% Special handling for disconnected message when there is no handler callback + Owner ! {disconnected, ReasonCode, Properties}, + ok; +eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER}, + disconnected, _OtherReason) -> + %% do nothing to be backward compatible + ok; +eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, + owner = Owner}, Kind, Msg) -> + Owner ! {Kind, Msg}, + ok; +eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) -> + F = maps:get(Kind, Handler), + _ = F(Msg), + ok. + packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, @@ -1319,9 +1349,9 @@ send(Msg, State) when is_record(Msg, mqtt_msg) -> send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> Data = emqx_frame:serialize(Packet, #{version => Ver}), - emqx_logger:debug("SEND Data: ~p", [Data]), + emqx_logger:debug("SEND Data: ~1000p", [Packet]), case emqx_client_sock:send(Sock, Data) of - ok -> {ok, next_packet_id(State)}; + ok -> {ok, bump_last_packet_id(State)}; Error -> Error end. @@ -1355,10 +1385,11 @@ next_events(Packets) -> [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. %%------------------------------------------------------------------------------ -%% Next packet id +%% packet_id generation -next_packet_id(State = #state{last_packet_id = 16#ffff}) -> - State#state{last_packet_id = 1}; +bump_last_packet_id(State = #state{last_packet_id = Id}) -> + State#state{last_packet_id = next_packet_id(Id)}. -next_packet_id(State = #state{last_packet_id = Id}) -> - State#state{last_packet_id = Id + 1}. +-spec next_packet_id(packet_id()) -> packet_id(). +next_packet_id(?MAX_PACKET_ID) -> 1; +next_packet_id(Id) -> Id + 1. diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl deleted file mode 100644 index 0521e6d3f..000000000 --- a/src/emqx_local_bridge.erl +++ /dev/null @@ -1,157 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_local_bridge). - --behaviour(gen_server). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - --export([start_link/5]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(PING_DOWN_INTERVAL, 1000). - --record(state, {pool, id, - node, subtopic, - qos = ?QOS_0, - topic_suffix = <<>>, - topic_prefix = <<>>, - mqueue :: emqx_mqueue:mqueue(), - max_queue_len = 10000, - ping_down_interval = ?PING_DOWN_INTERVAL, - status = up}). - --type(option() :: {qos, emqx_mqtt_types:qos()} | - {topic_suffix, binary()} | - {topic_prefix, binary()} | - {max_queue_len, pos_integer()} | - {ping_down_interval, pos_integer()}). - --export_type([option/0]). - -%% @doc Start a bridge --spec(start_link(term(), pos_integer(), atom(), binary(), [option()]) - -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id, Node, Topic, Options) -> - gen_server:start_link(?MODULE, [Pool, Id, Node, Topic, Options], [{hibernate_after, 5000}]). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Pool, Id, Node, Topic, Options]) -> - process_flag(trap_exit, true), - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - case net_kernel:connect_node(Node) of - true -> - true = erlang:monitor_node(Node, true), - Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_broker:subscribe(Topic, #{share => Group, qos => ?QOS_0}), - State = parse_opts(Options, #state{node = Node, subtopic = Topic}), - MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len, - store_qos0 => true}), - {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; - false -> - {stop, {cannot_connect_node, Node}} - end. - -parse_opts([], State) -> - State; -parse_opts([{qos, QoS} | Opts], State) -> - parse_opts(Opts, State#state{qos = QoS}); -parse_opts([{topic_suffix, Suffix} | Opts], State) -> - parse_opts(Opts, State#state{topic_suffix= Suffix}); -parse_opts([{topic_prefix, Prefix} | Opts], State) -> - parse_opts(Opts, State#state{topic_prefix = Prefix}); -parse_opts([{max_queue_len, Len} | Opts], State) -> - parse_opts(Opts, State#state{max_queue_len = Len}); -parse_opts([{ping_down_interval, Interval} | Opts], State) -> - parse_opts(Opts, State#state{ping_down_interval = Interval}); -parse_opts([_Opt | Opts], State) -> - parse_opts(Opts, State). - -handle_call(Req, _From, State) -> - emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) -> - %% TODO: how to drop??? - {_Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - {noreply, State#state{mqueue = NewQ}}; - -handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> - emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), - {noreply, State}; - -handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> - emqx_logger:warning("[Bridge] node down: ~s", [Node]), - erlang:send_after(Interval, self(), ping_down_node), - {noreply, State#state{status = down}, hibernate}; - -handle_info({nodeup, Node}, State = #state{node = Node}) -> - %% TODO: Really fast?? - case emqx:is_running(Node) of - true -> emqx_logger:warning("[Bridge] Node up: ~s", [Node]), - {noreply, dequeue(State#state{status = up})}; - false -> self() ! {nodedown, Node}, - {noreply, State#state{status = down}} - end; - -handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> - Self = self(), - spawn_link(fun() -> - case net_kernel:connect_node(Node) of - true -> Self ! {nodeup, Node}; - false -> erlang:send_after(Interval, Self, ping_down_node) - end - end), - {noreply, State}; - -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; - -handle_info(Info, State) -> - emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -dequeue(State = #state{mqueue = MQ}) -> - case emqx_mqueue:out(MQ) of - {empty, MQ1} -> - State#state{mqueue = MQ1}; - {{value, Msg}, MQ1} -> - handle_info({dispatch, Msg#message.topic, Msg}, State), - dequeue(State#state{mqueue = MQ1}) - end. - -transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> - Msg#message{topic = <>}. - diff --git a/src/emqx_local_bridge_sup_sup.erl b/src/emqx_local_bridge_sup_sup.erl deleted file mode 100644 index 8a61d5936..000000000 --- a/src/emqx_local_bridge_sup_sup.erl +++ /dev/null @@ -1,74 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_local_bridge_sup_sup). - --behavior(supervisor). - --include("emqx.hrl"). - --export([start_link/0, bridges/0]). --export([start_bridge/2, start_bridge/3, stop_bridge/2]). - -%% Supervisor callbacks --export([init/1]). - --define(CHILD_ID(Node, Topic), {bridge_sup, Node, Topic}). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). -bridges() -> - [{Node, Topic, Pid} || {?CHILD_ID(Node, Topic), Pid, supervisor, _} - <- supervisor:which_children(?MODULE)]. - -%% @doc Start a bridge --spec(start_bridge(node(), emqx_topic:topic()) -> {ok, pid()} | {error, term()}). -start_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) -> - start_bridge(Node, Topic, []). - --spec(start_bridge(node(), emqx_topic:topic(), [emqx_bridge:option()]) - -> {ok, pid()} | {error, term()}). -start_bridge(Node, _Topic, _Options) when Node =:= node() -> - {error, bridge_to_self}; -start_bridge(Node, Topic, Options) when is_atom(Node), is_binary(Topic) -> - Options1 = emqx_misc:merge_opts(emqx_config:get_env(bridge, []), Options), - supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). - -%% @doc Stop a bridge --spec(stop_bridge(node(), emqx_topic:topic()) -> ok | {error, term()}). -stop_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) -> - ChildId = ?CHILD_ID(Node, Topic), - case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - Error -> Error - end. - -%%------------------------------------------------------------------------------ -%% Supervisor callbacks -%%------------------------------------------------------------------------------ - -init([]) -> - {ok, {{one_for_one, 10, 3600}, []}}. - -bridge_spec(Node, Topic, Options) -> - #{id => ?CHILD_ID(Node, Topic), - start => {emqx_local_bridge_sup, start_link, [Node, Topic, Options]}, - restart => permanent, - shutdown => infinity, - type => supervisor, - modules => [emqx_local_bridge_sup]}. - diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl index 034eb8d36..dd94cceb6 100644 --- a/src/emqx_logger_formatter.erl +++ b/src/emqx_logger_formatter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2017-2013-2019. All Rights Reserved. +%% Copyright Ericsson AB 2013-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 20a08ba9f..016ff007f 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -67,7 +67,7 @@ default_priority => highest | lowest, store_qos0 => boolean() }). --type(message() :: pemqx_types:message()). +-type(message() :: emqx_types:message()). -type(stat() :: {len, non_neg_integer()} | {max_len, non_neg_integer()} diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index 0245da838..e0d82f400 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2013-2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -28,4 +28,3 @@ multicall(Nodes, Mod, Fun, Args) -> cast(Node, Mod, Fun, Args) -> ?RPC:cast(Node, Mod, Fun, Args). - diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 5f62df904..eff33a841 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -61,9 +61,6 @@ init([]) -> RouterSup = supervisor_spec(emqx_router_sup), %% Broker Sup BrokerSup = supervisor_spec(emqx_broker_sup), - %% BridgeSup - LocalBridgeSup = supervisor_spec(emqx_local_bridge_sup_sup), - BridgeSup = supervisor_spec(emqx_bridge_sup), %% AccessControl AccessControl = worker_spec(emqx_access_control), @@ -77,7 +74,6 @@ init([]) -> [KernelSup, RouterSup, BrokerSup, - LocalBridgeSup, BridgeSup, AccessControl, SMSup, @@ -92,4 +88,3 @@ worker_spec(M) -> {M, {M, start_link, []}, permanent, 30000, worker, [M]}. supervisor_spec(M) -> {M, {M, start_link, []}, permanent, infinity, supervisor, [M]}. - diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 59f592984..bb615ccfe 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -20,7 +20,7 @@ -export([triples/1]). -export([words/1]). -export([wildcard/1]). --export([join/1]). +-export([join/1, prepend/2]). -export([feed_var/3]). -export([systop/1]). -export([parse/1, parse/2]). @@ -129,10 +129,23 @@ join(root, W) -> join(Parent, W) -> <<(bin(Parent))/binary, $/, (bin(W))/binary>>. +%% @doc Prepend a topic prefix. +%% Ensured to have only one / between prefix and suffix. +prepend(root, W) -> bin(W); +prepend(undefined, W) -> bin(W); +prepend(<<>>, W) -> bin(W); +prepend(Parent0, W) -> + Parent = bin(Parent0), + case binary:last(Parent) of + $/ -> <>; + _ -> join(Parent, W) + end. + bin('') -> <<>>; bin('+') -> <<"+">>; bin('#') -> <<"#">>; -bin(B) when is_binary(B) -> B. +bin(B) when is_binary(B) -> B; +bin(L) when is_list(L) -> list_to_binary(L). levels(Topic) when is_binary(Topic) -> length(words(Topic)). diff --git a/src/emqx_types.erl b/src/emqx_types.erl index d84b1099a..2fe34e853 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -31,7 +31,7 @@ -type(pubsub() :: publish | subscribe). -type(topic() :: binary()). -type(subid() :: binary() | atom()). --type(subopts() :: #{qos := integer(), +-type(subopts() :: #{qos := emqx_mqtt_types:qos(), share => binary(), atom() => term() }). @@ -59,4 +59,3 @@ -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). - diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl index 9681c27e6..465b20637 100644 --- a/test/emqx_bridge_SUITE.erl +++ b/test/emqx_bridge_SUITE.erl @@ -14,45 +14,182 @@ -module(emqx_bridge_SUITE). --compile(export_all). --compile(nowarn_export_all). +-export([all/0, init_per_suite/1, end_per_suite/1]). +-export([t_rpc/1, + t_mqtt/1, + t_mngr/1 + ]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include("emqx_mqtt.hrl"). +-include("emqx.hrl"). -all() -> - [bridge_test]. +-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> [t_rpc, + t_mqtt, + t_mngr + ]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), - Config. + case node() of + nonode@nohost -> + net_kernel:start(['emqx@127.0.0.1', longnames]); + _ -> + ok + end, + emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -bridge_test(_) -> - #{msg := <<"start bridge successfully">>} - = emqx_bridge:start_bridge(aws), - test_forwards(), - test_subscriptions(0), - test_subscriptions(1), - test_subscriptions(2), - #{msg := <<"stop bridge successfully">>} - = emqx_bridge:stop_bridge(aws), +t_mngr(Config) when is_list(Config) -> + Subs = [{<<"a">>, 1}, {<<"b">>, 2}], + Cfg = #{address => node(), + forwards => [<<"mngr">>], + connect_module => emqx_bridge_rpc, + mountpoint => <<"forwarded">>, + subscriptions => Subs, + start_type => auto + }, + Name = ?FUNCTION_NAME, + {ok, Pid} = emqx_bridge:start_link(Name, Cfg), + try + ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr")), + ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr2")), + ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge:get_forwards(Pid)), + ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr2")), + ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr3")), + ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Pid)), + ?assertEqual({error, no_remote_subscription_support}, + emqx_bridge:ensure_subscription_present(Pid, <<"t">>, 0)), + ?assertEqual({error, no_remote_subscription_support}, + emqx_bridge:ensure_subscription_absent(Pid, <<"t">>)), + ?assertEqual(Subs, emqx_bridge:get_subscriptions(Pid)) + after + ok = emqx_bridge:stop(Pid) + end. + +%% A loopback RPC to local node +t_rpc(Config) when is_list(Config) -> + Cfg = #{address => node(), + forwards => [<<"t_rpc/#">>], + connect_module => emqx_bridge_rpc, + mountpoint => <<"forwarded">>, + start_type => auto + }, + {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"ClientId">>, + try + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + %% message from a different client, to avoid getting terminated by no-local + Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>), + ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]), + PacketId = 1, + emqx_session:publish(SPid, PacketId, Msg1), + ?wait(case emqx_mock_client:get_last_message(ConnPid) of + {publish, PacketId, #message{topic = <<"forwarded/t_rpc/one">>}} -> true; + Other -> Other + end, 4000), + emqx_mock_client:close_session(ConnPid) + after + ok = emqx_bridge:stop(Pid) + end. + +%% Full data loopback flow explained: +%% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)---> +%% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) --> +%% bridge(import) --(mecked message sending)--> test-pid +t_mqtt(Config) when is_list(Config) -> + SendToTopic = <<"t_mqtt/one">>, + SendToTopic2 = <<"t_mqtt/two">>, + Mountpoint = <<"forwarded/${node}/">>, + ForwardedTopic = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic]), + ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]), + Cfg = #{address => "127.0.0.1:1883", + forwards => [SendToTopic], + connect_module => emqx_bridge_mqtt, + mountpoint => Mountpoint, + username => "user", + clean_start => true, + client_id => "bridge_aws", + keepalive => 60000, + max_inflight => 32, + password => "passwd", + proto_ver => mqttv4, + queue => #{replayq_dir => "data/t_mqtt/", + replayq_seg_bytes => 10000, + batch_bytes_limit => 1000, + batch_count_limit => 10 + }, + reconnect_delay_ms => 1000, + ssl => false, + %% Consume back to forwarded message for verification + %% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2 + subscriptions => [{ForwardedTopic, _QoS = 1}], + start_type => auto + }, + Tester = self(), + Ref = make_ref(), + meck:new(emqx_bridge, [passthrough, no_history]), + meck:expect(emqx_bridge, import_batch, 2, + fun(Batch, AckFun) -> + Tester ! {Ref, Batch}, + AckFun() + end), + {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"client-1">>, + try + ?assertEqual([{ForwardedTopic, 1}], emqx_bridge:get_subscriptions(Pid)), + ok = emqx_bridge:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), + ok = emqx_bridge:ensure_forward_present(Pid, SendToTopic2), + ?assertEqual([{ForwardedTopic, 1}, + {ForwardedTopic2, 1}], emqx_bridge:get_subscriptions(Pid)), + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + %% message from a different client, to avoid getting terminated by no-local + Max = 100, + Msgs = lists:seq(1, Max), + lists:foreach(fun(I) -> + Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)), + emqx_session:publish(SPid, I, Msg) + end, Msgs), + ok = receive_and_match_messages(Ref, Msgs), + Msgs2 = lists:seq(Max + 1, Max * 2), + lists:foreach(fun(I) -> + Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)), + emqx_session:publish(SPid, I, Msg) + end, Msgs2), + ok = receive_and_match_messages(Ref, Msgs2), + emqx_mock_client:close_session(ConnPid) + after + ok = emqx_bridge:stop(Pid), + meck:unload(emqx_bridge) + end. + +receive_and_match_messages(Ref, Msgs) -> + TRef = erlang:send_after(timer:seconds(5), self(), {Ref, timeout}), + try + do_receive_and_match_messages(Ref, Msgs) + after + erlang:cancel_timer(TRef) + end, ok. -test_forwards() -> - emqx_bridge:add_forward(aws, <<"test_forwards">>), - [<<"test_forwards">>, <<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws), - emqx_bridge:del_forward(aws, <<"test_forwards">>), - [<<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws), - ok. - -test_subscriptions(QoS) -> - emqx_bridge:add_subscription(aws, <<"test_subscriptions">>, QoS), - [{<<"test_subscriptions">>, QoS}, - {<<"cmd/topic1">>, 1}, - {<<"cmd/topic2">>, 1}] = emqx_bridge:show_subscriptions(aws), - emqx_bridge:del_subscription(aws, <<"test_subscriptions">>), - [{<<"cmd/topic1">>,1}, {<<"cmd/topic2">>,1}] = emqx_bridge:show_subscriptions(aws), - ok. +do_receive_and_match_messages(_Ref, []) -> ok; +do_receive_and_match_messages(Ref, [I | Rest] = Exp) -> + receive + {Ref, timeout} -> erlang:error(timeout); + {Ref, [#{payload := P} = Msg]} -> + case binary_to_integer(P) of + I -> %% exact match + do_receive_and_match_messages(Ref, Rest); + J when J < I -> %% allow retry + do_receive_and_match_messages(Ref, Exp); + _Other -> + throw({unexpected, Msg, Exp}) + end + end. diff --git a/test/emqx_bridge_mqtt_tests.erl b/test/emqx_bridge_mqtt_tests.erl new file mode 100644 index 000000000..7c094b957 --- /dev/null +++ b/test/emqx_bridge_mqtt_tests.erl @@ -0,0 +1,59 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_mqtt_tests). +-include_lib("eunit/include/eunit.hrl"). +-include("emqx_mqtt.hrl"). + +send_and_ack_test() -> + %% delegate from gen_rpc to rpc for unit test + meck:new(emqx_client, [passthrough, no_history]), + meck:expect(emqx_client, start_link, 1, + fun(#{msg_handler := Hdlr}) -> + {ok, spawn_link(fun() -> fake_client(Hdlr) end)} + end), + meck:expect(emqx_client, connect, 1, {ok, dummy}), + meck:expect(emqx_client, stop, 1, + fun(Pid) -> Pid ! stop end), + meck:expect(emqx_client, publish, 2, + fun(Client, Msg) -> + case rand:uniform(200) of + 1 -> + {error, {dummy, inflight_full}}; + _ -> + Client ! {publish, Msg}, + {ok, Msg} %% as packet id + end + end), + try + Max = 100, + Batch = lists:seq(1, Max), + {ok, Ref, Conn} = emqx_bridge_mqtt:start(#{address => "127.0.0.1:1883"}), + %% return last packet id as batch reference + {ok, AckRef} = emqx_bridge_mqtt:send(Conn, Batch), + %% expect batch ack + receive {batch_ack, AckRef} -> ok end, + ok = emqx_bridge_mqtt:stop(Ref, Conn) + after + meck:unload(emqx_client) + end. + +fake_client(#{puback := PubAckCallback} = Hdlr) -> + receive + {publish, PktId} -> + PubAckCallback(#{packet_id => PktId, reason_code => ?RC_SUCCESS}), + fake_client(Hdlr); + stop -> + exit(normal) + end. diff --git a/test/emqx_bridge_rpc_tests.erl b/test/emqx_bridge_rpc_tests.erl new file mode 100644 index 000000000..28e05b895 --- /dev/null +++ b/test/emqx_bridge_rpc_tests.erl @@ -0,0 +1,43 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_rpc_tests). +-include_lib("eunit/include/eunit.hrl"). + +send_and_ack_test() -> + %% delegate from gen_rpc to rpc for unit test + meck:new(gen_rpc, [passthrough, no_history]), + meck:expect(gen_rpc, call, 4, + fun(Node, Module, Fun, Args) -> + rpc:call(Node, Module, Fun, Args) + end), + meck:expect(gen_rpc, cast, 4, + fun(Node, Module, Fun, Args) -> + rpc:cast(Node, Module, Fun, Args) + end), + meck:new(emqx_bridge, [passthrough, no_history]), + meck:expect(emqx_bridge, import_batch, 2, + fun(batch, AckFun) -> AckFun() end), + try + {ok, Pid, Node} = emqx_bridge_rpc:start(#{address => node()}), + {ok, Ref} = emqx_bridge_rpc:send(Node, batch), + receive + {batch_ack, Ref} -> + ok + end, + ok = emqx_bridge_rpc:stop(Pid, Node) + after + meck:unload(gen_rpc), + meck:unload(emqx_bridge) + end. diff --git a/test/emqx_bridge_tests.erl b/test/emqx_bridge_tests.erl new file mode 100644 index 000000000..22b2c4d49 --- /dev/null +++ b/test/emqx_bridge_tests.erl @@ -0,0 +1,157 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_tests). +-behaviour(emqx_bridge_connect). + +-include_lib("eunit/include/eunit.hrl"). +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +-define(BRIDGE_NAME, test). +-define(BRIDGE_REG_NAME, emqx_bridge_test). +-define(WAIT(PATTERN, TIMEOUT), + receive + PATTERN -> + ok + after + TIMEOUT -> + error(timeout) + end). + +%% stub callbacks +-export([start/1, send/2, stop/2]). + +start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) -> + case is_pid(Pid) of + true -> Pid ! {connection_start_attempt, Ref}; + false -> ok + end, + Result. + +send(SendFun, Batch) when is_function(SendFun, 1) -> + SendFun(Batch). + +stop(_Ref, _Pid) -> ok. + +%% bridge worker should retry connecting remote node indefinitely +reconnect_test() -> + Ref = make_ref(), + Config = make_config(Ref, self(), {error, test}), + {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), + %% assert name registered + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), + ?WAIT({connection_start_attempt, Ref}, 1000), + %% expect same message again + ?WAIT({connection_start_attempt, Ref}, 1000), + ok = emqx_bridge:stop(?BRIDGE_REG_NAME), + ok. + +%% connect first, disconnect, then connect again +disturbance_test() -> + Ref = make_ref(), + Config = make_config(Ref, self(), {ok, Ref, connection}), + {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), + ?WAIT({connection_start_attempt, Ref}, 1000), + Pid ! {disconnected, Ref, test}, + ?WAIT({connection_start_attempt, Ref}, 1000), + ok = emqx_bridge:stop(?BRIDGE_REG_NAME). + +%% buffer should continue taking in messages when disconnected +buffer_when_disconnected_test_() -> + {timeout, 10000, fun test_buffer_when_disconnected/0}. + +test_buffer_when_disconnected() -> + Ref = make_ref(), + Nums = lists:seq(1, 100), + Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end), + SenderMref = monitor(process, Sender), + Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end), + ReceiverMref = monitor(process, Receiver), + SendFun = fun(Batch) -> + BatchRef = make_ref(), + Receiver ! {batch, BatchRef, Batch}, + {ok, BatchRef} + end, + Config0 = make_config(Ref, false, {ok, Ref, SendFun}), + Config = Config0#{reconnect_delay_ms => 100}, + {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), + Sender ! {bridge, Pid}, + Receiver ! {bridge, Pid}, + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), + Pid ! {disconnected, Ref, test}, + ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000), + ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), + ok = emqx_bridge:stop(?BRIDGE_REG_NAME). + +manual_start_stop_test() -> + Ref = make_ref(), + Config0 = make_config(Ref, self(), {ok, Ref, connection}), + Config = Config0#{start_type := manual}, + {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config), + %% call ensure_started again should yeld the same result + {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config), + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), + ?assertEqual({error, standing_by}, + emqx_bridge:ensure_forward_present(Pid, "dummy")), + emqx_bridge:ensure_stopped(unknown), + emqx_bridge:ensure_stopped(Pid), + emqx_bridge:ensure_stopped(?BRIDGE_REG_NAME). + +%% Feed messages to bridge +sender_loop(_Pid, [], _) -> exit(normal); +sender_loop(Pid, [Num | Rest], Interval) -> + random_sleep(Interval), + Pid ! {dispatch, dummy, make_msg(Num)}, + sender_loop(Pid, Rest, Interval). + +%% Feed acknowledgments to bridge +receiver_loop(_Pid, [], _) -> ok; +receiver_loop(Pid, Nums, Interval) -> + receive + {batch, BatchRef, Batch} -> + Rest = match_nums(Batch, Nums), + random_sleep(Interval), + emqx_bridge:handle_ack(Pid, BatchRef), + receiver_loop(Pid, Rest, Interval) + end. + +random_sleep(MaxInterval) -> + case rand:uniform(MaxInterval) - 1 of + 0 -> ok; + T -> timer:sleep(T) + end. + +match_nums([], Rest) -> Rest; +match_nums([#message{payload = P} | Rest], Nums) -> + I = binary_to_integer(P), + case Nums of + [I | NumsLeft] -> match_nums(Rest, NumsLeft); + [J | _] when J > I -> match_nums(Rest, Nums); %% allow retry + _ -> error([{received, I}, {expecting, Nums}]) + end. + +make_config(Ref, TestPid, Result) -> + #{test_pid => TestPid, + test_ref => Ref, + connect_module => ?MODULE, + reconnect_delay_ms => 50, + connect_result => Result, + start_type => auto + }. + +make_msg(I) -> + Payload = integer_to_binary(I), + emqx_message:make(<<"test/topic">>, Payload). diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 1ab79e8a9..88240be85 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -54,10 +54,17 @@ "ECDH-RSA-AES128-SHA","AES128-SHA"]}]). run_setup_steps() -> + _ = run_setup_steps([]), + %% return ok to be backward compatible + ok. + +run_setup_steps(Config) -> NewConfig = generate_config(), lists:foreach(fun set_app_env/1, NewConfig), set_bridge_env(), - application:ensure_all_started(?APP). + {ok, _} = application:ensure_all_started(?APP), + set_log_level(Config), + Config. run_teardown_steps() -> ?APP:shutdown(). @@ -67,6 +74,12 @@ generate_config() -> Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), cuttlefish_generator:map(Schema, Conf). +set_log_level(Config) -> + case proplists:get_value(log_level, Config) of + undefined -> ok; + Level -> emqx_logger:set_log_level(Level) + end. + get_base_dir(Module) -> {file, Here} = code:is_loaded(Module), filename:dirname(filename:dirname(Here)). @@ -156,24 +169,30 @@ flush(Msgs) -> end. bridge_conf() -> - [{aws, - [{username,"user"}, - {address,"127.0.0.1:1883"}, - {clean_start,true}, - {client_id,"bridge_aws"}, - {forwards,["topic1/#","topic2/#"]}, - {keepalive,60000}, - {max_inflight,32}, - {mountpoint,"bridge/aws/${node}/"}, - {password,"passwd"}, - {proto_ver,mqttv4}, - {queue, - #{batch_size => 1000,mem_cache => true, - replayq_dir => "data/emqx_aws_bridge/", - replayq_seg_bytes => 10485760}}, - {reconnect_interval,30000}, - {retry_interval,20000}, - {ssl,false}, - {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]}, - {start_type,manual}, - {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}]. \ No newline at end of file + [ {local_rpc, + [{connect_module, emqx_bridge_rpc}, + {address, node()}, + {forwards, ["bridge-1/#", "bridge-2/#"]} + ]} + ]. + % [{aws, + % [{connect_module, emqx_bridge_mqtt}, + % {username,"user"}, + % {address,"127.0.0.1:1883"}, + % {clean_start,true}, + % {client_id,"bridge_aws"}, + % {forwards,["topic1/#","topic2/#"]}, + % {keepalive,60000}, + % {max_inflight,32}, + % {mountpoint,"bridge/aws/${node}/"}, + % {password,"passwd"}, + % {proto_ver,mqttv4}, + % {queue, + % #{batch_coun t_limit => 1000, + % replayq_dir => "data/emqx_aws_bridge/", + % replayq_seg_bytes => 10485760}}, + % {reconnect_delay_ms,30000}, + % {ssl,false}, + % {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]}, + % {start_type,manual}, + % {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}]. diff --git a/test/emqx_ct_helpers.erl b/test/emqx_ct_helpers.erl index eae22d6ab..361a6b4a9 100644 --- a/test/emqx_ct_helpers.erl +++ b/test/emqx_ct_helpers.erl @@ -14,9 +14,56 @@ -module(emqx_ct_helpers). --export([ensure_mnesia_stopped/0]). +-export([ensure_mnesia_stopped/0, wait_for/4]). ensure_mnesia_stopped() -> ekka_mnesia:ensure_stopped(), ekka_mnesia:delete_schema(). +%% Help function to wait for Fun to yeild 'true'. +wait_for(Fn, Ln, F, Timeout) -> + {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), + wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). + +wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> + receive + {'DOWN', Mref, process, Pid, normal} -> + ok; + {'DOWN', Mref, process, Pid, {unexpected, Result}} -> + erlang:error({unexpected, Fn, Ln, Result}); + {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} -> + erlang:raise(C, {Fn, Ln, E}, S) + after + Timeout -> + case Kill of + true -> + erlang:demonitor(Mref, [flush]), + erlang:exit(Pid, kill), + erlang:error({Fn, Ln, timeout}); + false -> + Pid ! stop, + wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) + end + end. + +wait_loop(_F, ok) -> exit(normal); +wait_loop(F, LastRes) -> + receive + stop -> erlang:exit(LastRes) + after + 100 -> + Res = catch_call(F), + wait_loop(F, Res) + end. + +catch_call(F) -> + try + case F() of + true -> ok; + Other -> {unexpected, Other} + end + catch + C : E : S -> + {crashed, {C, E, S}} + end. + diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 333ef309f..580fec4c2 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -10,7 +10,6 @@ %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and -%% limitations under the License. -module(emqx_mqtt_packet_SUITE). @@ -88,7 +87,7 @@ case1_protocol_name(_) -> {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). - + case2_protocol_ver(_) -> {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), Packet = serialize(?CASE2_PROTOCAL_VER), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 1fd7bef9b..6cd6c98d0 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -29,7 +29,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). +-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). all() -> [t_random_basic, t_random, @@ -259,49 +259,3 @@ ensure_config(Strategy, AckEnabled) -> subscribed(Group, Topic, Pid) -> lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). -wait_for(Fn, Ln, F, Timeout) -> - {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), - wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). - -wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> - receive - {'DOWN', Mref, process, Pid, normal} -> - ok; - {'DOWN', Mref, process, Pid, {unexpected, Result}} -> - erlang:error({unexpected, Fn, Ln, Result}); - {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} -> - erlang:raise(C, {Fn, Ln, E}, S) - after - Timeout -> - case Kill of - true -> - erlang:demonitor(Mref, [flush]), - erlang:exit(Pid, kill), - erlang:error({Fn, Ln, timeout}); - false -> - Pid ! stop, - wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) - end - end. - -wait_loop(_F, ok) -> exit(normal); -wait_loop(F, LastRes) -> - receive - stop -> erlang:exit(LastRes) - after - 100 -> - Res = catch_call(F), - wait_loop(F, Res) - end. - -catch_call(F) -> - try - case F() of - true -> ok; - Other -> {unexpected, Other} - end - catch - C : E : S -> - {crashed, {C, E, S}} - end. -