Merge pull request #6648 from DDDHuang/fix_update2
fix(config): restart app after config update in cluster
This commit is contained in:
commit
db0e4948a1
|
@ -22,9 +22,11 @@
|
||||||
%% be used by the prometheus application
|
%% be used by the prometheus application
|
||||||
-behaviour(prometheus_collector).
|
-behaviour(prometheus_collector).
|
||||||
|
|
||||||
|
-include("emqx_prometheus.hrl").
|
||||||
|
|
||||||
-include_lib("prometheus/include/prometheus.hrl").
|
-include_lib("prometheus/include/prometheus.hrl").
|
||||||
-include_lib("prometheus/include/prometheus_model.hrl").
|
-include_lib("prometheus/include/prometheus_model.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-import(prometheus_model_helpers,
|
-import(prometheus_model_helpers,
|
||||||
[ create_mf/5
|
[ create_mf/5
|
||||||
|
@ -32,6 +34,16 @@
|
||||||
, counter_metric/1
|
, counter_metric/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ update/1
|
||||||
|
, start/0
|
||||||
|
, stop/0
|
||||||
|
, restart/0
|
||||||
|
% for rpc
|
||||||
|
, do_start/0
|
||||||
|
, do_stop/0
|
||||||
|
, do_restart/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
|
|
||||||
|
@ -58,6 +70,56 @@
|
||||||
|
|
||||||
-record(state, {push_gateway, timer, interval}).
|
-record(state, {push_gateway, timer, interval}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% update new config
|
||||||
|
update(Config) ->
|
||||||
|
case emqx_conf:update([prometheus], Config,
|
||||||
|
#{rawconf_with_defaults => true, override_to => cluster}) of
|
||||||
|
{ok, #{raw_config := NewConfigRows}} ->
|
||||||
|
case maps:get(<<"enable">>, Config, true) of
|
||||||
|
true ->
|
||||||
|
ok = restart();
|
||||||
|
false ->
|
||||||
|
ok = stop()
|
||||||
|
end,
|
||||||
|
{ok, NewConfigRows};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
start() -> cluster_call(do_start, []).
|
||||||
|
stop() -> cluster_call(do_stop, []).
|
||||||
|
restart() -> cluster_call(do_restart, []).
|
||||||
|
|
||||||
|
do_start() ->
|
||||||
|
emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])).
|
||||||
|
|
||||||
|
do_stop() ->
|
||||||
|
case emqx_prometheus_sup:stop_child(?APP) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
{error, not_found} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_restart() ->
|
||||||
|
ok = do_start(),
|
||||||
|
ok = do_stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
cluster_call(F, A) ->
|
||||||
|
[ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
rpc_call(N, F, A) ->
|
||||||
|
case rpc:call(N, ?MODULE, F, A, 5000) of
|
||||||
|
{badrpc, R} ->
|
||||||
|
?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]),
|
||||||
|
{error, {badrpc, R}};
|
||||||
|
Result ->
|
||||||
|
Result
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -67,17 +67,8 @@ prometheus(get, _Params) ->
|
||||||
{200, emqx:get_raw_config([<<"prometheus">>], #{})};
|
{200, emqx:get_raw_config([<<"prometheus">>], #{})};
|
||||||
|
|
||||||
prometheus(put, #{body := Body}) ->
|
prometheus(put, #{body := Body}) ->
|
||||||
case emqx:update_config([prometheus],
|
case emqx_prometheus:update(Body) of
|
||||||
Body,
|
{ok, NewConfig} ->
|
||||||
#{rawconf_with_defaults => true, override_to => cluster}) of
|
|
||||||
{ok, #{raw_config := NewConfig, config := Config}} ->
|
|
||||||
case maps:get(<<"enable">>, Body, true) of
|
|
||||||
true ->
|
|
||||||
ok = emqx_prometheus_sup:stop_child(?APP),
|
|
||||||
ok = emqx_prometheus_sup:start_child(?APP, Config);
|
|
||||||
false ->
|
|
||||||
ok = emqx_prometheus_sup:stop_child(?APP)
|
|
||||||
end,
|
|
||||||
{200, NewConfig};
|
{200, NewConfig};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
||||||
|
|
|
@ -25,6 +25,18 @@
|
||||||
|
|
||||||
-include("emqx_statsd.hrl").
|
-include("emqx_statsd.hrl").
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-export([ update/1
|
||||||
|
, start/0
|
||||||
|
, stop/0
|
||||||
|
, restart/0
|
||||||
|
%% for rpc
|
||||||
|
, do_start/0
|
||||||
|
, do_stop/0
|
||||||
|
, do_restart/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% Interface
|
%% Interface
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
|
|
||||||
|
@ -44,6 +56,52 @@
|
||||||
estatsd_pid :: pid()
|
estatsd_pid :: pid()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
update(Config) ->
|
||||||
|
case emqx_conf:update([statsd],
|
||||||
|
Config,
|
||||||
|
#{rawconf_with_defaults => true, override_to => cluster}) of
|
||||||
|
{ok, #{raw_config := NewConfigRows}} ->
|
||||||
|
ok = stop(),
|
||||||
|
case maps:get(<<"enable">>, Config, true) of
|
||||||
|
true ->
|
||||||
|
ok = start();
|
||||||
|
false ->
|
||||||
|
ignore
|
||||||
|
end,
|
||||||
|
{ok, NewConfigRows};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
start() -> cluster_call(do_start, []).
|
||||||
|
stop() -> cluster_call(do_stop, []).
|
||||||
|
restart() -> cluster_call(do_restart, []).
|
||||||
|
|
||||||
|
do_start() ->
|
||||||
|
emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})).
|
||||||
|
|
||||||
|
do_stop() ->
|
||||||
|
emqx_statsd_sup:ensure_child_stopped(?APP).
|
||||||
|
|
||||||
|
do_restart() ->
|
||||||
|
ok = do_start(),
|
||||||
|
ok = do_stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
cluster_call(F, A) ->
|
||||||
|
[ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
rpc_call(N, F, A) ->
|
||||||
|
case rpc:call(N, ?MODULE, F, A, 5000) of
|
||||||
|
{badrpc, R} ->
|
||||||
|
?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]),
|
||||||
|
{error, {badrpc, R}};
|
||||||
|
Result ->
|
||||||
|
Result
|
||||||
|
end.
|
||||||
|
|
||||||
start_link(Opts) ->
|
start_link(Opts) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
||||||
|
|
||||||
|
|
|
@ -55,15 +55,8 @@ statsd(get, _Params) ->
|
||||||
{200, emqx:get_raw_config([<<"statsd">>], #{})};
|
{200, emqx:get_raw_config([<<"statsd">>], #{})};
|
||||||
|
|
||||||
statsd(put, #{body := Body}) ->
|
statsd(put, #{body := Body}) ->
|
||||||
case emqx:update_config([statsd],
|
case emqx_statsd:update(Body) of
|
||||||
Body,
|
{ok, NewConfig} ->
|
||||||
#{rawconf_with_defaults => true, override_to => cluster}) of
|
|
||||||
{ok, #{raw_config := NewConfig, config := Config}} ->
|
|
||||||
ok = emqx_statsd_sup:ensure_child_stopped(?APP),
|
|
||||||
case maps:get(<<"enable">>, Body) of
|
|
||||||
true -> emqx_statsd_sup:ensure_child_started(?APP, Config);
|
|
||||||
false -> ok
|
|
||||||
end,
|
|
||||||
{200, NewConfig};
|
{200, NewConfig};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
||||||
|
|
Loading…
Reference in New Issue