From 72713cb85fc692bc5b6284c0b7c8eef483b669c4 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 15:05:53 +0800 Subject: [PATCH 1/3] fix(config): restart app after config update in cluster --- apps/emqx_prometheus/src/emqx_prometheus.erl | 61 ++++++++++++++++++- .../src/emqx_prometheus_api.erl | 13 +--- apps/emqx_statsd/src/emqx_statsd.erl | 58 ++++++++++++++++++ apps/emqx_statsd/src/emqx_statsd_api.erl | 11 +--- 4 files changed, 122 insertions(+), 21 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 109564929..bd15d3c87 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -22,9 +22,11 @@ %% be used by the prometheus application -behaviour(prometheus_collector). +-include("emqx_prometheus.hrl"). + -include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). - +-include_lib("emqx/include/logger.hrl"). -import(prometheus_model_helpers, [ create_mf/5 @@ -32,6 +34,16 @@ , counter_metric/1 ]). +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + % for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% APIs -export([start_link/1]). @@ -58,6 +70,53 @@ -record(state, {push_gateway, timer, interval}). +%%-------------------------------------------------------------------- +%% update new config +update(Config) -> + case emqx:update_config([prometheus], Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + case maps:get(<<"enable">>, Config, true) of + true -> + ok = restart(); + false -> + ok = stop() + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). + +do_stop() -> + emqx_prometheus_sup:stop_child(?APP). + +do_restart() -> + case {stop(), start()} of + {ok, ok} -> + ok; + {Error1, Error2} -> + ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) + end. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index dda8bbd13..52120971b 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -67,17 +67,8 @@ prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; prometheus(put, #{body := Body}) -> - case emqx:update_config([prometheus], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - case maps:get(<<"enable">>, Body, true) of - true -> - ok = emqx_prometheus_sup:stop_child(?APP), - ok = emqx_prometheus_sup:start_child(?APP, Config); - false -> - ok = emqx_prometheus_sup:stop_child(?APP) - end, + case emqx_prometheus:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 5f88e5bdd..4b7852997 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -25,6 +25,18 @@ -include("emqx_statsd.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + %% for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% Interface -export([start_link/1]). @@ -44,6 +56,52 @@ estatsd_pid :: pid() }). +update(Config) -> + case emqx:update_config([statsd], + Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + start(), + case maps:get(<<"enable">>, Config) of + true -> stop(); + false -> ok + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})). + +do_stop() -> + emqx_statsd_sup:ensure_child_stopped(?APP). + +do_restart() -> + case {stop(), start()} of + {ok, ok} -> + ok; + {Error1, Error2} -> + ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) + end. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + start_link(Opts) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index c7ac94003..ad28bae95 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -55,15 +55,8 @@ statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; statsd(put, #{body := Body}) -> - case emqx:update_config([statsd], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - ok = emqx_statsd_sup:ensure_child_stopped(?APP), - case maps:get(<<"enable">>, Body) of - true -> emqx_statsd_sup:ensure_child_started(?APP, Config); - false -> ok - end, + case emqx_statsd:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), From 31aed3ea8ecf5e5d41986c0aa0fd9e85cfe1d94b Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 15:59:17 +0800 Subject: [PATCH 2/3] fix(prometheus): stop app with error check & dialyzer --- apps/emqx_prometheus/src/emqx_prometheus.erl | 19 ++++++++++------- apps/emqx_statsd/src/emqx_statsd.erl | 22 ++++++++++---------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index bd15d3c87..8ffc1c0eb 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -95,18 +95,21 @@ do_start() -> emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). do_stop() -> - emqx_prometheus_sup:stop_child(?APP). - -do_restart() -> - case {stop(), start()} of - {ok, ok} -> + case emqx_prometheus_sup:stop_child(?APP) of + ok -> ok; - {Error1, Error2} -> - ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) + {error, not_found} -> + ok end. +do_restart() -> + ok = do_start(), + ok = do_stop(), + ok. + cluster_call(F, A) -> - [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. rpc_call(N, F, A) -> case rpc:call(N, ?MODULE, F, A, 5000) of diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 4b7852997..99ba7c06e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -61,10 +61,12 @@ update(Config) -> Config, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfigRows}} -> - start(), - case maps:get(<<"enable">>, Config) of - true -> stop(); - false -> ok + _ = start(), + case maps:get(<<"enable">>, Config, true) of + true -> + ok = stop(); + false -> + ignore end, {ok, NewConfigRows}; {error, Reason} -> @@ -83,15 +85,13 @@ do_stop() -> emqx_statsd_sup:ensure_child_stopped(?APP). do_restart() -> - case {stop(), start()} of - {ok, ok} -> - ok; - {Error1, Error2} -> - ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) - end. + ok = do_start(), + ok = do_stop(), + ok. cluster_call(F, A) -> - [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. rpc_call(N, F, A) -> case rpc:call(N, ?MODULE, F, A, 5000) of From 6c574c08b8cb63e9a45aa449aa5472b97a08e0ac Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 16:11:55 +0800 Subject: [PATCH 3/3] fix(config): update config by emqx_conf --- apps/emqx_prometheus/src/emqx_prometheus.erl | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 8ffc1c0eb..92513ed71 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -73,7 +73,7 @@ %%-------------------------------------------------------------------- %% update new config update(Config) -> - case emqx:update_config([prometheus], Config, + case emqx_conf:update([prometheus], Config, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfigRows}} -> case maps:get(<<"enable">>, Config, true) of diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 99ba7c06e..545019884 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -57,14 +57,14 @@ }). update(Config) -> - case emqx:update_config([statsd], + case emqx_conf:update([statsd], Config, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfigRows}} -> - _ = start(), + ok = stop(), case maps:get(<<"enable">>, Config, true) of true -> - ok = stop(); + ok = start(); false -> ignore end,