feat(prom): support headers for pushing

This commit is contained in:
JianBo He 2023-01-03 12:55:07 +08:00
parent 6f0e228a9b
commit 50b7ac6a22
5 changed files with 56 additions and 5 deletions

View File

@ -24,6 +24,16 @@ emqx_prometheus_schema {
zh: """数据推送间隔"""
}
}
headers {
desc {
en: """A list of HTTP Headers when pushing to Push Gateway.<br/>
For example, <code> { Authorization = "some-authz-tokens"}</code>"""
zh: """推送到 Push Gateway 的 HTTP Headers 列表。<br/>
例如,<code> { Authorization = "some-authz-tokens"}</code>"""
}
}
enable {
desc {
en: """Turn Prometheus data pushing on or off"""

View File

@ -98,8 +98,12 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
#{interval := Interval, push_gateway_server := Server} = opts(),
PushRes = push_to_push_gateway(Server),
#{
interval := Interval,
headers := Headers,
push_gateway_server := Server
} = opts(),
PushRes = push_to_push_gateway(Server, Headers),
NewTimer = ensure_timer(Interval),
NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}),
%% Data is too big, hibernate for saving memory and stop system monitor warning.
@ -107,12 +111,19 @@ handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
handle_info(_Msg, State) ->
{noreply, State}.
push_to_push_gateway(Uri) ->
push_to_push_gateway(Uri, Headers0) when is_map(Headers0) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"),
Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/", Name, "~", Ip]),
Data = prometheus_text_format:format(),
case httpc:request(post, {Url, [], "text/plain", Data}, ?HTTP_OPTIONS, []) of
{ok, {{"HTTP/1.1", 200, "OK"}, _Headers, _Body}} ->
Headers = maps:fold(
fun(K, V, Acc) ->
[{atom_to_list(K), binary_to_list(V)} | Acc]
end,
[],
Headers0
),
case httpc:request(post, {Url, Headers, "text/plain", Data}, ?HTTP_OPTIONS, []) of
{ok, {{"HTTP/1.1", 200, _}, _Headers, _Body}} ->
ok;
Error ->
?SLOG(error, #{

View File

@ -121,6 +121,7 @@ prometheus_config_example() ->
enable => true,
interval => "15s",
push_gateway_server => <<"http://127.0.0.1:9091">>,
headers => #{'header-name' => 'header-value'},
vm_dist_collector => enabled,
mnesia_collector => enabled,
vm_statistics_collector => enabled,

View File

@ -52,6 +52,15 @@ fields("prometheus") ->
desc => ?DESC(interval)
}
)},
{headers,
?HOCON(
map(),
#{
default => #{},
required => false,
desc => ?DESC(headers)
}
)},
{enable,
?HOCON(
boolean(),

View File

@ -27,6 +27,7 @@
"prometheus {\n"
" push_gateway_server = \"http://127.0.0.1:9091\"\n"
" interval = \"1s\"\n"
" headers = { Authorization = \"some-authz-tokens\"}\n"
" enable = true\n"
" vm_dist_collector = enabled\n"
" mnesia_collector = enabled\n"
@ -85,6 +86,25 @@ t_collector_no_crash_test(_) ->
prometheus_text_format:format(),
ok.
t_assert_push(_) ->
meck:new(httpc, [passthrough]),
Self = self(),
AssertPush = fun(Method, Req = {Url, Headers, ContentType, _Data}, HttpOpts, Opts) ->
?assertEqual(post, Method),
?assertMatch("http://127.0.0.1:9091/metrics/job/" ++ _, Url),
?assertEqual([{"Authorization", "some-authz-tokens"}], Headers),
?assertEqual("text/plain", ContentType),
Self ! pass,
meck:passthrough([Method, Req, HttpOpts, Opts])
end,
meck:expect(httpc, request, AssertPush),
?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus)),
receive
pass -> ok
after 2000 ->
ct:fail(assert_push_request_failed)
end.
t_only_for_coverage(_) ->
?assertEqual("5.0.0", emqx_prometheus_proto_v1:introduced_in()),
ok.