Merge pull request #7809 from EMQ-YangM/authn_add_metrics

feat: authn new metrics
This commit is contained in:
Xinyu Liu 2022-04-28 19:24:09 +08:00 committed by GitHub
commit 8380efef16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 254 additions and 60 deletions

View File

@ -579,8 +579,11 @@ handle_delete_authenticator(Chain, AuthenticatorID) ->
ID =:= AuthenticatorID ID =:= AuthenticatorID
end, end,
case do_delete_authenticators(MatchFun, Chain) of case do_delete_authenticators(MatchFun, Chain) of
[] -> {error, {not_found, {authenticator, AuthenticatorID}}}; [] ->
[AuthenticatorID] -> ok {error, {not_found, {authenticator, AuthenticatorID}}};
[AuthenticatorID] ->
emqx_plugin_libs_metrics:clear_metrics(authn_metrics, AuthenticatorID),
ok
end. end.
handle_move_authenticator(Chain, AuthenticatorID, Position) -> handle_move_authenticator(Chain, AuthenticatorID, Position) ->
@ -609,6 +612,13 @@ handle_create_authenticator(Chain, Config, Providers) ->
?CHAINS_TAB, ?CHAINS_TAB,
Chain#chain{authenticators = NAuthenticators} Chain#chain{authenticators = NAuthenticators}
), ),
ok = emqx_plugin_libs_metrics:create_metrics(
authn_metrics,
AuthenticatorID,
[matched, success, failed, ignore],
[matched]
),
{ok, serialize_authenticator(Authenticator)}; {ok, serialize_authenticator(Authenticator)};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -618,8 +628,10 @@ handle_create_authenticator(Chain, Config, Providers) ->
do_authenticate([], _) -> do_authenticate([], _) ->
{stop, {error, not_authorized}}; {stop, {error, not_authorized}};
do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) -> do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
emqx_plugin_libs_metrics:inc(authn_metrics, ID, matched),
try Provider:authenticate(Credential, State) of try Provider:authenticate(Credential, State) of
ignore -> ignore ->
ok = emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore),
do_authenticate(More, Credential); do_authenticate(More, Credential);
Result -> Result ->
%% {ok, Extra} %% {ok, Extra}
@ -627,6 +639,14 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
%% {continue, AuthCache} %% {continue, AuthCache}
%% {continue, AuthData, AuthCache} %% {continue, AuthData, AuthCache}
%% {error, Reason} %% {error, Reason}
case Result of
{ok, _} ->
emqx_plugin_libs_metrics:inc(authn_metrics, ID, success);
{error, _} ->
emqx_plugin_libs_metrics:inc(authn_metrics, ID, failed);
_ ->
ok
end,
{stop, Result} {stop, Result}
catch catch
Class:Reason:Stacktrace -> Class:Reason:Stacktrace ->
@ -637,6 +657,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
stacktrace => Stacktrace, stacktrace => Stacktrace,
authenticator => ID authenticator => ID
}), }),
emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore),
do_authenticate(More, Credential) do_authenticate(More, Credential)
end. end.

View File

@ -0,0 +1,38 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authn_authz_metrics_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
AuthnMetrics = emqx_plugin_libs_metrics:child_spec(emqx_authn_metrics, authn_metrics),
AuthzMetrics = emqx_plugin_libs_metrics:child_spec(eqmx_authz_metrics, authz_metrics),
{ok,
{
{one_for_one, 10, 100},
[
AuthnMetrics,
AuthzMetrics
]
}}.

View File

@ -35,7 +35,8 @@ init([]) ->
child_spec(emqx_hooks, worker), child_spec(emqx_hooks, worker),
child_spec(emqx_stats, worker), child_spec(emqx_stats, worker),
child_spec(emqx_metrics, worker), child_spec(emqx_metrics, worker),
child_spec(emqx_ctl, worker) child_spec(emqx_ctl, worker),
child_spec(emqx_authn_authz_metrics_sup, supervisor)
] ]
}}. }}.

View File

@ -76,6 +76,17 @@ emqx_authn_schema {
} }
} }
node_error {
desc {
en: """The error of node."""
zh: """节点上产生的错误。"""
}
label: {
en: """Error in Node"""
zh: """节点产生的错误"""
}
}
matched { matched {
desc { desc {
en: """Count of this resource is queried.""" en: """Count of this resource is queried."""

View File

@ -930,26 +930,6 @@ list_authenticator(_, ConfKeyPath, AuthenticatorID) ->
serialize_error(Reason) serialize_error(Reason)
end. end.
lookup_from_local_node(ChainName, AuthenticatorID) ->
NodeId = node(self()),
case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
{ok, #{provider := Provider, state := State}} ->
case lists:member(Provider, resource_provider()) of
false ->
{error, {NodeId, resource_unsupport_metrics_and_status}};
true ->
#{resource_id := ResourceId} = State,
case emqx_resource:get_instance(ResourceId) of
{error, not_found} ->
{error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := Metrics}} ->
{ok, {NodeId, Status, Metrics}}
end
end;
{error, Reason} ->
{error, {NodeId, Reason}}
end.
resource_provider() -> resource_provider() ->
[ [
emqx_authn_mysql, emqx_authn_mysql,
@ -959,32 +939,54 @@ resource_provider() ->
emqx_authn_http emqx_authn_http
]. ].
lookup_from_local_node(ChainName, AuthenticatorID) ->
NodeId = node(self()),
case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
{ok, #{provider := Provider, state := State}} ->
Metrics = emqx_plugin_libs_metrics:get_metrics(authn_metrics, AuthenticatorID),
case lists:member(Provider, resource_provider()) of
false ->
{ok, {NodeId, connected, Metrics, #{}}};
true ->
#{resource_id := ResourceId} = State,
case emqx_resource:get_instance(ResourceId) of
{error, not_found} ->
{error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := ResourceMetrics}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}}
end
end;
{error, Reason} ->
{error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
end.
lookup_from_all_nodes(ChainName, AuthenticatorID) -> lookup_from_all_nodes(ChainName, AuthenticatorID) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of
{ok, ResList} -> {ok, ResList} ->
{StatusMap, MetricsMap, _} = make_result_map(ResList), {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)), AggregateStatus = aggregate_status(maps:values(StatusMap)),
AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)), AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
AggregateResourceMetrics = aggregate_metrics(maps:values(ResourceMetricsMap)),
Fun = fun(_, V1) -> restructure_map(V1) end, Fun = fun(_, V1) -> restructure_map(V1) end,
MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end, MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end,
HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end, HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end,
case AggregateStatus of {200, #{
empty_metrics_and_status -> node_resource_metrics => HelpFun(maps:map(Fun, ResourceMetricsMap), metrics),
{400, #{ resource_metrics =>
code => <<"BAD_REQUEST">>, case maps:size(AggregateResourceMetrics) of
message => <<"Resource Not Support Status">> 0 -> #{};
}}; _ -> restructure_map(AggregateResourceMetrics)
_ -> end,
{200, #{ node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
node_status => HelpFun(StatusMap, status), metrics => restructure_map(AggregateMetrics),
node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
status => AggregateStatus, node_status => HelpFun(StatusMap, status),
metrics => restructure_map(AggregateMetrics) status => AggregateStatus,
}} node_error => HelpFun(maps:map(Fun, ErrorMap), reason)
end; }};
{error, ErrL} -> {error, ErrL} ->
{500, #{ {400, #{
code => <<"INTERNAL_ERROR">>, code => <<"INTERNAL_ERROR">>,
message => list_to_binary(io_lib:format("~p", [ErrL])) message => list_to_binary(io_lib:format("~p", [ErrL]))
}} }}
@ -1002,7 +1004,7 @@ aggregate_status(AllStatus) ->
end. end.
aggregate_metrics([]) -> aggregate_metrics([]) ->
empty_metrics_and_status; #{};
aggregate_metrics([HeadMetrics | AllMetrics]) -> aggregate_metrics([HeadMetrics | AllMetrics]) ->
CombinerFun = CombinerFun =
fun ComFun(_Key, Val1, Val2) -> fun ComFun(_Key, Val1, Val2) ->
@ -1018,20 +1020,34 @@ aggregate_metrics([HeadMetrics | AllMetrics]) ->
make_result_map(ResList) -> make_result_map(ResList) ->
Fun = Fun =
fun(Elem, {StatusMap, MetricsMap, ErrorMap}) -> fun(Elem, {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap}) ->
case Elem of case Elem of
{ok, {NodeId, Status, Metrics}} -> {ok, {NodeId, Status, Metrics, ResourceMetrics}} ->
{ {
maps:put(NodeId, Status, StatusMap), maps:put(NodeId, Status, StatusMap),
maps:put(NodeId, Metrics, MetricsMap), maps:put(NodeId, Metrics, MetricsMap),
maps:put(NodeId, ResourceMetrics, ResourceMetricsMap),
ErrorMap ErrorMap
}; };
{error, {NodeId, Reason}} -> {error, {NodeId, Reason}} ->
{StatusMap, MetricsMap, maps:put(NodeId, Reason, ErrorMap)} {StatusMap, MetricsMap, ResourceMetricsMap, maps:put(NodeId, Reason, ErrorMap)}
end end
end, end,
lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList). lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList).
restructure_map(#{
counters := #{failed := Failed, matched := Match, success := Succ, ignore := Ignore},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) ->
#{
matched => Match,
success => Succ,
failed => Failed,
ignore => Ignore,
rate => Rate,
rate_last5m => Rate5m,
rate_max => RateMax
};
restructure_map(#{ restructure_map(#{
counters := #{failed := Failed, matched := Match, success := Succ}, counters := #{failed := Failed, matched := Match, success := Succ},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}} rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
@ -1439,10 +1455,12 @@ status_metrics_example() ->
matched => 0, matched => 0,
success => 0, success => 0,
failed => 0, failed => 0,
ignore => 0,
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.0 rate_max => 0.0
}, },
node_error => [],
node_metrics => [ node_metrics => [
#{ #{
node => node(), node => node(),
@ -1450,6 +1468,7 @@ status_metrics_example() ->
matched => 0, matched => 0,
success => 0, success => 0,
failed => 0, failed => 0,
ignore => 0,
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.0 rate_max => 0.0

View File

@ -89,20 +89,58 @@ backend(Name) ->
fields("metrics_status_fields") -> fields("metrics_status_fields") ->
[ [
{"resource_metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})},
{"node_resource_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_resource_metrics")),
#{desc => ?DESC("node_metrics")}
)},
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})}, {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})},
{"node_metrics", {"node_metrics",
mk( mk(
hoconsc:array(ref(?MODULE, "node_metrics")), hoconsc:array(ref(?MODULE, "node_metrics")),
#{desc => ?DESC("node_metrics")} #{desc => ?DESC("node_metrics")}
)}, )},
{"status", mk(status(), #{desc => ?DESC("status")})}, {"status", mk(cluster_status(), #{desc => ?DESC("status")})},
{"node_status", {"node_status",
mk( mk(
hoconsc:array(ref(?MODULE, "node_status")), hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("node_status")} #{desc => ?DESC("node_status")}
)},
{"node_error",
mk(
hoconsc:array(ref(?MODULE, "node_error")),
#{desc => ?DESC("node_error")}
)} )}
]; ];
fields("metrics") -> fields("metrics") ->
[
{"ignore", mk(integer(), #{desc => ?DESC("failed")})}
] ++ common_field();
fields("resource_metrics") ->
common_field();
fields("node_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})}
];
fields("node_resource_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})}
];
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{desc => ?DESC("node_status")})}
];
fields("node_error") ->
[
node_name(),
{"error", mk(string(), #{desc => ?DESC("node_error")})}
].
common_field() ->
[ [
{"matched", mk(integer(), #{desc => ?DESC("matched")})}, {"matched", mk(integer(), #{desc => ?DESC("matched")})},
{"success", mk(integer(), #{desc => ?DESC("success")})}, {"success", mk(integer(), #{desc => ?DESC("success")})},
@ -110,20 +148,13 @@ fields("metrics") ->
{"rate", mk(float(), #{desc => ?DESC("rate")})}, {"rate", mk(float(), #{desc => ?DESC("rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("rate_max")})}, {"rate_max", mk(float(), #{desc => ?DESC("rate_max")})},
{"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})} {"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})}
];
fields("node_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})}
];
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{desc => ?DESC("node_status")})}
]. ].
status() -> status() ->
hoconsc:enum([connected, disconnected, connecting]). hoconsc:enum([connected, disconnected, connecting]).
cluster_status() ->
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
node_name() -> node_name() ->
{"node", mk(binary(), #{desc => ?DESC("node"), example => "emqx@127.0.0.1"})}. {"node", mk(binary(), #{desc => ?DESC("node"), example => "emqx@127.0.0.1"})}.

View File

@ -129,7 +129,8 @@ t_aggregate_metrics(_) ->
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.1, rate_max => 0.1,
success => 1 success => 1,
ignore => 1
} }
}, },
'emqx@node2.emqx.io' => #{ 'emqx@node2.emqx.io' => #{
@ -140,7 +141,8 @@ t_aggregate_metrics(_) ->
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.1, rate_max => 0.1,
success => 1 success => 1,
ignore => 2
} }
} }
}, },
@ -154,7 +156,8 @@ t_aggregate_metrics(_) ->
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.2, rate_max => 0.2,
success => 2 success => 2,
ignore => 3
} }
}, },
Res Res
@ -299,6 +302,40 @@ test_authenticator_users(PathPrefix) ->
emqx_authn_test_lib:built_in_database_example() emqx_authn_test_lib:built_in_database_example()
), ),
{ok, Client} = emqtt:start_link(
[
{username, <<"u_event">>},
{clientid, <<"c_event">>},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 60}}
]
),
process_flag(trap_exit, true),
?assertMatch({error, _}, emqtt:connect(Client)),
timer:sleep(300),
UsersUri0 = uri(PathPrefix ++ [?CONF_NS, "password_based:built_in_database", "status"]),
{ok, 200, PageData0} = request(get, UsersUri0),
case PathPrefix of
[] ->
#{
<<"metrics">> := #{
<<"matched">> := 1,
<<"success">> := 0,
<<"ignore">> := 1
}
} = jiffy:decode(PageData0, [return_maps]);
["listeners", 'tcp:default'] ->
#{
<<"metrics">> := #{
<<"matched">> := 1,
<<"success">> := 0,
<<"ignore">> := 1
}
} = jiffy:decode(PageData0, [return_maps])
end,
InvalidUsers = [ InvalidUsers = [
#{clientid => <<"u1">>, password => <<"p1">>}, #{clientid => <<"u1">>, password => <<"p1">>},
#{user_id => <<"u2">>}, #{user_id => <<"u2">>},
@ -325,6 +362,38 @@ test_authenticator_users(PathPrefix) ->
ValidUsers ValidUsers
), ),
{ok, Client1} = emqtt:start_link(
[
{username, <<"u1">>},
{password, <<"p1">>},
{clientid, <<"c_event">>},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 60}}
]
),
{ok, _} = emqtt:connect(Client1),
timer:sleep(300),
UsersUri01 = uri(PathPrefix ++ [?CONF_NS, "password_based:built_in_database", "status"]),
{ok, 200, PageData01} = request(get, UsersUri01),
case PathPrefix of
[] ->
#{
<<"metrics">> := #{
<<"matched">> := 2,
<<"success">> := 1,
<<"ignore">> := 1
}
} = jiffy:decode(PageData01, [return_maps]);
["listeners", 'tcp:default'] ->
#{
<<"metrics">> := #{
<<"matched">> := 2,
<<"success">> := 1,
<<"ignore">> := 1
}
} = jiffy:decode(PageData01, [return_maps])
end,
{ok, 200, Page1Data} = request(get, UsersUri ++ "?page=1&limit=2"), {ok, 200, Page1Data} = request(get, UsersUri ++ "?page=1&limit=2"),
#{ #{

View File

@ -22,7 +22,8 @@
-export([ -export([
start_link/1, start_link/1,
stop/1, stop/1,
child_spec/1 child_spec/1,
child_spec/2
]). ]).
-export([ -export([
@ -99,8 +100,11 @@
-spec child_spec(handler_name()) -> supervisor:child_spec(). -spec child_spec(handler_name()) -> supervisor:child_spec().
child_spec(Name) -> child_spec(Name) ->
child_spec(emqx_plugin_libs_metrics, Name).
child_spec(ChldName, Name) ->
#{ #{
id => emqx_plugin_libs_metrics, id => ChldName,
start => {emqx_plugin_libs_metrics, start_link, [Name]}, start => {emqx_plugin_libs_metrics, start_link, [Name]},
restart => permanent, restart => permanent,
shutdown => 5000, shutdown => 5000,