diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index e23dfe05f..1ce8d3b77 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -30,6 +30,7 @@ -define(BAD_REQUEST, 'BAD_REQUEST'). -define(NOT_FOUND, 'NOT_FOUND'). -define(CONFLICT, 'CONFLICT'). +-define(TIMEOUT, 15000). % Swagger @@ -59,6 +60,8 @@ , authenticator_user/2 , listener_authenticator_users/2 , listener_authenticator_user/2 + , lookup_from_local_node/2 + , lookup_from_all_node/2 ]). -export([ authenticator_examples/0 @@ -759,6 +762,140 @@ list_authenticator(ConfKeyPath, AuthenticatorID) -> serialize_error(Reason) end. +%% example: +%% +%% emqx_authn_api:lookup_from_local_node('mqtt:global', <<"password-based:http">>) +%% +%% {ok,{'emqx@127.0.0.1',connecting, +%% #{counters => +%% #{exception => 0,failed => 0,matched => 0,success => 0}, +%% rate => +%% #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}}} +lookup_from_local_node(ChainName, AuthenticatorID) -> + NodeId = node(self()), + case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of + {ok, #{provider := Provider, state := State}} -> + case lists:member(Provider, resource_provider()) of + false -> {error, {NodeId, resource_unsupport_metrics_and_status}}; + true -> + #{resource_id := ResourceId} = State, + case emqx_resource:get_instance(ResourceId) of + {error, not_found} -> {error, {NodeId, not_found_resource}}; + {ok, _, #{ status := Status, metrics := Metrics }} -> + {ok, {NodeId, Status, Metrics}} + end + end; + {error, Reason} -> {error, {NodeId, Reason}} + end. + +resource_provider() -> + [ emqx_authn_mysql, + emqx_authn_pgsql, + emqx_authn_mongodb, + emqx_authn_redis, + emqx_authn_http + ]. + +%% example: +%% +%% emqx_authn_api:lookup_from_all_node('mqtt:global', <<"password-based:http">>) +%% +%% #{aggregate_metrics => +%% #{counters => +%% #{exception => 0,failed => 0,matched => 0,success => 0}, +%% rate => +%% #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}, +%% aggregate_status => connecting,all_node_error => #{}, +%% all_node_metrics => +%% #{'emqx@127.0.0.1' => +%% #{counters => +%% #{exception => 0,failed => 0,matched => 0,success => 0}, +%% rate => +%% #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}}, +%% all_node_state => #{'emqx@127.0.0.1' => connecting}} +lookup_from_all_node(ChainName, AuthenticatorID) -> + Nodes = mria_mnesia:running_nodes(), + case is_ok(erpc:multicall(Nodes, + emqx_authn_api, + lookup_from_local_node, + [ChainName, AuthenticatorID], + ?TIMEOUT)) of + {ok, ResList} -> + {StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList), + AggregateStatus = aggregate_status(maps:values(StatusMap)), + AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)), + #{all_node_state => StatusMap, + all_node_metrics => MetricsMap, + all_node_error => ErrorMap, + aggregate_status => AggregateStatus, + aggregate_metrics => AggregateMetrics + }; + {error, ErrL} -> + {error_msg('INTERNAL_ERROR', ErrL)} + end. + +aggregate_status([]) -> error_some_strange_happen; +aggregate_status(AllStatus) -> + Head = fun ([A | _]) -> A end, + HeadVal = Head(AllStatus), + AllRes = lists:all(fun (Val) -> Val == HeadVal end, AllStatus), + case AllRes of + true -> HeadVal; + false -> inconsistent + end. + +aggregate_metrics([]) -> error_some_strange_happen; +aggregate_metrics([HeadMetrics | AllMetrics]) -> + CombinerFun = + %% use fixpoint reference self + fun (FixVal) -> + fun (_, Val1, Val2) -> + case erlang:is_map(Val1) of + true -> maps:merge_with(FixVal(FixVal), Val1, Val2); + false -> Val1 + Val2 + end + end + end, + Fun = fun (ElemMap, AccMap) -> maps:merge_with(CombinerFun(CombinerFun), ElemMap, AccMap) end, + lists:foldl(Fun, HeadMetrics, AllMetrics). + +make_result_map(ResList) -> + Fun = + fun(Elem, {StatusMap, MetricsMap, ErrorMap}) -> + case Elem of + {ok, {NodeId, Status, Metrics}} -> + {maps:put(NodeId, Status, StatusMap), + maps:put(NodeId, Metrics, MetricsMap), + ErrorMap + }; + {error, {NodeId, Reason}} -> + {StatusMap, + MetricsMap, + maps:put(NodeId, Reason, ErrorMap) + } + end + end, + lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList). + + + + + + +error_msg(Code, Msg) when is_binary(Msg) -> + #{code => Code, message => Msg}; +error_msg(Code, Msg) -> + #{code => Code, message => bin(io_lib:format("~p", [Msg]))}. + +bin(S) when is_list(S) -> + list_to_binary(S). + +is_ok(ResL) -> + case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of + [] -> {ok, [Res || {ok, Res} <- ResL]}; + ErrL -> {error, ErrL} + end. + update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) -> case update_config(ConfKeyPath, {update_authenticator, ChainName, AuthenticatorID, Config}) of {ok, #{post_config_update := #{emqx_authentication := #{id := ID}},