diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 109564929..92513ed71 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -22,9 +22,11 @@ %% be used by the prometheus application -behaviour(prometheus_collector). +-include("emqx_prometheus.hrl"). + -include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). - +-include_lib("emqx/include/logger.hrl"). -import(prometheus_model_helpers, [ create_mf/5 @@ -32,6 +34,16 @@ , counter_metric/1 ]). +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + % for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% APIs -export([start_link/1]). @@ -58,6 +70,56 @@ -record(state, {push_gateway, timer, interval}). +%%-------------------------------------------------------------------- +%% update new config +update(Config) -> + case emqx_conf:update([prometheus], Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + case maps:get(<<"enable">>, Config, true) of + true -> + ok = restart(); + false -> + ok = stop() + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). + +do_stop() -> + case emqx_prometheus_sup:stop_child(?APP) of + ok -> + ok; + {error, not_found} -> + ok + end. + +do_restart() -> + ok = do_start(), + ok = do_stop(), + ok. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index dda8bbd13..52120971b 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -67,17 +67,8 @@ prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; prometheus(put, #{body := Body}) -> - case emqx:update_config([prometheus], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - case maps:get(<<"enable">>, Body, true) of - true -> - ok = emqx_prometheus_sup:stop_child(?APP), - ok = emqx_prometheus_sup:start_child(?APP, Config); - false -> - ok = emqx_prometheus_sup:stop_child(?APP) - end, + case emqx_prometheus:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 5f88e5bdd..545019884 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -25,6 +25,18 @@ -include("emqx_statsd.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + %% for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% Interface -export([start_link/1]). @@ -44,6 +56,52 @@ estatsd_pid :: pid() }). +update(Config) -> + case emqx_conf:update([statsd], + Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + ok = stop(), + case maps:get(<<"enable">>, Config, true) of + true -> + ok = start(); + false -> + ignore + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})). + +do_stop() -> + emqx_statsd_sup:ensure_child_stopped(?APP). + +do_restart() -> + ok = do_start(), + ok = do_stop(), + ok. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + start_link(Opts) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index c7ac94003..ad28bae95 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -55,15 +55,8 @@ statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; statsd(put, #{body := Body}) -> - case emqx:update_config([statsd], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - ok = emqx_statsd_sup:ensure_child_stopped(?APP), - case maps:get(<<"enable">>, Body) of - true -> emqx_statsd_sup:ensure_child_started(?APP, Config); - false -> ok - end, + case emqx_statsd:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),