diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 7d4108908..4b54097fe 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -1,6 +1,7 @@ {emqx,1}. {emqx_bridge,1}. {emqx_authn,1}. +{emqx_authz,1}. {emqx_broker,1}. {emqx_cm,1}. {emqx_conf,1}. diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 51061e551..54d2f181c 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -54,6 +54,8 @@ -export([ get_raw_sources/0 , get_raw_source/1 + , lookup_from_local_node/1 + , lookup_from_all_nodes/1 ]). -export([ api_spec/0 @@ -226,7 +228,12 @@ source(get, #{bindings := #{type := Type}}) -> message => bin(Reason)}} end; [Source] -> - {200, read_certs(Source)} + case emqx_authz:lookup(Type) of + #{annotations := #{id := ResourceId }} -> + StatusAndMetrics = lookup_from_all_nodes(ResourceId), + {200, maps:put(status_and_metrics, StatusAndMetrics, read_certs(Source))}; + _ -> {200, maps:put(status_and_metrics, resource_not_found, read_certs(Source))} + end end; source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, @@ -269,6 +276,100 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos %% Internal functions %%-------------------------------------------------------------------- +lookup_from_local_node(ResourceId) -> + NodeId = node(self()), + case emqx_resource:get_instance(ResourceId) of + {error, not_found} -> {error, {NodeId, not_found_resource}}; + {ok, _, #{ status := Status, metrics := Metrics }} -> + {ok, {NodeId, Status, Metrics}} + end. + +lookup_from_all_nodes(ResourceId) -> + Nodes = mria_mnesia:running_nodes(), + case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, ResourceId)) of + {ok, ResList} -> + {StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList), + AggregateStatus = aggregate_status(maps:values(StatusMap)), + AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)), + Fun = fun(_, V1) -> restructure_map(V1) end, + #{node_status => StatusMap, + node_metrics => maps:map(Fun, MetricsMap), + node_error => ErrorMap, + status => AggregateStatus, + metrics => restructure_map(AggregateMetrics) + }; + {error, ErrL} -> + {error_msg('INTERNAL_ERROR', ErrL)} + end. + +aggregate_status([]) -> error_some_strange_happen; +aggregate_status(AllStatus) -> + Head = fun ([A | _]) -> A end, + HeadVal = Head(AllStatus), + AllRes = lists:all(fun (Val) -> Val == HeadVal end, AllStatus), + case AllRes of + true -> HeadVal; + false -> inconsistent + end. + +aggregate_metrics([]) -> error_some_strange_happen; +aggregate_metrics([HeadMetrics | AllMetrics]) -> + CombinerFun = + fun ComFun(Val1, Val2) -> + case erlang:is_map(Val1) of + true -> emqx_map_lib:merge_with(ComFun, Val1, Val2); + false -> Val1 + Val2 + end + end, + Fun = fun (ElemMap, AccMap) -> + emqx_map_lib:merge_with(CombinerFun, ElemMap, AccMap) end, + lists:foldl(Fun, HeadMetrics, AllMetrics). + +make_result_map(ResList) -> + Fun = + fun(Elem, {StatusMap, MetricsMap, ErrorMap}) -> + case Elem of + {ok, {NodeId, Status, Metrics}} -> + {maps:put(NodeId, Status, StatusMap), + maps:put(NodeId, Metrics, MetricsMap), + ErrorMap + }; + {error, {NodeId, Reason}} -> + {StatusMap, + MetricsMap, + maps:put(NodeId, Reason, ErrorMap) + } + end + end, + lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList). + +restructure_map(#{counters := #{failed := Failed, matched := Match, success := Succ}, + rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax} + } + } + ) -> + #{matched => Match, + success => Succ, + failed => Failed, + rate => Rate, + rate_last5m => Rate5m, + rate_max => RateMax + }; +restructure_map(Error) -> + Error. + +error_msg(Code, Msg) -> + #{code => Code, message => bin_t(io_lib:format("~p", [Msg]))}. + +bin_t(S) when is_list(S) -> + list_to_binary(S). + +is_ok(ResL) -> + case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of + [] -> {ok, [Res || {ok, Res} <- ResL]}; + ErrL -> {error, ErrL} + end. + get_raw_sources() -> RawSources = emqx:get_raw_config([authorization, sources], []), Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}}, diff --git a/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl b/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl new file mode 100644 index 000000000..66f303397 --- /dev/null +++ b/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + , lookup_from_all_nodes/2 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.0". + +-spec lookup_from_all_nodes([node()], binary()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, ResourceId) -> + erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [ResourceId], ?TIMEOUT). diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index b8da6b0d3..e3cb5254a 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -176,6 +176,34 @@ t_api(_) -> [?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]), {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1), + Snd = fun ({_, Val}) -> Val end, + LookupVal = fun LookupV(List, RestJson) -> + case List of + [Name] -> Snd(lists:keyfind(Name, 1, RestJson)); + [Name | NS] -> LookupV(NS, Snd(lists:keyfind(Name, 1, RestJson))) + end + end, + EqualFun = fun (RList) -> + fun ({M, V}) -> + ?assertEqual(V, + LookupVal([<<"status_and_metrics">>, + <<"metrics">>, M], + RList) + ) + end + end, + AssertFun = + fun (ResultJson) -> + {ok, RList} = emqx_json:safe_decode(ResultJson), + MetricsList = [{<<"failed">>, 0}, + {<<"matched">>, 0}, + {<<"rate">>, 0.0}, + {<<"rate_last5m">>, 0.0}, + {<<"rate_max">>, 0.0}, + {<<"success">>, 0}], + lists:map(EqualFun(RList), MetricsList) + end, + {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []), Sources = get_sources(Result2), ?assertMatch([ #{<<"type">> := <<"http">>} @@ -190,6 +218,10 @@ t_api(_) -> {ok, 204, _} = request(put, uri(["authorization", "sources", "http"]), ?SOURCE1#{<<"enable">> := false}), {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []), + {ok, RList3} = emqx_json:safe_decode(Result3), + ?assertEqual(<<"resource_not_found">>, + LookupVal([<<"status_and_metrics">> + ], RList3)), ?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result3)), Keyfile = emqx_common_test_helpers:app_path( @@ -211,6 +243,7 @@ t_api(_) -> <<"verify">> => <<"verify_none">> }}), {ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []), + AssertFun(Result4), ?assertMatch(#{<<"type">> := <<"mongodb">>, <<"ssl">> := #{<<"enable">> := <<"true">>, <<"cacertfile">> := ?MATCH_CERT, @@ -233,6 +266,7 @@ t_api(_) -> <<"verify">> => <<"verify_none">> }}), {ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []), + AssertFun(Result5), ?assertMatch(#{<<"type">> := <<"mongodb">>, <<"ssl">> := #{<<"enable">> := <<"true">>, <<"cacertfile">> := ?MATCH_CERT,