diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0fac31625..61a697948 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -17,6 +17,7 @@ -behaviour(emqx_config_handler). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([post_config_update/5]). @@ -46,6 +47,30 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). +load() -> + %% set wait_for_resource_ready => 0 to start resources async + Opts = #{auto_retry_interval => 60000, wait_for_resource_ready => 0}, + Bridges = emqx:get_config([bridges], #{}), + lists:foreach( + fun({Type, NamedConf}) -> + lists:foreach( + fun({Name, Conf}) -> + _Res = emqx_bridge_resource:create(Type, Name, Conf, Opts), + ?tp( + emqx_bridge_loaded, + #{ + type => Type, + name => Name, + res => _Res + } + ) + end, + maps:to_list(NamedConf) + ) + end, + maps:to_list(Bridges) + ). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). @@ -138,10 +163,6 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> ok = load_hook(NewConf), Result. -load() -> - Bridges = emqx:get_config([bridges], #{}), - emqx_bridge_monitor:ensure_all_started(Bridges). - list() -> lists:foldl( fun({Type, NameAndConf}, Bridges) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl deleted file mode 100644 index 70c89f352..000000000 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ /dev/null @@ -1,90 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- -%% This process monitors all the data bridges, and try to restart a bridge -%% when one of it stopped. --module(emqx_bridge_monitor). - --behaviour(gen_server). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - -%% API functions --export([ - start_link/0, - ensure_all_started/1 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --record(state, {}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -ensure_all_started(Configs) -> - gen_server:cast(?MODULE, {start_and_monitor, Configs}). - -init([]) -> - {ok, #state{}}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast({start_and_monitor, Configs}, State) -> - ok = load_bridges(Configs), - {noreply, State}; -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%============================================================================ -load_bridges(Configs) -> - lists:foreach( - fun({Type, NamedConf}) -> - lists:foreach( - fun({Name, Conf}) -> - _Res = emqx_bridge_resource:create(Type, Name, Conf), - ?tp( - emqx_bridge_monitor_loaded_bridge, - #{ - type => Type, - name => Name, - res => _Res - } - ) - end, - maps:to_list(NamedConf) - ) - end, - maps:to_list(Configs) - ). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 0ff04c4f5..186f99557 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -28,6 +28,7 @@ -export([ create/2, create/3, + create/4, recreate/2, recreate/3, create_dry_run/2, @@ -79,6 +80,9 @@ create(BridgeId, Conf) -> create(BridgeType, BridgeName, Conf). create(Type, Name, Conf) -> + create(Type, Name, Conf, #{auto_retry_interval => 60000}). + +create(Type, Name, Conf, Opts) -> ?SLOG(info, #{ msg => "create bridge", type => Type, @@ -90,7 +94,7 @@ create(Type, Name, Conf) -> <<"emqx_bridge">>, bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - #{auto_retry_interval => 60000} + Opts ), maybe_disable_bridge(Type, Name, Conf). @@ -132,13 +136,14 @@ update(Type, Name, {OldConf, Conf}) -> true -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' - case maps:get(enable, Conf, true) of - true -> - _ = restart(Type, Name), - ok; - false -> - stop(Type, Name) - end + _ = + case maps:get(enable, Conf, true) of + true -> + restart(Type, Name); + false -> + stop(Type, Name) + end, + ok end. recreate(Type, Name) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_sup.erl b/apps/emqx_bridge/src/emqx_bridge_sup.erl index cce3a066b..ba8181f83 100644 --- a/apps/emqx_bridge/src/emqx_bridge_sup.erl +++ b/apps/emqx_bridge/src/emqx_bridge_sup.erl @@ -32,15 +32,7 @@ init([]) -> intensity => 10, period => 10 }, - ChildSpecs = [ - #{ - id => emqx_bridge_monitor, - start => {emqx_bridge_monitor, start_link, []}, - restart => permanent, - type => worker, - modules => [emqx_bridge_monitor] - } - ], + ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index 894322c61..d8266f83a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -147,7 +147,7 @@ setup_fake_telemetry_data() -> ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts), ok = snabbkaffe:start_trace(), - Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end, + Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end, NEvents = 3, BackInTime = 0, Timeout = 11_000,