refactor: remove the emqx_bridge_monitor module
This commit is contained in:
parent
88ca25c60c
commit
69fba6958b
|
@ -17,6 +17,7 @@
|
||||||
-behaviour(emqx_config_handler).
|
-behaviour(emqx_config_handler).
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-export([post_config_update/5]).
|
-export([post_config_update/5]).
|
||||||
|
|
||||||
|
@ -46,6 +47,30 @@
|
||||||
%% exported for `emqx_telemetry'
|
%% exported for `emqx_telemetry'
|
||||||
-export([get_basic_usage_info/0]).
|
-export([get_basic_usage_info/0]).
|
||||||
|
|
||||||
|
load() ->
|
||||||
|
%% set wait_for_resource_ready => 0 to start resources async
|
||||||
|
Opts = #{auto_retry_interval => 60000, wait_for_resource_ready => 0},
|
||||||
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
|
lists:foreach(
|
||||||
|
fun({Type, NamedConf}) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Name, Conf}) ->
|
||||||
|
_Res = emqx_bridge_resource:create(Type, Name, Conf, Opts),
|
||||||
|
?tp(
|
||||||
|
emqx_bridge_loaded,
|
||||||
|
#{
|
||||||
|
type => Type,
|
||||||
|
name => Name,
|
||||||
|
res => _Res
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
maps:to_list(NamedConf)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
maps:to_list(Bridges)
|
||||||
|
).
|
||||||
|
|
||||||
load_hook() ->
|
load_hook() ->
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
load_hook(Bridges).
|
load_hook(Bridges).
|
||||||
|
@ -138,10 +163,6 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
|
||||||
ok = load_hook(NewConf),
|
ok = load_hook(NewConf),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
load() ->
|
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
|
||||||
emqx_bridge_monitor:ensure_all_started(Bridges).
|
|
||||||
|
|
||||||
list() ->
|
list() ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({Type, NameAndConf}, Bridges) ->
|
fun({Type, NameAndConf}, Bridges) ->
|
||||||
|
|
|
@ -1,90 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% This process monitors all the data bridges, and try to restart a bridge
|
|
||||||
%% when one of it stopped.
|
|
||||||
-module(emqx_bridge_monitor).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
|
|
||||||
%% API functions
|
|
||||||
-export([
|
|
||||||
start_link/0,
|
|
||||||
ensure_all_started/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([
|
|
||||||
init/1,
|
|
||||||
handle_call/3,
|
|
||||||
handle_cast/2,
|
|
||||||
handle_info/2,
|
|
||||||
terminate/2,
|
|
||||||
code_change/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-record(state, {}).
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
ensure_all_started(Configs) ->
|
|
||||||
gen_server:cast(?MODULE, {start_and_monitor, Configs}).
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
{ok, #state{}}.
|
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
|
||||||
Reply = ok,
|
|
||||||
{reply, Reply, State}.
|
|
||||||
|
|
||||||
handle_cast({start_and_monitor, Configs}, State) ->
|
|
||||||
ok = load_bridges(Configs),
|
|
||||||
{noreply, State};
|
|
||||||
handle_cast(_Msg, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%============================================================================
|
|
||||||
load_bridges(Configs) ->
|
|
||||||
lists:foreach(
|
|
||||||
fun({Type, NamedConf}) ->
|
|
||||||
lists:foreach(
|
|
||||||
fun({Name, Conf}) ->
|
|
||||||
_Res = emqx_bridge_resource:create(Type, Name, Conf),
|
|
||||||
?tp(
|
|
||||||
emqx_bridge_monitor_loaded_bridge,
|
|
||||||
#{
|
|
||||||
type => Type,
|
|
||||||
name => Name,
|
|
||||||
res => _Res
|
|
||||||
}
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
maps:to_list(NamedConf)
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
maps:to_list(Configs)
|
|
||||||
).
|
|
|
@ -28,6 +28,7 @@
|
||||||
-export([
|
-export([
|
||||||
create/2,
|
create/2,
|
||||||
create/3,
|
create/3,
|
||||||
|
create/4,
|
||||||
recreate/2,
|
recreate/2,
|
||||||
recreate/3,
|
recreate/3,
|
||||||
create_dry_run/2,
|
create_dry_run/2,
|
||||||
|
@ -79,6 +80,9 @@ create(BridgeId, Conf) ->
|
||||||
create(BridgeType, BridgeName, Conf).
|
create(BridgeType, BridgeName, Conf).
|
||||||
|
|
||||||
create(Type, Name, Conf) ->
|
create(Type, Name, Conf) ->
|
||||||
|
create(Type, Name, Conf, #{auto_retry_interval => 60000}).
|
||||||
|
|
||||||
|
create(Type, Name, Conf, Opts) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "create bridge",
|
msg => "create bridge",
|
||||||
type => Type,
|
type => Type,
|
||||||
|
@ -90,7 +94,7 @@ create(Type, Name, Conf) ->
|
||||||
<<"emqx_bridge">>,
|
<<"emqx_bridge">>,
|
||||||
bridge_to_resource_type(Type),
|
bridge_to_resource_type(Type),
|
||||||
parse_confs(Type, Name, Conf),
|
parse_confs(Type, Name, Conf),
|
||||||
#{auto_retry_interval => 60000}
|
Opts
|
||||||
),
|
),
|
||||||
maybe_disable_bridge(Type, Name, Conf).
|
maybe_disable_bridge(Type, Name, Conf).
|
||||||
|
|
||||||
|
@ -132,13 +136,14 @@ update(Type, Name, {OldConf, Conf}) ->
|
||||||
true ->
|
true ->
|
||||||
%% we don't need to recreate the bridge if this config change is only to
|
%% we don't need to recreate the bridge if this config change is only to
|
||||||
%% toggole the config 'bridge.{type}.{name}.enable'
|
%% toggole the config 'bridge.{type}.{name}.enable'
|
||||||
|
_ =
|
||||||
case maps:get(enable, Conf, true) of
|
case maps:get(enable, Conf, true) of
|
||||||
true ->
|
true ->
|
||||||
_ = restart(Type, Name),
|
restart(Type, Name);
|
||||||
ok;
|
|
||||||
false ->
|
false ->
|
||||||
stop(Type, Name)
|
stop(Type, Name)
|
||||||
end
|
end,
|
||||||
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
recreate(Type, Name) ->
|
recreate(Type, Name) ->
|
||||||
|
|
|
@ -32,15 +32,7 @@ init([]) ->
|
||||||
intensity => 10,
|
intensity => 10,
|
||||||
period => 10
|
period => 10
|
||||||
},
|
},
|
||||||
ChildSpecs = [
|
ChildSpecs = [],
|
||||||
#{
|
|
||||||
id => emqx_bridge_monitor,
|
|
||||||
start => {emqx_bridge_monitor, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_bridge_monitor]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
|
|
@ -147,7 +147,7 @@ setup_fake_telemetry_data() ->
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts),
|
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts),
|
||||||
|
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end,
|
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end,
|
||||||
NEvents = 3,
|
NEvents = 3,
|
||||||
BackInTime = 0,
|
BackInTime = 0,
|
||||||
Timeout = 11_000,
|
Timeout = 11_000,
|
||||||
|
|
Loading…
Reference in New Issue