From a6920ac11bcb82cd8afe4ecf64f1e23a298f6485 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Thu, 28 Apr 2022 17:19:41 +0800 Subject: [PATCH] feat: new authz metrics --- .../i18n/emqx_authn_schema_i18n.conf | 33 +++++ apps/emqx_authn/src/emqx_authn_schema.erl | 65 ++++++--- apps/emqx_authz/src/emqx_authz.erl | 30 ++++- .../emqx_authz/src/emqx_authz_api_sources.erl | 101 +++++++------- .../src/proto/emqx_authz_proto_v1.erl | 6 +- .../test/emqx_authz_api_sources_SUITE.erl | 124 +++++++++++++----- 6 files changed, 253 insertions(+), 106 deletions(-) diff --git a/apps/emqx_authn/i18n/emqx_authn_schema_i18n.conf b/apps/emqx_authn/i18n/emqx_authn_schema_i18n.conf index 371939daa..2dc4999f2 100644 --- a/apps/emqx_authn/i18n/emqx_authn_schema_i18n.conf +++ b/apps/emqx_authn/i18n/emqx_authn_schema_i18n.conf @@ -163,4 +163,37 @@ emqx_authn_schema { zh: """节点名称。""" } } + + ignore { + desc { + en: """Not match any rules.""" + zh: """没有匹配到任何规则。""" + } + label: { + en: """Not Match Any Rules""" + zh: """没有匹配到任何规则。""" + } + } + + allow { + desc { + en: """Authorize allow.""" + zh: """鉴权成功。""" + } + label: { + en: """Authorize Allow""" + zh: """鉴权成功""" + } + } + + deny { + desc { + en: """Authorize Deny.""" + zh: """鉴权失败""" + } + label: { + en: """Authorize Deny""" + zh: """鉴权失败""" + } + } } diff --git a/apps/emqx_authn/src/emqx_authn_schema.erl b/apps/emqx_authn/src/emqx_authn_schema.erl index ffcd2eeff..99f81adb6 100644 --- a/apps/emqx_authn/src/emqx_authn_schema.erl +++ b/apps/emqx_authn/src/emqx_authn_schema.erl @@ -89,41 +89,45 @@ backend(Name) -> fields("metrics_status_fields") -> [ - {"resource_metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})}, - {"node_resource_metrics", - mk( - hoconsc:array(ref(?MODULE, "node_resource_metrics")), - #{desc => ?DESC("node_metrics")} - )}, {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})}, {"node_metrics", mk( hoconsc:array(ref(?MODULE, "node_metrics")), #{desc => ?DESC("node_metrics")} - )}, - {"status", mk(cluster_status(), #{desc => ?DESC("status")})}, - {"node_status", - mk( - hoconsc:array(ref(?MODULE, "node_status")), - #{desc => ?DESC("node_status")} - )}, - {"node_error", - mk( - hoconsc:array(ref(?MODULE, "node_error")), - #{desc => ?DESC("node_error")} )} - ]; + ] ++ common_metrics_field(); +fields("metrics_status_fields_authz") -> + [ + {"metrics", mk(ref(?MODULE, "metrics_authz"), #{desc => ?DESC("metrics")})}, + {"node_metrics", + mk( + hoconsc:array(ref(?MODULE, "node_metrics_authz")), + #{desc => ?DESC("node_metrics")} + )} + ] ++ common_metrics_field(); fields("metrics") -> [ {"ignore", mk(integer(), #{desc => ?DESC("failed")})} ] ++ common_field(); -fields("resource_metrics") -> - common_field(); +fields("metrics_authz") -> + [ + {"matched", mk(integer(), #{desc => ?DESC("matched")})}, + {"allow", mk(integer(), #{desc => ?DESC("allow")})}, + {"deny", mk(integer(), #{desc => ?DESC("deny")})}, + {"ignore", mk(float(), #{desc => ?DESC("ignore")})} + ]; fields("node_metrics") -> [ node_name(), {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})} ]; +fields("node_metrics_authz") -> + [ + node_name(), + {"metrics", mk(ref(?MODULE, "metrics_authz"), #{desc => ?DESC("metrics")})} + ]; +fields("resource_metrics") -> + common_field(); fields("node_resource_metrics") -> [ node_name(), @@ -150,6 +154,27 @@ common_field() -> {"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})} ]. +common_metrics_field() -> + [ + {"resource_metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})}, + {"node_resource_metrics", + mk( + hoconsc:array(ref(?MODULE, "node_resource_metrics")), + #{desc => ?DESC("node_metrics")} + )}, + {"status", mk(cluster_status(), #{desc => ?DESC("status")})}, + {"node_status", + mk( + hoconsc:array(ref(?MODULE, "node_status")), + #{desc => ?DESC("node_status")} + )}, + {"node_error", + mk( + hoconsc:array(ref(?MODULE, "node_error")), + #{desc => ?DESC("node_error")} + )} + ]. + status() -> hoconsc:enum([connected, disconnected, connecting]). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index e394f46f8..e8c1daf06 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -171,6 +171,14 @@ do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> do_move(Cmd, InitedSources); do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) -> InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)), + %% create metrics + TypeName = type(RawNewSource), + ok = emqx_plugin_libs_metrics:create_metrics( + authz_metrics, + TypeName, + [matched, allow, deny, ignore], + [matched] + ), [InitedNewSource] ++ lookup(); do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) -> InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)), @@ -185,6 +193,8 @@ do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> OldInitedSources = lookup(), {OldSource, Front, Rear} = take(Type, OldInitedSources), + %% delete metrics + ok = emqx_plugin_libs_metrics:clear_metrics(authz_metrics, Type), ok = ensure_resource_deleted(OldSource), clear_certs(OldSource), Front ++ Rear; @@ -247,6 +257,7 @@ init_sources(Sources) -> true -> ?SLOG(info, #{msg => "disabled_sources_ignored", sources => Disabled}); false -> ok end, + lists:map(fun init_metrics/1, Sources), lists:map(fun init_source/1, Sources). init_source(#{enable := false} = Source) -> @@ -255,6 +266,15 @@ init_source(#{type := Type} = Source) -> Module = authz_module(Type), Module:init(Source). +init_metrics(Source) -> + TypeName = type(Source), + ok = emqx_plugin_libs_metrics:create_metrics( + authz_metrics, + TypeName, + [matched, allow, deny, ignore], + [matched] + ). + %%-------------------------------------------------------------------- %% AuthZ callbacks %%-------------------------------------------------------------------- @@ -290,6 +310,7 @@ authorize( ipaddr => IpAddress, topic => Topic }), + emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, allow), emqx_metrics:inc(?METRIC_ALLOW), {stop, allow}; {{matched, deny}, AuthzSource} -> @@ -303,6 +324,7 @@ authorize( ipaddr => IpAddress, topic => Topic }), + emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, deny), emqx_metrics:inc(?METRIC_DENY), {stop, deny}; nomatch -> @@ -332,9 +354,13 @@ do_authorize( [Connector = #{type := Type} | Tail] ) -> Module = authz_module(Type), + emqx_plugin_libs_metrics:inc(authz_metrics, Type, matched), case Module:authorize(Client, PubSub, Topic, Connector) of - nomatch -> do_authorize(Client, PubSub, Topic, Tail); - Matched -> {Matched, Type} + nomatch -> + emqx_plugin_libs_metrics:inc(authz_metrics, Type, ignore), + do_authorize(Client, PubSub, Topic, Tail); + Matched -> + {Matched, Type} end. get_enabled_authzs() -> diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 22bfbac42..3c7e8e4a1 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -143,7 +143,7 @@ schema("/authorization/sources/:type/status") -> responses => #{ 200 => emqx_dashboard_swagger:schema_with_examples( - hoconsc:ref(emqx_authn_schema, "metrics_status_fields"), + hoconsc:ref(emqx_authn_schema, "metrics_status_fields_authz"), status_metrics_example() ), 400 => emqx_dashboard_swagger:error_codes( @@ -258,24 +258,7 @@ source(delete, #{bindings := #{type := Type}}) -> update_config({?CMD_DELETE, Type}, #{}). source_status(get, #{bindings := #{type := Type}}) -> - BinType = atom_to_binary(Type, utf8), - case get_raw_source(BinType) of - [] -> - {404, #{ - code => <<"NOT_FOUND">>, - message => <<"Not found", BinType/binary>> - }}; - [#{<<"type">> := <<"file">>}] -> - {400, #{ - code => <<"BAD_REQUEST">>, - message => <<"Not Support Status">> - }}; - [_] -> - case emqx_authz:lookup(Type) of - #{annotations := #{id := ResourceId}} -> lookup_from_all_nodes(ResourceId); - _ -> {400, #{code => <<"BAD_REQUEST">>, message => <<"Resource Disable">>}} - end - end. + lookup_from_all_nodes(Type). move_source(Method, #{bindings := #{type := Type} = Bindings} = Req) when is_atom(Type) @@ -321,39 +304,51 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos %% Internal functions %%-------------------------------------------------------------------- -lookup_from_local_node(ResourceId) -> +lookup_from_local_node(Type) -> NodeId = node(self()), - case emqx_resource:get_instance(ResourceId) of - {error, not_found} -> {error, {NodeId, not_found_resource}}; - {ok, _, #{status := Status, metrics := Metrics}} -> {ok, {NodeId, Status, Metrics}} + try emqx_authz:lookup(Type) of + #{annotations := #{id := ResourceId}} -> + Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type), + case emqx_resource:get_instance(ResourceId) of + {error, not_found} -> + {error, {NodeId, not_found_resource}}; + {ok, _, #{status := Status, metrics := ResourceMetrics}} -> + {ok, {NodeId, Status, Metrics, ResourceMetrics}} + end; + _ -> + Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type), + {ok, {NodeId, connected, Metrics, #{}}} + catch + Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}} end. -lookup_from_all_nodes(ResourceId) -> +lookup_from_all_nodes(Type) -> Nodes = mria_mnesia:running_nodes(), - case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, ResourceId)) of + case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, Type)) of {ok, ResList} -> - {StatusMap, MetricsMap, _} = make_result_map(ResList), + {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap} = make_result_map(ResList), AggregateStatus = aggregate_status(maps:values(StatusMap)), AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)), + AggregateResourceMetrics = aggregate_metrics(maps:values(ResourceMetricsMap)), Fun = fun(_, V1) -> restructure_map(V1) end, MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end, HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end, - case AggregateStatus of - empty_metrics_and_status -> - {400, #{ - code => <<"BAD_REQUEST">>, - message => <<"Resource Not Support Status">> - }}; - _ -> - {200, #{ - node_status => HelpFun(StatusMap, status), - node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics), - status => AggregateStatus, - metrics => restructure_map(AggregateMetrics) - }} - end; + {200, #{ + node_resource_metrics => HelpFun(maps:map(Fun, ResourceMetricsMap), metrics), + resource_metrics => + case maps:size(AggregateResourceMetrics) of + 0 -> #{}; + _ -> restructure_map(AggregateResourceMetrics) + end, + node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics), + metrics => restructure_map(AggregateMetrics), + + node_status => HelpFun(StatusMap, status), + status => AggregateStatus, + node_error => HelpFun(maps:map(Fun, ErrorMap), reason) + }}; {error, ErrL} -> - {500, #{ + {400, #{ code => <<"INTERNAL_ERROR">>, message => bin_t(io_lib:format("~p", [ErrL])) }} @@ -371,7 +366,7 @@ aggregate_status(AllStatus) -> end. aggregate_metrics([]) -> - empty_metrics_and_status; + #{}; aggregate_metrics([HeadMetrics | AllMetrics]) -> CombinerFun = fun ComFun(_Key, Val1, Val2) -> @@ -387,20 +382,34 @@ aggregate_metrics([HeadMetrics | AllMetrics]) -> make_result_map(ResList) -> Fun = - fun(Elem, {StatusMap, MetricsMap, ErrorMap}) -> + fun(Elem, {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap}) -> case Elem of - {ok, {NodeId, Status, Metrics}} -> + {ok, {NodeId, Status, Metrics, ResourceMetrics}} -> { maps:put(NodeId, Status, StatusMap), maps:put(NodeId, Metrics, MetricsMap), + maps:put(NodeId, ResourceMetrics, ResourceMetricsMap), ErrorMap }; {error, {NodeId, Reason}} -> - {StatusMap, MetricsMap, maps:put(NodeId, Reason, ErrorMap)} + {StatusMap, MetricsMap, ResourceMetricsMap, maps:put(NodeId, Reason, ErrorMap)} end end, - lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList). + lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList). +restructure_map(#{ + counters := #{deny := Failed, matched := Match, allow := Succ, ignore := Ignore}, + rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}} +}) -> + #{ + matched => Match, + allow => Succ, + deny => Failed, + ignore => Ignore, + rate => Rate, + rate_last5m => Rate5m, + rate_max => RateMax + }; restructure_map(#{ counters := #{failed := Failed, matched := Match, success := Succ}, rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}} diff --git a/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl b/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl index d3c2dc6f4..24e820c95 100644 --- a/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl +++ b/apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl @@ -30,7 +30,7 @@ introduced_in() -> "5.0.0". --spec lookup_from_all_nodes([node()], binary()) -> +-spec lookup_from_all_nodes([node()], atom()) -> emqx_rpc:erpc_multicall(). -lookup_from_all_nodes(Nodes, ResourceId) -> - erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [ResourceId], ?TIMEOUT). +lookup_from_all_nodes(Nodes, Type) -> + erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [Type], ?TIMEOUT). diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index 308e67d53..3f1b206be 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -188,38 +188,6 @@ t_api(_) -> ], {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1), - Snd = fun({_, Val}) -> Val end, - LookupVal = fun LookupV(List, RestJson) -> - case List of - [Name] -> Snd(lists:keyfind(Name, 1, RestJson)); - [Name | NS] -> LookupV(NS, Snd(lists:keyfind(Name, 1, RestJson))) - end - end, - EqualFun = fun(RList) -> - fun({M, V}) -> - ?assertEqual( - V, - LookupVal( - [<<"metrics">>, M], - RList - ) - ) - end - end, - AssertFun = - fun(ResultJson) -> - {ok, RList} = emqx_json:safe_decode(ResultJson), - MetricsList = [ - {<<"failed">>, 0}, - {<<"matched">>, 0}, - {<<"rate">>, 0.0}, - {<<"rate_last5m">>, 0.0}, - {<<"rate_max">>, 0.0}, - {<<"success">>, 0} - ], - lists:map(EqualFun(RList), MetricsList) - end, - {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []), Sources = get_sources(Result2), ?assertMatch( @@ -271,7 +239,14 @@ t_api(_) -> ), {ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []), {ok, 200, Status4} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []), - AssertFun(Status4), + #{ + <<"metrics">> := #{ + <<"allow">> := 0, + <<"deny">> := 0, + <<"matched">> := 0, + <<"ignore">> := 0 + } + } = jiffy:decode(Status4, [return_maps]), ?assertMatch( #{ <<"type">> := <<"mongodb">>, @@ -304,8 +279,6 @@ t_api(_) -> } ), {ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []), - {ok, 200, Status5} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []), - AssertFun(Status5), ?assertMatch( #{ <<"type">> := <<"mongodb">>, @@ -320,6 +293,16 @@ t_api(_) -> jsx:decode(Result5) ), + {ok, 200, Status5_1} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []), + #{ + <<"metrics">> := #{ + <<"allow">> := 0, + <<"deny">> := 0, + <<"matched">> := 0, + <<"ignore">> := 0 + } + } = jiffy:decode(Status5_1, [return_maps]), + #{ ssl := #{ cacertfile := SavedCacertfile, @@ -367,6 +350,77 @@ t_api(_) -> {ok, 200, Result6} = request(get, uri(["authorization", "sources"]), []), ?assertEqual([], get_sources(Result6)), ?assertEqual([], emqx:get_config([authorization, sources])), + + {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6), + + {ok, Client} = emqtt:start_link( + [ + {username, <<"u_event3">>}, + {clientid, <<"c_event3">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 60}} + ] + ), + emqtt:connect(Client), + timer:sleep(50), + + emqtt:publish( + Client, + <<"t1">>, + #{'Message-Expiry-Interval' => 60}, + <<"{\"id\": 1, \"name\": \"ha\"}">>, + [{qos, 1}] + ), + + {ok, 200, Status5} = request(get, uri(["authorization", "sources", "file", "status"]), []), + #{ + <<"metrics">> := #{ + <<"allow">> := 1, + <<"deny">> := 0, + <<"matched">> := 1, + <<"ignore">> := 0 + } + } = jiffy:decode(Status5, [return_maps]), + + timer:sleep(50), + emqtt:publish( + Client, + <<"t2">>, + #{'Message-Expiry-Interval' => 60}, + <<"{\"id\": 1, \"name\": \"ha\"}">>, + [{qos, 1}] + ), + + {ok, 200, Status6} = request(get, uri(["authorization", "sources", "file", "status"]), []), + #{ + <<"metrics">> := #{ + <<"allow">> := 2, + <<"deny">> := 0, + <<"matched">> := 2, + <<"ignore">> := 0 + } + } = jiffy:decode(Status6, [return_maps]), + + timer:sleep(50), + emqtt:publish( + Client, + <<"t3">>, + #{'Message-Expiry-Interval' => 60}, + <<"{\"id\": 1, \"name\": \"ha\"}">>, + [{qos, 1}] + ), + + timer:sleep(50), + {ok, 200, Status7} = request(get, uri(["authorization", "sources", "file", "status"]), []), + #{ + <<"metrics">> := #{ + <<"allow">> := 3, + <<"deny">> := 0, + <<"matched">> := 3, + <<"ignore">> := 0 + } + } = jiffy:decode(Status7, [return_maps]), + ok. t_move_source(_) ->