diff --git a/Makefile b/Makefile index bf1da5bc5..791aed2f2 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,11 @@ CT_NODE_NAME = emqxct@127.0.0.1 RUN_NODE_NAME = emqxdebug@127.0.0.1 +.PHONY: all +all: compile + .PHONY: run -run: run_setup +run: run_setup unlock @rebar3 as test get-deps @rebar3 as test auto --name $(RUN_NODE_NAME) --script test/run_emqx.escript @@ -31,7 +34,8 @@ run_setup: lists:keyreplace(plugins, 1, Term, {plugins, NewPlugins}) \ end, \ ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]); \ - _ -> \ + _Enoent -> \ + os:cmd(\"mkdir -p ~/.config/rebar3/ \"), \ NewTerm=[{plugins, [rebar3_auto]}], \ ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]) \ end, \ @@ -41,9 +45,12 @@ run_setup: shell: @rebar3 as test auto -compile: +compile: unlock @rebar3 compile +unlock: + @rebar3 unlock + clean: distclean ## Cuttlefish escript is built by default when cuttlefish app (as dependency) was built diff --git a/etc/emqx.conf b/etc/emqx.conf index 4ad11388a..9ce950aae 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -354,8 +354,8 @@ log.to = both ## Note: Only the messages with severity level higher than or equal to ## this level will be logged. ## -## Default: error -log.level = error +## Default: warning +log.level = warning ## The dir for log files. ## @@ -1806,334 +1806,6 @@ listener.wss.external.send_timeout_close = on ## Value: Number ## listener.wss.external.max_frame_size = 0 -##-------------------------------------------------------------------- -## Bridges -##-------------------------------------------------------------------- - -##-------------------------------------------------------------------- -## Bridges to aws -##-------------------------------------------------------------------- - -## Bridge address: node name for local bridge, host:port for remote. -## -## Value: String -## Example: emqx@127.0.0.1, 127.0.0.1:1883 -## bridge.aws.address = 127.0.0.1:1883 - -## Protocol version of the bridge. -## -## Value: Enum -## - mqttv5 -## - mqttv4 -## - mqttv3 -## bridge.aws.proto_ver = mqttv4 - -## The ClientId of a remote bridge. -## -## Value: String -## bridge.aws.client_id = bridge_aws - -## The Clean start flag of a remote bridge. -## -## Value: boolean -## Default: true -## -## NOTE: Some IoT platforms require clean_start -## must be set to 'true' -## bridge.aws.clean_start = true - -## The username for a remote bridge. -## -## Value: String -## bridge.aws.username = user - -## The password for a remote bridge. -## -## Value: String -## bridge.aws.password = passwd - -## Mountpoint of the bridge. -## -## Value: String -## bridge.aws.mountpoint = bridge/aws/${node}/ - -## Forward message topics -## -## Value: String -## Example: topic1/#,topic2/# -## bridge.aws.forwards = topic1/#,topic2/# - -## Bribge to remote server via SSL. -## -## Value: on | off -## bridge.aws.ssl = off - -## PEM-encoded CA certificates of the bridge. -## -## Value: File -## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem - -## Client SSL Certfile of the bridge. -## -## Value: File -## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem - -## Client SSL Keyfile of the bridge. -## -## Value: File -## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem - -## SSL Ciphers used by the bridge. -## -## Value: String -## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 - -## Ciphers for TLS PSK. -## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot -## be configured at the same time. -## See 'https://tools.ietf.org/html/rfc4279#section-2'. -## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA - -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -## bridge.aws.keepalive = 60s - -## TLS versions used by the bridge. -## -## Value: String -## bridge.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - -## Subscriptions of the bridge topic. -## -## Value: String -## bridge.aws.subscription.1.topic = cmd/topic1 - -## Subscriptions of the bridge qos. -## -## Value: Number -## bridge.aws.subscription.1.qos = 1 - -## Subscriptions of the bridge topic. -## -## Value: String -## bridge.aws.subscription.2.topic = cmd/topic2 - -## Subscriptions of the bridge qos. -## -## Value: Number -## bridge.aws.subscription.2.qos = 1 - -## Start type of the bridge. -## -## Value: enum -## manual -## auto -## bridge.aws.start_type = manual - -## Bridge reconnect time. -## -## Value: Duration -## Default: 30 seconds -## bridge.aws.reconnect_interval = 30s - -## Retry interval for bridge QoS1 message delivering. -## -## Value: Duration -## bridge.aws.retry_interval = 20s - -## Inflight size. -## -## Value: Integer -## bridge.aws.max_inflight_batches = 32 - -## Max number of messages to collect in a batch for -## each send call towards emqx_bridge_connect -## -## Value: Integer -## default: 32 -## bridge.aws.queue.batch_count_limit = 32 - -## Max number of bytes to collect in a batch for each -## send call towards emqx_bridge_connect -## -## Value: Bytesize -## default: 1000M -## bridge.aws.queue.batch_bytes_limit = 1000MB - -## Base directory for replayq to store messages on disk -## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. -## -## Value: String -## bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ - -## Replayq segment size -## -## Value: Bytesize -## bridge.aws.queue.replayq_seg_bytes = 10MB - -##-------------------------------------------------------------------- -## Bridges to azure -##-------------------------------------------------------------------- - -## Bridge address: node name for local bridge, host:port for remote. -## -## Value: String -## Example: emqx@127.0.0.1, 127.0.0.1:1883 -## bridge.azure.address = 127.0.0.1:1883 - -## Protocol version of the bridge. -## -## Value: Enum -## - mqttv5 -## - mqttv4 -## - mqttv3 -## bridge.azure.proto_ver = mqttv4 - -## The ClientId of a remote bridge. -## -## Value: String -## bridge.azure.client_id = bridge_azure - -## The Clean start flag of a remote bridge. -## -## Value: boolean -## Default: true -## -## NOTE: Some IoT platforms require clean_start -## must be set to 'true' -## bridge.azure.clean_start = true - -## The username for a remote bridge. -## -## Value: String -## bridge.azure.username = user - -## The password for a remote bridge. -## -## Value: String -## bridge.azure.password = passwd - -## Mountpoint of the bridge. -## -## Value: String -## bridge.azure.mountpoint = bridge/aws/${node}/ - -## Forward message topics -## -## Value: String -## Example: topic1/#,topic2/# -## bridge.azure.forwards = topic1/#,topic2/# - -## Bribge to remote server via SSL. -## -## Value: on | off -## bridge.azure.ssl = off - -## PEM-encoded CA certificates of the bridge. -## -## Value: File -## bridge.azure.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem - -## Client SSL Certfile of the bridge. -## -## Value: File -## bridge.azure.certfile = {{ platform_etc_dir }}/certs/client-cert.pem - -## Client SSL Keyfile of the bridge. -## -## Value: File -## bridge.azure.keyfile = {{ platform_etc_dir }}/certs/client-key.pem - -## SSL Ciphers used by the bridge. -## -## Value: String -## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 - -## Ciphers for TLS PSK. -## Note that 'bridge.*.ciphers' and 'bridge.*.psk_ciphers' cannot -## be configured at the same time. -## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#bridge.azure.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA - -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -## bridge.azure.keepalive = 60s - -## TLS versions used by the bridge. -## -## Value: String -## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - -## Subscriptions of the bridge topic. -## -## Value: String -## bridge.azure.subscription.1.topic = cmd/topic1 - -## Subscriptions of the bridge qos. -## -## Value: Number -## bridge.azure.subscription.1.qos = 1 - -## Subscriptions of the bridge topic. -## -## Value: String -## bridge.azure.subscription.2.topic = cmd/topic2 - -## Subscriptions of the bridge qos. -## -## Value: Number -## bridge.azure.subscription.2.qos = 1 - -## Start type of the bridge. -## -## Value: enum -## manual -## auto -## bridge.azure.start_type = manual - -## Bridge reconnect time. -## -## Value: Duration -## Default: 30 seconds -## bridge.azure.reconnect_interval = 30s - -## Retry interval for bridge QoS1 message delivering. -## -## Value: Duration -## bridge.azure.retry_interval = 20s - -## Inflight size. -## -## Value: Integer -## bridge.azure.max_inflight_batches = 32 - -## Maximum number of messages in one batch when sending to remote borkers -## NOTE: when bridging via MQTT connection to remote broker, this config is only -## used for internal message passing optimization as the underlying MQTT -## protocol does not supports batching. -## -## Value: Integer -## default: 32 -## bridge.azure.queue.batch_size = 32 - -## Base directory for replayq to store messages on disk -## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. -## -## Value: String -## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ - -## Replayq segment size -## -## Value: Bytesize -## bridge.azure.queue.replayq_seg_bytes = 10MB - - ##-------------------------------------------------------------------- ## Modules ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 561d1e770..1366665ca 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -396,12 +396,12 @@ end}. ]}. {mapping, "log.level", "kernel.logger", [ - {default, error}, + {default, warning}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. {mapping, "log.primary_log_level", "kernel.logger_level", [ - {default, error}, + {default, warning}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. @@ -1712,237 +1712,6 @@ end}. ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. -%%-------------------------------------------------------------------- -%% Bridges -%%-------------------------------------------------------------------- -{mapping, "bridge.$name.address", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.proto_ver", "emqx.bridges", [ - {datatype, {enum, [mqttv3, mqttv4, mqttv5]}} -]}. - -{mapping, "bridge.$name.client_id", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.clean_start", "emqx.bridges", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "bridge.$name.username", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.password", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.mountpoint", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.forwards", "emqx.bridges", [ - {datatype, string}, - {default, ""} -]}. - -{mapping, "bridge.$name.ssl", "emqx.bridges", [ - {datatype, flag}, - {default, off} -]}. - -{mapping, "bridge.$name.cacertfile", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.certfile", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.keyfile", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.ciphers", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.psk_ciphers", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.keepalive", "emqx.bridges", [ - {default, "10s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "bridge.$name.tls_versions", "emqx.bridges", [ - {datatype, string}, - {default, "tlsv1,tlsv1.1,tlsv1.2"} -]}. - -{mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.subscription.$id.qos", "emqx.bridges", [ - {datatype, integer} -]}. - -{mapping, "bridge.$name.start_type", "emqx.bridges", [ - {datatype, {enum, [manual, auto]}}, - {default, auto} -]}. - -{mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [ - {default, "30s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "bridge.$name.retry_interval", "emqx.bridges", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "bridge.$name.max_inflight_batches", "emqx.bridges", [ - {default, 0}, - {datatype, integer} -]}. - -{mapping, "bridge.$name.queue.batch_count_limit", "emqx.bridges", [ - {datatype, integer} -]}. - -{mapping, "bridge.$name.queue.batch_bytes_limit", "emqx.bridges", [ - {datatype, bytesize} -]}. - -{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [ - {datatype, bytesize} -]}. - -{translation, "emqx.bridges", fun(Conf) -> - MapPSKCiphers = fun(PSKCiphers) -> - lists:map( - fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; - ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; - ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; - ("PSK-RC4-SHA") -> {psk, rc4_128, sha} - end, PSKCiphers) - end, - - Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - - IsSsl = fun(cacertfile) -> true; - (certfile) -> true; - (keyfile) -> true; - (ciphers) -> true; - (psk_ciphers) -> true; - (tls_versions) -> true; - (_Opt) -> false - end, - - Parse = fun(tls_versions, Vers) -> - [{versions, [list_to_atom(S) || S <- Split(Vers)]}]; - (ciphers, Ciphers) -> - [{ciphers, Split(Ciphers)}]; - (psk_ciphers, Ciphers) -> - [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}]; - (Opt, Val) -> - [{Opt, Val}] - end, - - Merge = fun(forwards, Val, Opts) -> - [{forwards, string:tokens(Val, ",")}|Opts]; - (Opt, Val, Opts) -> - case IsSsl(Opt) of - true -> - SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []), - lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); - false -> - [{Opt, Val}|Opts] - end - end, - Queue = fun(Name) -> - Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".queue", Conf), - QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, "queue", QOpt], QValue} <- Configs], - maps:from_list(QOpts) - end, - Subscriptions = fun(Name) -> - Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), - lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], - [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) - end, - IsNodeAddr = fun(Addr) -> - case string:tokens(Addr, "@") of - [_NodeName, _Hostname] -> true; - _ -> false - end - end, - ConnMod = fun(Name) -> - [AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".address", Conf), - {_, Addr} = AddrConfig, - Subs = Subscriptions(Name), - case IsNodeAddr(Addr) of - true when Subs =/= [] -> - error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs}); - true -> - emqx_bridge_rpc; - false -> - emqx_bridge_mqtt - end - end, - %% to be backward compatible - Translate = - fun Tr(queue, Q, Cfg) -> - NewQ = maps:fold(Tr, #{}, Q), - Cfg#{queue => NewQ}; - Tr(address, Addr0, Cfg) -> - Addr = case IsNodeAddr(Addr0) of - true -> list_to_atom(Addr0); - false -> Addr0 - end, - Cfg#{address => Addr}; - Tr(batch_size, Count, Cfg) -> - Cfg#{batch_count_limit => Count}; - Tr(reconnect_interval, Ms, Cfg) -> - Cfg#{reconnect_delay_ms => Ms}; - Tr(max_inflight, Count, Cfg) -> - Cfg#{max_inflight_batches => Count}; - Tr(proto_ver, Ver, Cfg) -> - Cfg#{proto_ver => - case Ver of - mqttv3 -> v3; - mqttv4 -> v4; - mqttv5 -> v5; - _ -> v4 - end}; - Tr(Key, Value, Cfg) -> - Cfg#{Key => Value} - end, - C = lists:foldl( - fun({["bridge", Name, Opt], Val}, Acc) -> - %% e.g #{aws => [{OptKey, OptVal}]} - Init = [{list_to_atom(Opt), Val}, - {connect_module, ConnMod(Name)}, - {subscriptions, Subscriptions(Name)}, - {queue, Queue(Name)}], - maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); - (_, Acc) -> Acc - end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))), - C1 = maps:map(fun(Bn, Bc) -> - maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc))) - end, C), - maps:to_list(C1) -end}. - %%-------------------------------------------------------------------- %% Modules %%-------------------------------------------------------------------- diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index a7654e32c..e84665488 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -38,10 +38,11 @@ -export([ load/0 , unload/0 , get_alarms/0 + , get_alarms/1 ]). -record(common_alarm, {id, desc}). --record(alarm_history, {id, clear_at}). +-record(alarm_history, {id, desc, clear_at}). -define(ALARM_TAB, emqx_alarm). -define(ALARM_HISTORY_TAB, emqx_alarm_history). @@ -79,7 +80,14 @@ unload() -> gen_event:swap_handler(alarm_handler, {?MODULE, swap}, {alarm_handler, []}). get_alarms() -> - gen_event:call(alarm_handler, ?MODULE, get_alarms). + get_alarms(present). + +get_alarms(present) -> + Alarms = ets:tab2list(?ALARM_TAB), + [{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms]; +get_alarms(history) -> + Alarms = ets:tab2list(?ALARM_HISTORY_TAB), + [{Id, Desc, ClearAt} || #alarm_history{id = Id, desc = Desc, clear_at = ClearAt} <- Alarms]. %%---------------------------------------------------------------------- %% gen_event callbacks @@ -95,7 +103,7 @@ init(_) -> handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State); handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> - ?LOG(warning, "~p set", [Alarm]), + ?LOG(warning, "New Alarm: ~p, Alarm Info: ~p", [AlarmId, AlarmDesc]), case encode_alarm(Alarm) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(topic(alert), Json)); @@ -105,7 +113,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> set_alarm_(AlarmId, AlarmDesc), {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> - ?LOG(notice, "~p clear", [AlarmId]), + ?LOG(warning, "Clear Alarm: ~p", [AlarmId]), case encode_alarm({AlarmId, undefined}) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(topic(clear), Json)); @@ -117,14 +125,14 @@ handle_event({clear_alarm, AlarmId}, State) -> handle_event(_, State) -> {ok, State}. -handle_info(_, State) -> {ok, State}. +handle_info(_, State) -> + {ok, State}. -handle_call(get_alarms, State) -> - {ok, get_alarms_(), State}; -handle_call(_Query, State) -> {ok, {error, bad_query}, State}. +handle_call(_Query, State) -> + {ok, {error, bad_query}, State}. terminate(swap, _State) -> - {emqx_alarm_handler, get_alarms_()}; + {emqx_alarm_handler, get_alarms()}; terminate(_, _) -> ok. @@ -134,24 +142,24 @@ terminate(_, _) -> init_tables(ExistingAlarms) -> mnesia:clear_table(?ALARM_TAB), - lists:foreach(fun({Id, _Desc}) -> - set_alarm_history(Id) + lists:foreach(fun({Id, Desc}) -> + set_alarm_history(Id, Desc) end, ExistingAlarms). -encode_alarm({AlarmId, #alarm{severity = Severity, +encode_alarm({AlarmId, #alarm{severity = Severity, title = Title, - summary = Summary, + summary = Summary, timestamp = Ts}}) -> emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, {desc, [{severity, Severity}, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, - {ts, emqx_time:now_secs(Ts)}]}]); + {timestamp, emqx_time:now_ms(Ts)}]}]); encode_alarm({AlarmId, undefined}) -> emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]); encode_alarm({AlarmId, AlarmDesc}) -> emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, - {description, maybe_to_binary(AlarmDesc)}]). + {desc, maybe_to_binary(AlarmDesc)}]). alarm_msg(Topic, Payload) -> Msg = emqx_message:make(?MODULE, Topic, Payload), @@ -172,14 +180,15 @@ set_alarm_(Id, Desc) -> mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}). clear_alarm_(Id) -> - mnesia:dirty_delete(?ALARM_TAB, Id), - set_alarm_history(Id). + case mnesia:dirty_read(?ALARM_TAB, Id) of + [#common_alarm{desc = Desc}] -> + set_alarm_history(Id, Desc), + mnesia:dirty_delete(?ALARM_TAB, Id); + [] -> ok + end. -get_alarms_() -> - Alarms = ets:tab2list(?ALARM_TAB), - [{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms]. - -set_alarm_history(Id) -> +set_alarm_history(Id, Desc) -> mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id, - clear_at = undefined}). + desc = Desc, + clear_at = os:timestamp()}). diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl deleted file mode 100644 index 5b5ab36bd..000000000 --- a/src/emqx_bridge.erl +++ /dev/null @@ -1,610 +0,0 @@ -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - -%% @doc Bridge works in two layers (1) batching layer (2) transport layer -%% The `bridge' batching layer collects local messages in batches and sends over -%% to remote MQTT node/cluster via `connetion' transport layer. -%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be -%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection' -%% has to be `emqx_bridge_mqtt'. -%% -%% ``` -%% +------+ +--------+ -%% | EMQX | | REMOTE | -%% | | | | -%% | (bridge) <==(connection)==> | | -%% | | | | -%% | | | | -%% +------+ +--------+ -%% ''' -%% -%% -%% This module implements 2 kinds of APIs with regards to batching and -%% messaging protocol. (1) A `gen_statem' based local batch collector; -%% (2) APIs for incoming remote batches/messages. -%% -%% Batch collector state diagram -%% -%% [standing_by] --(0) --> [connecting] --(2)--> [connected] -%% | ^ | -%% | | | -%% '--(1)---'--------(3)------' -%% -%% (0): auto or manual start -%% (1): retry timeout -%% (2): successfuly connected to remote node/cluster -%% (3): received {disconnected, conn_ref(), Reason} OR -%% failed to send to remote node/cluster. -%% -%% NOTE: A bridge worker may subscribe to multiple (including wildcard) -%% local topics, and the underlying `emqx_bridge_connect' may subscribe to -%% multiple remote topics, however, worker/connections are not designed -%% to support automatic load-balancing, i.e. in case it can not keep up -%% with the amount of messages comming in, administrator should split and -%% balance topics between worker/connections manually. -%% -%% NOTES: -%% * Local messages are all normalised to QoS-1 when exporting to remote - --module(emqx_bridge). --behaviour(gen_statem). - -%% APIs --export([ start_link/2 - , import_batch/2 - , handle_ack/2 - , stop/1 - ]). - -%% gen_statem callbacks --export([ terminate/3 - , code_change/4 - , init/1 - , callback_mode/0 - ]). - -%% state functions --export([ standing_by/3 - , connecting/3 - , connected/3 - ]). - -%% management APIs --export([ ensure_started/1 - , ensure_started/2 - , ensure_stopped/1 - , ensure_stopped/2 - , status/1 - ]). - --export([ get_forwards/1 - , ensure_forward_present/2 - , ensure_forward_absent/2 - ]). - --export([ get_subscriptions/1 - , ensure_subscription_present/3 - , ensure_subscription_absent/2 - ]). - --export_type([ config/0 - , batch/0 - , ack_ref/0 - ]). - --type id() :: atom() | string() | pid(). --type qos() :: emqx_mqtt_types:qos(). --type config() :: map(). --type batch() :: [emqx_bridge_msg:exp_msg()]. --type ack_ref() :: term(). --type topic() :: emqx_topic:topic(). - --include("logger.hrl"). --include("emqx_mqtt.hrl"). - --logger_header("[Bridge]"). - -%% same as default in-flight limit for emqx_client --define(DEFAULT_BATCH_COUNT, 32). --define(DEFAULT_BATCH_BYTES, 1 bsl 20). --define(DEFAULT_SEND_AHEAD, 8). --define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). --define(DEFAULT_SEG_BYTES, (1 bsl 20)). --define(NO_BRIDGE_HANDLER, undefined). --define(NO_FROM, undefined). --define(maybe_send, {next_event, internal, maybe_send}). - -%% @doc Start a bridge worker. Supported configs: -%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay -%% at 'standing_by' state until a manual call to start it. -%% connect_module: The module which implements emqx_bridge_connect behaviour -%% and work as message batch transport layer -%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry -%% in case of transportation failure. -%% max_inflight_batches: Max number of batches allowed to send-ahead before -%% receiving confirmation from remote node/cluster -%% mountpoint: The topic mount point for messages sent to remote node/cluster -%% `undefined', `<<>>' or `""' to disable -%% forwards: Local topics to subscribe. -%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each -%% send call towards emqx_bridge_connect -%% queue.batch_count_limit: Max number of messages to collect in a batch for -%% each send call towards emqx_bridge_connect -%% queue.replayq_dir: Directory where replayq should persist messages -%% queue.replayq_seg_bytes: Size in bytes for each replayq segment file -%% -%% Find more connection specific configs in the callback modules -%% of emqx_bridge_connect behaviour. -start_link(Name, Config) when is_list(Config) -> - start_link(Name, maps:from_list(Config)); -start_link(Name, Config) -> - gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). - -%% @doc Manually start bridge worker. State idempotency ensured. -ensure_started(Name) -> - gen_statem:call(name(Name), ensure_started). - -ensure_started(Name, Config) -> - case start_link(Name, Config) of - {ok, Pid} -> {ok, Pid}; - {error, {already_started,Pid}} -> {ok, Pid} - end. - -%% @doc Manually stop bridge worker. State idempotency ensured. -ensure_stopped(Id) -> - ensure_stopped(Id, 1000). - -ensure_stopped(Id, Timeout) -> - Pid = case id(Id) of - P when is_pid(P) -> P; - N -> whereis(N) - end, - case Pid of - undefined -> - ok; - _ -> - MRef = monitor(process, Pid), - unlink(Pid), - _ = gen_statem:call(id(Id), ensure_stopped, Timeout), - receive - {'DOWN', MRef, _, _, _} -> - ok - after - Timeout -> - exit(Pid, kill) - end - end. - -stop(Pid) -> gen_statem:stop(Pid). - -status(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, status); -status(Id) -> - status(name(Id)). - -%% @doc This function is to be evaluated on message/batch receiver side. --spec import_batch(batch(), fun(() -> ok)) -> ok. -import_batch(Batch, AckFun) -> - lists:foreach(fun emqx_broker:publish/1, emqx_bridge_msg:to_broker_msgs(Batch)), - AckFun(). - -%% @doc This function is to be evaluated on message/batch exporter side -%% when message/batch is accepted by remote node. --spec handle_ack(pid(), ack_ref()) -> ok. -handle_ack(Pid, Ref) when node() =:= node(Pid) -> - Pid ! {batch_ack, Ref}, - ok. - -%% @doc Return all forwards (local subscriptions). --spec get_forwards(id()) -> [topic()]. -get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)). - -%% @doc Return all subscriptions (subscription over mqtt connection to remote broker). --spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. -get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). - -%% @doc Add a new forward (local topic subscription). --spec ensure_forward_present(id(), topic()) -> ok. -ensure_forward_present(Id, Topic) -> - gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}). - -%% @doc Ensure a forward topic is deleted. --spec ensure_forward_absent(id(), topic()) -> ok. -ensure_forward_absent(Id, Topic) -> - gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}). - -%% @doc Ensure subscribed to remote topic. -%% NOTE: only applicable when connection module is emqx_bridge_mqtt -%% return `{error, no_remote_subscription_support}' otherwise. --spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}. -ensure_subscription_present(Id, Topic, QoS) -> - gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}). - -%% @doc Ensure unsubscribed from remote topic. -%% NOTE: only applicable when connection module is emqx_bridge_mqtt --spec ensure_subscription_absent(id(), topic()) -> ok. -ensure_subscription_absent(Id, Topic) -> - gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}). - -callback_mode() -> [state_functions, state_enter]. - -%% @doc Config should be a map(). -init(Config) -> - erlang:process_flag(trap_exit, true), - Get = fun(K, D) -> maps:get(K, Config, D) end, - QCfg = maps:get(queue, Config, #{}), - GetQ = fun(K, D) -> maps:get(K, QCfg, D) end, - Dir = GetQ(replayq_dir, undefined), - QueueConfig = - case Dir =:= undefined orelse Dir =:= "" of - true -> #{mem_only => true}; - false -> #{dir => Dir, - seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES) - } - end, - Queue = replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1, - marshaller => fun msg_marshaller/1}), - Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]), - Subs = lists:keysort(1, lists:map(fun({T0, QoS}) -> - T = iolist_to_binary(T0), - true = emqx_topic:validate({filter, T}), - {T, QoS} - end, Get(subscriptions, []))), - ConnectModule = maps:get(connect_module, Config), - ConnectConfig = maps:without([connect_module, - queue, - reconnect_delay_ms, - max_inflight_batches, - mountpoint, - forwards - ], Config#{subscriptions => Subs}), - ConnectFun = fun(SubsX) -> emqx_bridge_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end, - {ok, standing_by, - #{connect_module => ConnectModule, - connect_fun => ConnectFun, - start_type => Get(start_type, manual), - reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS), - batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES), - batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT), - max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD), - mountpoint => format_mountpoint(Get(mountpoint, undefined)), - forwards => Topics, - subscriptions => Subs, - replayq => Queue, - inflight => [], - connection => undefined, - bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER) - }}. - -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. - -terminate(_Reason, _StateName, #{replayq := Q} = State) -> - _ = disconnect(State), - _ = replayq:close(Q), - ok. - -%% @doc Standing by for manual start. -standing_by(enter, _, #{start_type := auto}) -> - Action = {state_timeout, 0, do_connect}, - {keep_state_and_data, Action}; -standing_by(enter, _, #{start_type := manual}) -> - keep_state_and_data; -standing_by({call, From}, ensure_started, State) -> - do_connect({call, From}, standing_by, State); -standing_by(state_timeout, do_connect, State) -> - {next_state, connecting, State}; -standing_by(info, Info, State) -> - ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), - {keep_state_and_data, State}; -standing_by(Type, Content, State) -> - common(standing_by, Type, Content, State). - -%% @doc Connecting state is a state with timeout. -%% After each timeout, it re-enters this state and start a retry until -%% successfuly connected to remote node/cluster. -connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> - Action = {state_timeout, Timeout, reconnect}, - {keep_state_and_data, Action}; -connecting(enter, _, State) -> - do_connect(enter, connecting, State); -connecting(state_timeout, connected, State) -> - {next_state, connected, State}; -connecting(state_timeout, reconnect, _State) -> - repeat_state_and_data; -connecting(info, {batch_ack, Ref}, State) -> - case do_ack(State, Ref) of - {ok, NewState} -> - {keep_state, NewState}; - _ -> - keep_state_and_data - end; -connecting(internal, maybe_send, _State) -> - keep_state_and_data; -connecting(info, {disconnected, _Ref, _Reason}, _State) -> - keep_state_and_data; -connecting(Type, Content, State) -> - common(connecting, Type, Content, State). - -%% @doc Send batches to remote node/cluster when in 'connected' state. -connected(enter, _OldState, #{inflight := Inflight} = State) -> - case retry_inflight(State#{inflight := []}, Inflight) of - {ok, NewState} -> - Action = {state_timeout, 0, success}, - {keep_state, NewState, Action}; - {error, NewState} -> - Action = {state_timeout, 0, failure}, - {keep_state, disconnect(NewState), Action} - end; -connected(state_timeout, failure, State) -> - {next_state, connecting, State}; -connected(state_timeout, success, State) -> - {keep_state, State, ?maybe_send}; -connected(internal, maybe_send, State) -> - case pop_and_send(State) of - {ok, NewState} -> - {keep_state, NewState}; - {error, NewState} -> - {next_state, connecting, disconnect(NewState)} - end; -connected(info, {disconnected, ConnRef, Reason}, - #{conn_ref := ConnRefCurrent} = State) -> - case ConnRefCurrent =:= ConnRef of - true -> - ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]), - {next_state, connecting, - State#{conn_ref => undefined, connection => undefined}}; - false -> - keep_state_and_data - end; -connected(info, {batch_ack, Ref}, State) -> - case do_ack(State, Ref) of - stale -> - keep_state_and_data; - bad_order -> - %% try re-connect then re-send - ?LOG(error, "Bad order ack received by bridge ~p", [name()]), - {next_state, connecting, disconnect(State)}; - {ok, NewState} -> - {keep_state, NewState, ?maybe_send} - end; -connected(Type, Content, State) -> - common(connected, Type, Content, State). - -%% Common handlers -common(StateName, {call, From}, status, _State) -> - {keep_state_and_data, [{reply, From, StateName}]}; -common(_StateName, {call, From}, ensure_started, _State) -> - {keep_state_and_data, [{reply, From, ok}]}; -common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> - {keep_state_and_data, [{reply, From, Forwards}]}; -common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> - {keep_state_and_data, [{reply, From, Subs}]}; -common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> - {Result, NewState} = ensure_present(What, Topic, State), - {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> - {Result, NewState} = ensure_absent(What, Topic, State), - {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, {call, From}, ensure_stopped, _State) -> - {stop_and_reply, {shutdown, manual}, - [{reply, From, ok}]}; -common(_StateName, info, {dispatch, _, Msg}, - #{replayq := Q} = State) -> - NewQ = replayq:append(Q, collect([Msg])), - {keep_state, State#{replayq => NewQ}, ?maybe_send}; -common(StateName, Type, Content, State) -> - ?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:\n~p", - [name(), Type, StateName, Content]), - {keep_state, State}. - -eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> - State; -eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> - Handler(Msg), - State. - -ensure_present(Key, Topic, State) -> - Topics = maps:get(Key, State), - case is_topic_present(Topic, Topics) of - true -> - {ok, State}; - false -> - R = do_ensure_present(Key, Topic, State), - {R, State#{Key := lists:usort([Topic | Topics])}} - end. - -ensure_absent(Key, Topic, State) -> - Topics = maps:get(Key, State), - case is_topic_present(Topic, Topics) of - true -> - R = do_ensure_absent(Key, Topic, State), - {R, State#{Key := ensure_topic_absent(Topic, Topics)}}; - false -> - {ok, State} - end. - -ensure_topic_absent(_Topic, []) -> []; -ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L); -ensure_topic_absent(Topic, L) -> lists:delete(Topic, L). - -is_topic_present({Topic, _QoS}, Topics) -> - is_topic_present(Topic, Topics); -is_topic_present(Topic, Topics) -> - lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). - -do_connect(Type, StateName, #{ forwards := Forwards - , subscriptions := Subs - , connect_fun := ConnectFun - , reconnect_delay_ms := Timeout - } = State) -> - ok = subscribe_local_topics(Forwards), - From = case StateName of - standing_by -> {call, Pid} = Type, Pid; - connecting -> ?NO_FROM - end, - DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) -> - StandingbyAction; - (connecting, _StandingbyAction, ConnectingAction) -> - ConnectingAction - end, - case ConnectFun(Subs) of - {ok, ConnRef, Conn} -> - ?LOG(info, "Bridge ~p connected", [name()]), - State0 = State#{conn_ref => ConnRef, connection => Conn}, - State1 = eval_bridge_handler(State0, connected), - StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, - ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}}, - DoEvent(StateName, StandingbyAction, ConnectingAction); - {error, Reason} -> - StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]}, - ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}}, - DoEvent(StateName, StandingbyAction, ConnectingAction) - end. - -do_ensure_present(forwards, Topic, _) -> - ok = subscribe_local_topic(Topic); -do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, - connection := undefined}) -> - {error, no_connection}; -do_ensure_present(subscriptions, {Topic, QoS}, - #{connect_module := ConnectModule, connection := Conn}) -> - case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of - true -> - _ = ConnectModule:ensure_subscribed(Conn, Topic, QoS), - ok; - false -> - {error, no_remote_subscription_support} - end. - -do_ensure_absent(forwards, Topic, _) -> - ok = emqx_broker:unsubscribe(Topic); -do_ensure_absent(subscriptions, _Topic, #{connect_module := _ConnectModule, - connection := undefined}) -> - {error, no_connection}; -do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule, - connection := Conn}) -> - case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of - true -> ConnectModule:ensure_unsubscribed(Conn, Topic); - false -> {error, no_remote_subscription_support} - end. - -collect(Acc) -> - receive - {dispatch, _, Msg} -> - collect([Msg | Acc]) - after - 0 -> - lists:reverse(Acc) - end. - -%% Retry all inflight (previously sent but not acked) batches. -retry_inflight(State, []) -> {ok, State}; -retry_inflight(#{inflight := Inflight} = State, - [#{q_ack_ref := QAckRef, batch := Batch} | T] = Remain) -> - case do_send(State, QAckRef, Batch) of - {ok, NewState} -> - retry_inflight(NewState, T); - {error, Reason} -> - ?LOG(error, "Inflight retry failed\n~p", [Reason]), - {error, State#{inflight := Inflight ++ Remain}} - end. - -pop_and_send(#{inflight := Inflight, - max_inflight_batches := Max - } = State) when length(Inflight) >= Max -> - {ok, State}; -pop_and_send(#{replayq := Q, - batch_count_limit := CountLimit, - batch_bytes_limit := BytesLimit - } = State) -> - case replayq:is_empty(Q) of - true -> - {ok, State}; - false -> - Opts = #{count_limit => CountLimit, bytes_limit => BytesLimit}, - {Q1, QAckRef, Batch} = replayq:pop(Q, Opts), - do_send(State#{replayq := Q1}, QAckRef, Batch) - end. - -%% Assert non-empty batch because we have a is_empty check earlier. -do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> - case maybe_send(State, Batch) of - {ok, Ref} -> - %% this is a list of inflight BATCHes, not expecting it to be too long - NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, - send_ack_ref => Ref, - batch => Batch}], - {ok, State#{inflight := NewInflight}}; - {error, Reason} -> - ?LOG(info, "Batch produce failed\n~p", [Reason]), - {error, State} - end. - -do_ack(State = #{inflight := [#{send_ack_ref := Refx, q_ack_ref := QAckRef} | Rest], - replayq := Q}, Ref) when Refx =:= Ref -> - ok = replayq:ack(Q, QAckRef), - {ok, State#{inflight := Rest}}; -do_ack(#{inflight := Inflight}, Ref) -> - case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of - true -> bad_order; - false -> stale - end. - -subscribe_local_topics(Topics) -> lists:foreach(fun subscribe_local_topic/1, Topics). - -subscribe_local_topic(Topic0) -> - Topic = topic(Topic0), - try - emqx_topic:validate({filter, Topic}) - catch - error : Reason -> - erlang:error({bad_topic, Topic, Reason}) - end, - ok = emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}). - -topic(T) -> iolist_to_binary(T). - -disconnect(#{connection := Conn, - conn_ref := ConnRef, - connect_module := Module - } = State) when Conn =/= undefined -> - ok = Module:stop(ConnRef, Conn), - State0 = State#{conn_ref => undefined, connection => undefined}, - eval_bridge_handler(State0, disconnected); -disconnect(State) -> - eval_bridge_handler(State, disconnected). - -%% Called only when replayq needs to dump it to disk. -msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); -msg_marshaller(Msg) -> emqx_bridge_msg:to_binary(Msg). - -%% Return {ok, SendAckRef} or {error, Reason} -maybe_send(#{connect_module := Module, - connection := Connection, - mountpoint := Mountpoint - }, Batch) -> - Module:send(Connection, [emqx_bridge_msg:to_export(Module, Mountpoint, M) || M <- Batch]). - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - -name() -> {_, Name} = process_info(self(), registered_name), Name. - -name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). - -id(Pid) when is_pid(Pid) -> Pid; -id(Name) -> name(Name). diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl deleted file mode 100644 index 480129903..000000000 --- a/src/emqx_bridge_connect.erl +++ /dev/null @@ -1,73 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_connect). - --export([start/2]). - --export_type([config/0, connection/0]). - --optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]). - -%% map fields depend on implementation --type config() :: map(). --type connection() :: term(). --type conn_ref() :: term(). --type batch() :: emqx_protal:batch(). --type ack_ref() :: emqx_bridge:ack_ref(). --type topic() :: emqx_topic:topic(). --type qos() :: emqx_mqtt_types:qos(). - --include("logger.hrl"). - --logger_header("[Bridge Connect]"). - -%% establish the connection to remote node/cluster -%% protal worker (the caller process) should be expecting -%% a message {disconnected, conn_ref()} when disconnected. --callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}. - -%% send to remote node/cluster -%% bridge worker (the caller process) should be expecting -%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster --callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}. - -%% called when owner is shutting down. --callback stop(conn_ref(), connection()) -> ok. - --callback ensure_subscribed(connection(), topic(), qos()) -> ok. - --callback ensure_unsubscribed(connection(), topic()) -> ok. - -start(Module, Config) -> - case Module:start(Config) of - {ok, Ref, Conn} -> - {ok, Ref, Conn}; - {error, Reason} -> - Config1 = obfuscate(Config), - ?LOG(error, "Failed to connect with module=~p\n" - "config=~p\nreason:~p", [Module, Config1, Reason]), - {error, Reason} - end. - -obfuscate(Map) -> - maps:fold(fun(K, V, Acc) -> - case is_sensitive(K) of - true -> [{K, '***'} | Acc]; - false -> [{K, V} | Acc] - end - end, [], Map). - -is_sensitive(password) -> true; -is_sensitive(_) -> false. diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl deleted file mode 100644 index 8a66f77a0..000000000 --- a/src/emqx_bridge_mqtt.erl +++ /dev/null @@ -1,198 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - -%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol - --module(emqx_bridge_mqtt). - --behaviour(emqx_bridge_connect). - -%% behaviour callbacks --export([ start/1 - , send/2 - , stop/2 - ]). - -%% optional behaviour callbacks --export([ ensure_subscribed/3 - , ensure_unsubscribed/2 - ]). - --include("emqx_mqtt.hrl"). - --define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}). - -%% Messages towards ack collector process --define(RANGE(Min, Max), {Min, Max}). --define(REF_IDS(Ref, Ids), {Ref, Ids}). --define(SENT(RefIds), {sent, RefIds}). --define(ACKED(AnyPktId), {acked, AnyPktId}). --define(STOP(Ref), {stop, Ref}). - -%%------------------------------------------------------------------------------ -%% emqx_bridge_connect callbacks -%%------------------------------------------------------------------------------ - -start(Config = #{address := Address}) -> - Ref = make_ref(), - Parent = self(), - AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end), - Handlers = make_hdlr(Parent, AckCollector, Ref), - {Host, Port} = case string:tokens(Address, ":") of - [H] -> {H, 1883}; - [H, P] -> {H, list_to_integer(P)} - end, - ClientConfig = Config#{msg_handler => Handlers, - owner => AckCollector, - host => Host, - port => Port, - bridge_mode => true - }, - case emqx_client:start_link(ClientConfig) of - {ok, Pid} -> - case emqx_client:connect(Pid) of - {ok, _} -> - try - subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])), - {ok, Ref, #{ack_collector => AckCollector, - client_pid => Pid}} - catch - throw : Reason -> - ok = stop(AckCollector, Pid), - {error, Reason} - end; - {error, Reason} -> - ok = stop(Ref, #{ack_collector => AckCollector, client_pid => Pid}), - {error, Reason} - end; - {error, Reason} -> - {error, Reason} - end. - -stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> - safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000), - safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000), - ok. - -ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> - case emqx_client:subscribe(Pid, Topic, QoS) of - {ok, _, _} -> ok; - Error -> Error - end; -ensure_subscribed(_Conn, _Topic, _QoS) -> - %% return ok for now, next re-connect should should call start with new topic added to config - ok. - -ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> - case emqx_client:unsubscribe(Pid, Topic) of - {ok, _, _} -> ok; - Error -> Error - end; -ensure_unsubscribed(_, _) -> - %% return ok for now, next re-connect should should call start with this topic deleted from config - ok. - -safe_stop(Pid, StopF, Timeout) -> - MRef = monitor(process, Pid), - unlink(Pid), - try - StopF() - catch - _ : _ -> - ok - end, - receive - {'DOWN', MRef, _, _, _} -> - ok - after - Timeout -> - exit(Pid, kill) - end. - -send(Conn, Batch) -> - send(Conn, Batch, []). - -send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest], Acc) -> - case emqx_client:publish(ClientPid, Msg) of - {ok, PktId} when Rest =:= [] -> - %% last one sent - Ref = make_ref(), - AckCollector ! ?SENT(?REF_IDS(Ref, lists:reverse([PktId | Acc]))), - {ok, Ref}; - {ok, PktId} -> - send(Conn, Rest, [PktId | Acc]); - {error, Reason} -> - %% NOTE: There is no partial sucess of a batch and recover from the middle - %% only to retry all messages in one batch - {error, Reason} - end. - -ack_collector(Parent, ConnRef) -> - ack_collector(Parent, ConnRef, queue:new(), []). - -ack_collector(Parent, ConnRef, Acked, Sent) -> - {NewAcked, NewSent} = - receive - ?STOP(ConnRef) -> - exit(normal); - ?ACKED(PktId) -> - match_acks(Parent, queue:in(PktId, Acked), Sent); - ?SENT(RefIds) -> - %% this message only happens per-batch, hence ++ is ok - match_acks(Parent, Acked, Sent ++ [RefIds]) - after - 200 -> - {Acked, Sent} - end, - ack_collector(Parent, ConnRef, NewAcked, NewSent). - -match_acks(_Parent, Acked, []) -> {Acked, []}; -match_acks(Parent, Acked, Sent) -> - match_acks_1(Parent, queue:out(Acked), Sent). - -match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent}; -match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) -> - %% batch finished - ok = emqx_bridge:handle_ack(Parent, Ref), - match_acks(Parent, Acked, Sent); -match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) -> - %% one message finished, but not the whole batch - match_acks(Parent, Acked, [?REF_IDS(Ref, RestIds) | Sent]). - - -%% When puback for QoS-1 message is received from remote MQTT broker -%% NOTE: no support for QoS-2 -handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) -> - RC =:= ?RC_SUCCESS orelse error({puback_error_code, RC}), - AckCollector ! ?ACKED(PktId), - ok. - -%% Message published from remote broker. Import to local broker. -import_msg(Msg) -> - %% auto-ack should be enabled in emqx_client, hence dummy ack-fun. - emqx_bridge:import_batch([Msg], _AckFun = fun() -> ok end). - -make_hdlr(Parent, AckCollector, Ref) -> - #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end, - publish => fun(Msg) -> import_msg(Msg) end, - disconnected => fun(Reason) -> Parent ! {disconnected, Ref, Reason}, ok end - }. - -subscribe_remote_topics(ClientPid, Subscriptions) -> - lists:foreach(fun({Topic, Qos}) -> - case emqx_client:subscribe(ClientPid, Topic, Qos) of - {ok, _, _} -> ok; - Error -> throw(Error) - end - end, Subscriptions). diff --git a/src/emqx_bridge_msg.erl b/src/emqx_bridge_msg.erl deleted file mode 100644 index 6633027f9..000000000 --- a/src/emqx_bridge_msg.erl +++ /dev/null @@ -1,84 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_msg). - --export([ to_binary/1 - , from_binary/1 - , to_export/3 - , to_broker_msgs/1 - , estimate_size/1 - ]). - --export_type([msg/0]). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). --include("emqx_client.hrl"). - --type msg() :: emqx_types:message(). --type exp_msg() :: emqx_types:message() | #mqtt_msg{}. - -%% @doc Make export format: -%% 1. Mount topic to a prefix -%% 2. Fix QoS to 1 -%% @end -%% Shame that we have to know the callback module here -%% would be great if we can get rid of #mqtt_msg{} record -%% and use #message{} in all places. --spec to_export(emqx_bridge_rpc | emqx_bridge_mqtt, - undefined | binary(), msg()) -> exp_msg(). -to_export(emqx_bridge_mqtt, Mountpoint, - #message{topic = Topic, - payload = Payload, - flags = Flags - }) -> - Retain = maps:get(retain, Flags, false), - #mqtt_msg{qos = ?QOS_1, - retain = Retain, - topic = topic(Mountpoint, Topic), - payload = Payload}; -to_export(_Module, Mountpoint, - #message{topic = Topic} = Msg) -> - Msg#message{topic = topic(Mountpoint, Topic), qos = 1}. - -%% @doc Make `binary()' in order to make iodata to be persisted on disk. --spec to_binary(msg()) -> binary(). -to_binary(Msg) -> term_to_binary(Msg). - -%% @doc Unmarshal binary into `msg()'. --spec from_binary(binary()) -> msg(). -from_binary(Bin) -> binary_to_term(Bin). - -%% @doc Estimate the size of a message. -%% Count only the topic length + payload size --spec estimate_size(msg()) -> integer(). -estimate_size(#message{topic = Topic, payload = Payload}) -> - size(Topic) + size(Payload). - -%% @doc By message/batch receiver, transform received batch into -%% messages to dispatch to local brokers. -to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch). - -to_broker_msg(#message{} = Msg) -> - %% internal format from another EMQX node via rpc - Msg; -to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic, - properties := Props, payload := Payload}) -> - %% published from remote node over a MQTT connection - emqx_message:set_headers(Props, - emqx_message:set_flags(#{dup => Dup, retain => Retain}, - emqx_message:make(bridge, QoS, Topic, Payload))). - -topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic). diff --git a/src/emqx_bridge_rpc.erl b/src/emqx_bridge_rpc.erl deleted file mode 100644 index 9674fdcf1..000000000 --- a/src/emqx_bridge_rpc.erl +++ /dev/null @@ -1,105 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - -%% @doc This module implements EMQX Bridge transport layer based on gen_rpc. - --module(emqx_bridge_rpc). --behaviour(emqx_bridge_connect). - -%% behaviour callbacks --export([ start/1 - , send/2 - , stop/2 - ]). - -%% Internal exports --export([ handle_send/2 - , handle_ack/2 - , heartbeat/2 - ]). - --type ack_ref() :: emqx_bridge:ack_ref(). --type batch() :: emqx_bridge:batch(). - --define(HEARTBEAT_INTERVAL, timer:seconds(1)). - --define(RPC, gen_rpc). - -start(#{address := Remote}) -> - case poke(Remote) of - ok -> - Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), Remote]), - {ok, Pid, Remote}; - Error -> - Error - end. - -stop(Pid, _Remote) when is_pid(Pid) -> - Ref = erlang:monitor(process, Pid), - unlink(Pid), - Pid ! stop, - receive - {'DOWN', Ref, process, Pid, _Reason} -> - ok - after - 1000 -> - exit(Pid, kill) - end, - ok. - -%% @doc Callback for `emqx_bridge_connect' behaviour --spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. -send(Remote, Batch) -> - Sender = self(), - case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of - {ok, Ref} -> {ok, Ref}; - {badrpc, Reason} -> {error, Reason} - end. - -%% @doc Handle send on receiver side. --spec handle_send(pid(), batch()) -> {ok, ack_ref()} | {error, any()}. -handle_send(SenderPid, Batch) -> - SenderNode = node(SenderPid), - Ref = make_ref(), - AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end, - case emqx_bridge:import_batch(Batch, AckFun) of - ok -> {ok, Ref}; - Error -> Error - end. - -%% @doc Handle batch ack in sender node. -handle_ack(SenderPid, Ref) -> - ok = emqx_bridge:handle_ack(SenderPid, Ref). - -%% @hidden Heartbeat loop -heartbeat(Parent, RemoteNode) -> - Interval = ?HEARTBEAT_INTERVAL, - receive - stop -> exit(normal) - after - Interval -> - case poke(RemoteNode) of - ok -> - ?MODULE:heartbeat(Parent, RemoteNode); - {error, Reason} -> - Parent ! {disconnected, self(), Reason}, - exit(normal) - end - end. - -poke(Node) -> - case ?RPC:call(Node, erlang, node, []) of - Node -> ok; - {badrpc, Reason} -> {error, Reason} - end. diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl deleted file mode 100644 index baf86fd70..000000000 --- a/src/emqx_bridge_sup.erl +++ /dev/null @@ -1,81 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_sup). --behavior(supervisor). - --include("logger.hrl"). - --logger_header("[Bridge]"). - -%% APIs --export([ start_link/0 - , start_link/1 - ]). - --export([ create_bridge/2 - , drop_bridge/1 - , bridges/0 - , is_bridge_exist/1 - ]). - -%% supervisor callbacks --export([init/1]). - --define(SUP, ?MODULE). --define(WORKER_SUP, emqx_bridge_worker_sup). - -start_link() -> start_link(?SUP). - -start_link(Name) -> - supervisor:start_link({local, Name}, ?MODULE, Name). - -init(?SUP) -> - BridgesConf = emqx_config:get_env(bridges, []), - BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf), - SupFlag = #{strategy => one_for_one, - intensity => 100, - period => 10}, - {ok, {SupFlag, BridgeSpec}}. - -bridge_spec({Name, Config}) -> - #{id => Name, - start => {emqx_bridge, start_link, [Name, Config]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_bridge]}. - --spec(bridges() -> [{node(), map()}]). -bridges() -> - [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. - --spec(is_bridge_exist(atom() | pid()) -> boolean()). -is_bridge_exist(Id) -> - case supervisor:get_childspec(?SUP, Id) of - {ok, _ChildSpec} -> true; - {error, _Error} -> false - end. - -create_bridge(Id, Config) -> - supervisor:start_child(?SUP, bridge_spec({Id, Config})). - -drop_bridge(Id) -> - case supervisor:terminate_child(?SUP, Id) of - ok -> - supervisor:delete_child(?SUP, Id); - Error -> - ?LOG(error, "Delete bridge failed, error : ~p", [Error]), - Error - end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 49454d8ac..b4a3032ce 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -321,7 +321,7 @@ resume(SPid, SessAttrs) -> %% @doc Discard the session -spec(discard(spid(), ByPid :: pid()) -> ok). discard(SPid, ByPid) -> - gen_server:call(SPid, {discard, ByPid}, infinity). + gen_server:call(SPid, {discard, ByPid}). -spec(update_expiry_interval(spid(), timeout()) -> ok). update_expiry_interval(SPid, Interval) -> @@ -329,7 +329,7 @@ update_expiry_interval(SPid, Interval) -> -spec(close(spid()) -> ok). close(SPid) -> - gen_server:call(SPid, close, infinity). + gen_server:call(SPid, close). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -798,7 +798,7 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) -> SessProps = #{client_id => ClientId, username => Username}, %% Drain the mailbox and batch deliver - Msgs1 = drain_m(batch_n(Inflight), Msgs), + Msgs1 = Msgs ++ drain_m(batch_n(Inflight)), %% Ack the messages for shared subscription Msgs2 = maybe_ack_shared(Msgs1, State), %% Process suboptions @@ -821,6 +821,9 @@ batch_n(Inflight) -> Sz -> Sz - emqx_inflight:size(Inflight) end. +drain_m(Cnt) -> + drain_m(Cnt, []). + drain_m(Cnt, Msgs) when Cnt =< 0 -> lists:reverse(Msgs); drain_m(Cnt, Msgs) -> diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index b271bd85d..8c176edb2 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -119,7 +119,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow {noreply, State#state{sessions = SessMap1}}; handle_info(Info, State) -> - ?LOG(notice, "Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, State) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 256425f5a..f6f2fbf47 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -116,6 +116,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> try emqx_session:discard(SessPid, ConnPid) catch _:Error:_Stk -> + unregister_session(ClientId, SessPid), ?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error]) end end, lookup_session_pids(ClientId)). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index dc30f4af1..b517e4f07 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -65,7 +65,6 @@ init([]) -> RouterSup = supervisor_spec(emqx_router_sup), %% Broker Sup BrokerSup = supervisor_spec(emqx_broker_sup), - BridgeSup = supervisor_spec(emqx_bridge_sup), %% Session Manager SMSup = supervisor_spec(emqx_sm_sup), %% Connection Manager @@ -76,7 +75,6 @@ init([]) -> [KernelSup, RouterSup, BrokerSup, - BridgeSup, SMSup, CMSup, SysSup]}}. diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index 1e7223320..c918ce4a7 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -111,6 +111,7 @@ t_logger_handler(_) -> {child_type, worker}]}]}, #{logger_formatter => #{title => "SUPERVISOR REPORT"}, report_cb => fun logger:format_otp_report/1}), + timer:sleep(20), ?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())). raw_send_serialize(Packet) -> diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl deleted file mode 100644 index 7d94091e1..000000000 --- a/test/emqx_bridge_SUITE.erl +++ /dev/null @@ -1,195 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_SUITE). - --export([ all/0 - , init_per_suite/1 - , end_per_suite/1]). --export([ t_rpc/1 - , t_mqtt/1 - , t_mngr/1]). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). --include("emqx_mqtt.hrl"). --include("emqx.hrl"). - --define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). - -all() -> [ t_rpc - , t_mqtt - , t_mngr]. - -init_per_suite(Config) -> - case node() of - nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]); - _ -> ok - end, - emqx_ct_helpers:start_apps([]), - emqx_logger:set_log_level(error), - [{log_level, error} | Config]. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -t_mngr(Config) when is_list(Config) -> - Subs = [{<<"a">>, 1}, {<<"b">>, 2}], - Cfg = #{address => node(), - forwards => [<<"mngr">>], - connect_module => emqx_bridge_rpc, - mountpoint => <<"forwarded">>, - subscriptions => Subs, - start_type => auto}, - Name = ?FUNCTION_NAME, - {ok, Pid} = emqx_bridge:start_link(Name, Cfg), - try - ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Name)), - ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr")), - ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr2")), - ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge:get_forwards(Pid)), - ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr2")), - ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr3")), - ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Pid)), - ?assertEqual({error, no_remote_subscription_support}, - emqx_bridge:ensure_subscription_present(Pid, <<"t">>, 0)), - ?assertEqual({error, no_remote_subscription_support}, - emqx_bridge:ensure_subscription_absent(Pid, <<"t">>)), - ?assertEqual(Subs, emqx_bridge:get_subscriptions(Pid)) - after - ok = emqx_bridge:stop(Pid) - end. - -%% A loopback RPC to local node -t_rpc(Config) when is_list(Config) -> - Cfg = #{address => node(), - forwards => [<<"t_rpc/#">>], - connect_module => emqx_bridge_rpc, - mountpoint => <<"forwarded">>, - start_type => auto}, - {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), - ClientId = <<"ClientId">>, - try - {ok, ConnPid} = emqx_mock_client:start_link(ClientId), - {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), - %% message from a different client, to avoid getting terminated by no-local - Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>), - ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]), - ct:sleep(100), - PacketId = 1, - emqx_session:publish(SPid, PacketId, Msg1), - ?wait(case emqx_mock_client:get_last_message(ConnPid) of - [{publish, PacketId, #message{topic = <<"forwarded/t_rpc/one">>}}] -> - true; - Other -> - Other - end, 4000), - emqx_mock_client:close_session(ConnPid) - after - ok = emqx_bridge:stop(Pid) - end. - -%% Full data loopback flow explained: -%% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)---> -%% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) --> -%% bridge(import) --(mecked message sending)--> test-pid -t_mqtt(Config) when is_list(Config) -> - SendToTopic = <<"t_mqtt/one">>, - SendToTopic2 = <<"t_mqtt/two">>, - Mountpoint = <<"forwarded/${node}/">>, - ForwardedTopic = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic]), - ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]), - Cfg = #{address => "127.0.0.1:1883", - forwards => [SendToTopic], - connect_module => emqx_bridge_mqtt, - mountpoint => Mountpoint, - username => "user", - clean_start => true, - client_id => "bridge_aws", - keepalive => 60000, - max_inflight => 32, - password => "passwd", - proto_ver => mqttv4, - queue => #{replayq_dir => "data/t_mqtt/", - replayq_seg_bytes => 10000, - batch_bytes_limit => 1000, - batch_count_limit => 10 - }, - reconnect_delay_ms => 1000, - ssl => false, - %% Consume back to forwarded message for verification - %% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2 - subscriptions => [{ForwardedTopic, _QoS = 1}], - start_type => auto}, - Tester = self(), - Ref = make_ref(), - meck:new(emqx_bridge, [passthrough, no_history]), - meck:expect(emqx_bridge, import_batch, 2, - fun(Batch, AckFun) -> - Tester ! {Ref, Batch}, - AckFun() - end), - {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), - ClientId = <<"client-1">>, - try - ?assertEqual([{ForwardedTopic, 1}], emqx_bridge:get_subscriptions(Pid)), - ok = emqx_bridge:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), - ok = emqx_bridge:ensure_forward_present(Pid, SendToTopic2), - ?assertEqual([{ForwardedTopic, 1}, - {ForwardedTopic2, 1}], emqx_bridge:get_subscriptions(Pid)), - {ok, ConnPid} = emqx_mock_client:start_link(ClientId), - {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), - %% message from a different client, to avoid getting terminated by no-local - Max = 100, - Msgs = lists:seq(1, Max), - lists:foreach(fun(I) -> - Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)), - emqx_session:publish(SPid, I, Msg) - end, Msgs), - ok = receive_and_match_messages(Ref, Msgs), - Msgs2 = lists:seq(Max + 1, Max * 2), - lists:foreach(fun(I) -> - Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)), - emqx_session:publish(SPid, I, Msg) - end, Msgs2), - ok = receive_and_match_messages(Ref, Msgs2), - emqx_mock_client:close_session(ConnPid) - after - ok = emqx_bridge:stop(Pid), - meck:unload(emqx_bridge) - end. - -receive_and_match_messages(Ref, Msgs) -> - TRef = erlang:send_after(timer:seconds(5), self(), {Ref, timeout}), - try - do_receive_and_match_messages(Ref, Msgs) - after - erlang:cancel_timer(TRef) - end, - ok. - -do_receive_and_match_messages(_Ref, []) -> ok; -do_receive_and_match_messages(Ref, [I | Rest] = Exp) -> - receive - {Ref, timeout} -> erlang:error(timeout); - {Ref, [#{payload := P} = Msg]} -> - case binary_to_integer(P) of - I -> %% exact match - do_receive_and_match_messages(Ref, Rest); - J when J < I -> %% allow retry - do_receive_and_match_messages(Ref, Exp); - _Other -> - throw({unexpected, Msg, Exp}) - end - end. diff --git a/test/emqx_bridge_mqtt_tests.erl b/test/emqx_bridge_mqtt_tests.erl deleted file mode 100644 index e9eb0b0a0..000000000 --- a/test/emqx_bridge_mqtt_tests.erl +++ /dev/null @@ -1,54 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_mqtt_tests). --include_lib("eunit/include/eunit.hrl"). --include("emqx_mqtt.hrl"). - -send_and_ack_test() -> - %% delegate from gen_rpc to rpc for unit test - meck:new(emqx_client, [passthrough, no_history]), - meck:expect(emqx_client, start_link, 1, - fun(#{msg_handler := Hdlr}) -> - {ok, spawn_link(fun() -> fake_client(Hdlr) end)} - end), - meck:expect(emqx_client, connect, 1, {ok, dummy}), - meck:expect(emqx_client, stop, 1, - fun(Pid) -> Pid ! stop end), - meck:expect(emqx_client, publish, 2, - fun(Client, Msg) -> - Client ! {publish, Msg}, - {ok, Msg} %% as packet id - end), - try - Max = 100, - Batch = lists:seq(1, Max), - {ok, Ref, Conn} = emqx_bridge_mqtt:start(#{address => "127.0.0.1:1883"}), - %% return last packet id as batch reference - {ok, AckRef} = emqx_bridge_mqtt:send(Conn, Batch), - %% expect batch ack - receive {batch_ack, AckRef} -> ok end, - ok = emqx_bridge_mqtt:stop(Ref, Conn) - after - meck:unload(emqx_client) - end. - -fake_client(#{puback := PubAckCallback} = Hdlr) -> - receive - {publish, PktId} -> - PubAckCallback(#{packet_id => PktId, reason_code => ?RC_SUCCESS}), - fake_client(Hdlr); - stop -> - exit(normal) - end. diff --git a/test/emqx_bridge_rpc_tests.erl b/test/emqx_bridge_rpc_tests.erl deleted file mode 100644 index 28e05b895..000000000 --- a/test/emqx_bridge_rpc_tests.erl +++ /dev/null @@ -1,43 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_rpc_tests). --include_lib("eunit/include/eunit.hrl"). - -send_and_ack_test() -> - %% delegate from gen_rpc to rpc for unit test - meck:new(gen_rpc, [passthrough, no_history]), - meck:expect(gen_rpc, call, 4, - fun(Node, Module, Fun, Args) -> - rpc:call(Node, Module, Fun, Args) - end), - meck:expect(gen_rpc, cast, 4, - fun(Node, Module, Fun, Args) -> - rpc:cast(Node, Module, Fun, Args) - end), - meck:new(emqx_bridge, [passthrough, no_history]), - meck:expect(emqx_bridge, import_batch, 2, - fun(batch, AckFun) -> AckFun() end), - try - {ok, Pid, Node} = emqx_bridge_rpc:start(#{address => node()}), - {ok, Ref} = emqx_bridge_rpc:send(Node, batch), - receive - {batch_ack, Ref} -> - ok - end, - ok = emqx_bridge_rpc:stop(Pid, Node) - after - meck:unload(gen_rpc), - meck:unload(emqx_bridge) - end. diff --git a/test/emqx_bridge_tests.erl b/test/emqx_bridge_tests.erl deleted file mode 100644 index 4bd86ad5e..000000000 --- a/test/emqx_bridge_tests.erl +++ /dev/null @@ -1,155 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge_tests). --behaviour(emqx_bridge_connect). - --include_lib("eunit/include/eunit.hrl"). --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - --define(BRIDGE_NAME, test). --define(BRIDGE_REG_NAME, emqx_bridge_test). --define(WAIT(PATTERN, TIMEOUT), - receive - PATTERN -> - ok - after - TIMEOUT -> - error(timeout) - end). - -%% stub callbacks --export([start/1, send/2, stop/2]). - -start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) -> - case is_pid(Pid) of - true -> Pid ! {connection_start_attempt, Ref}; - false -> ok - end, - Result. - -send(SendFun, Batch) when is_function(SendFun, 1) -> - SendFun(Batch). - -stop(_Ref, _Pid) -> ok. - -%% bridge worker should retry connecting remote node indefinitely -reconnect_test() -> - Ref = make_ref(), - Config = make_config(Ref, self(), {error, test}), - {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), - %% assert name registered - ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), - ?WAIT({connection_start_attempt, Ref}, 1000), - %% expect same message again - ?WAIT({connection_start_attempt, Ref}, 1000), - ok = emqx_bridge:stop(?BRIDGE_REG_NAME), - ok. - -%% connect first, disconnect, then connect again -disturbance_test() -> - Ref = make_ref(), - Config = make_config(Ref, self(), {ok, Ref, connection}), - {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), - ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), - ?WAIT({connection_start_attempt, Ref}, 1000), - Pid ! {disconnected, Ref, test}, - ?WAIT({connection_start_attempt, Ref}, 1000), - ok = emqx_bridge:stop(?BRIDGE_REG_NAME). - -%% buffer should continue taking in messages when disconnected -buffer_when_disconnected_test_() -> - {timeout, 10000, fun test_buffer_when_disconnected/0}. - -test_buffer_when_disconnected() -> - Ref = make_ref(), - Nums = lists:seq(1, 100), - Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end), - SenderMref = monitor(process, Sender), - Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end), - ReceiverMref = monitor(process, Receiver), - SendFun = fun(Batch) -> - BatchRef = make_ref(), - Receiver ! {batch, BatchRef, Batch}, - {ok, BatchRef} - end, - Config0 = make_config(Ref, false, {ok, Ref, SendFun}), - Config = Config0#{reconnect_delay_ms => 100}, - {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), - Sender ! {bridge, Pid}, - Receiver ! {bridge, Pid}, - ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), - Pid ! {disconnected, Ref, test}, - ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000), - ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), - ok = emqx_bridge:stop(?BRIDGE_REG_NAME). - -manual_start_stop_test() -> - Ref = make_ref(), - Config0 = make_config(Ref, self(), {ok, Ref, connection}), - Config = Config0#{start_type := manual}, - {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config), - %% call ensure_started again should yeld the same result - {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config), - ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), - emqx_bridge:ensure_stopped(unknown), - emqx_bridge:ensure_stopped(Pid), - emqx_bridge:ensure_stopped(?BRIDGE_REG_NAME). - -%% Feed messages to bridge -sender_loop(_Pid, [], _) -> exit(normal); -sender_loop(Pid, [Num | Rest], Interval) -> - random_sleep(Interval), - Pid ! {dispatch, dummy, make_msg(Num)}, - sender_loop(Pid, Rest, Interval). - -%% Feed acknowledgments to bridge -receiver_loop(_Pid, [], _) -> ok; -receiver_loop(Pid, Nums, Interval) -> - receive - {batch, BatchRef, Batch} -> - Rest = match_nums(Batch, Nums), - random_sleep(Interval), - emqx_bridge:handle_ack(Pid, BatchRef), - receiver_loop(Pid, Rest, Interval) - end. - -random_sleep(MaxInterval) -> - case rand:uniform(MaxInterval) - 1 of - 0 -> ok; - T -> timer:sleep(T) - end. - -match_nums([], Rest) -> Rest; -match_nums([#message{payload = P} | Rest], Nums) -> - I = binary_to_integer(P), - case Nums of - [I | NumsLeft] -> match_nums(Rest, NumsLeft); - [J | _] when J > I -> match_nums(Rest, Nums); %% allow retry - _ -> error([{received, I}, {expecting, Nums}]) - end. - -make_config(Ref, TestPid, Result) -> - #{test_pid => TestPid, - test_ref => Ref, - connect_module => ?MODULE, - reconnect_delay_ms => 50, - connect_result => Result, - start_type => auto - }. - -make_msg(I) -> - Payload = integer_to_binary(I), - emqx_message:make(<<"test/topic">>, Payload). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index ec707e30d..17c55f517 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -245,7 +245,7 @@ connect_v5(_) -> #{version => ?MQTT_PROTO_V5} )), - {ok, Data3} = gen_tcp:recv(Sock, 0), + {ok, Data3} = gen_tcp:recv(Sock, 6), {ok, ?PUBACK_PACKET(1, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 0e04de963..8d516e94d 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -75,4 +75,4 @@ start_traces(_Config) -> ok = emqx_tracer:stop_trace({topic, <<"a/#">>}), emqx_client:disconnect(T), - emqx_logger:set_log_level(error). + emqx_logger:set_log_level(warning).