feat: authn add new metrics

This commit is contained in:
EMQ-YangM 2022-04-28 11:08:44 +08:00
parent 2dded74584
commit 2a7bd74ef1
6 changed files with 142 additions and 43 deletions

View File

@ -609,6 +609,9 @@ handle_create_authenticator(Chain, Config, Providers) ->
?CHAINS_TAB, ?CHAINS_TAB,
Chain#chain{authenticators = NAuthenticators} Chain#chain{authenticators = NAuthenticators}
), ),
ok = emqx_plugin_libs_metrics:create_metrics(authn_metrics, AuthenticatorID,
[matched, success, failed, ignore], [matched]),
{ok, serialize_authenticator(Authenticator)}; {ok, serialize_authenticator(Authenticator)};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -618,8 +621,10 @@ handle_create_authenticator(Chain, Config, Providers) ->
do_authenticate([], _) -> do_authenticate([], _) ->
{stop, {error, not_authorized}}; {stop, {error, not_authorized}};
do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) -> do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
emqx_plugin_libs_metrics:inc(authn_metrics, ID, matched),
try Provider:authenticate(Credential, State) of try Provider:authenticate(Credential, State) of
ignore -> ignore ->
ok = emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore),
do_authenticate(More, Credential); do_authenticate(More, Credential);
Result -> Result ->
%% {ok, Extra} %% {ok, Extra}
@ -627,6 +632,12 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
%% {continue, AuthCache} %% {continue, AuthCache}
%% {continue, AuthData, AuthCache} %% {continue, AuthData, AuthCache}
%% {error, Reason} %% {error, Reason}
case Result of
{ok, _} ->
emqx_plugin_libs_metrics:inc(authn_metrics, ID, success);
{error, _} ->
emqx_plugin_libs_metrics:inc(authn_metrics, ID, failed)
end,
{stop, Result} {stop, Result}
catch catch
Class:Reason:Stacktrace -> Class:Reason:Stacktrace ->
@ -637,6 +648,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
stacktrace => Stacktrace, stacktrace => Stacktrace,
authenticator => ID authenticator => ID
}), }),
emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore),
do_authenticate(More, Credential) do_authenticate(More, Credential)
end. end.

View File

@ -76,6 +76,17 @@ emqx_authn_schema {
} }
} }
node_error {
desc {
en: """The error of node."""
zh: """节点上产生的错误。"""
}
label: {
en: """Error in Node"""
zh: """节点产生的错误"""
}
}
matched { matched {
desc { desc {
en: """Count of this resource is queried.""" en: """Count of this resource is queried."""

View File

@ -902,6 +902,8 @@ create_authenticator(ConfKeyPath, ChainName, Config) ->
raw_config := AuthenticatorsConfig raw_config := AuthenticatorsConfig
}} -> }} ->
{ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig), {ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig),
ok = emqx_plugin_libs_metrics:create_metrics(authn_metrics, ID,
[matched, success, failed, ignore], [matched]),
{200, maps:put(id, ID, convert_certs(fill_defaults(AuthenticatorConfig)))}; {200, maps:put(id, ID, convert_certs(fill_defaults(AuthenticatorConfig)))};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} -> {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
serialize_error(Reason); serialize_error(Reason);
@ -930,61 +932,55 @@ list_authenticator(_, ConfKeyPath, AuthenticatorID) ->
serialize_error(Reason) serialize_error(Reason)
end. end.
resource_provider() ->
[
emqx_authn_mysql,
emqx_authn_pgsql,
emqx_authn_mongodb,
emqx_authn_redis,
emqx_authn_http
].
lookup_from_local_node(ChainName, AuthenticatorID) -> lookup_from_local_node(ChainName, AuthenticatorID) ->
NodeId = node(self()), NodeId = node(self()),
case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
{ok, #{provider := Provider, state := State}} -> {ok, #{provider := Provider, state := State}} ->
Metrics = emqx_plugin_libs_metrics:get_metrics(authn_metrics, AuthenticatorID),
case lists:member(Provider, resource_provider()) of case lists:member(Provider, resource_provider()) of
false -> false ->
{error, {NodeId, resource_unsupport_metrics_and_status}}; {ok, {NodeId, connected, Metrics}};
true -> true ->
#{resource_id := ResourceId} = State, #{resource_id := ResourceId} = State,
case emqx_resource:get_instance(ResourceId) of case emqx_resource:get_instance(ResourceId) of
{error, not_found} -> {error, not_found} ->
{error, {NodeId, not_found_resource}}; {error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := Metrics}} -> {ok, _, #{status := Status}} ->
{ok, {NodeId, Status, Metrics}} {ok, {NodeId, Status, Metrics}}
end end
end; end;
{error, Reason} -> {error, Reason} ->
{error, {NodeId, Reason}} {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
end. end.
resource_provider() ->
[
emqx_authn_mysql,
emqx_authn_pgsql,
emqx_authn_mongodb,
emqx_authn_redis,
emqx_authn_http
].
lookup_from_all_nodes(ChainName, AuthenticatorID) -> lookup_from_all_nodes(ChainName, AuthenticatorID) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of
{ok, ResList} -> {ok, ResList} ->
{StatusMap, MetricsMap, _} = make_result_map(ResList), {StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)), AggregateStatus = aggregate_status(maps:values(StatusMap)),
AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)), AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
Fun = fun(_, V1) -> restructure_map(V1) end, Fun = fun(_, V1) -> restructure_map(V1) end,
MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end, MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end,
HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end, HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end,
case AggregateStatus of {200, #{
empty_metrics_and_status -> node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
{400, #{ metrics => restructure_map(AggregateMetrics),
code => <<"BAD_REQUEST">>, node_status => HelpFun(StatusMap, status),
message => <<"Resource Not Support Status">> status => AggregateStatus,
}}; node_error => HelpFun(maps:map(Fun, ErrorMap), reason)
_ -> }};
{200, #{
node_status => HelpFun(StatusMap, status),
node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
status => AggregateStatus,
metrics => restructure_map(AggregateMetrics)
}}
end;
{error, ErrL} -> {error, ErrL} ->
{500, #{ {400, #{
code => <<"INTERNAL_ERROR">>, code => <<"INTERNAL_ERROR">>,
message => list_to_binary(io_lib:format("~p", [ErrL])) message => list_to_binary(io_lib:format("~p", [ErrL]))
}} }}
@ -1019,27 +1015,28 @@ aggregate_metrics([HeadMetrics | AllMetrics]) ->
make_result_map(ResList) -> make_result_map(ResList) ->
Fun = Fun =
fun(Elem, {StatusMap, MetricsMap, ErrorMap}) -> fun(Elem, {StatusMap, MetricsMap, ErrorMap}) ->
case Elem of case Elem of
{ok, {NodeId, Status, Metrics}} -> {ok, {NodeId, Status, Metrics}} ->
{ {
maps:put(NodeId, Status, StatusMap), maps:put(NodeId, Status, StatusMap),
maps:put(NodeId, Metrics, MetricsMap), maps:put(NodeId, Metrics, MetricsMap),
ErrorMap ErrorMap
}; };
{error, {NodeId, Reason}} -> {error, {NodeId, Reason}} ->
{StatusMap, MetricsMap, maps:put(NodeId, Reason, ErrorMap)} {StatusMap, MetricsMap, maps:put(NodeId, Reason, ErrorMap)}
end end
end, end,
lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList). lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
restructure_map(#{ restructure_map(#{
counters := #{failed := Failed, matched := Match, success := Succ}, counters := #{failed := Failed, matched := Match, success := Succ, ignore := Ignore},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}} rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) -> }) ->
#{ #{
matched => Match, matched => Match,
success => Succ, success => Succ,
failed => Failed, failed => Failed,
ignore => Ignore,
rate => Rate, rate => Rate,
rate_last5m => Rate5m, rate_last5m => Rate5m,
rate_max => RateMax rate_max => RateMax
@ -1083,6 +1080,7 @@ update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) ->
delete_authenticator(ConfKeyPath, ChainName, AuthenticatorID) -> delete_authenticator(ConfKeyPath, ChainName, AuthenticatorID) ->
case update_config(ConfKeyPath, {delete_authenticator, ChainName, AuthenticatorID}) of case update_config(ConfKeyPath, {delete_authenticator, ChainName, AuthenticatorID}) of
{ok, _} -> {ok, _} ->
emqx_plugin_libs_metrics:clear_metrics(authn_metrics, AuthenticatorID),
{204}; {204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} -> {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
serialize_error(Reason); serialize_error(Reason);
@ -1439,10 +1437,12 @@ status_metrics_example() ->
matched => 0, matched => 0,
success => 0, success => 0,
failed => 0, failed => 0,
ignore => 0,
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.0 rate_max => 0.0
}, },
node_error => [],
node_metrics => [ node_metrics => [
#{ #{
node => node(), node => node(),
@ -1450,6 +1450,7 @@ status_metrics_example() ->
matched => 0, matched => 0,
success => 0, success => 0,
failed => 0, failed => 0,
ignore => 0,
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.0 rate_max => 0.0

View File

@ -100,6 +100,11 @@ fields("metrics_status_fields") ->
mk( mk(
hoconsc:array(ref(?MODULE, "node_status")), hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("node_status")} #{desc => ?DESC("node_status")}
)},
{"node_error",
mk(
hoconsc:array(ref(?MODULE, "node_error")),
#{desc => ?DESC("node_error")}
)} )}
]; ];
fields("metrics") -> fields("metrics") ->
@ -107,6 +112,7 @@ fields("metrics") ->
{"matched", mk(integer(), #{desc => ?DESC("matched")})}, {"matched", mk(integer(), #{desc => ?DESC("matched")})},
{"success", mk(integer(), #{desc => ?DESC("success")})}, {"success", mk(integer(), #{desc => ?DESC("success")})},
{"failed", mk(integer(), #{desc => ?DESC("failed")})}, {"failed", mk(integer(), #{desc => ?DESC("failed")})},
{"ignore", mk(integer(), #{desc => ?DESC("failed")})},
{"rate", mk(float(), #{desc => ?DESC("rate")})}, {"rate", mk(float(), #{desc => ?DESC("rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("rate_max")})}, {"rate_max", mk(float(), #{desc => ?DESC("rate_max")})},
{"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})} {"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})}
@ -120,6 +126,12 @@ fields("node_status") ->
[ [
node_name(), node_name(),
{"status", mk(status(), #{desc => ?DESC("node_status")})} {"status", mk(status(), #{desc => ?DESC("node_status")})}
];
fields("node_error") ->
[
node_name(),
{"error", mk(string(), #{desc => ?DESC("node_error")})}
]. ].
status() -> status() ->

View File

@ -27,5 +27,6 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
ChildSpecs = [], Metrics = emqx_plugin_libs_metrics:child_spec(authn_metrics),
ChildSpecs = [Metrics],
{ok, {{one_for_one, 10, 10}, ChildSpecs}}. {ok, {{one_for_one, 10, 10}, ChildSpecs}}.

View File

@ -129,7 +129,8 @@ t_aggregate_metrics(_) ->
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.1, rate_max => 0.1,
success => 1 success => 1,
ignore => 1
} }
}, },
'emqx@node2.emqx.io' => #{ 'emqx@node2.emqx.io' => #{
@ -140,7 +141,8 @@ t_aggregate_metrics(_) ->
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.1, rate_max => 0.1,
success => 1 success => 1,
ignore => 2
} }
} }
}, },
@ -154,7 +156,8 @@ t_aggregate_metrics(_) ->
rate => 0.0, rate => 0.0,
rate_last5m => 0.0, rate_last5m => 0.0,
rate_max => 0.2, rate_max => 0.2,
success => 2 success => 2,
ignore => 3
} }
}, },
Res Res
@ -299,6 +302,37 @@ test_authenticator_users(PathPrefix) ->
emqx_authn_test_lib:built_in_database_example() emqx_authn_test_lib:built_in_database_example()
), ),
{ok, Client} = emqtt:start_link(
[ {username, <<"u_event">>}
, {clientid, <<"c_event">>}
, {proto_ver, v5}
, {properties, #{'Session-Expiry-Interval' => 60}}
]),
process_flag(trap_exit, true),
?assertMatch({error, _}, emqtt:connect(Client)),
timer:sleep(300),
UsersUri0 = uri(PathPrefix ++ [?CONF_NS, "password_based:built_in_database", "status"]),
{ok, 200, PageData0} = request(get, UsersUri0),
case PathPrefix of
[] ->
#{ <<"metrics">> := #{
<<"matched">> := 1,
<<"success">> := 0,
<<"ignore">> := 1
}
} = jiffy:decode(PageData0, [return_maps]);
["listeners",'tcp:default'] ->
#{ <<"metrics">> := #{
<<"matched">> := 3,
<<"success">> := 1,
<<"ignore">> := 2
}
} = jiffy:decode(PageData0, [return_maps])
end,
InvalidUsers = [ InvalidUsers = [
#{clientid => <<"u1">>, password => <<"p1">>}, #{clientid => <<"u1">>, password => <<"p1">>},
#{user_id => <<"u2">>}, #{user_id => <<"u2">>},
@ -325,6 +359,34 @@ test_authenticator_users(PathPrefix) ->
ValidUsers ValidUsers
), ),
{ok, Client1} = emqtt:start_link(
[ {username, <<"u1">>}
, {password, <<"p1">>}
, {clientid, <<"c_event">>}
, {proto_ver, v5}
, {properties, #{'Session-Expiry-Interval' => 60}}
]),
{ok, _} = emqtt:connect(Client1),
timer:sleep(300),
UsersUri01 = uri(PathPrefix ++ [?CONF_NS, "password_based:built_in_database", "status"]),
{ok, 200, PageData01} = request(get, UsersUri01),
case PathPrefix of
[] ->
#{ <<"metrics">> := #{
<<"matched">> := 2,
<<"success">> := 1,
<<"ignore">> := 1
}
} = jiffy:decode(PageData01, [return_maps]);
["listeners",'tcp:default'] ->
#{ <<"metrics">> := #{
<<"matched">> := 4,
<<"success">> := 2,
<<"ignore">> := 2
}
} = jiffy:decode(PageData01, [return_maps])
end,
{ok, 200, Page1Data} = request(get, UsersUri ++ "?page=1&limit=2"), {ok, 200, Page1Data} = request(get, UsersUri ++ "?page=1&limit=2"),
#{ #{