refactor: prometheus config

This commit is contained in:
zhongwencool 2023-11-05 00:42:30 +08:00
parent df6221a219
commit c376a5db29
11 changed files with 667 additions and 222 deletions

View File

@ -1190,37 +1190,44 @@ tr_prometheus_collectors(Conf) ->
emqx_prometheus, emqx_prometheus,
emqx_prometheus_mria emqx_prometheus_mria
%% builtin vm collectors %% builtin vm collectors
| tr_vm_dist_collector(Conf) ++ | prometheus_collectors(Conf)
tr_mnesia_collector(Conf) ++
tr_vm_statistics_collector(Conf) ++
tr_vm_system_info_collector(Conf) ++
tr_vm_memory_collector(Conf) ++
tr_vm_msacc_collector(Conf)
]. ].
tr_vm_dist_collector(Conf) -> prometheus_collectors(Conf) ->
Enabled = conf_get("prometheus.vm_dist_collector", Conf, disabled), case conf_get("prometheus.enable_basic_auth", Conf, undefined) of
collector_enabled(Enabled, prometheus_vm_dist_collector). %% legacy
undefined ->
tr_collector("prometheus.vm_dist_collector", prometheus_vm_dist_collector, Conf) ++
tr_collector("prometheus.mnesia_collector", prometheus_mnesia_collector, Conf) ++
tr_collector(
"prometheus.vm_statistics_collector", prometheus_vm_statistics_collector, Conf
) ++
tr_collector(
"prometheus.vm_system_info_collector", prometheus_vm_system_info_collector, Conf
) ++
tr_collector("prometheus.vm_memory_collector", prometheus_vm_memory_collector, Conf) ++
tr_collector("prometheus.vm_msacc_collector", prometheus_vm_msacc_collector, Conf);
%% new
_ ->
tr_collector("prometheus.collectors.vm_dist", prometheus_vm_dist_collector, Conf) ++
tr_collector("prometheus.collectors.mnesia", prometheus_mnesia_collector, Conf) ++
tr_collector(
"prometheus.collectors.vm_statistics", prometheus_vm_statistics_collector, Conf
) ++
tr_collector(
"prometheus.collectors.vm_system_info",
prometheus_vm_system_info_collector,
Conf
) ++
tr_collector(
"prometheus.collectors.vm_memory", prometheus_vm_memory_collector, Conf
) ++
tr_collector("prometheus.collectors.vm_msacc", prometheus_vm_msacc_collector, Conf)
end.
tr_mnesia_collector(Conf) -> tr_collector(Key, Collect, Conf) ->
Enabled = conf_get("prometheus.mnesia_collector", Conf, disabled), Enabled = conf_get(Key, Conf, disabled),
collector_enabled(Enabled, prometheus_mnesia_collector). collector_enabled(Enabled, Collect).
tr_vm_statistics_collector(Conf) ->
Enabled = conf_get("prometheus.vm_statistics_collector", Conf, disabled),
collector_enabled(Enabled, prometheus_vm_statistics_collector).
tr_vm_system_info_collector(Conf) ->
Enabled = conf_get("prometheus.vm_system_info_collector", Conf, disabled),
collector_enabled(Enabled, prometheus_vm_system_info_collector).
tr_vm_memory_collector(Conf) ->
Enabled = conf_get("prometheus.vm_memory_collector", Conf, disabled),
collector_enabled(Enabled, prometheus_vm_memory_collector).
tr_vm_msacc_collector(Conf) ->
Enabled = conf_get("prometheus.vm_msacc_collector", Conf, disabled),
collector_enabled(Enabled, prometheus_vm_msacc_collector).
collector_enabled(enabled, Collector) -> [Collector]; collector_enabled(enabled, Collector) -> [Collector];
collector_enabled(disabled, _) -> []. collector_enabled(disabled, _) -> [].

View File

@ -42,6 +42,7 @@
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
init/1, init/1,
handle_continue/2,
handle_call/3, handle_call/3,
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
@ -58,12 +59,6 @@
-export([collect/1]). -export([collect/1]).
-export([
%% For bpapi, deprecated_since 5.0.10, remove this when 5.1.x
do_start/0,
do_stop/0
]).
-define(C(K, L), proplists:get_value(K, L, 0)). -define(C(K, L), proplists:get_value(K, L, 0)).
-define(TIMER_MSG, '#interval'). -define(TIMER_MSG, '#interval').
@ -74,8 +69,8 @@
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link([]) -> start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
info() -> info() ->
gen_server:call(?MODULE, info). gen_server:call(?MODULE, info).
@ -84,49 +79,42 @@ info() ->
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init(Conf) ->
#{interval := Interval} = opts(), {ok, #{}, {continue, Conf}}.
{ok, #{timer => ensure_timer(Interval), ok => 0, failed => 0}}.
handle_call(info, _From, State = #{timer := Timer}) -> handle_continue(Conf, State) ->
{reply, State#{opts => opts(), next_push_ms => erlang:read_timer(Timer)}, State}; Opts = #{interval := Interval} = opts(Conf),
{noreply, State#{
timer => ensure_timer(Interval),
opts => Opts,
ok => 0,
failed => 0
}}.
handle_call(info, _From, State = #{timer := Timer, opts := Opts}) ->
{reply, State#{opts => Opts, next_push_ms => erlang:read_timer(Timer)}, State};
handle_call(_Msg, _From, State) -> handle_call(_Msg, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) -> handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer, opts := Opts}) ->
#{ #{interval := Interval, headers := Headers, url := Server} = Opts,
interval := Interval, PushRes = push_to_push_gateway(Server, Headers),
headers := Headers,
job_name := JobName,
push_gateway_server := Server
} = opts(),
PushRes = push_to_push_gateway(Server, Headers, JobName),
NewTimer = ensure_timer(Interval), NewTimer = ensure_timer(Interval),
NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}), NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}),
%% Data is too big, hibernate for saving memory and stop system monitor warning. %% Data is too big, hibernate for saving memory and stop system monitor warning.
{noreply, NewState, hibernate}; {noreply, NewState, hibernate};
handle_info({update, Conf}, State = #{timer := Timer}) ->
emqx_utils:cancel_timer(Timer),
handle_continue(Conf, State);
handle_info(_Msg, State) -> handle_info(_Msg, State) ->
{noreply, State}. {noreply, State}.
push_to_push_gateway(Uri, Headers, JobName) when is_list(Headers) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"), push_to_push_gateway(Url, Headers) when is_list(Headers) ->
% NOTE: allowing errors here to keep rough backward compatibility
{JobName1, Errors} = emqx_template:render(
emqx_template:parse(JobName),
#{<<"name">> => Name, <<"host">> => Ip}
),
_ =
Errors == [] orelse
?SLOG(warning, #{
msg => "prometheus_job_name_template_invalid",
errors => Errors,
template => JobName
}),
Data = prometheus_text_format:format(), Data = prometheus_text_format:format(),
Url = lists:concat([Uri, "/metrics/job/", unicode:characters_to_list(JobName1)]),
case httpc:request(post, {Url, Headers, "text/plain", Data}, ?HTTP_OPTIONS, []) of case httpc:request(post, {Url, Headers, "text/plain", Data}, ?HTTP_OPTIONS, []) of
{ok, {{"HTTP/1.1", 200, _}, _RespHeaders, _RespBody}} -> {ok, {{"HTTP/1.1", 200, _}, _RespHeaders, _RespBody}} ->
ok; ok;
@ -152,8 +140,26 @@ ensure_timer(Interval) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% prometheus callbacks %% prometheus callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
opts() -> opts(#{interval := Interval, headers := Headers, job_name := JobName, push_gateway_server := Url}) ->
emqx_conf:get(?PROMETHEUS). #{interval => Interval, headers => Headers, url => join_url(Url, JobName)};
opts(#{push_gateway := #{url := Url, job_name := JobName} = PushGateway}) ->
maps:put(url, join_url(Url, JobName), PushGateway).
join_url(Url, JobName0) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"),
% NOTE: allowing errors here to keep rough backward compatibility
{JobName1, Errors} = emqx_template:render(
emqx_template:parse(JobName0),
#{<<"name">> => Name, <<"host">> => Ip}
),
_ =
Errors == [] orelse
?SLOG(warning, #{
msg => "prometheus_job_name_template_invalid",
errors => Errors,
template => JobName0
}),
lists:concat([Url, "/metrics/job/", unicode:characters_to_list(JobName1)]).
deregister_cleanup(_Registry) -> deregister_cleanup(_Registry) ->
ok. ok.
@ -672,11 +678,3 @@ emqx_cluster_data() ->
{nodes_running, length(Running)}, {nodes_running, length(Running)},
{nodes_stopped, length(Stopped)} {nodes_stopped, length(Stopped)}
]. ].
%% deprecated_since 5.0.10, remove this when 5.1.x
do_start() ->
emqx_prometheus_sup:start_child(?APP).
%% deprecated_since 5.0.10, remove this when 5.1.x
do_stop() ->
emqx_prometheus_sup:stop_child(?APP).

View File

@ -20,8 +20,6 @@
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [ref/2]).
-export([ -export([
api_spec/0, api_spec/0,
paths/0, paths/0,
@ -29,11 +27,10 @@
]). ]).
-export([ -export([
prometheus/2, setting/2,
stats/2 stats/2
]). ]).
-define(SCHEMA_MODULE, emqx_prometheus_schema).
-define(TAGS, [<<"Monitor">>]). -define(TAGS, [<<"Monitor">>]).
api_spec() -> api_spec() ->
@ -47,21 +44,21 @@ paths() ->
schema("/prometheus") -> schema("/prometheus") ->
#{ #{
'operationId' => prometheus, 'operationId' => setting,
get => get =>
#{ #{
description => ?DESC(get_prom_conf_info), description => ?DESC(get_prom_conf_info),
tags => ?TAGS, tags => ?TAGS,
responses => responses =>
#{200 => prometheus_config_schema()} #{200 => prometheus_setting_schema()}
}, },
put => put =>
#{ #{
description => ?DESC(update_prom_conf_info), description => ?DESC(update_prom_conf_info),
tags => ?TAGS, tags => ?TAGS,
'requestBody' => prometheus_config_schema(), 'requestBody' => prometheus_setting_schema(),
responses => responses =>
#{200 => prometheus_config_schema()} #{200 => prometheus_setting_schema()}
} }
}; };
schema("/prometheus/stats") -> schema("/prometheus/stats") ->
@ -81,9 +78,9 @@ schema("/prometheus/stats") ->
%% API Handler funcs %% API Handler funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
prometheus(get, _Params) -> setting(get, _Params) ->
{200, emqx:get_raw_config([<<"prometheus">>], #{})}; {200, emqx:get_raw_config([<<"prometheus">>], #{})};
prometheus(put, #{body := Body}) -> setting(put, #{body := Body}) ->
case emqx_prometheus_config:update(Body) of case emqx_prometheus_config:update(Body) of
{ok, NewConfig} -> {ok, NewConfig} ->
{200, NewConfig}; {200, NewConfig};
@ -110,20 +107,57 @@ stats(get, #{headers := Headers}) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
prometheus_config_schema() -> prometheus_setting_schema() ->
emqx_dashboard_swagger:schema_with_example( [{prometheus, #{type := Setting}}] = emqx_prometheus_schema:roots(),
ref(?SCHEMA_MODULE, "prometheus"), emqx_dashboard_swagger:schema_with_examples(
prometheus_config_example() Setting,
[
recommend_setting_example(),
legacy_setting_example()
]
). ).
prometheus_config_example() -> legacy_setting_example() ->
#{ Summary = <<"legacy_deprecated_setting">>,
{Summary, #{
summary => Summary,
value => #{
enable => true, enable => true,
interval => "15s", interval => <<"15s">>,
push_gateway_server => <<"http://127.0.0.1:9091">>, push_gateway_server => <<"http://127.0.0.1:9091">>,
headers => #{'header-name' => 'header-value'}, headers => #{<<"Authorization">> => <<"Basic YWRtaW46Y2JraG55eWd5QDE=">>},
job_name => <<"${name}/instance/${name}~${host}">>,
vm_dist_collector => <<"disabled">>,
vm_memory_collector => <<"disabled">>,
vm_msacc_collector => <<"disabled">>,
mnesia_collector => <<"disabled">>,
vm_statistics_collector => <<"disabled">>,
vm_system_info_collector => <<"disabled">>
}
}}.
recommend_setting_example() ->
Summary = <<"recommend_setting">>,
{Summary, #{
summary => Summary,
value => #{
enable_basic_auth => false,
push_gateway => #{
interval => <<"15s">>,
url => <<"http://127.0.0.1:9091">>,
headers => #{<<"Authorization">> => <<"Basic YWRtaW46Y2JraG55eWd5QDE=">>},
job_name => <<"${name}/instance/${name}~${host}">> job_name => <<"${name}/instance/${name}~${host}">>
}. },
collectors => #{
vm_dist => <<"disabled">>,
vm_memory => <<"disabled">>,
vm_msacc => <<"disabled">>,
mnesia => <<"disabled">>,
vm_statistics => <<"disabled">>,
vm_system_info => <<"disabled">>
}
}
}}.
prometheus_data_schema() -> prometheus_data_schema() ->
#{ #{

View File

@ -22,6 +22,7 @@
-export([add_handler/0, remove_handler/0]). -export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]). -export([post_config_update/5]).
-export([update/1]). -export([update/1]).
-export([conf/0, is_push_gateway_server_enabled/1]).
update(Config) -> update(Config) ->
case case
@ -64,7 +65,20 @@ update_prometheus(AppEnvs) ->
), ),
application:set_env(AppEnvs). application:set_env(AppEnvs).
update_push_gateway(#{enable := true}) -> update_push_gateway(Prometheus) ->
emqx_prometheus_sup:start_child(?APP); case is_push_gateway_server_enabled(Prometheus) of
update_push_gateway(#{enable := false}) -> true ->
emqx_prometheus_sup:stop_child(?APP). case erlang:whereis(?APP) of
undefined -> emqx_prometheus_sup:start_child(?APP, Prometheus);
Pid -> emqx_prometheus_sup:update_child(Pid, Prometheus)
end;
false ->
emqx_prometheus_sup:stop_child(?APP)
end.
conf() ->
emqx_config:get(?PROMETHEUS).
is_push_gateway_server_enabled(#{enable := true, push_gateway_server := Url}) -> Url =/= "";
is_push_gateway_server_enabled(#{push_gateway := #{url := Url}}) -> Url =/= "";
is_push_gateway_server_enabled(_) -> false.

View File

@ -27,23 +27,58 @@
desc/1, desc/1,
translation/1, translation/1,
convert_headers/2, convert_headers/2,
validate_push_gateway_server/1 validate_url/1
]). ]).
namespace() -> "prometheus". namespace() -> prometheus.
roots() -> [{"prometheus", ?HOCON(?R_REF("prometheus"), #{translate_to => ["prometheus"]})}]. roots() ->
fields("prometheus") ->
[ [
{push_gateway_server, {prometheus,
?HOCON(
?UNION(setting_union_schema()),
#{translate_to => ["prometheus"], default => #{}}
)}
].
fields(recommend_setting) ->
[
{enable_basic_auth,
?HOCON(
boolean(),
#{
default => false,
required => true,
importance => ?IMPORTANCE_HIGH,
desc => ?DESC(enable_basic_auth)
}
)},
{push_gateway,
?HOCON(
?R_REF(push_gateway),
#{
required => false,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(push_gateway)
}
)},
{collectors,
?HOCON(?R_REF(collector), #{
required => false,
importance => ?IMPORTANCE_LOW,
desc => ?DESC(collectors)
})}
];
fields(push_gateway) ->
[
{url,
?HOCON( ?HOCON(
string(), string(),
#{ #{
default => <<"http://127.0.0.1:9091">>, required => false,
required => true, default => <<"">>,
validator => fun ?MODULE:validate_push_gateway_server/1, validator => fun ?MODULE:validate_url/1,
desc => ?DESC(push_gateway_server) desc => ?DESC(push_gateway_url)
} }
)}, )},
{interval, {interval,
@ -51,7 +86,7 @@ fields("prometheus") ->
emqx_schema:timeout_duration_ms(), emqx_schema:timeout_duration_ms(),
#{ #{
default => <<"15s">>, default => <<"15s">>,
required => true, required => false,
desc => ?DESC(interval) desc => ?DESC(interval)
} }
)}, )},
@ -70,9 +105,113 @@ fields("prometheus") ->
binary(), binary(),
#{ #{
default => <<"${name}/instance/${name}~${host}">>, default => <<"${name}/instance/${name}~${host}">>,
required => true, required => false,
desc => ?DESC(job_name) desc => ?DESC(job_name)
} }
)}
];
fields(collector) ->
[
{vm_dist,
?HOCON(
hoconsc:enum([disabled, enabled]),
#{
default => disabled,
required => true,
desc => ?DESC(vm_dist_collector)
}
)},
%% Mnesia metrics mainly using mnesia:system_info/1
{mnesia,
?HOCON(
hoconsc:enum([enabled, disabled]),
#{
default => disabled,
required => true,
desc => ?DESC(mnesia_collector)
}
)},
%% Collects Erlang VM metrics using erlang:statistics/1.
{vm_statistics,
?HOCON(
hoconsc:enum([enabled, disabled]),
#{
default => disabled,
required => true,
desc => ?DESC(vm_statistics_collector)
}
)},
%% Collects Erlang VM metrics using erlang:system_info/1.
{vm_system_info,
?HOCON(
hoconsc:enum([enabled, disabled]),
#{
default => disabled,
required => true,
desc => ?DESC(vm_system_info_collector)
}
)},
%% Collects information about memory dynamically allocated by the Erlang VM using erlang:memory/0,
%% it also provides basic (D)ETS statistics.
{vm_memory,
?HOCON(
hoconsc:enum([enabled, disabled]),
#{
default => disabled,
required => true,
desc => ?DESC(vm_memory_collector)
}
)},
%% Collects microstate accounting metrics using erlang:statistics(microstate_accounting).
{vm_msacc,
?HOCON(
hoconsc:enum([enabled, disabled]),
#{
default => disabled,
required => true,
desc => ?DESC(vm_msacc_collector)
}
)}
];
fields(legacy_deprecated_setting) ->
[
{push_gateway_server,
?HOCON(
string(),
#{
default => <<"http://127.0.0.1:9091">>,
required => true,
validator => fun ?MODULE:validate_url/1,
desc => ?DESC(legacy_push_gateway_server)
}
)},
{interval,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
required => true,
desc => ?DESC(legacy_interval)
}
)},
{headers,
?HOCON(
list({string(), string()}),
#{
default => #{},
required => false,
converter => fun ?MODULE:convert_headers/2,
desc => ?DESC(legacy_headers)
}
)},
{job_name,
?HOCON(
binary(),
#{
default => <<"${name}/instance/${name}~${host}">>,
required => true,
desc => ?DESC(legacy_job_name)
}
)}, )},
{enable, {enable,
@ -81,7 +220,7 @@ fields("prometheus") ->
#{ #{
default => false, default => false,
required => true, required => true,
desc => ?DESC(enable) desc => ?DESC(legacy_enable)
} }
)}, )},
{vm_dist_collector, {vm_dist_collector,
@ -91,7 +230,7 @@ fields("prometheus") ->
default => disabled, default => disabled,
required => true, required => true,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC(vm_dist_collector) desc => ?DESC(legacy_vm_dist_collector)
} }
)}, )},
%% Mnesia metrics mainly using mnesia:system_info/1 %% Mnesia metrics mainly using mnesia:system_info/1
@ -102,7 +241,7 @@ fields("prometheus") ->
default => disabled, default => disabled,
required => true, required => true,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC(mnesia_collector) desc => ?DESC(legacy_mnesia_collector)
} }
)}, )},
%% Collects Erlang VM metrics using erlang:statistics/1. %% Collects Erlang VM metrics using erlang:statistics/1.
@ -113,7 +252,7 @@ fields("prometheus") ->
default => disabled, default => disabled,
required => true, required => true,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC(vm_statistics_collector) desc => ?DESC(legacy_vm_statistics_collector)
} }
)}, )},
%% Collects Erlang VM metrics using erlang:system_info/1. %% Collects Erlang VM metrics using erlang:system_info/1.
@ -124,7 +263,7 @@ fields("prometheus") ->
default => disabled, default => disabled,
required => true, required => true,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC(vm_system_info_collector) desc => ?DESC(legacy_vm_system_info_collector)
} }
)}, )},
%% Collects information about memory dynamically allocated by the Erlang VM using erlang:memory/0, %% Collects information about memory dynamically allocated by the Erlang VM using erlang:memory/0,
@ -136,7 +275,7 @@ fields("prometheus") ->
default => disabled, default => disabled,
required => true, required => true,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC(vm_memory_collector) desc => ?DESC(legacy_vm_memory_collector)
} }
)}, )},
%% Collects microstate accounting metrics using erlang:statistics(microstate_accounting). %% Collects microstate accounting metrics using erlang:statistics(microstate_accounting).
@ -147,14 +286,26 @@ fields("prometheus") ->
default => disabled, default => disabled,
required => true, required => true,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC(vm_msacc_collector) desc => ?DESC(legacy_vm_msacc_collector)
} }
)} )}
]. ].
desc("prometheus") -> ?DESC(prometheus); setting_union_schema() ->
RecommendSetting = ?R_REF(recommend_setting),
LegacySetting = ?R_REF(legacy_deprecated_setting),
fun
(all_union_members) -> [RecommendSetting, LegacySetting];
({value, #{<<"enable">> := _}}) -> [LegacySetting];
%% all other cases treat as new config, include init empty config.
({value, _}) -> [RecommendSetting]
end.
desc(prometheus) -> ?DESC(prometheus);
desc(_) -> undefined. desc(_) -> undefined.
convert_headers(undefined, _) ->
undefined;
convert_headers(Headers, #{make_serializable := true}) -> convert_headers(Headers, #{make_serializable := true}) ->
Headers; Headers;
convert_headers(<<>>, _Opts) -> convert_headers(<<>>, _Opts) ->
@ -170,10 +321,22 @@ convert_headers(Headers, _Opts) when is_map(Headers) ->
convert_headers(Headers, _Opts) when is_list(Headers) -> convert_headers(Headers, _Opts) when is_list(Headers) ->
Headers. Headers.
validate_push_gateway_server(Url) -> validate_url(Url) ->
case uri_string:parse(Url) of case uri_string:parse(Url) of
#{scheme := S} when S =:= "https" orelse S =:= "http" -> ok; #{scheme := S} when
_ -> {error, "Invalid url"} S =:= "https";
S =:= "http";
S =:= <<"https">>;
S =:= <<"http">>
->
ok;
%% default is ""
#{path := []} ->
ok;
#{path := <<>>} ->
ok;
_ ->
{error, "Invalid url"}
end. end.
%% for CI test, CI don't load the whole emqx_conf_schema. %% for CI test, CI don't load the whole emqx_conf_schema.

View File

@ -20,7 +20,8 @@
-export([ -export([
start_link/0, start_link/0,
start_child/1, start_child/2,
update_child/2,
stop_child/1 stop_child/1
]). ]).
@ -39,11 +40,13 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec() | atom()) -> ok. -spec start_child(atom(), map()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) -> start_child(Mod, Conf) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)); assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Conf))).
start_child(Mod) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). update_child(Pid, Conf) ->
erlang:send(Pid, {update, Conf}),
ok.
-spec stop_child(any()) -> ok | {error, term()}. -spec stop_child(any()) -> ok | {error, term()}.
stop_child(ChildId) -> stop_child(ChildId) ->
@ -54,10 +57,11 @@ stop_child(ChildId) ->
end. end.
init([]) -> init([]) ->
Conf = emqx_prometheus_config:conf(),
Children = Children =
case emqx_conf:get([prometheus, enable], false) of case emqx_prometheus_config:is_push_gateway_server_enabled(Conf) of
false -> []; false -> [];
true -> [?CHILD(emqx_prometheus, [])] true -> [?CHILD(emqx_prometheus, Conf)]
end, end,
{ok, {{one_for_one, 10, 3600}, Children}}. {ok, {{one_for_one, 10, 3600}, Children}}.

View File

@ -1,41 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
deprecated_since/0,
start/1,
stop/1
]).
-include_lib("emqx/include/bpapi.hrl").
deprecated_since() -> "5.0.10".
introduced_in() ->
"5.0.0".
-spec start([node()]) -> emqx_rpc:multicall_result().
start(Nodes) ->
rpc:multicall(Nodes, emqx_prometheus, do_start, [], 5000).
-spec stop([node()]) -> emqx_rpc:multicall_result().
stop(Nodes) ->
rpc:multicall(Nodes, emqx_prometheus, do_stop, [], 5000).

View File

@ -22,7 +22,7 @@
-compile(export_all). -compile(export_all).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, << -define(LEGACY_CONF_DEFAULT, <<
"\n" "\n"
"prometheus {\n" "prometheus {\n"
" push_gateway_server = \"http://127.0.0.1:9091\"\n" " push_gateway_server = \"http://127.0.0.1:9091\"\n"
@ -38,45 +38,120 @@
" vm_msacc_collector = disabled\n" " vm_msacc_collector = disabled\n"
"}\n" "}\n"
>>). >>).
-define(CONF_DEFAULT, #{
<<"prometheus">> =>
#{
<<"enable_basic_auth">> => false,
<<"collectors">> =>
#{
<<"mnesia">> => <<"disabled">>,
<<"vm_dist">> => <<"disabled">>,
<<"vm_memory">> => <<"disabled">>,
<<"vm_msacc">> => <<"disabled">>,
<<"vm_statistics">> => <<"disabled">>,
<<"vm_system_info">> => <<"disabled">>
},
<<"push_gateway">> =>
#{
<<"headers">> => #{<<"Authorization">> => <<"some-authz-tokens">>},
<<"interval">> => <<"1s">>,
<<"job_name">> => <<"${name}~${host}">>,
<<"url">> => <<"http://127.0.0.1:9091">>
}
}
}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
all() ->
[
{group, new_config},
{group, legacy_config}
].
all() -> emqx_common_test_helpers:all(?MODULE). groups() ->
[
{new_config, [sequence], common_tests()},
{legacy_config, [sequence], common_tests()}
].
init_per_suite(Cfg) -> suite() ->
[{timetrap, {seconds, 30}}].
common_tests() ->
emqx_common_test_helpers:all(?MODULE).
init_per_group(new_config, Config) ->
init_group(),
load_config(),
emqx_common_test_helpers:start_apps([emqx_prometheus]),
%% coverage olp metrics
{ok, _} = emqx:update_config([overload_protection, enable], true),
Config;
init_per_group(legacy_config, Config) ->
init_group(),
load_legacy_config(),
emqx_common_test_helpers:start_apps([emqx_prometheus]),
{ok, _} = emqx:update_config([overload_protection, enable], false),
Config.
init_group() ->
application:load(emqx_conf), application:load(emqx_conf),
ok = ekka:start(), ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok).
load_config(), end_group() ->
emqx_common_test_helpers:start_apps([emqx_prometheus]),
Cfg.
end_per_suite(_Cfg) ->
ekka:stop(), ekka:stop(),
mria:stop(), mria:stop(),
mria_mnesia:delete_schema(), mria_mnesia:delete_schema(),
meck:unload(emqx_alarm), meck:unload(emqx_alarm),
emqx_common_test_helpers:stop_apps([emqx_prometheus]). emqx_common_test_helpers:stop_apps([emqx_prometheus]).
end_per_group(_Group, Config) ->
end_group(),
Config.
init_per_testcase(t_assert_push, Config) ->
meck:new(httpc, [passthrough]),
Config;
init_per_testcase(t_push_gateway, Config) ->
start_mock_pushgateway(9091),
Config;
init_per_testcase(_Testcase, Config) ->
Config.
end_per_testcase(t_push_gateway, Config) ->
stop_mock_pushgateway(),
Config;
end_per_testcase(t_assert_push, _Config) ->
meck:unload(httpc),
ok;
end_per_testcase(_Testcase, _Config) ->
ok.
load_config() -> load_config() ->
ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?CONF_DEFAULT). ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?CONF_DEFAULT).
load_legacy_config() ->
ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?LEGACY_CONF_DEFAULT).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_start_stop(_) -> t_start_stop(_) ->
App = emqx_prometheus, App = emqx_prometheus,
?assertMatch(ok, emqx_prometheus_sup:start_child(App)), Conf = emqx_prometheus_config:conf(),
?assertMatch(ok, emqx_prometheus_sup:start_child(App, Conf)),
%% start twice return ok. %% start twice return ok.
?assertMatch(ok, emqx_prometheus_sup:start_child(App)), ?assertMatch(ok, emqx_prometheus_sup:start_child(App, Conf)),
ok = gen_server:call(emqx_prometheus, dump, 1000),
ok = gen_server:cast(emqx_prometheus, dump),
dump = erlang:send(emqx_prometheus, dump),
?assertMatch(ok, emqx_prometheus_sup:stop_child(App)), ?assertMatch(ok, emqx_prometheus_sup:stop_child(App)),
%% stop twice return ok. %% stop twice return ok.
?assertMatch(ok, emqx_prometheus_sup:stop_child(App)), ?assertMatch(ok, emqx_prometheus_sup:stop_child(App)),
@ -88,7 +163,6 @@ t_collector_no_crash_test(_) ->
ok. ok.
t_assert_push(_) -> t_assert_push(_) ->
meck:new(httpc, [passthrough]),
Self = self(), Self = self(),
AssertPush = fun(Method, Req = {Url, Headers, ContentType, _Data}, HttpOpts, Opts) -> AssertPush = fun(Method, Req = {Url, Headers, ContentType, _Data}, HttpOpts, Opts) ->
?assertEqual(post, Method), ?assertEqual(post, Method),
@ -99,13 +173,51 @@ t_assert_push(_) ->
meck:passthrough([Method, Req, HttpOpts, Opts]) meck:passthrough([Method, Req, HttpOpts, Opts])
end, end,
meck:expect(httpc, request, AssertPush), meck:expect(httpc, request, AssertPush),
?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus)), Conf = emqx_prometheus_config:conf(),
?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus, Conf)),
receive receive
pass -> ok pass -> ok
after 2000 -> after 2000 ->
ct:fail(assert_push_request_failed) ct:fail(assert_push_request_failed)
end. end.
t_only_for_coverage(_) -> t_push_gateway(_) ->
?assertEqual("5.0.0", emqx_prometheus_proto_v1:introduced_in()), Conf = emqx_prometheus_config:conf(),
?assertMatch(ok, emqx_prometheus_sup:stop_child(emqx_prometheus)),
?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus, Conf)),
?assertMatch(#{ok := 0, failed := 0}, emqx_prometheus:info()),
timer:sleep(1100),
?assertMatch(#{ok := 1, failed := 0}, emqx_prometheus:info()),
ok = emqx_prometheus_sup:update_child(emqx_prometheus, Conf),
?assertMatch(#{ok := 0, failed := 0}, emqx_prometheus:info()),
ok. ok.
start_mock_pushgateway(Port) ->
application:ensure_all_started(cowboy),
Dispatch = cowboy_router:compile([{'_', [{'_', ?MODULE, []}]}]),
{ok, _} = cowboy:start_clear(
mock_pushgateway_listener,
[{port, Port}],
#{env => #{dispatch => Dispatch}}
).
stop_mock_pushgateway() ->
cowboy:stop_listener(mock_pushgateway_listener).
init(Req0, Opts) ->
Method = cowboy_req:method(Req0),
Headers = cowboy_req:headers(Req0),
?assertEqual(<<"POST">>, Method),
?assertMatch(
#{
<<"authorization">> := <<"some-authz-tokens">>,
<<"content-length">> := _,
<<"content-type">> := <<"text/plain">>,
<<"host">> := <<"127.0.0.1:9091">>
},
Headers
),
RespHeader = #{<<"content-type">> => <<"text/plain; charset=utf-8">>},
Req = -cowboy_req:reply(200, RespHeader, <<"OK">>, Req0),
{ok, Req, Opts}.

View File

@ -28,40 +28,59 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). [
{group, new_config},
{group, legacy_config}
].
groups() ->
[
{new_config, [sequence], [t_stats_api, t_prometheus_api]},
{legacy_config, [sequence], [t_stats_api, t_legacy_prometheus_api]}
].
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_conf), emqx_prometheus_SUITE:init_group(),
ok = ekka:start(), emqx_mgmt_api_test_util:init_suite([emqx_conf]),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(mria_rlog, [non_strict, passthrough, no_link]),
emqx_prometheus_SUITE:load_config(),
emqx_mgmt_api_test_util:init_suite([emqx_prometheus]),
Config. Config.
end_per_suite(Config) -> end_per_suite(Config) ->
ekka:stop(), emqx_prometheus_SUITE:end_group(),
mria:stop(), emqx_mgmt_api_test_util:end_suite([emqx_conf]),
mria_mnesia:delete_schema(),
meck:unload(mria_rlog),
emqx_mgmt_api_test_util:end_suite([emqx_prometheus]),
Config. Config.
init_per_testcase(_, Config) -> init_per_group(new_config, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(), emqx_common_test_helpers:start_apps(
[emqx_prometheus],
fun(App) -> set_special_configs(App, new_config) end
),
Config;
init_per_group(legacy_config, Config) ->
emqx_common_test_helpers:start_apps(
[emqx_prometheus],
fun(App) -> set_special_configs(App, legacy_config) end
),
Config. Config.
end_per_group(_Group, Config) ->
_ = application:stop(emqx_prometheus),
Config.
set_special_configs(emqx_dashboard, _) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(emqx_prometheus, new_config) ->
emqx_prometheus_SUITE:load_config(),
ok;
set_special_configs(emqx_prometheus, legacy_config) ->
emqx_prometheus_SUITE:load_legacy_config(),
ok;
set_special_configs(_App, _) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Cases %% Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_prometheus_api(_) -> t_legacy_prometheus_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["prometheus"]), Path = emqx_mgmt_api_test_util:api_path(["prometheus"]),
Auth = emqx_mgmt_api_test_util:auth_header_(), Auth = emqx_mgmt_api_test_util:auth_header_(),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, "", Auth), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, "", Auth),
@ -145,6 +164,105 @@ t_prometheus_api(_) ->
), ),
ok. ok.
t_prometheus_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["prometheus"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, "", Auth),
Conf = emqx_utils_json:decode(Response, [return_maps]),
?assertMatch(
#{
<<"push_gateway">> := #{},
<<"collectors">> := _,
<<"enable_basic_auth">> := _
},
Conf
),
#{
<<"push_gateway">> :=
#{<<"url">> := Url} = PushGateway,
<<"collectors">> := Collector
} = Conf,
Pid = erlang:whereis(emqx_prometheus),
?assertEqual(Url =/= "", undefined =/= Pid, {Url, Pid}),
NewConf = Conf#{
<<"push_gateway">> => PushGateway#{
<<"interval">> => <<"2s">>,
<<"headers">> => #{
<<"test-str1">> => <<"test-value">>,
<<"test-str2">> => <<"42">>
}
},
<<"collectors">> => Collector#{
<<"vm_dist">> => <<"enabled">>,
<<"vm_system_info">> => <<"enabled">>,
<<"vm_memory">> => <<"enabled">>,
<<"vm_msacc">> => <<"enabled">>,
<<"mnesia">> => <<"enabled">>,
<<"vm_statistics">> => <<"enabled">>
}
},
{ok, Response2} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf),
Conf2 = emqx_utils_json:decode(Response2, [return_maps]),
?assertMatch(NewConf, Conf2),
EnvCollectors = application:get_env(prometheus, collectors, []),
PromCollectors = prometheus_registry:collectors(default),
?assertEqual(lists:sort(EnvCollectors), lists:sort(PromCollectors)),
?assert(lists:member(prometheus_vm_statistics_collector, EnvCollectors), EnvCollectors),
lists:foreach(
fun({C, Enabled}) ->
?assertEqual(Enabled, lists:member(C, EnvCollectors), EnvCollectors)
end,
[
{prometheus_vm_dist_collector, true},
{prometheus_vm_system_info_collector, true},
{prometheus_vm_memory_collector, true},
{prometheus_mnesia_collector, true},
{prometheus_vm_msacc_collector, true},
{prometheus_vm_statistics_collector, true}
]
),
?assertMatch(
#{
<<"push_gateway">> := #{
<<"headers">> := #{
<<"test-str1">> := <<"test-value">>,
<<"test-str2">> := <<"42">>
}
}
},
emqx_config:get_raw([prometheus])
),
?assertMatch(
#{
push_gateway := #{
headers := [
{"test-str2", "42"},
{"test-str1", "test-value"}
]
}
},
emqx_config:get([prometheus])
),
NewConf1 = Conf#{<<"push_gateway">> => PushGateway#{<<"url">> => <<"">>}},
{ok, _Response3} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf1),
?assertEqual(undefined, erlang:whereis(emqx_prometheus)),
ConfWithoutScheme = Conf#{
<<"push_gateway">> => PushGateway#{<<"url">> => <<"127.0.0.1:8081">>}
},
?assertMatch(
{error, {"HTTP/1.1", 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, ConfWithoutScheme)
),
ok.
t_stats_api(_) -> t_stats_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["prometheus", "stats"]), Path = emqx_mgmt_api_test_util:api_path(["prometheus", "stats"]),
Auth = emqx_mgmt_api_test_util:auth_header_(), Auth = emqx_mgmt_api_test_util:auth_header_(),

View File

@ -11,8 +11,8 @@ update_prom_conf_info.label:
"""Update Prometheus config""" """Update Prometheus config"""
get_prom_data.desc: get_prom_data.desc:
"""Get Prometheus Data""" """Get Prometheus Metrics"""
get_prom_data.label: get_prom_data.label:
"""Get Prometheus Data""" """Prometheus Metrics"""
} }

View File

@ -1,8 +1,5 @@
emqx_prometheus_schema { emqx_prometheus_schema {
enable.desc:
"""Turn Prometheus data pushing on or off"""
headers.desc: headers.desc:
"""An HTTP Headers when pushing to Push Gateway.<br/> """An HTTP Headers when pushing to Push Gateway.<br/>
For example, <code> { Authorization = "some-authz-tokens"}</code>""" For example, <code> { Authorization = "some-authz-tokens"}</code>"""
@ -17,19 +14,25 @@ job_name.desc:
For example, when the EMQX node name is <code>emqx@127.0.0.1</code> then the <code>name</code> variable takes value <code>emqx</code> and the <code>host</code> variable takes value <code>127.0.0.1</code>.<br/> For example, when the EMQX node name is <code>emqx@127.0.0.1</code> then the <code>name</code> variable takes value <code>emqx</code> and the <code>host</code> variable takes value <code>127.0.0.1</code>.<br/>
Default value is: <code>${name}/instance/${name}~${host}</code>""" Default value is: <code>${name}/instance/${name}~${host}</code>"""
mnesia_collector.desc:
"""Enable or disable Mnesia metrics collector"""
prometheus.desc: prometheus.desc:
"""EMQX's Prometheus scraping endpoint is enabled by default without authentication. """EMQX's Prometheus scraping endpoint is enabled by default without authentication.
You can inspect it with a `curl` command like this: `curl -f "127.0.0.1:18083/api/v5/prometheus/stats"`<br/> You can inspect it with a `curl` command like this: `curl -f "127.0.0.1:18083/api/v5/prometheus/stats"`<br/>"""
The 'enable' flag is used to turn on and off for the push-gateway integration."""
prometheus.label: prometheus.label:
"""Prometheus""" """Prometheus"""
push_gateway_server.desc: push_gateway.desc:
"""URL of Prometheus server. Pushgateway is optional, should not be configured if prometheus is to scrape EMQX.""" """Push Gateway is optional, should not be configured if prometheus is to scrape EMQX."""
collectors.desc:
"""Metrics collectors to be enabled."""
push_gateway_url.desc:
"""URL of Prometheus server. Pushgateway is optional, should not be configured if prometheus is to scrape EMQX.
Set url to "" to disable push gateway"""
mnesia_collector.desc:
"""Enable or disable Mnesia metrics collector"""
vm_dist_collector.desc: vm_dist_collector.desc:
"""Enable or disable VM distribution collector, collects information about the sockets and processes involved in the Erlang distribution mechanism.""" """Enable or disable VM distribution collector, collects information about the sockets and processes involved in the Erlang distribution mechanism."""
@ -46,4 +49,37 @@ vm_statistics_collector.desc:
vm_system_info_collector.desc: vm_system_info_collector.desc:
"""Enable or disable VM system info collector.""" """Enable or disable VM system info collector."""
legacy_enable.desc:
"""Deprecated, use prometheus.push_gateway.url instead"""
legacy_headers.desc:
"""Deprecated, use prometheus.push_gateway.headers instead"""
legacy_interval.desc:
"""Deprecated, use prometheus.push_gateway.interval instead"""
legacy_job_name.desc:
"""Deprecated, use prometheus.push_gateway.job_name instead"""
legacy_push_gateway_server.desc:
"""Deprecated, use prometheus.push_gateway.url instead"""
legacy_mnesia_collector.desc:
"""Deprecated, use prometheus.collectors.mnesia instead"""
legacy_vm_dist_collector.desc:
"""Deprecated, use prometheus.collectors.vm_dist instead"""
legacy_vm_memory_collector.desc:
"""Deprecated, use prometheus.collectors.vm_memory instead"""
legacy_vm_msacc_collector.desc:
"""Deprecated, use prometheus.collectors.vm_msacc instead"""
legacy_vm_statistics_collector.desc:
"""Deprecated, use prometheus.collectors.vm_statistics instead"""
legacy_vm_system_info_collector.desc:
"""Deprecated, use prometheus.collectors.vm_system_info instead"""
} }