Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-06-29 18:38:45 +08:00
commit 86feae6adc
20 changed files with 47 additions and 2192 deletions

View File

@ -354,8 +354,8 @@ log.to = both
## Note: Only the messages with severity level higher than or equal to
## this level will be logged.
##
## Default: error
log.level = error
## Default: warning
log.level = warning
## The dir for log files.
##
@ -1806,334 +1806,6 @@ listener.wss.external.send_timeout_close = on
## Value: Number
## listener.wss.external.max_frame_size = 0
##--------------------------------------------------------------------
## Bridges
##--------------------------------------------------------------------
##--------------------------------------------------------------------
## Bridges to aws
##--------------------------------------------------------------------
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1, 127.0.0.1:1883
## bridge.aws.address = 127.0.0.1:1883
## Protocol version of the bridge.
##
## Value: Enum
## - mqttv5
## - mqttv4
## - mqttv3
## bridge.aws.proto_ver = mqttv4
## The ClientId of a remote bridge.
##
## Value: String
## bridge.aws.client_id = bridge_aws
## The Clean start flag of a remote bridge.
##
## Value: boolean
## Default: true
##
## NOTE: Some IoT platforms require clean_start
## must be set to 'true'
## bridge.aws.clean_start = true
## The username for a remote bridge.
##
## Value: String
## bridge.aws.username = user
## The password for a remote bridge.
##
## Value: String
## bridge.aws.password = passwd
## Mountpoint of the bridge.
##
## Value: String
## bridge.aws.mountpoint = bridge/aws/${node}/
## Forward message topics
##
## Value: String
## Example: topic1/#,topic2/#
## bridge.aws.forwards = topic1/#,topic2/#
## Bribge to remote server via SSL.
##
## Value: on | off
## bridge.aws.ssl = off
## PEM-encoded CA certificates of the bridge.
##
## Value: File
## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## Client SSL Certfile of the bridge.
##
## Value: File
## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
## Client SSL Keyfile of the bridge.
##
## Value: File
## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
## Value: Duration
## Default: 10 seconds
## bridge.aws.keepalive = 60s
## TLS versions used by the bridge.
##
## Value: String
## bridge.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## Subscriptions of the bridge topic.
##
## Value: String
## bridge.aws.subscription.1.topic = cmd/topic1
## Subscriptions of the bridge qos.
##
## Value: Number
## bridge.aws.subscription.1.qos = 1
## Subscriptions of the bridge topic.
##
## Value: String
## bridge.aws.subscription.2.topic = cmd/topic2
## Subscriptions of the bridge qos.
##
## Value: Number
## bridge.aws.subscription.2.qos = 1
## Start type of the bridge.
##
## Value: enum
## manual
## auto
## bridge.aws.start_type = manual
## Bridge reconnect time.
##
## Value: Duration
## Default: 30 seconds
## bridge.aws.reconnect_interval = 30s
## Retry interval for bridge QoS1 message delivering.
##
## Value: Duration
## bridge.aws.retry_interval = 20s
## Inflight size.
##
## Value: Integer
## bridge.aws.max_inflight_batches = 32
## Max number of messages to collect in a batch for
## each send call towards emqx_bridge_connect
##
## Value: Integer
## default: 32
## bridge.aws.queue.batch_count_limit = 32
## Max number of bytes to collect in a batch for each
## send call towards emqx_bridge_connect
##
## Value: Bytesize
## default: 1000M
## bridge.aws.queue.batch_bytes_limit = 1000MB
## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
## replayq works in a mem-only manner.
##
## Value: String
## bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/
## Replayq segment size
##
## Value: Bytesize
## bridge.aws.queue.replayq_seg_bytes = 10MB
##--------------------------------------------------------------------
## Bridges to azure
##--------------------------------------------------------------------
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1, 127.0.0.1:1883
## bridge.azure.address = 127.0.0.1:1883
## Protocol version of the bridge.
##
## Value: Enum
## - mqttv5
## - mqttv4
## - mqttv3
## bridge.azure.proto_ver = mqttv4
## The ClientId of a remote bridge.
##
## Value: String
## bridge.azure.client_id = bridge_azure
## The Clean start flag of a remote bridge.
##
## Value: boolean
## Default: true
##
## NOTE: Some IoT platforms require clean_start
## must be set to 'true'
## bridge.azure.clean_start = true
## The username for a remote bridge.
##
## Value: String
## bridge.azure.username = user
## The password for a remote bridge.
##
## Value: String
## bridge.azure.password = passwd
## Mountpoint of the bridge.
##
## Value: String
## bridge.azure.mountpoint = bridge/aws/${node}/
## Forward message topics
##
## Value: String
## Example: topic1/#,topic2/#
## bridge.azure.forwards = topic1/#,topic2/#
## Bribge to remote server via SSL.
##
## Value: on | off
## bridge.azure.ssl = off
## PEM-encoded CA certificates of the bridge.
##
## Value: File
## bridge.azure.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## Client SSL Certfile of the bridge.
##
## Value: File
## bridge.azure.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
## Client SSL Keyfile of the bridge.
##
## Value: File
## bridge.azure.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'bridge.*.ciphers' and 'bridge.*.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.azure.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
## Value: Duration
## Default: 10 seconds
## bridge.azure.keepalive = 60s
## TLS versions used by the bridge.
##
## Value: String
## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## Subscriptions of the bridge topic.
##
## Value: String
## bridge.azure.subscription.1.topic = cmd/topic1
## Subscriptions of the bridge qos.
##
## Value: Number
## bridge.azure.subscription.1.qos = 1
## Subscriptions of the bridge topic.
##
## Value: String
## bridge.azure.subscription.2.topic = cmd/topic2
## Subscriptions of the bridge qos.
##
## Value: Number
## bridge.azure.subscription.2.qos = 1
## Start type of the bridge.
##
## Value: enum
## manual
## auto
## bridge.azure.start_type = manual
## Bridge reconnect time.
##
## Value: Duration
## Default: 30 seconds
## bridge.azure.reconnect_interval = 30s
## Retry interval for bridge QoS1 message delivering.
##
## Value: Duration
## bridge.azure.retry_interval = 20s
## Inflight size.
##
## Value: Integer
## bridge.azure.max_inflight_batches = 32
## Maximum number of messages in one batch when sending to remote borkers
## NOTE: when bridging via MQTT connection to remote broker, this config is only
## used for internal message passing optimization as the underlying MQTT
## protocol does not supports batching.
##
## Value: Integer
## default: 32
## bridge.azure.queue.batch_size = 32
## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
## replayq works in a mem-only manner.
##
## Value: String
## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/
## Replayq segment size
##
## Value: Bytesize
## bridge.azure.queue.replayq_seg_bytes = 10MB
##--------------------------------------------------------------------
## Modules
##--------------------------------------------------------------------

View File

@ -396,12 +396,12 @@ end}.
]}.
{mapping, "log.level", "kernel.logger", [
{default, error},
{default, warning},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
]}.
{mapping, "log.primary_log_level", "kernel.logger_level", [
{default, error},
{default, warning},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
]}.
@ -1712,237 +1712,6 @@ end}.
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
end}.
%%--------------------------------------------------------------------
%% Bridges
%%--------------------------------------------------------------------
{mapping, "bridge.$name.address", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.proto_ver", "emqx.bridges", [
{datatype, {enum, [mqttv3, mqttv4, mqttv5]}}
]}.
{mapping, "bridge.$name.client_id", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.clean_start", "emqx.bridges", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "bridge.$name.username", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.password", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.mountpoint", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.forwards", "emqx.bridges", [
{datatype, string},
{default, ""}
]}.
{mapping, "bridge.$name.ssl", "emqx.bridges", [
{datatype, flag},
{default, off}
]}.
{mapping, "bridge.$name.cacertfile", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.certfile", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.keyfile", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.ciphers", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.psk_ciphers", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.keepalive", "emqx.bridges", [
{default, "10s"},
{datatype, {duration, ms}}
]}.
{mapping, "bridge.$name.tls_versions", "emqx.bridges", [
{datatype, string},
{default, "tlsv1,tlsv1.1,tlsv1.2"}
]}.
{mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.subscription.$id.qos", "emqx.bridges", [
{datatype, integer}
]}.
{mapping, "bridge.$name.start_type", "emqx.bridges", [
{datatype, {enum, [manual, auto]}},
{default, auto}
]}.
{mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [
{default, "30s"},
{datatype, {duration, ms}}
]}.
{mapping, "bridge.$name.retry_interval", "emqx.bridges", [
{default, "20s"},
{datatype, {duration, ms}}
]}.
{mapping, "bridge.$name.max_inflight_batches", "emqx.bridges", [
{default, 0},
{datatype, integer}
]}.
{mapping, "bridge.$name.queue.batch_count_limit", "emqx.bridges", [
{datatype, integer}
]}.
{mapping, "bridge.$name.queue.batch_bytes_limit", "emqx.bridges", [
{datatype, bytesize}
]}.
{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [
{datatype, bytesize}
]}.
{translation, "emqx.bridges", fun(Conf) ->
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers)
end,
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
IsSsl = fun(cacertfile) -> true;
(certfile) -> true;
(keyfile) -> true;
(ciphers) -> true;
(psk_ciphers) -> true;
(tls_versions) -> true;
(_Opt) -> false
end,
Parse = fun(tls_versions, Vers) ->
[{versions, [list_to_atom(S) || S <- Split(Vers)]}];
(ciphers, Ciphers) ->
[{ciphers, Split(Ciphers)}];
(psk_ciphers, Ciphers) ->
[{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
(Opt, Val) ->
[{Opt, Val}]
end,
Merge = fun(forwards, Val, Opts) ->
[{forwards, string:tokens(Val, ",")}|Opts];
(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
false ->
[{Opt, Val}|Opts]
end
end,
Queue = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".queue", Conf),
QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, "queue", QOpt], QValue} <- Configs],
maps:from_list(QOpts)
end,
Subscriptions = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
[QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])])
end,
IsNodeAddr = fun(Addr) ->
case string:tokens(Addr, "@") of
[_NodeName, _Hostname] -> true;
_ -> false
end
end,
ConnMod = fun(Name) ->
[AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".address", Conf),
{_, Addr} = AddrConfig,
Subs = Subscriptions(Name),
case IsNodeAddr(Addr) of
true when Subs =/= [] ->
error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
true ->
emqx_bridge_rpc;
false ->
emqx_bridge_mqtt
end
end,
%% to be backward compatible
Translate =
fun Tr(queue, Q, Cfg) ->
NewQ = maps:fold(Tr, #{}, Q),
Cfg#{queue => NewQ};
Tr(address, Addr0, Cfg) ->
Addr = case IsNodeAddr(Addr0) of
true -> list_to_atom(Addr0);
false -> Addr0
end,
Cfg#{address => Addr};
Tr(batch_size, Count, Cfg) ->
Cfg#{batch_count_limit => Count};
Tr(reconnect_interval, Ms, Cfg) ->
Cfg#{reconnect_delay_ms => Ms};
Tr(max_inflight, Count, Cfg) ->
Cfg#{max_inflight_batches => Count};
Tr(proto_ver, Ver, Cfg) ->
Cfg#{proto_ver =>
case Ver of
mqttv3 -> v3;
mqttv4 -> v4;
mqttv5 -> v5;
_ -> v4
end};
Tr(Key, Value, Cfg) ->
Cfg#{Key => Value}
end,
C = lists:foldl(
fun({["bridge", Name, Opt], Val}, Acc) ->
%% e.g #{aws => [{OptKey, OptVal}]}
Init = [{list_to_atom(Opt), Val},
{connect_module, ConnMod(Name)},
{subscriptions, Subscriptions(Name)},
{queue, Queue(Name)}],
maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
(_, Acc) -> Acc
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))),
C1 = maps:map(fun(Bn, Bc) ->
maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc)))
end, C),
maps:to_list(C1)
end}.
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------

View File

@ -38,10 +38,11 @@
-export([ load/0
, unload/0
, get_alarms/0
, get_alarms/1
]).
-record(common_alarm, {id, desc}).
-record(alarm_history, {id, clear_at}).
-record(alarm_history, {id, desc, clear_at}).
-define(ALARM_TAB, emqx_alarm).
-define(ALARM_HISTORY_TAB, emqx_alarm_history).
@ -79,7 +80,14 @@ unload() ->
gen_event:swap_handler(alarm_handler, {?MODULE, swap}, {alarm_handler, []}).
get_alarms() ->
gen_event:call(alarm_handler, ?MODULE, get_alarms).
get_alarms(present).
get_alarms(present) ->
Alarms = ets:tab2list(?ALARM_TAB),
[{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms];
get_alarms(history) ->
Alarms = ets:tab2list(?ALARM_HISTORY_TAB),
[{Id, Desc, ClearAt} || #alarm_history{id = Id, desc = Desc, clear_at = ClearAt} <- Alarms].
%%----------------------------------------------------------------------
%% gen_event callbacks
@ -95,7 +103,7 @@ init(_) ->
handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) ->
handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State);
handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(warning, "~p set", [Alarm]),
?LOG(warning, "New Alarm: ~p, Alarm Info: ~p", [AlarmId, AlarmDesc]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert), Json));
@ -105,7 +113,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
set_alarm_(AlarmId, AlarmDesc),
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(notice, "~p clear", [AlarmId]),
?LOG(warning, "Clear Alarm: ~p", [AlarmId]),
case encode_alarm({AlarmId, undefined}) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(clear), Json));
@ -117,14 +125,14 @@ handle_event({clear_alarm, AlarmId}, State) ->
handle_event(_, State) ->
{ok, State}.
handle_info(_, State) -> {ok, State}.
handle_info(_, State) ->
{ok, State}.
handle_call(get_alarms, State) ->
{ok, get_alarms_(), State};
handle_call(_Query, State) -> {ok, {error, bad_query}, State}.
handle_call(_Query, State) ->
{ok, {error, bad_query}, State}.
terminate(swap, _State) ->
{emqx_alarm_handler, get_alarms_()};
{emqx_alarm_handler, get_alarms()};
terminate(_, _) ->
ok.
@ -134,8 +142,8 @@ terminate(_, _) ->
init_tables(ExistingAlarms) ->
mnesia:clear_table(?ALARM_TAB),
lists:foreach(fun({Id, _Desc}) ->
set_alarm_history(Id)
lists:foreach(fun({Id, Desc}) ->
set_alarm_history(Id, Desc)
end, ExistingAlarms).
encode_alarm({AlarmId, #alarm{severity = Severity,
@ -146,12 +154,12 @@ encode_alarm({AlarmId, #alarm{severity = Severity,
{desc, [{severity, Severity},
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(Ts)}]}]);
{timestamp, emqx_time:now_ms(Ts)}]}]);
encode_alarm({AlarmId, undefined}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]);
encode_alarm({AlarmId, AlarmDesc}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{description, maybe_to_binary(AlarmDesc)}]).
{desc, maybe_to_binary(AlarmDesc)}]).
alarm_msg(Topic, Payload) ->
Msg = emqx_message:make(?MODULE, Topic, Payload),
@ -172,14 +180,15 @@ set_alarm_(Id, Desc) ->
mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}).
clear_alarm_(Id) ->
mnesia:dirty_delete(?ALARM_TAB, Id),
set_alarm_history(Id).
case mnesia:dirty_read(?ALARM_TAB, Id) of
[#common_alarm{desc = Desc}] ->
set_alarm_history(Id, Desc),
mnesia:dirty_delete(?ALARM_TAB, Id);
[] -> ok
end.
get_alarms_() ->
Alarms = ets:tab2list(?ALARM_TAB),
[{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms].
set_alarm_history(Id) ->
set_alarm_history(Id, Desc) ->
mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id,
clear_at = undefined}).
desc = Desc,
clear_at = os:timestamp()}).

View File

@ -1,610 +0,0 @@
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc Bridge works in two layers (1) batching layer (2) transport layer
%% The `bridge' batching layer collects local messages in batches and sends over
%% to remote MQTT node/cluster via `connetion' transport layer.
%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be
%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection'
%% has to be `emqx_bridge_mqtt'.
%%
%% ```
%% +------+ +--------+
%% | EMQX | | REMOTE |
%% | | | |
%% | (bridge) <==(connection)==> | |
%% | | | |
%% | | | |
%% +------+ +--------+
%% '''
%%
%%
%% This module implements 2 kinds of APIs with regards to batching and
%% messaging protocol. (1) A `gen_statem' based local batch collector;
%% (2) APIs for incoming remote batches/messages.
%%
%% Batch collector state diagram
%%
%% [standing_by] --(0) --> [connecting] --(2)--> [connected]
%% | ^ |
%% | | |
%% '--(1)---'--------(3)------'
%%
%% (0): auto or manual start
%% (1): retry timeout
%% (2): successfuly connected to remote node/cluster
%% (3): received {disconnected, conn_ref(), Reason} OR
%% failed to send to remote node/cluster.
%%
%% NOTE: A bridge worker may subscribe to multiple (including wildcard)
%% local topics, and the underlying `emqx_bridge_connect' may subscribe to
%% multiple remote topics, however, worker/connections are not designed
%% to support automatic load-balancing, i.e. in case it can not keep up
%% with the amount of messages comming in, administrator should split and
%% balance topics between worker/connections manually.
%%
%% NOTES:
%% * Local messages are all normalised to QoS-1 when exporting to remote
-module(emqx_bridge).
-behaviour(gen_statem).
%% APIs
-export([ start_link/2
, import_batch/2
, handle_ack/2
, stop/1
]).
%% gen_statem callbacks
-export([ terminate/3
, code_change/4
, init/1
, callback_mode/0
]).
%% state functions
-export([ standing_by/3
, connecting/3
, connected/3
]).
%% management APIs
-export([ ensure_started/1
, ensure_started/2
, ensure_stopped/1
, ensure_stopped/2
, status/1
]).
-export([ get_forwards/1
, ensure_forward_present/2
, ensure_forward_absent/2
]).
-export([ get_subscriptions/1
, ensure_subscription_present/3
, ensure_subscription_absent/2
]).
-export_type([ config/0
, batch/0
, ack_ref/0
]).
-type id() :: atom() | string() | pid().
-type qos() :: emqx_mqtt_types:qos().
-type config() :: map().
-type batch() :: [emqx_bridge_msg:exp_msg()].
-type ack_ref() :: term().
-type topic() :: emqx_topic:topic().
-include("logger.hrl").
-include("emqx_mqtt.hrl").
-logger_header("[Bridge]").
%% same as default in-flight limit for emqx_client
-define(DEFAULT_BATCH_COUNT, 32).
-define(DEFAULT_BATCH_BYTES, 1 bsl 20).
-define(DEFAULT_SEND_AHEAD, 8).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
-define(NO_BRIDGE_HANDLER, undefined).
-define(NO_FROM, undefined).
-define(maybe_send, {next_event, internal, maybe_send}).
%% @doc Start a bridge worker. Supported configs:
%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay
%% at 'standing_by' state until a manual call to start it.
%% connect_module: The module which implements emqx_bridge_connect behaviour
%% and work as message batch transport layer
%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry
%% in case of transportation failure.
%% max_inflight_batches: Max number of batches allowed to send-ahead before
%% receiving confirmation from remote node/cluster
%% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe.
%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each
%% send call towards emqx_bridge_connect
%% queue.batch_count_limit: Max number of messages to collect in a batch for
%% each send call towards emqx_bridge_connect
%% queue.replayq_dir: Directory where replayq should persist messages
%% queue.replayq_seg_bytes: Size in bytes for each replayq segment file
%%
%% Find more connection specific configs in the callback modules
%% of emqx_bridge_connect behaviour.
start_link(Name, Config) when is_list(Config) ->
start_link(Name, maps:from_list(Config));
start_link(Name, Config) ->
gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
%% @doc Manually start bridge worker. State idempotency ensured.
ensure_started(Name) ->
gen_statem:call(name(Name), ensure_started).
ensure_started(Name, Config) ->
case start_link(Name, Config) of
{ok, Pid} -> {ok, Pid};
{error, {already_started,Pid}} -> {ok, Pid}
end.
%% @doc Manually stop bridge worker. State idempotency ensured.
ensure_stopped(Id) ->
ensure_stopped(Id, 1000).
ensure_stopped(Id, Timeout) ->
Pid = case id(Id) of
P when is_pid(P) -> P;
N -> whereis(N)
end,
case Pid of
undefined ->
ok;
_ ->
MRef = monitor(process, Pid),
unlink(Pid),
_ = gen_statem:call(id(Id), ensure_stopped, Timeout),
receive
{'DOWN', MRef, _, _, _} ->
ok
after
Timeout ->
exit(Pid, kill)
end
end.
stop(Pid) -> gen_statem:stop(Pid).
status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, status);
status(Id) ->
status(name(Id)).
%% @doc This function is to be evaluated on message/batch receiver side.
-spec import_batch(batch(), fun(() -> ok)) -> ok.
import_batch(Batch, AckFun) ->
lists:foreach(fun emqx_broker:publish/1, emqx_bridge_msg:to_broker_msgs(Batch)),
AckFun().
%% @doc This function is to be evaluated on message/batch exporter side
%% when message/batch is accepted by remote node.
-spec handle_ack(pid(), ack_ref()) -> ok.
handle_ack(Pid, Ref) when node() =:= node(Pid) ->
Pid ! {batch_ack, Ref},
ok.
%% @doc Return all forwards (local subscriptions).
-spec get_forwards(id()) -> [topic()].
get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)).
%% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
%% @doc Add a new forward (local topic subscription).
-spec ensure_forward_present(id(), topic()) -> ok.
ensure_forward_present(Id, Topic) ->
gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}).
%% @doc Ensure a forward topic is deleted.
-spec ensure_forward_absent(id(), topic()) -> ok.
ensure_forward_absent(Id, Topic) ->
gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}).
%% @doc Ensure subscribed to remote topic.
%% NOTE: only applicable when connection module is emqx_bridge_mqtt
%% return `{error, no_remote_subscription_support}' otherwise.
-spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
ensure_subscription_present(Id, Topic, QoS) ->
gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}).
%% @doc Ensure unsubscribed from remote topic.
%% NOTE: only applicable when connection module is emqx_bridge_mqtt
-spec ensure_subscription_absent(id(), topic()) -> ok.
ensure_subscription_absent(Id, Topic) ->
gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}).
callback_mode() -> [state_functions, state_enter].
%% @doc Config should be a map().
init(Config) ->
erlang:process_flag(trap_exit, true),
Get = fun(K, D) -> maps:get(K, Config, D) end,
QCfg = maps:get(queue, Config, #{}),
GetQ = fun(K, D) -> maps:get(K, QCfg, D) end,
Dir = GetQ(replayq_dir, undefined),
QueueConfig =
case Dir =:= undefined orelse Dir =:= "" of
true -> #{mem_only => true};
false -> #{dir => Dir,
seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES)
}
end,
Queue = replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1,
marshaller => fun msg_marshaller/1}),
Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]),
Subs = lists:keysort(1, lists:map(fun({T0, QoS}) ->
T = iolist_to_binary(T0),
true = emqx_topic:validate({filter, T}),
{T, QoS}
end, Get(subscriptions, []))),
ConnectModule = maps:get(connect_module, Config),
ConnectConfig = maps:without([connect_module,
queue,
reconnect_delay_ms,
max_inflight_batches,
mountpoint,
forwards
], Config#{subscriptions => Subs}),
ConnectFun = fun(SubsX) -> emqx_bridge_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
{ok, standing_by,
#{connect_module => ConnectModule,
connect_fun => ConnectFun,
start_type => Get(start_type, manual),
reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS),
batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES),
batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT),
max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD),
mountpoint => format_mountpoint(Get(mountpoint, undefined)),
forwards => Topics,
subscriptions => Subs,
replayq => Queue,
inflight => [],
connection => undefined,
bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER)
}}.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
terminate(_Reason, _StateName, #{replayq := Q} = State) ->
_ = disconnect(State),
_ = replayq:close(Q),
ok.
%% @doc Standing by for manual start.
standing_by(enter, _, #{start_type := auto}) ->
Action = {state_timeout, 0, do_connect},
{keep_state_and_data, Action};
standing_by(enter, _, #{start_type := manual}) ->
keep_state_and_data;
standing_by({call, From}, ensure_started, State) ->
do_connect({call, From}, standing_by, State);
standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State};
standing_by(info, Info, State) ->
?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
{keep_state_and_data, State};
standing_by(Type, Content, State) ->
common(standing_by, Type, Content, State).
%% @doc Connecting state is a state with timeout.
%% After each timeout, it re-enters this state and start a retry until
%% successfuly connected to remote node/cluster.
connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action};
connecting(enter, _, State) ->
do_connect(enter, connecting, State);
connecting(state_timeout, connected, State) ->
{next_state, connected, State};
connecting(state_timeout, reconnect, _State) ->
repeat_state_and_data;
connecting(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of
{ok, NewState} ->
{keep_state, NewState};
_ ->
keep_state_and_data
end;
connecting(internal, maybe_send, _State) ->
keep_state_and_data;
connecting(info, {disconnected, _Ref, _Reason}, _State) ->
keep_state_and_data;
connecting(Type, Content, State) ->
common(connecting, Type, Content, State).
%% @doc Send batches to remote node/cluster when in 'connected' state.
connected(enter, _OldState, #{inflight := Inflight} = State) ->
case retry_inflight(State#{inflight := []}, Inflight) of
{ok, NewState} ->
Action = {state_timeout, 0, success},
{keep_state, NewState, Action};
{error, NewState} ->
Action = {state_timeout, 0, failure},
{keep_state, disconnect(NewState), Action}
end;
connected(state_timeout, failure, State) ->
{next_state, connecting, State};
connected(state_timeout, success, State) ->
{keep_state, State, ?maybe_send};
connected(internal, maybe_send, State) ->
case pop_and_send(State) of
{ok, NewState} ->
{keep_state, NewState};
{error, NewState} ->
{next_state, connecting, disconnect(NewState)}
end;
connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRefCurrent} = State) ->
case ConnRefCurrent =:= ConnRef of
true ->
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]),
{next_state, connecting,
State#{conn_ref => undefined, connection => undefined}};
false ->
keep_state_and_data
end;
connected(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of
stale ->
keep_state_and_data;
bad_order ->
%% try re-connect then re-send
?LOG(error, "Bad order ack received by bridge ~p", [name()]),
{next_state, connecting, disconnect(State)};
{ok, NewState} ->
{keep_state, NewState, ?maybe_send}
end;
connected(Type, Content, State) ->
common(connected, Type, Content, State).
%% Common handlers
common(StateName, {call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, StateName}]};
common(_StateName, {call, From}, ensure_started, _State) ->
{keep_state_and_data, [{reply, From, ok}]};
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
{keep_state_and_data, [{reply, From, Subs}]};
common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
{Result, NewState} = ensure_present(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
{Result, NewState} = ensure_absent(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, {call, From}, ensure_stopped, _State) ->
{stop_and_reply, {shutdown, manual},
[{reply, From, ok}]};
common(_StateName, info, {dispatch, _, Msg},
#{replayq := Q} = State) ->
NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) ->
?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:\n~p",
[name(), Type, StateName, Content]),
{keep_state, State}.
eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
State;
eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
Handler(Msg),
State.
ensure_present(Key, Topic, State) ->
Topics = maps:get(Key, State),
case is_topic_present(Topic, Topics) of
true ->
{ok, State};
false ->
R = do_ensure_present(Key, Topic, State),
{R, State#{Key := lists:usort([Topic | Topics])}}
end.
ensure_absent(Key, Topic, State) ->
Topics = maps:get(Key, State),
case is_topic_present(Topic, Topics) of
true ->
R = do_ensure_absent(Key, Topic, State),
{R, State#{Key := ensure_topic_absent(Topic, Topics)}};
false ->
{ok, State}
end.
ensure_topic_absent(_Topic, []) -> [];
ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L);
ensure_topic_absent(Topic, L) -> lists:delete(Topic, L).
is_topic_present({Topic, _QoS}, Topics) ->
is_topic_present(Topic, Topics);
is_topic_present(Topic, Topics) ->
lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
do_connect(Type, StateName, #{ forwards := Forwards
, subscriptions := Subs
, connect_fun := ConnectFun
, reconnect_delay_ms := Timeout
} = State) ->
ok = subscribe_local_topics(Forwards),
From = case StateName of
standing_by -> {call, Pid} = Type, Pid;
connecting -> ?NO_FROM
end,
DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) ->
StandingbyAction;
(connecting, _StandingbyAction, ConnectingAction) ->
ConnectingAction
end,
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "Bridge ~p connected", [name()]),
State0 = State#{conn_ref => ConnRef, connection => Conn},
State1 = eval_bridge_handler(State0, connected),
StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]},
ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}},
DoEvent(StateName, StandingbyAction, ConnectingAction);
{error, Reason} ->
StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]},
ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}},
DoEvent(StateName, StandingbyAction, ConnectingAction)
end.
do_ensure_present(forwards, Topic, _) ->
ok = subscribe_local_topic(Topic);
do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,
connection := undefined}) ->
{error, no_connection};
do_ensure_present(subscriptions, {Topic, QoS},
#{connect_module := ConnectModule, connection := Conn}) ->
case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of
true ->
_ = ConnectModule:ensure_subscribed(Conn, Topic, QoS),
ok;
false ->
{error, no_remote_subscription_support}
end.
do_ensure_absent(forwards, Topic, _) ->
ok = emqx_broker:unsubscribe(Topic);
do_ensure_absent(subscriptions, _Topic, #{connect_module := _ConnectModule,
connection := undefined}) ->
{error, no_connection};
do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
connection := Conn}) ->
case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of
true -> ConnectModule:ensure_unsubscribed(Conn, Topic);
false -> {error, no_remote_subscription_support}
end.
collect(Acc) ->
receive
{dispatch, _, Msg} ->
collect([Msg | Acc])
after
0 ->
lists:reverse(Acc)
end.
%% Retry all inflight (previously sent but not acked) batches.
retry_inflight(State, []) -> {ok, State};
retry_inflight(#{inflight := Inflight} = State,
[#{q_ack_ref := QAckRef, batch := Batch} | T] = Remain) ->
case do_send(State, QAckRef, Batch) of
{ok, NewState} ->
retry_inflight(NewState, T);
{error, Reason} ->
?LOG(error, "Inflight retry failed\n~p", [Reason]),
{error, State#{inflight := Inflight ++ Remain}}
end.
pop_and_send(#{inflight := Inflight,
max_inflight_batches := Max
} = State) when length(Inflight) >= Max ->
{ok, State};
pop_and_send(#{replayq := Q,
batch_count_limit := CountLimit,
batch_bytes_limit := BytesLimit
} = State) ->
case replayq:is_empty(Q) of
true ->
{ok, State};
false ->
Opts = #{count_limit => CountLimit, bytes_limit => BytesLimit},
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
do_send(State#{replayq := Q1}, QAckRef, Batch)
end.
%% Assert non-empty batch because we have a is_empty check earlier.
do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
case maybe_send(State, Batch) of
{ok, Ref} ->
%% this is a list of inflight BATCHes, not expecting it to be too long
NewInflight = Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => Ref,
batch => Batch}],
{ok, State#{inflight := NewInflight}};
{error, Reason} ->
?LOG(info, "Batch produce failed\n~p", [Reason]),
{error, State}
end.
do_ack(State = #{inflight := [#{send_ack_ref := Refx, q_ack_ref := QAckRef} | Rest],
replayq := Q}, Ref) when Refx =:= Ref ->
ok = replayq:ack(Q, QAckRef),
{ok, State#{inflight := Rest}};
do_ack(#{inflight := Inflight}, Ref) ->
case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of
true -> bad_order;
false -> stale
end.
subscribe_local_topics(Topics) -> lists:foreach(fun subscribe_local_topic/1, Topics).
subscribe_local_topic(Topic0) ->
Topic = topic(Topic0),
try
emqx_topic:validate({filter, Topic})
catch
error : Reason ->
erlang:error({bad_topic, Topic, Reason})
end,
ok = emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}).
topic(T) -> iolist_to_binary(T).
disconnect(#{connection := Conn,
conn_ref := ConnRef,
connect_module := Module
} = State) when Conn =/= undefined ->
ok = Module:stop(ConnRef, Conn),
State0 = State#{conn_ref => undefined, connection => undefined},
eval_bridge_handler(State0, disconnected);
disconnect(State) ->
eval_bridge_handler(State, disconnected).
%% Called only when replayq needs to dump it to disk.
msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);
msg_marshaller(Msg) -> emqx_bridge_msg:to_binary(Msg).
%% Return {ok, SendAckRef} or {error, Reason}
maybe_send(#{connect_module := Module,
connection := Connection,
mountpoint := Mountpoint
}, Batch) ->
Module:send(Connection, [emqx_bridge_msg:to_export(Module, Mountpoint, M) || M <- Batch]).
format_mountpoint(undefined) ->
undefined;
format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
name() -> {_, Name} = process_info(self(), registered_name), Name.
name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
id(Pid) when is_pid(Pid) -> Pid;
id(Name) -> name(Name).

View File

@ -1,73 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_connect).
-export([start/2]).
-export_type([config/0, connection/0]).
-optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]).
%% map fields depend on implementation
-type config() :: map().
-type connection() :: term().
-type conn_ref() :: term().
-type batch() :: emqx_protal:batch().
-type ack_ref() :: emqx_bridge:ack_ref().
-type topic() :: emqx_topic:topic().
-type qos() :: emqx_mqtt_types:qos().
-include("logger.hrl").
-logger_header("[Bridge Connect]").
%% establish the connection to remote node/cluster
%% protal worker (the caller process) should be expecting
%% a message {disconnected, conn_ref()} when disconnected.
-callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}.
%% send to remote node/cluster
%% bridge worker (the caller process) should be expecting
%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster
-callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}.
%% called when owner is shutting down.
-callback stop(conn_ref(), connection()) -> ok.
-callback ensure_subscribed(connection(), topic(), qos()) -> ok.
-callback ensure_unsubscribed(connection(), topic()) -> ok.
start(Module, Config) ->
case Module:start(Config) of
{ok, Ref, Conn} ->
{ok, Ref, Conn};
{error, Reason} ->
Config1 = obfuscate(Config),
?LOG(error, "Failed to connect with module=~p\n"
"config=~p\nreason:~p", [Module, Config1, Reason]),
{error, Reason}
end.
obfuscate(Map) ->
maps:fold(fun(K, V, Acc) ->
case is_sensitive(K) of
true -> [{K, '***'} | Acc];
false -> [{K, V} | Acc]
end
end, [], Map).
is_sensitive(password) -> true;
is_sensitive(_) -> false.

View File

@ -1,198 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
-module(emqx_bridge_mqtt).
-behaviour(emqx_bridge_connect).
%% behaviour callbacks
-export([ start/1
, send/2
, stop/2
]).
%% optional behaviour callbacks
-export([ ensure_subscribed/3
, ensure_unsubscribed/2
]).
-include("emqx_mqtt.hrl").
-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
%% Messages towards ack collector process
-define(RANGE(Min, Max), {Min, Max}).
-define(REF_IDS(Ref, Ids), {Ref, Ids}).
-define(SENT(RefIds), {sent, RefIds}).
-define(ACKED(AnyPktId), {acked, AnyPktId}).
-define(STOP(Ref), {stop, Ref}).
%%------------------------------------------------------------------------------
%% emqx_bridge_connect callbacks
%%------------------------------------------------------------------------------
start(Config = #{address := Address}) ->
Ref = make_ref(),
Parent = self(),
AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end),
Handlers = make_hdlr(Parent, AckCollector, Ref),
{Host, Port} = case string:tokens(Address, ":") of
[H] -> {H, 1883};
[H, P] -> {H, list_to_integer(P)}
end,
ClientConfig = Config#{msg_handler => Handlers,
owner => AckCollector,
host => Host,
port => Port,
bridge_mode => true
},
case emqx_client:start_link(ClientConfig) of
{ok, Pid} ->
case emqx_client:connect(Pid) of
{ok, _} ->
try
subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])),
{ok, Ref, #{ack_collector => AckCollector,
client_pid => Pid}}
catch
throw : Reason ->
ok = stop(AckCollector, Pid),
{error, Reason}
end;
{error, Reason} ->
ok = stop(Ref, #{ack_collector => AckCollector, client_pid => Pid}),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000),
safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000),
ok.
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
case emqx_client:subscribe(Pid, Topic, QoS) of
{ok, _, _} -> ok;
Error -> Error
end;
ensure_subscribed(_Conn, _Topic, _QoS) ->
%% return ok for now, next re-connect should should call start with new topic added to config
ok.
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
case emqx_client:unsubscribe(Pid, Topic) of
{ok, _, _} -> ok;
Error -> Error
end;
ensure_unsubscribed(_, _) ->
%% return ok for now, next re-connect should should call start with this topic deleted from config
ok.
safe_stop(Pid, StopF, Timeout) ->
MRef = monitor(process, Pid),
unlink(Pid),
try
StopF()
catch
_ : _ ->
ok
end,
receive
{'DOWN', MRef, _, _, _} ->
ok
after
Timeout ->
exit(Pid, kill)
end.
send(Conn, Batch) ->
send(Conn, Batch, []).
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest], Acc) ->
case emqx_client:publish(ClientPid, Msg) of
{ok, PktId} when Rest =:= [] ->
%% last one sent
Ref = make_ref(),
AckCollector ! ?SENT(?REF_IDS(Ref, lists:reverse([PktId | Acc]))),
{ok, Ref};
{ok, PktId} ->
send(Conn, Rest, [PktId | Acc]);
{error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
ack_collector(Parent, ConnRef) ->
ack_collector(Parent, ConnRef, queue:new(), []).
ack_collector(Parent, ConnRef, Acked, Sent) ->
{NewAcked, NewSent} =
receive
?STOP(ConnRef) ->
exit(normal);
?ACKED(PktId) ->
match_acks(Parent, queue:in(PktId, Acked), Sent);
?SENT(RefIds) ->
%% this message only happens per-batch, hence ++ is ok
match_acks(Parent, Acked, Sent ++ [RefIds])
after
200 ->
{Acked, Sent}
end,
ack_collector(Parent, ConnRef, NewAcked, NewSent).
match_acks(_Parent, Acked, []) -> {Acked, []};
match_acks(Parent, Acked, Sent) ->
match_acks_1(Parent, queue:out(Acked), Sent).
match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent};
match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) ->
%% batch finished
ok = emqx_bridge:handle_ack(Parent, Ref),
match_acks(Parent, Acked, Sent);
match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) ->
%% one message finished, but not the whole batch
match_acks(Parent, Acked, [?REF_IDS(Ref, RestIds) | Sent]).
%% When puback for QoS-1 message is received from remote MQTT broker
%% NOTE: no support for QoS-2
handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) ->
RC =:= ?RC_SUCCESS orelse error({puback_error_code, RC}),
AckCollector ! ?ACKED(PktId),
ok.
%% Message published from remote broker. Import to local broker.
import_msg(Msg) ->
%% auto-ack should be enabled in emqx_client, hence dummy ack-fun.
emqx_bridge:import_batch([Msg], _AckFun = fun() -> ok end).
make_hdlr(Parent, AckCollector, Ref) ->
#{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end,
publish => fun(Msg) -> import_msg(Msg) end,
disconnected => fun(Reason) -> Parent ! {disconnected, Ref, Reason}, ok end
}.
subscribe_remote_topics(ClientPid, Subscriptions) ->
lists:foreach(fun({Topic, Qos}) ->
case emqx_client:subscribe(ClientPid, Topic, Qos) of
{ok, _, _} -> ok;
Error -> throw(Error)
end
end, Subscriptions).

View File

@ -1,84 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_msg).
-export([ to_binary/1
, from_binary/1
, to_export/3
, to_broker_msgs/1
, estimate_size/1
]).
-export_type([msg/0]).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_client.hrl").
-type msg() :: emqx_types:message().
-type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
%% @doc Make export format:
%% 1. Mount topic to a prefix
%% 2. Fix QoS to 1
%% @end
%% Shame that we have to know the callback module here
%% would be great if we can get rid of #mqtt_msg{} record
%% and use #message{} in all places.
-spec to_export(emqx_bridge_rpc | emqx_bridge_mqtt,
undefined | binary(), msg()) -> exp_msg().
to_export(emqx_bridge_mqtt, Mountpoint,
#message{topic = Topic,
payload = Payload,
flags = Flags
}) ->
Retain = maps:get(retain, Flags, false),
#mqtt_msg{qos = ?QOS_1,
retain = Retain,
topic = topic(Mountpoint, Topic),
payload = Payload};
to_export(_Module, Mountpoint,
#message{topic = Topic} = Msg) ->
Msg#message{topic = topic(Mountpoint, Topic), qos = 1}.
%% @doc Make `binary()' in order to make iodata to be persisted on disk.
-spec to_binary(msg()) -> binary().
to_binary(Msg) -> term_to_binary(Msg).
%% @doc Unmarshal binary into `msg()'.
-spec from_binary(binary()) -> msg().
from_binary(Bin) -> binary_to_term(Bin).
%% @doc Estimate the size of a message.
%% Count only the topic length + payload size
-spec estimate_size(msg()) -> integer().
estimate_size(#message{topic = Topic, payload = Payload}) ->
size(Topic) + size(Payload).
%% @doc By message/batch receiver, transform received batch into
%% messages to dispatch to local brokers.
to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch).
to_broker_msg(#message{} = Msg) ->
%% internal format from another EMQX node via rpc
Msg;
to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic,
properties := Props, payload := Payload}) ->
%% published from remote node over a MQTT connection
emqx_message:set_headers(Props,
emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, Topic, Payload))).
topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic).

View File

@ -1,105 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements EMQX Bridge transport layer based on gen_rpc.
-module(emqx_bridge_rpc).
-behaviour(emqx_bridge_connect).
%% behaviour callbacks
-export([ start/1
, send/2
, stop/2
]).
%% Internal exports
-export([ handle_send/2
, handle_ack/2
, heartbeat/2
]).
-type ack_ref() :: emqx_bridge:ack_ref().
-type batch() :: emqx_bridge:batch().
-define(HEARTBEAT_INTERVAL, timer:seconds(1)).
-define(RPC, gen_rpc).
start(#{address := Remote}) ->
case poke(Remote) of
ok ->
Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), Remote]),
{ok, Pid, Remote};
Error ->
Error
end.
stop(Pid, _Remote) when is_pid(Pid) ->
Ref = erlang:monitor(process, Pid),
unlink(Pid),
Pid ! stop,
receive
{'DOWN', Ref, process, Pid, _Reason} ->
ok
after
1000 ->
exit(Pid, kill)
end,
ok.
%% @doc Callback for `emqx_bridge_connect' behaviour
-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}.
send(Remote, Batch) ->
Sender = self(),
case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of
{ok, Ref} -> {ok, Ref};
{badrpc, Reason} -> {error, Reason}
end.
%% @doc Handle send on receiver side.
-spec handle_send(pid(), batch()) -> {ok, ack_ref()} | {error, any()}.
handle_send(SenderPid, Batch) ->
SenderNode = node(SenderPid),
Ref = make_ref(),
AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end,
case emqx_bridge:import_batch(Batch, AckFun) of
ok -> {ok, Ref};
Error -> Error
end.
%% @doc Handle batch ack in sender node.
handle_ack(SenderPid, Ref) ->
ok = emqx_bridge:handle_ack(SenderPid, Ref).
%% @hidden Heartbeat loop
heartbeat(Parent, RemoteNode) ->
Interval = ?HEARTBEAT_INTERVAL,
receive
stop -> exit(normal)
after
Interval ->
case poke(RemoteNode) of
ok ->
?MODULE:heartbeat(Parent, RemoteNode);
{error, Reason} ->
Parent ! {disconnected, self(), Reason},
exit(normal)
end
end.
poke(Node) ->
case ?RPC:call(Node, erlang, node, []) of
Node -> ok;
{badrpc, Reason} -> {error, Reason}
end.

View File

@ -1,81 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_sup).
-behavior(supervisor).
-include("logger.hrl").
-logger_header("[Bridge]").
%% APIs
-export([ start_link/0
, start_link/1
]).
-export([ create_bridge/2
, drop_bridge/1
, bridges/0
, is_bridge_exist/1
]).
%% supervisor callbacks
-export([init/1]).
-define(SUP, ?MODULE).
-define(WORKER_SUP, emqx_bridge_worker_sup).
start_link() -> start_link(?SUP).
start_link(Name) ->
supervisor:start_link({local, Name}, ?MODULE, Name).
init(?SUP) ->
BridgesConf = emqx_config:get_env(bridges, []),
BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf),
SupFlag = #{strategy => one_for_one,
intensity => 100,
period => 10},
{ok, {SupFlag, BridgeSpec}}.
bridge_spec({Name, Config}) ->
#{id => Name,
start => {emqx_bridge, start_link, [Name, Config]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_bridge]}.
-spec(bridges() -> [{node(), map()}]).
bridges() ->
[{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
-spec(is_bridge_exist(atom() | pid()) -> boolean()).
is_bridge_exist(Id) ->
case supervisor:get_childspec(?SUP, Id) of
{ok, _ChildSpec} -> true;
{error, _Error} -> false
end.
create_bridge(Id, Config) ->
supervisor:start_child(?SUP, bridge_spec({Id, Config})).
drop_bridge(Id) ->
case supervisor:terminate_child(?SUP, Id) of
ok ->
supervisor:delete_child(?SUP, Id);
Error ->
?LOG(error, "Delete bridge failed, error : ~p", [Error]),
Error
end.

View File

@ -321,7 +321,7 @@ resume(SPid, SessAttrs) ->
%% @doc Discard the session
-spec(discard(spid(), ByPid :: pid()) -> ok).
discard(SPid, ByPid) ->
gen_server:call(SPid, {discard, ByPid}, infinity).
gen_server:call(SPid, {discard, ByPid}).
-spec(update_expiry_interval(spid(), timeout()) -> ok).
update_expiry_interval(SPid, Interval) ->
@ -329,7 +329,7 @@ update_expiry_interval(SPid, Interval) ->
-spec(close(spid()) -> ok).
close(SPid) ->
gen_server:call(SPid, close, infinity).
gen_server:call(SPid, close).
%%------------------------------------------------------------------------------
%% gen_server callbacks
@ -798,7 +798,7 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight,
subscriptions = SubMap}) ->
SessProps = #{client_id => ClientId, username => Username},
%% Drain the mailbox and batch deliver
Msgs1 = drain_m(batch_n(Inflight), Msgs),
Msgs1 = Msgs ++ drain_m(batch_n(Inflight)),
%% Ack the messages for shared subscription
Msgs2 = maybe_ack_shared(Msgs1, State),
%% Process suboptions
@ -821,6 +821,9 @@ batch_n(Inflight) ->
Sz -> Sz - emqx_inflight:size(Inflight)
end.
drain_m(Cnt) ->
drain_m(Cnt, []).
drain_m(Cnt, Msgs) when Cnt =< 0 ->
lists:reverse(Msgs);
drain_m(Cnt, Msgs) ->

View File

@ -119,7 +119,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow
{noreply, State#state{sessions = SessMap1}};
handle_info(Info, State) ->
?LOG(notice, "Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, State) ->

View File

@ -116,6 +116,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
try emqx_session:discard(SessPid, ConnPid)
catch
_:Error:_Stk ->
unregister_session(ClientId, SessPid),
?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error])
end
end, lookup_session_pids(ClientId)).

View File

@ -65,7 +65,6 @@ init([]) ->
RouterSup = supervisor_spec(emqx_router_sup),
%% Broker Sup
BrokerSup = supervisor_spec(emqx_broker_sup),
BridgeSup = supervisor_spec(emqx_bridge_sup),
%% Session Manager
SMSup = supervisor_spec(emqx_sm_sup),
%% Connection Manager
@ -76,7 +75,6 @@ init([]) ->
[KernelSup,
RouterSup,
BrokerSup,
BridgeSup,
SMSup,
CMSup,
SysSup]}}.

View File

@ -111,6 +111,7 @@ t_logger_handler(_) ->
{child_type, worker}]}]},
#{logger_formatter => #{title => "SUPERVISOR REPORT"},
report_cb => fun logger:format_otp_report/1}),
timer:sleep(20),
?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())).
raw_send_serialize(Packet) ->

View File

@ -1,195 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_SUITE).
-export([ all/0
, init_per_suite/1
, end_per_suite/1]).
-export([ t_rpc/1
, t_mqtt/1
, t_mngr/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
all() -> [ t_rpc
, t_mqtt
, t_mngr].
init_per_suite(Config) ->
case node() of
nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]);
_ -> ok
end,
emqx_ct_helpers:start_apps([]),
emqx_logger:set_log_level(error),
[{log_level, error} | Config].
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_mngr(Config) when is_list(Config) ->
Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
Cfg = #{address => node(),
forwards => [<<"mngr">>],
connect_module => emqx_bridge_rpc,
mountpoint => <<"forwarded">>,
subscriptions => Subs,
start_type => auto},
Name = ?FUNCTION_NAME,
{ok, Pid} = emqx_bridge:start_link(Name, Cfg),
try
?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Name)),
?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr")),
?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr2")),
?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge:get_forwards(Pid)),
?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr2")),
?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr3")),
?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Pid)),
?assertEqual({error, no_remote_subscription_support},
emqx_bridge:ensure_subscription_present(Pid, <<"t">>, 0)),
?assertEqual({error, no_remote_subscription_support},
emqx_bridge:ensure_subscription_absent(Pid, <<"t">>)),
?assertEqual(Subs, emqx_bridge:get_subscriptions(Pid))
after
ok = emqx_bridge:stop(Pid)
end.
%% A loopback RPC to local node
t_rpc(Config) when is_list(Config) ->
Cfg = #{address => node(),
forwards => [<<"t_rpc/#">>],
connect_module => emqx_bridge_rpc,
mountpoint => <<"forwarded">>,
start_type => auto},
{ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
try
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
%% message from a different client, to avoid getting terminated by no-local
Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>),
ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]),
ct:sleep(100),
PacketId = 1,
emqx_session:publish(SPid, PacketId, Msg1),
?wait(case emqx_mock_client:get_last_message(ConnPid) of
[{publish, PacketId, #message{topic = <<"forwarded/t_rpc/one">>}}] ->
true;
Other ->
Other
end, 4000),
emqx_mock_client:close_session(ConnPid)
after
ok = emqx_bridge:stop(Pid)
end.
%% Full data loopback flow explained:
%% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)--->
%% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) -->
%% bridge(import) --(mecked message sending)--> test-pid
t_mqtt(Config) when is_list(Config) ->
SendToTopic = <<"t_mqtt/one">>,
SendToTopic2 = <<"t_mqtt/two">>,
Mountpoint = <<"forwarded/${node}/">>,
ForwardedTopic = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic]),
ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]),
Cfg = #{address => "127.0.0.1:1883",
forwards => [SendToTopic],
connect_module => emqx_bridge_mqtt,
mountpoint => Mountpoint,
username => "user",
clean_start => true,
client_id => "bridge_aws",
keepalive => 60000,
max_inflight => 32,
password => "passwd",
proto_ver => mqttv4,
queue => #{replayq_dir => "data/t_mqtt/",
replayq_seg_bytes => 10000,
batch_bytes_limit => 1000,
batch_count_limit => 10
},
reconnect_delay_ms => 1000,
ssl => false,
%% Consume back to forwarded message for verification
%% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2
subscriptions => [{ForwardedTopic, _QoS = 1}],
start_type => auto},
Tester = self(),
Ref = make_ref(),
meck:new(emqx_bridge, [passthrough, no_history]),
meck:expect(emqx_bridge, import_batch, 2,
fun(Batch, AckFun) ->
Tester ! {Ref, Batch},
AckFun()
end),
{ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"client-1">>,
try
?assertEqual([{ForwardedTopic, 1}], emqx_bridge:get_subscriptions(Pid)),
ok = emqx_bridge:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1),
ok = emqx_bridge:ensure_forward_present(Pid, SendToTopic2),
?assertEqual([{ForwardedTopic, 1},
{ForwardedTopic2, 1}], emqx_bridge:get_subscriptions(Pid)),
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
%% message from a different client, to avoid getting terminated by no-local
Max = 100,
Msgs = lists:seq(1, Max),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs),
ok = receive_and_match_messages(Ref, Msgs),
Msgs2 = lists:seq(Max + 1, Max * 2),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs2),
ok = receive_and_match_messages(Ref, Msgs2),
emqx_mock_client:close_session(ConnPid)
after
ok = emqx_bridge:stop(Pid),
meck:unload(emqx_bridge)
end.
receive_and_match_messages(Ref, Msgs) ->
TRef = erlang:send_after(timer:seconds(5), self(), {Ref, timeout}),
try
do_receive_and_match_messages(Ref, Msgs)
after
erlang:cancel_timer(TRef)
end,
ok.
do_receive_and_match_messages(_Ref, []) -> ok;
do_receive_and_match_messages(Ref, [I | Rest] = Exp) ->
receive
{Ref, timeout} -> erlang:error(timeout);
{Ref, [#{payload := P} = Msg]} ->
case binary_to_integer(P) of
I -> %% exact match
do_receive_and_match_messages(Ref, Rest);
J when J < I -> %% allow retry
do_receive_and_match_messages(Ref, Exp);
_Other ->
throw({unexpected, Msg, Exp})
end
end.

View File

@ -1,54 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_mqtt_tests).
-include_lib("eunit/include/eunit.hrl").
-include("emqx_mqtt.hrl").
send_and_ack_test() ->
%% delegate from gen_rpc to rpc for unit test
meck:new(emqx_client, [passthrough, no_history]),
meck:expect(emqx_client, start_link, 1,
fun(#{msg_handler := Hdlr}) ->
{ok, spawn_link(fun() -> fake_client(Hdlr) end)}
end),
meck:expect(emqx_client, connect, 1, {ok, dummy}),
meck:expect(emqx_client, stop, 1,
fun(Pid) -> Pid ! stop end),
meck:expect(emqx_client, publish, 2,
fun(Client, Msg) ->
Client ! {publish, Msg},
{ok, Msg} %% as packet id
end),
try
Max = 100,
Batch = lists:seq(1, Max),
{ok, Ref, Conn} = emqx_bridge_mqtt:start(#{address => "127.0.0.1:1883"}),
%% return last packet id as batch reference
{ok, AckRef} = emqx_bridge_mqtt:send(Conn, Batch),
%% expect batch ack
receive {batch_ack, AckRef} -> ok end,
ok = emqx_bridge_mqtt:stop(Ref, Conn)
after
meck:unload(emqx_client)
end.
fake_client(#{puback := PubAckCallback} = Hdlr) ->
receive
{publish, PktId} ->
PubAckCallback(#{packet_id => PktId, reason_code => ?RC_SUCCESS}),
fake_client(Hdlr);
stop ->
exit(normal)
end.

View File

@ -1,43 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_rpc_tests).
-include_lib("eunit/include/eunit.hrl").
send_and_ack_test() ->
%% delegate from gen_rpc to rpc for unit test
meck:new(gen_rpc, [passthrough, no_history]),
meck:expect(gen_rpc, call, 4,
fun(Node, Module, Fun, Args) ->
rpc:call(Node, Module, Fun, Args)
end),
meck:expect(gen_rpc, cast, 4,
fun(Node, Module, Fun, Args) ->
rpc:cast(Node, Module, Fun, Args)
end),
meck:new(emqx_bridge, [passthrough, no_history]),
meck:expect(emqx_bridge, import_batch, 2,
fun(batch, AckFun) -> AckFun() end),
try
{ok, Pid, Node} = emqx_bridge_rpc:start(#{address => node()}),
{ok, Ref} = emqx_bridge_rpc:send(Node, batch),
receive
{batch_ack, Ref} ->
ok
end,
ok = emqx_bridge_rpc:stop(Pid, Node)
after
meck:unload(gen_rpc),
meck:unload(emqx_bridge)
end.

View File

@ -1,155 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_bridge_tests).
-behaviour(emqx_bridge_connect).
-include_lib("eunit/include/eunit.hrl").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-define(BRIDGE_NAME, test).
-define(BRIDGE_REG_NAME, emqx_bridge_test).
-define(WAIT(PATTERN, TIMEOUT),
receive
PATTERN ->
ok
after
TIMEOUT ->
error(timeout)
end).
%% stub callbacks
-export([start/1, send/2, stop/2]).
start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) ->
case is_pid(Pid) of
true -> Pid ! {connection_start_attempt, Ref};
false -> ok
end,
Result.
send(SendFun, Batch) when is_function(SendFun, 1) ->
SendFun(Batch).
stop(_Ref, _Pid) -> ok.
%% bridge worker should retry connecting remote node indefinitely
reconnect_test() ->
Ref = make_ref(),
Config = make_config(Ref, self(), {error, test}),
{ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config),
%% assert name registered
?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
?WAIT({connection_start_attempt, Ref}, 1000),
%% expect same message again
?WAIT({connection_start_attempt, Ref}, 1000),
ok = emqx_bridge:stop(?BRIDGE_REG_NAME),
ok.
%% connect first, disconnect, then connect again
disturbance_test() ->
Ref = make_ref(),
Config = make_config(Ref, self(), {ok, Ref, connection}),
{ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config),
?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
?WAIT({connection_start_attempt, Ref}, 1000),
Pid ! {disconnected, Ref, test},
?WAIT({connection_start_attempt, Ref}, 1000),
ok = emqx_bridge:stop(?BRIDGE_REG_NAME).
%% buffer should continue taking in messages when disconnected
buffer_when_disconnected_test_() ->
{timeout, 10000, fun test_buffer_when_disconnected/0}.
test_buffer_when_disconnected() ->
Ref = make_ref(),
Nums = lists:seq(1, 100),
Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end),
SenderMref = monitor(process, Sender),
Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end),
ReceiverMref = monitor(process, Receiver),
SendFun = fun(Batch) ->
BatchRef = make_ref(),
Receiver ! {batch, BatchRef, Batch},
{ok, BatchRef}
end,
Config0 = make_config(Ref, false, {ok, Ref, SendFun}),
Config = Config0#{reconnect_delay_ms => 100},
{ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config),
Sender ! {bridge, Pid},
Receiver ! {bridge, Pid},
?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
Pid ! {disconnected, Ref, test},
?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000),
?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
ok = emqx_bridge:stop(?BRIDGE_REG_NAME).
manual_start_stop_test() ->
Ref = make_ref(),
Config0 = make_config(Ref, self(), {ok, Ref, connection}),
Config = Config0#{start_type := manual},
{ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config),
%% call ensure_started again should yeld the same result
{ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config),
?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
emqx_bridge:ensure_stopped(unknown),
emqx_bridge:ensure_stopped(Pid),
emqx_bridge:ensure_stopped(?BRIDGE_REG_NAME).
%% Feed messages to bridge
sender_loop(_Pid, [], _) -> exit(normal);
sender_loop(Pid, [Num | Rest], Interval) ->
random_sleep(Interval),
Pid ! {dispatch, dummy, make_msg(Num)},
sender_loop(Pid, Rest, Interval).
%% Feed acknowledgments to bridge
receiver_loop(_Pid, [], _) -> ok;
receiver_loop(Pid, Nums, Interval) ->
receive
{batch, BatchRef, Batch} ->
Rest = match_nums(Batch, Nums),
random_sleep(Interval),
emqx_bridge:handle_ack(Pid, BatchRef),
receiver_loop(Pid, Rest, Interval)
end.
random_sleep(MaxInterval) ->
case rand:uniform(MaxInterval) - 1 of
0 -> ok;
T -> timer:sleep(T)
end.
match_nums([], Rest) -> Rest;
match_nums([#message{payload = P} | Rest], Nums) ->
I = binary_to_integer(P),
case Nums of
[I | NumsLeft] -> match_nums(Rest, NumsLeft);
[J | _] when J > I -> match_nums(Rest, Nums); %% allow retry
_ -> error([{received, I}, {expecting, Nums}])
end.
make_config(Ref, TestPid, Result) ->
#{test_pid => TestPid,
test_ref => Ref,
connect_module => ?MODULE,
reconnect_delay_ms => 50,
connect_result => Result,
start_type => auto
}.
make_msg(I) ->
Payload = integer_to_binary(I),
emqx_message:make(<<"test/topic">>, Payload).

View File

@ -245,7 +245,7 @@ connect_v5(_) ->
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data3} = gen_tcp:recv(Sock, 0),
{ok, Data3} = gen_tcp:recv(Sock, 6),
{ok, ?PUBACK_PACKET(1, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),

View File

@ -75,4 +75,4 @@ start_traces(_Config) ->
ok = emqx_tracer:stop_trace({topic, <<"a/#">>}),
emqx_client:disconnect(T),
emqx_logger:set_log_level(error).
emqx_logger:set_log_level(warning).