diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index 3a2b5d972..45219c3f6 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -1190,37 +1190,44 @@ tr_prometheus_collectors(Conf) ->
emqx_prometheus,
emqx_prometheus_mria
%% builtin vm collectors
- | tr_vm_dist_collector(Conf) ++
- tr_mnesia_collector(Conf) ++
- tr_vm_statistics_collector(Conf) ++
- tr_vm_system_info_collector(Conf) ++
- tr_vm_memory_collector(Conf) ++
- tr_vm_msacc_collector(Conf)
+ | prometheus_collectors(Conf)
].
-tr_vm_dist_collector(Conf) ->
- Enabled = conf_get("prometheus.vm_dist_collector", Conf, disabled),
- collector_enabled(Enabled, prometheus_vm_dist_collector).
+prometheus_collectors(Conf) ->
+ case conf_get("prometheus.enable_basic_auth", Conf, undefined) of
+ %% legacy
+ undefined ->
+ tr_collector("prometheus.vm_dist_collector", prometheus_vm_dist_collector, Conf) ++
+ tr_collector("prometheus.mnesia_collector", prometheus_mnesia_collector, Conf) ++
+ tr_collector(
+ "prometheus.vm_statistics_collector", prometheus_vm_statistics_collector, Conf
+ ) ++
+ tr_collector(
+ "prometheus.vm_system_info_collector", prometheus_vm_system_info_collector, Conf
+ ) ++
+ tr_collector("prometheus.vm_memory_collector", prometheus_vm_memory_collector, Conf) ++
+ tr_collector("prometheus.vm_msacc_collector", prometheus_vm_msacc_collector, Conf);
+ %% new
+ _ ->
+ tr_collector("prometheus.collectors.vm_dist", prometheus_vm_dist_collector, Conf) ++
+ tr_collector("prometheus.collectors.mnesia", prometheus_mnesia_collector, Conf) ++
+ tr_collector(
+ "prometheus.collectors.vm_statistics", prometheus_vm_statistics_collector, Conf
+ ) ++
+ tr_collector(
+ "prometheus.collectors.vm_system_info",
+ prometheus_vm_system_info_collector,
+ Conf
+ ) ++
+ tr_collector(
+ "prometheus.collectors.vm_memory", prometheus_vm_memory_collector, Conf
+ ) ++
+ tr_collector("prometheus.collectors.vm_msacc", prometheus_vm_msacc_collector, Conf)
+ end.
-tr_mnesia_collector(Conf) ->
- Enabled = conf_get("prometheus.mnesia_collector", Conf, disabled),
- collector_enabled(Enabled, prometheus_mnesia_collector).
-
-tr_vm_statistics_collector(Conf) ->
- Enabled = conf_get("prometheus.vm_statistics_collector", Conf, disabled),
- collector_enabled(Enabled, prometheus_vm_statistics_collector).
-
-tr_vm_system_info_collector(Conf) ->
- Enabled = conf_get("prometheus.vm_system_info_collector", Conf, disabled),
- collector_enabled(Enabled, prometheus_vm_system_info_collector).
-
-tr_vm_memory_collector(Conf) ->
- Enabled = conf_get("prometheus.vm_memory_collector", Conf, disabled),
- collector_enabled(Enabled, prometheus_vm_memory_collector).
-
-tr_vm_msacc_collector(Conf) ->
- Enabled = conf_get("prometheus.vm_msacc_collector", Conf, disabled),
- collector_enabled(Enabled, prometheus_vm_msacc_collector).
+tr_collector(Key, Collect, Conf) ->
+ Enabled = conf_get(Key, Conf, disabled),
+ collector_enabled(Enabled, Collect).
collector_enabled(enabled, Collector) -> [Collector];
collector_enabled(disabled, _) -> [].
diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl
index a242931c4..29d531140 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus.erl
+++ b/apps/emqx_prometheus/src/emqx_prometheus.erl
@@ -42,6 +42,7 @@
%% gen_server callbacks
-export([
init/1,
+ handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
@@ -58,12 +59,6 @@
-export([collect/1]).
--export([
- %% For bpapi, deprecated_since 5.0.10, remove this when 5.1.x
- do_start/0,
- do_stop/0
-]).
-
-define(C(K, L), proplists:get_value(K, L, 0)).
-define(TIMER_MSG, '#interval').
@@ -74,8 +69,8 @@
%% APIs
%%--------------------------------------------------------------------
-start_link([]) ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(Conf) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
info() ->
gen_server:call(?MODULE, info).
@@ -84,49 +79,42 @@ info() ->
%% gen_server callbacks
%%--------------------------------------------------------------------
-init([]) ->
- #{interval := Interval} = opts(),
- {ok, #{timer => ensure_timer(Interval), ok => 0, failed => 0}}.
+init(Conf) ->
+ {ok, #{}, {continue, Conf}}.
-handle_call(info, _From, State = #{timer := Timer}) ->
- {reply, State#{opts => opts(), next_push_ms => erlang:read_timer(Timer)}, State};
+handle_continue(Conf, State) ->
+ Opts = #{interval := Interval} = opts(Conf),
+ {noreply, State#{
+ timer => ensure_timer(Interval),
+ opts => Opts,
+ ok => 0,
+ failed => 0
+ }}.
+
+handle_call(info, _From, State = #{timer := Timer, opts := Opts}) ->
+ {reply, State#{opts => Opts, next_push_ms => erlang:read_timer(Timer)}, State};
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
- #{
- interval := Interval,
- headers := Headers,
- job_name := JobName,
- push_gateway_server := Server
- } = opts(),
- PushRes = push_to_push_gateway(Server, Headers, JobName),
+handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer, opts := Opts}) ->
+ #{interval := Interval, headers := Headers, url := Server} = Opts,
+ PushRes = push_to_push_gateway(Server, Headers),
NewTimer = ensure_timer(Interval),
NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}),
%% Data is too big, hibernate for saving memory and stop system monitor warning.
{noreply, NewState, hibernate};
+handle_info({update, Conf}, State = #{timer := Timer}) ->
+ emqx_utils:cancel_timer(Timer),
+ handle_continue(Conf, State);
handle_info(_Msg, State) ->
{noreply, State}.
-push_to_push_gateway(Uri, Headers, JobName) when is_list(Headers) ->
- [Name, Ip] = string:tokens(atom_to_list(node()), "@"),
- % NOTE: allowing errors here to keep rough backward compatibility
- {JobName1, Errors} = emqx_template:render(
- emqx_template:parse(JobName),
- #{<<"name">> => Name, <<"host">> => Ip}
- ),
- _ =
- Errors == [] orelse
- ?SLOG(warning, #{
- msg => "prometheus_job_name_template_invalid",
- errors => Errors,
- template => JobName
- }),
+
+push_to_push_gateway(Url, Headers) when is_list(Headers) ->
Data = prometheus_text_format:format(),
- Url = lists:concat([Uri, "/metrics/job/", unicode:characters_to_list(JobName1)]),
case httpc:request(post, {Url, Headers, "text/plain", Data}, ?HTTP_OPTIONS, []) of
{ok, {{"HTTP/1.1", 200, _}, _RespHeaders, _RespBody}} ->
ok;
@@ -152,8 +140,26 @@ ensure_timer(Interval) ->
%%--------------------------------------------------------------------
%% prometheus callbacks
%%--------------------------------------------------------------------
-opts() ->
- emqx_conf:get(?PROMETHEUS).
+opts(#{interval := Interval, headers := Headers, job_name := JobName, push_gateway_server := Url}) ->
+ #{interval => Interval, headers => Headers, url => join_url(Url, JobName)};
+opts(#{push_gateway := #{url := Url, job_name := JobName} = PushGateway}) ->
+ maps:put(url, join_url(Url, JobName), PushGateway).
+
+join_url(Url, JobName0) ->
+ [Name, Ip] = string:tokens(atom_to_list(node()), "@"),
+ % NOTE: allowing errors here to keep rough backward compatibility
+ {JobName1, Errors} = emqx_template:render(
+ emqx_template:parse(JobName0),
+ #{<<"name">> => Name, <<"host">> => Ip}
+ ),
+ _ =
+ Errors == [] orelse
+ ?SLOG(warning, #{
+ msg => "prometheus_job_name_template_invalid",
+ errors => Errors,
+ template => JobName0
+ }),
+ lists:concat([Url, "/metrics/job/", unicode:characters_to_list(JobName1)]).
deregister_cleanup(_Registry) ->
ok.
@@ -672,11 +678,3 @@ emqx_cluster_data() ->
{nodes_running, length(Running)},
{nodes_stopped, length(Stopped)}
].
-
-%% deprecated_since 5.0.10, remove this when 5.1.x
-do_start() ->
- emqx_prometheus_sup:start_child(?APP).
-
-%% deprecated_since 5.0.10, remove this when 5.1.x
-do_stop() ->
- emqx_prometheus_sup:stop_child(?APP).
diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl
index 987386b61..3072ab5a7 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl
+++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl
@@ -20,8 +20,6 @@
-include_lib("hocon/include/hoconsc.hrl").
--import(hoconsc, [ref/2]).
-
-export([
api_spec/0,
paths/0,
@@ -29,11 +27,10 @@
]).
-export([
- prometheus/2,
+ setting/2,
stats/2
]).
--define(SCHEMA_MODULE, emqx_prometheus_schema).
-define(TAGS, [<<"Monitor">>]).
api_spec() ->
@@ -47,21 +44,21 @@ paths() ->
schema("/prometheus") ->
#{
- 'operationId' => prometheus,
+ 'operationId' => setting,
get =>
#{
description => ?DESC(get_prom_conf_info),
tags => ?TAGS,
responses =>
- #{200 => prometheus_config_schema()}
+ #{200 => prometheus_setting_schema()}
},
put =>
#{
description => ?DESC(update_prom_conf_info),
tags => ?TAGS,
- 'requestBody' => prometheus_config_schema(),
+ 'requestBody' => prometheus_setting_schema(),
responses =>
- #{200 => prometheus_config_schema()}
+ #{200 => prometheus_setting_schema()}
}
};
schema("/prometheus/stats") ->
@@ -81,9 +78,9 @@ schema("/prometheus/stats") ->
%% API Handler funcs
%%--------------------------------------------------------------------
-prometheus(get, _Params) ->
+setting(get, _Params) ->
{200, emqx:get_raw_config([<<"prometheus">>], #{})};
-prometheus(put, #{body := Body}) ->
+setting(put, #{body := Body}) ->
case emqx_prometheus_config:update(Body) of
{ok, NewConfig} ->
{200, NewConfig};
@@ -110,20 +107,57 @@ stats(get, #{headers := Headers}) ->
%% Internal funcs
%%--------------------------------------------------------------------
-prometheus_config_schema() ->
- emqx_dashboard_swagger:schema_with_example(
- ref(?SCHEMA_MODULE, "prometheus"),
- prometheus_config_example()
+prometheus_setting_schema() ->
+ [{prometheus, #{type := Setting}}] = emqx_prometheus_schema:roots(),
+ emqx_dashboard_swagger:schema_with_examples(
+ Setting,
+ [
+ recommend_setting_example(),
+ legacy_setting_example()
+ ]
).
-prometheus_config_example() ->
- #{
- enable => true,
- interval => "15s",
- push_gateway_server => <<"http://127.0.0.1:9091">>,
- headers => #{'header-name' => 'header-value'},
- job_name => <<"${name}/instance/${name}~${host}">>
- }.
+legacy_setting_example() ->
+ Summary = <<"legacy_deprecated_setting">>,
+ {Summary, #{
+ summary => Summary,
+ value => #{
+ enable => true,
+ interval => <<"15s">>,
+ push_gateway_server => <<"http://127.0.0.1:9091">>,
+ headers => #{<<"Authorization">> => <<"Basic YWRtaW46Y2JraG55eWd5QDE=">>},
+ job_name => <<"${name}/instance/${name}~${host}">>,
+ vm_dist_collector => <<"disabled">>,
+ vm_memory_collector => <<"disabled">>,
+ vm_msacc_collector => <<"disabled">>,
+ mnesia_collector => <<"disabled">>,
+ vm_statistics_collector => <<"disabled">>,
+ vm_system_info_collector => <<"disabled">>
+ }
+ }}.
+
+recommend_setting_example() ->
+ Summary = <<"recommend_setting">>,
+ {Summary, #{
+ summary => Summary,
+ value => #{
+ enable_basic_auth => false,
+ push_gateway => #{
+ interval => <<"15s">>,
+ url => <<"http://127.0.0.1:9091">>,
+ headers => #{<<"Authorization">> => <<"Basic YWRtaW46Y2JraG55eWd5QDE=">>},
+ job_name => <<"${name}/instance/${name}~${host}">>
+ },
+ collectors => #{
+ vm_dist => <<"disabled">>,
+ vm_memory => <<"disabled">>,
+ vm_msacc => <<"disabled">>,
+ mnesia => <<"disabled">>,
+ vm_statistics => <<"disabled">>,
+ vm_system_info => <<"disabled">>
+ }
+ }
+ }}.
prometheus_data_schema() ->
#{
diff --git a/apps/emqx_prometheus/src/emqx_prometheus_config.erl b/apps/emqx_prometheus/src/emqx_prometheus_config.erl
index 00dad47f9..7cc5c7f8b 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus_config.erl
+++ b/apps/emqx_prometheus/src/emqx_prometheus_config.erl
@@ -22,6 +22,7 @@
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([update/1]).
+-export([conf/0, is_push_gateway_server_enabled/1]).
update(Config) ->
case
@@ -64,7 +65,20 @@ update_prometheus(AppEnvs) ->
),
application:set_env(AppEnvs).
-update_push_gateway(#{enable := true}) ->
- emqx_prometheus_sup:start_child(?APP);
-update_push_gateway(#{enable := false}) ->
- emqx_prometheus_sup:stop_child(?APP).
+update_push_gateway(Prometheus) ->
+ case is_push_gateway_server_enabled(Prometheus) of
+ true ->
+ case erlang:whereis(?APP) of
+ undefined -> emqx_prometheus_sup:start_child(?APP, Prometheus);
+ Pid -> emqx_prometheus_sup:update_child(Pid, Prometheus)
+ end;
+ false ->
+ emqx_prometheus_sup:stop_child(?APP)
+ end.
+
+conf() ->
+ emqx_config:get(?PROMETHEUS).
+
+is_push_gateway_server_enabled(#{enable := true, push_gateway_server := Url}) -> Url =/= "";
+is_push_gateway_server_enabled(#{push_gateway := #{url := Url}}) -> Url =/= "";
+is_push_gateway_server_enabled(_) -> false.
diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl
index 3aaf4292f..1e67b06ca 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl
+++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl
@@ -27,23 +27,58 @@
desc/1,
translation/1,
convert_headers/2,
- validate_push_gateway_server/1
+ validate_url/1
]).
-namespace() -> "prometheus".
+namespace() -> prometheus.
-roots() -> [{"prometheus", ?HOCON(?R_REF("prometheus"), #{translate_to => ["prometheus"]})}].
-
-fields("prometheus") ->
+roots() ->
[
- {push_gateway_server,
+ {prometheus,
+ ?HOCON(
+ ?UNION(setting_union_schema()),
+ #{translate_to => ["prometheus"], default => #{}}
+ )}
+ ].
+
+fields(recommend_setting) ->
+ [
+ {enable_basic_auth,
+ ?HOCON(
+ boolean(),
+ #{
+ default => false,
+ required => true,
+ importance => ?IMPORTANCE_HIGH,
+ desc => ?DESC(enable_basic_auth)
+ }
+ )},
+ {push_gateway,
+ ?HOCON(
+ ?R_REF(push_gateway),
+ #{
+ required => false,
+ importance => ?IMPORTANCE_MEDIUM,
+ desc => ?DESC(push_gateway)
+ }
+ )},
+ {collectors,
+ ?HOCON(?R_REF(collector), #{
+ required => false,
+ importance => ?IMPORTANCE_LOW,
+ desc => ?DESC(collectors)
+ })}
+ ];
+fields(push_gateway) ->
+ [
+ {url,
?HOCON(
string(),
#{
- default => <<"http://127.0.0.1:9091">>,
- required => true,
- validator => fun ?MODULE:validate_push_gateway_server/1,
- desc => ?DESC(push_gateway_server)
+ required => false,
+ default => <<"">>,
+ validator => fun ?MODULE:validate_url/1,
+ desc => ?DESC(push_gateway_url)
}
)},
{interval,
@@ -51,7 +86,7 @@ fields("prometheus") ->
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
- required => true,
+ required => false,
desc => ?DESC(interval)
}
)},
@@ -70,9 +105,113 @@ fields("prometheus") ->
binary(),
#{
default => <<"${name}/instance/${name}~${host}">>,
- required => true,
+ required => false,
desc => ?DESC(job_name)
}
+ )}
+ ];
+fields(collector) ->
+ [
+ {vm_dist,
+ ?HOCON(
+ hoconsc:enum([disabled, enabled]),
+ #{
+ default => disabled,
+ required => true,
+ desc => ?DESC(vm_dist_collector)
+ }
+ )},
+ %% Mnesia metrics mainly using mnesia:system_info/1
+ {mnesia,
+ ?HOCON(
+ hoconsc:enum([enabled, disabled]),
+ #{
+ default => disabled,
+ required => true,
+ desc => ?DESC(mnesia_collector)
+ }
+ )},
+ %% Collects Erlang VM metrics using erlang:statistics/1.
+ {vm_statistics,
+ ?HOCON(
+ hoconsc:enum([enabled, disabled]),
+ #{
+ default => disabled,
+ required => true,
+ desc => ?DESC(vm_statistics_collector)
+ }
+ )},
+ %% Collects Erlang VM metrics using erlang:system_info/1.
+ {vm_system_info,
+ ?HOCON(
+ hoconsc:enum([enabled, disabled]),
+ #{
+ default => disabled,
+ required => true,
+ desc => ?DESC(vm_system_info_collector)
+ }
+ )},
+ %% Collects information about memory dynamically allocated by the Erlang VM using erlang:memory/0,
+ %% it also provides basic (D)ETS statistics.
+ {vm_memory,
+ ?HOCON(
+ hoconsc:enum([enabled, disabled]),
+ #{
+ default => disabled,
+ required => true,
+ desc => ?DESC(vm_memory_collector)
+ }
+ )},
+ %% Collects microstate accounting metrics using erlang:statistics(microstate_accounting).
+ {vm_msacc,
+ ?HOCON(
+ hoconsc:enum([enabled, disabled]),
+ #{
+ default => disabled,
+ required => true,
+ desc => ?DESC(vm_msacc_collector)
+ }
+ )}
+ ];
+fields(legacy_deprecated_setting) ->
+ [
+ {push_gateway_server,
+ ?HOCON(
+ string(),
+ #{
+ default => <<"http://127.0.0.1:9091">>,
+ required => true,
+ validator => fun ?MODULE:validate_url/1,
+ desc => ?DESC(legacy_push_gateway_server)
+ }
+ )},
+ {interval,
+ ?HOCON(
+ emqx_schema:timeout_duration_ms(),
+ #{
+ default => <<"15s">>,
+ required => true,
+ desc => ?DESC(legacy_interval)
+ }
+ )},
+ {headers,
+ ?HOCON(
+ list({string(), string()}),
+ #{
+ default => #{},
+ required => false,
+ converter => fun ?MODULE:convert_headers/2,
+ desc => ?DESC(legacy_headers)
+ }
+ )},
+ {job_name,
+ ?HOCON(
+ binary(),
+ #{
+ default => <<"${name}/instance/${name}~${host}">>,
+ required => true,
+ desc => ?DESC(legacy_job_name)
+ }
)},
{enable,
@@ -81,7 +220,7 @@ fields("prometheus") ->
#{
default => false,
required => true,
- desc => ?DESC(enable)
+ desc => ?DESC(legacy_enable)
}
)},
{vm_dist_collector,
@@ -91,7 +230,7 @@ fields("prometheus") ->
default => disabled,
required => true,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC(vm_dist_collector)
+ desc => ?DESC(legacy_vm_dist_collector)
}
)},
%% Mnesia metrics mainly using mnesia:system_info/1
@@ -102,7 +241,7 @@ fields("prometheus") ->
default => disabled,
required => true,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC(mnesia_collector)
+ desc => ?DESC(legacy_mnesia_collector)
}
)},
%% Collects Erlang VM metrics using erlang:statistics/1.
@@ -113,7 +252,7 @@ fields("prometheus") ->
default => disabled,
required => true,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC(vm_statistics_collector)
+ desc => ?DESC(legacy_vm_statistics_collector)
}
)},
%% Collects Erlang VM metrics using erlang:system_info/1.
@@ -124,7 +263,7 @@ fields("prometheus") ->
default => disabled,
required => true,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC(vm_system_info_collector)
+ desc => ?DESC(legacy_vm_system_info_collector)
}
)},
%% Collects information about memory dynamically allocated by the Erlang VM using erlang:memory/0,
@@ -136,7 +275,7 @@ fields("prometheus") ->
default => disabled,
required => true,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC(vm_memory_collector)
+ desc => ?DESC(legacy_vm_memory_collector)
}
)},
%% Collects microstate accounting metrics using erlang:statistics(microstate_accounting).
@@ -147,14 +286,26 @@ fields("prometheus") ->
default => disabled,
required => true,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC(vm_msacc_collector)
+ desc => ?DESC(legacy_vm_msacc_collector)
}
)}
].
-desc("prometheus") -> ?DESC(prometheus);
+setting_union_schema() ->
+ RecommendSetting = ?R_REF(recommend_setting),
+ LegacySetting = ?R_REF(legacy_deprecated_setting),
+ fun
+ (all_union_members) -> [RecommendSetting, LegacySetting];
+ ({value, #{<<"enable">> := _}}) -> [LegacySetting];
+ %% all other cases treat as new config, include init empty config.
+ ({value, _}) -> [RecommendSetting]
+ end.
+
+desc(prometheus) -> ?DESC(prometheus);
desc(_) -> undefined.
+convert_headers(undefined, _) ->
+ undefined;
convert_headers(Headers, #{make_serializable := true}) ->
Headers;
convert_headers(<<>>, _Opts) ->
@@ -170,10 +321,22 @@ convert_headers(Headers, _Opts) when is_map(Headers) ->
convert_headers(Headers, _Opts) when is_list(Headers) ->
Headers.
-validate_push_gateway_server(Url) ->
+validate_url(Url) ->
case uri_string:parse(Url) of
- #{scheme := S} when S =:= "https" orelse S =:= "http" -> ok;
- _ -> {error, "Invalid url"}
+ #{scheme := S} when
+ S =:= "https";
+ S =:= "http";
+ S =:= <<"https">>;
+ S =:= <<"http">>
+ ->
+ ok;
+ %% default is ""
+ #{path := []} ->
+ ok;
+ #{path := <<>>} ->
+ ok;
+ _ ->
+ {error, "Invalid url"}
end.
%% for CI test, CI don't load the whole emqx_conf_schema.
diff --git a/apps/emqx_prometheus/src/emqx_prometheus_sup.erl b/apps/emqx_prometheus/src/emqx_prometheus_sup.erl
index c284328ff..85281654f 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus_sup.erl
+++ b/apps/emqx_prometheus/src/emqx_prometheus_sup.erl
@@ -20,7 +20,8 @@
-export([
start_link/0,
- start_child/1,
+ start_child/2,
+ update_child/2,
stop_child/1
]).
@@ -39,11 +40,13 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
--spec start_child(supervisor:child_spec() | atom()) -> ok.
-start_child(ChildSpec) when is_map(ChildSpec) ->
- assert_started(supervisor:start_child(?MODULE, ChildSpec));
-start_child(Mod) when is_atom(Mod) ->
- assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))).
+-spec start_child(atom(), map()) -> ok.
+start_child(Mod, Conf) when is_atom(Mod) ->
+ assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Conf))).
+
+update_child(Pid, Conf) ->
+ erlang:send(Pid, {update, Conf}),
+ ok.
-spec stop_child(any()) -> ok | {error, term()}.
stop_child(ChildId) ->
@@ -54,10 +57,11 @@ stop_child(ChildId) ->
end.
init([]) ->
+ Conf = emqx_prometheus_config:conf(),
Children =
- case emqx_conf:get([prometheus, enable], false) of
+ case emqx_prometheus_config:is_push_gateway_server_enabled(Conf) of
false -> [];
- true -> [?CHILD(emqx_prometheus, [])]
+ true -> [?CHILD(emqx_prometheus, Conf)]
end,
{ok, {{one_for_one, 10, 3600}, Children}}.
diff --git a/apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v1.erl b/apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v1.erl
deleted file mode 100644
index b3ba1b6ce..000000000
--- a/apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v1.erl
+++ /dev/null
@@ -1,41 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_prometheus_proto_v1).
-
--behaviour(emqx_bpapi).
-
--export([
- introduced_in/0,
- deprecated_since/0,
- start/1,
- stop/1
-]).
-
--include_lib("emqx/include/bpapi.hrl").
-
-deprecated_since() -> "5.0.10".
-
-introduced_in() ->
- "5.0.0".
-
--spec start([node()]) -> emqx_rpc:multicall_result().
-start(Nodes) ->
- rpc:multicall(Nodes, emqx_prometheus, do_start, [], 5000).
-
--spec stop([node()]) -> emqx_rpc:multicall_result().
-stop(Nodes) ->
- rpc:multicall(Nodes, emqx_prometheus, do_stop, [], 5000).
diff --git a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl
index 3f9e743f3..4ac935604 100644
--- a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl
+++ b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl
@@ -22,7 +22,7 @@
-compile(export_all).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
--define(CONF_DEFAULT, <<
+-define(LEGACY_CONF_DEFAULT, <<
"\n"
"prometheus {\n"
" push_gateway_server = \"http://127.0.0.1:9091\"\n"
@@ -38,45 +38,120 @@
" vm_msacc_collector = disabled\n"
"}\n"
>>).
+-define(CONF_DEFAULT, #{
+ <<"prometheus">> =>
+ #{
+ <<"enable_basic_auth">> => false,
+ <<"collectors">> =>
+ #{
+ <<"mnesia">> => <<"disabled">>,
+ <<"vm_dist">> => <<"disabled">>,
+ <<"vm_memory">> => <<"disabled">>,
+ <<"vm_msacc">> => <<"disabled">>,
+ <<"vm_statistics">> => <<"disabled">>,
+ <<"vm_system_info">> => <<"disabled">>
+ },
+ <<"push_gateway">> =>
+ #{
+ <<"headers">> => #{<<"Authorization">> => <<"some-authz-tokens">>},
+ <<"interval">> => <<"1s">>,
+ <<"job_name">> => <<"${name}~${host}">>,
+ <<"url">> => <<"http://127.0.0.1:9091">>
+ }
+ }
+}).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
+all() ->
+ [
+ {group, new_config},
+ {group, legacy_config}
+ ].
-all() -> emqx_common_test_helpers:all(?MODULE).
+groups() ->
+ [
+ {new_config, [sequence], common_tests()},
+ {legacy_config, [sequence], common_tests()}
+ ].
-init_per_suite(Cfg) ->
+suite() ->
+ [{timetrap, {seconds, 30}}].
+
+common_tests() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+init_per_group(new_config, Config) ->
+ init_group(),
+ load_config(),
+ emqx_common_test_helpers:start_apps([emqx_prometheus]),
+ %% coverage olp metrics
+ {ok, _} = emqx:update_config([overload_protection, enable], true),
+ Config;
+init_per_group(legacy_config, Config) ->
+ init_group(),
+ load_legacy_config(),
+ emqx_common_test_helpers:start_apps([emqx_prometheus]),
+ {ok, _} = emqx:update_config([overload_protection, enable], false),
+ Config.
+
+init_group() ->
application:load(emqx_conf),
ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
- meck:expect(emqx_alarm, deactivate, 3, ok),
+ meck:expect(emqx_alarm, deactivate, 3, ok).
- load_config(),
- emqx_common_test_helpers:start_apps([emqx_prometheus]),
- Cfg.
-
-end_per_suite(_Cfg) ->
+end_group() ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
-
emqx_common_test_helpers:stop_apps([emqx_prometheus]).
+end_per_group(_Group, Config) ->
+ end_group(),
+ Config.
+
+init_per_testcase(t_assert_push, Config) ->
+ meck:new(httpc, [passthrough]),
+ Config;
+init_per_testcase(t_push_gateway, Config) ->
+ start_mock_pushgateway(9091),
+ Config;
+init_per_testcase(_Testcase, Config) ->
+ Config.
+
+end_per_testcase(t_push_gateway, Config) ->
+ stop_mock_pushgateway(),
+ Config;
+end_per_testcase(t_assert_push, _Config) ->
+ meck:unload(httpc),
+ ok;
+end_per_testcase(_Testcase, _Config) ->
+ ok.
+
load_config() ->
ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?CONF_DEFAULT).
+load_legacy_config() ->
+ ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?LEGACY_CONF_DEFAULT).
+
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_start_stop(_) ->
App = emqx_prometheus,
- ?assertMatch(ok, emqx_prometheus_sup:start_child(App)),
+ Conf = emqx_prometheus_config:conf(),
+ ?assertMatch(ok, emqx_prometheus_sup:start_child(App, Conf)),
%% start twice return ok.
- ?assertMatch(ok, emqx_prometheus_sup:start_child(App)),
+ ?assertMatch(ok, emqx_prometheus_sup:start_child(App, Conf)),
+ ok = gen_server:call(emqx_prometheus, dump, 1000),
+ ok = gen_server:cast(emqx_prometheus, dump),
+ dump = erlang:send(emqx_prometheus, dump),
?assertMatch(ok, emqx_prometheus_sup:stop_child(App)),
%% stop twice return ok.
?assertMatch(ok, emqx_prometheus_sup:stop_child(App)),
@@ -88,7 +163,6 @@ t_collector_no_crash_test(_) ->
ok.
t_assert_push(_) ->
- meck:new(httpc, [passthrough]),
Self = self(),
AssertPush = fun(Method, Req = {Url, Headers, ContentType, _Data}, HttpOpts, Opts) ->
?assertEqual(post, Method),
@@ -99,13 +173,51 @@ t_assert_push(_) ->
meck:passthrough([Method, Req, HttpOpts, Opts])
end,
meck:expect(httpc, request, AssertPush),
- ?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus)),
+ Conf = emqx_prometheus_config:conf(),
+ ?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus, Conf)),
receive
pass -> ok
after 2000 ->
ct:fail(assert_push_request_failed)
end.
-t_only_for_coverage(_) ->
- ?assertEqual("5.0.0", emqx_prometheus_proto_v1:introduced_in()),
+t_push_gateway(_) ->
+ Conf = emqx_prometheus_config:conf(),
+ ?assertMatch(ok, emqx_prometheus_sup:stop_child(emqx_prometheus)),
+ ?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus, Conf)),
+ ?assertMatch(#{ok := 0, failed := 0}, emqx_prometheus:info()),
+ timer:sleep(1100),
+ ?assertMatch(#{ok := 1, failed := 0}, emqx_prometheus:info()),
+ ok = emqx_prometheus_sup:update_child(emqx_prometheus, Conf),
+ ?assertMatch(#{ok := 0, failed := 0}, emqx_prometheus:info()),
+
ok.
+
+start_mock_pushgateway(Port) ->
+ application:ensure_all_started(cowboy),
+ Dispatch = cowboy_router:compile([{'_', [{'_', ?MODULE, []}]}]),
+ {ok, _} = cowboy:start_clear(
+ mock_pushgateway_listener,
+ [{port, Port}],
+ #{env => #{dispatch => Dispatch}}
+ ).
+
+stop_mock_pushgateway() ->
+ cowboy:stop_listener(mock_pushgateway_listener).
+
+init(Req0, Opts) ->
+ Method = cowboy_req:method(Req0),
+ Headers = cowboy_req:headers(Req0),
+ ?assertEqual(<<"POST">>, Method),
+ ?assertMatch(
+ #{
+ <<"authorization">> := <<"some-authz-tokens">>,
+ <<"content-length">> := _,
+ <<"content-type">> := <<"text/plain">>,
+ <<"host">> := <<"127.0.0.1:9091">>
+ },
+ Headers
+ ),
+ RespHeader = #{<<"content-type">> => <<"text/plain; charset=utf-8">>},
+ Req = -cowboy_req:reply(200, RespHeader, <<"OK">>, Req0),
+ {ok, Req, Opts}.
diff --git a/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl
index eb909baf5..26957ed23 100644
--- a/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl
+++ b/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl
@@ -28,40 +28,59 @@
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
-
all() ->
- emqx_common_test_helpers:all(?MODULE).
+ [
+ {group, new_config},
+ {group, legacy_config}
+ ].
+
+groups() ->
+ [
+ {new_config, [sequence], [t_stats_api, t_prometheus_api]},
+ {legacy_config, [sequence], [t_stats_api, t_legacy_prometheus_api]}
+ ].
init_per_suite(Config) ->
- application:load(emqx_conf),
- ok = ekka:start(),
- ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
-
- meck:new(mria_rlog, [non_strict, passthrough, no_link]),
-
- emqx_prometheus_SUITE:load_config(),
- emqx_mgmt_api_test_util:init_suite([emqx_prometheus]),
-
+ emqx_prometheus_SUITE:init_group(),
+ emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config.
-
end_per_suite(Config) ->
- ekka:stop(),
- mria:stop(),
- mria_mnesia:delete_schema(),
-
- meck:unload(mria_rlog),
-
- emqx_mgmt_api_test_util:end_suite([emqx_prometheus]),
+ emqx_prometheus_SUITE:end_group(),
+ emqx_mgmt_api_test_util:end_suite([emqx_conf]),
Config.
-init_per_testcase(_, Config) ->
- {ok, _} = emqx_cluster_rpc:start_link(),
+init_per_group(new_config, Config) ->
+ emqx_common_test_helpers:start_apps(
+ [emqx_prometheus],
+ fun(App) -> set_special_configs(App, new_config) end
+ ),
+ Config;
+init_per_group(legacy_config, Config) ->
+ emqx_common_test_helpers:start_apps(
+ [emqx_prometheus],
+ fun(App) -> set_special_configs(App, legacy_config) end
+ ),
Config.
+end_per_group(_Group, Config) ->
+ _ = application:stop(emqx_prometheus),
+ Config.
+
+set_special_configs(emqx_dashboard, _) ->
+ emqx_dashboard_api_test_helpers:set_default_config();
+set_special_configs(emqx_prometheus, new_config) ->
+ emqx_prometheus_SUITE:load_config(),
+ ok;
+set_special_configs(emqx_prometheus, legacy_config) ->
+ emqx_prometheus_SUITE:load_legacy_config(),
+ ok;
+set_special_configs(_App, _) ->
+ ok.
+
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
-t_prometheus_api(_) ->
+t_legacy_prometheus_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["prometheus"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, "", Auth),
@@ -145,6 +164,105 @@ t_prometheus_api(_) ->
),
ok.
+t_prometheus_api(_) ->
+ Path = emqx_mgmt_api_test_util:api_path(["prometheus"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, "", Auth),
+
+ Conf = emqx_utils_json:decode(Response, [return_maps]),
+ ?assertMatch(
+ #{
+ <<"push_gateway">> := #{},
+ <<"collectors">> := _,
+ <<"enable_basic_auth">> := _
+ },
+ Conf
+ ),
+ #{
+ <<"push_gateway">> :=
+ #{<<"url">> := Url} = PushGateway,
+ <<"collectors">> := Collector
+ } = Conf,
+ Pid = erlang:whereis(emqx_prometheus),
+ ?assertEqual(Url =/= "", undefined =/= Pid, {Url, Pid}),
+
+ NewConf = Conf#{
+ <<"push_gateway">> => PushGateway#{
+ <<"interval">> => <<"2s">>,
+ <<"headers">> => #{
+ <<"test-str1">> => <<"test-value">>,
+ <<"test-str2">> => <<"42">>
+ }
+ },
+ <<"collectors">> => Collector#{
+ <<"vm_dist">> => <<"enabled">>,
+ <<"vm_system_info">> => <<"enabled">>,
+ <<"vm_memory">> => <<"enabled">>,
+ <<"vm_msacc">> => <<"enabled">>,
+ <<"mnesia">> => <<"enabled">>,
+ <<"vm_statistics">> => <<"enabled">>
+ }
+ },
+ {ok, Response2} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf),
+
+ Conf2 = emqx_utils_json:decode(Response2, [return_maps]),
+ ?assertMatch(NewConf, Conf2),
+
+ EnvCollectors = application:get_env(prometheus, collectors, []),
+ PromCollectors = prometheus_registry:collectors(default),
+ ?assertEqual(lists:sort(EnvCollectors), lists:sort(PromCollectors)),
+ ?assert(lists:member(prometheus_vm_statistics_collector, EnvCollectors), EnvCollectors),
+
+ lists:foreach(
+ fun({C, Enabled}) ->
+ ?assertEqual(Enabled, lists:member(C, EnvCollectors), EnvCollectors)
+ end,
+ [
+ {prometheus_vm_dist_collector, true},
+ {prometheus_vm_system_info_collector, true},
+ {prometheus_vm_memory_collector, true},
+ {prometheus_mnesia_collector, true},
+ {prometheus_vm_msacc_collector, true},
+ {prometheus_vm_statistics_collector, true}
+ ]
+ ),
+
+ ?assertMatch(
+ #{
+ <<"push_gateway">> := #{
+ <<"headers">> := #{
+ <<"test-str1">> := <<"test-value">>,
+ <<"test-str2">> := <<"42">>
+ }
+ }
+ },
+ emqx_config:get_raw([prometheus])
+ ),
+ ?assertMatch(
+ #{
+ push_gateway := #{
+ headers := [
+ {"test-str2", "42"},
+ {"test-str1", "test-value"}
+ ]
+ }
+ },
+ emqx_config:get([prometheus])
+ ),
+
+ NewConf1 = Conf#{<<"push_gateway">> => PushGateway#{<<"url">> => <<"">>}},
+ {ok, _Response3} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf1),
+ ?assertEqual(undefined, erlang:whereis(emqx_prometheus)),
+
+ ConfWithoutScheme = Conf#{
+ <<"push_gateway">> => PushGateway#{<<"url">> => <<"127.0.0.1:8081">>}
+ },
+ ?assertMatch(
+ {error, {"HTTP/1.1", 400, _}},
+ emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, ConfWithoutScheme)
+ ),
+ ok.
+
t_stats_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["prometheus", "stats"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
diff --git a/rel/i18n/emqx_prometheus_api.hocon b/rel/i18n/emqx_prometheus_api.hocon
index c468ccc2f..0d9b5dc5f 100644
--- a/rel/i18n/emqx_prometheus_api.hocon
+++ b/rel/i18n/emqx_prometheus_api.hocon
@@ -11,8 +11,8 @@ update_prom_conf_info.label:
"""Update Prometheus config"""
get_prom_data.desc:
-"""Get Prometheus Data"""
+"""Get Prometheus Metrics"""
get_prom_data.label:
-"""Get Prometheus Data"""
+"""Prometheus Metrics"""
}
diff --git a/rel/i18n/emqx_prometheus_schema.hocon b/rel/i18n/emqx_prometheus_schema.hocon
index d665343e9..d97f1884f 100644
--- a/rel/i18n/emqx_prometheus_schema.hocon
+++ b/rel/i18n/emqx_prometheus_schema.hocon
@@ -1,8 +1,5 @@
emqx_prometheus_schema {
-enable.desc:
-"""Turn Prometheus data pushing on or off"""
-
headers.desc:
"""An HTTP Headers when pushing to Push Gateway.
For example, { Authorization = "some-authz-tokens"}
"""
@@ -17,19 +14,25 @@ job_name.desc:
For example, when the EMQX node name is emqx@127.0.0.1
then the name
variable takes value emqx
and the host
variable takes value 127.0.0.1
.
Default value is: ${name}/instance/${name}~${host}
"""
-mnesia_collector.desc:
-"""Enable or disable Mnesia metrics collector"""
-
prometheus.desc:
"""EMQX's Prometheus scraping endpoint is enabled by default without authentication.
-You can inspect it with a `curl` command like this: `curl -f "127.0.0.1:18083/api/v5/prometheus/stats"`
-The 'enable' flag is used to turn on and off for the push-gateway integration."""
+You can inspect it with a `curl` command like this: `curl -f "127.0.0.1:18083/api/v5/prometheus/stats"`
"""
prometheus.label:
"""Prometheus"""
-push_gateway_server.desc:
-"""URL of Prometheus server. Pushgateway is optional, should not be configured if prometheus is to scrape EMQX."""
+push_gateway.desc:
+"""Push Gateway is optional, should not be configured if prometheus is to scrape EMQX."""
+
+collectors.desc:
+"""Metrics collectors to be enabled."""
+
+push_gateway_url.desc:
+"""URL of Prometheus server. Pushgateway is optional, should not be configured if prometheus is to scrape EMQX.
+Set url to "" to disable push gateway"""
+
+mnesia_collector.desc:
+"""Enable or disable Mnesia metrics collector"""
vm_dist_collector.desc:
"""Enable or disable VM distribution collector, collects information about the sockets and processes involved in the Erlang distribution mechanism."""
@@ -46,4 +49,37 @@ vm_statistics_collector.desc:
vm_system_info_collector.desc:
"""Enable or disable VM system info collector."""
+legacy_enable.desc:
+"""Deprecated, use prometheus.push_gateway.url instead"""
+
+legacy_headers.desc:
+"""Deprecated, use prometheus.push_gateway.headers instead"""
+
+legacy_interval.desc:
+"""Deprecated, use prometheus.push_gateway.interval instead"""
+
+legacy_job_name.desc:
+"""Deprecated, use prometheus.push_gateway.job_name instead"""
+
+legacy_push_gateway_server.desc:
+"""Deprecated, use prometheus.push_gateway.url instead"""
+
+legacy_mnesia_collector.desc:
+"""Deprecated, use prometheus.collectors.mnesia instead"""
+
+legacy_vm_dist_collector.desc:
+"""Deprecated, use prometheus.collectors.vm_dist instead"""
+
+legacy_vm_memory_collector.desc:
+"""Deprecated, use prometheus.collectors.vm_memory instead"""
+
+legacy_vm_msacc_collector.desc:
+"""Deprecated, use prometheus.collectors.vm_msacc instead"""
+
+legacy_vm_statistics_collector.desc:
+"""Deprecated, use prometheus.collectors.vm_statistics instead"""
+
+legacy_vm_system_info_collector.desc:
+"""Deprecated, use prometheus.collectors.vm_system_info instead"""
+
}