feat(emqx_authn_api): add metrics and status to authn

This commit is contained in:
EMQ-YangM 2022-03-10 18:28:57 +08:00
parent 34c3710d99
commit a03f324010
1 changed files with 137 additions and 0 deletions

View File

@ -30,6 +30,7 @@
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(CONFLICT, 'CONFLICT').
-define(TIMEOUT, 15000).
% Swagger
@ -59,6 +60,8 @@
, authenticator_user/2
, listener_authenticator_users/2
, listener_authenticator_user/2
, lookup_from_local_node/2
, lookup_from_all_node/2
]).
-export([ authenticator_examples/0
@ -759,6 +762,140 @@ list_authenticator(ConfKeyPath, AuthenticatorID) ->
serialize_error(Reason)
end.
%% example:
%%
%% emqx_authn_api:lookup_from_local_node('mqtt:global', <<"password-based:http">>)
%%
%% {ok,{'emqx@127.0.0.1',connecting,
%% #{counters =>
%% #{exception => 0,failed => 0,matched => 0,success => 0},
%% rate =>
%% #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}}}
lookup_from_local_node(ChainName, AuthenticatorID) ->
NodeId = node(self()),
case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
{ok, #{provider := Provider, state := State}} ->
case lists:member(Provider, resource_provider()) of
false -> {error, {NodeId, resource_unsupport_metrics_and_status}};
true ->
#{resource_id := ResourceId} = State,
case emqx_resource:get_instance(ResourceId) of
{error, not_found} -> {error, {NodeId, not_found_resource}};
{ok, _, #{ status := Status, metrics := Metrics }} ->
{ok, {NodeId, Status, Metrics}}
end
end;
{error, Reason} -> {error, {NodeId, Reason}}
end.
resource_provider() ->
[ emqx_authn_mysql,
emqx_authn_pgsql,
emqx_authn_mongodb,
emqx_authn_redis,
emqx_authn_http
].
%% example:
%%
%% emqx_authn_api:lookup_from_all_node('mqtt:global', <<"password-based:http">>)
%%
%% #{aggregate_metrics =>
%% #{counters =>
%% #{exception => 0,failed => 0,matched => 0,success => 0},
%% rate =>
%% #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}},
%% aggregate_status => connecting,all_node_error => #{},
%% all_node_metrics =>
%% #{'emqx@127.0.0.1' =>
%% #{counters =>
%% #{exception => 0,failed => 0,matched => 0,success => 0},
%% rate =>
%% #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}},
%% all_node_state => #{'emqx@127.0.0.1' => connecting}}
lookup_from_all_node(ChainName, AuthenticatorID) ->
Nodes = mria_mnesia:running_nodes(),
case is_ok(erpc:multicall(Nodes,
emqx_authn_api,
lookup_from_local_node,
[ChainName, AuthenticatorID],
?TIMEOUT)) of
{ok, ResList} ->
{StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)),
AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
#{all_node_state => StatusMap,
all_node_metrics => MetricsMap,
all_node_error => ErrorMap,
aggregate_status => AggregateStatus,
aggregate_metrics => AggregateMetrics
};
{error, ErrL} ->
{error_msg('INTERNAL_ERROR', ErrL)}
end.
aggregate_status([]) -> error_some_strange_happen;
aggregate_status(AllStatus) ->
Head = fun ([A | _]) -> A end,
HeadVal = Head(AllStatus),
AllRes = lists:all(fun (Val) -> Val == HeadVal end, AllStatus),
case AllRes of
true -> HeadVal;
false -> inconsistent
end.
aggregate_metrics([]) -> error_some_strange_happen;
aggregate_metrics([HeadMetrics | AllMetrics]) ->
CombinerFun =
%% use fixpoint reference self
fun (FixVal) ->
fun (_, Val1, Val2) ->
case erlang:is_map(Val1) of
true -> maps:merge_with(FixVal(FixVal), Val1, Val2);
false -> Val1 + Val2
end
end
end,
Fun = fun (ElemMap, AccMap) -> maps:merge_with(CombinerFun(CombinerFun), ElemMap, AccMap) end,
lists:foldl(Fun, HeadMetrics, AllMetrics).
make_result_map(ResList) ->
Fun =
fun(Elem, {StatusMap, MetricsMap, ErrorMap}) ->
case Elem of
{ok, {NodeId, Status, Metrics}} ->
{maps:put(NodeId, Status, StatusMap),
maps:put(NodeId, Metrics, MetricsMap),
ErrorMap
};
{error, {NodeId, Reason}} ->
{StatusMap,
MetricsMap,
maps:put(NodeId, Reason, ErrorMap)
}
end
end,
lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
bin(S) when is_list(S) ->
list_to_binary(S).
is_ok(ResL) ->
case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
[] -> {ok, [Res || {ok, Res} <- ResL]};
ErrL -> {error, ErrL}
end.
update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) ->
case update_config(ConfKeyPath, {update_authenticator, ChainName, AuthenticatorID, Config}) of
{ok, #{post_config_update := #{emqx_authentication := #{id := ID}},