Merge pull request #7823 from EMQ-YangM/authz_add_metrics

feat: new authz metrics
This commit is contained in:
Yang Miao 2022-04-29 11:27:59 +08:00 committed by GitHub
commit 7061d94cf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 448 additions and 90 deletions

View File

@ -393,4 +393,171 @@ Filter supports the following placeholders:
zh: """查询语句"""
}
}
#==== metrics field
metrics {
desc {
en: """The metrics of the resource."""
zh: """资源统计指标。"""
}
label: {
en: """Metrics"""
zh: """指标"""
}
}
node_metrics {
desc {
en: """The metrics of the resource for each node."""
zh: """每个节点上资源的统计指标。"""
}
label: {
en: """Resource Metrics in Node"""
zh: """节点资源指标"""
}
}
status {
desc {
en: """The status of the resource."""
zh: """资源状态。"""
}
label: {
en: """Status"""
zh: """状态"""
}
}
node_status {
desc {
en: """The status of the resource for each node."""
zh: """每个节点上资源的状态。"""
}
label: {
en: """Resource Status in Node"""
zh: """节点资源状态"""
}
}
node_error {
desc {
en: """The error of node."""
zh: """节点上产生的错误。"""
}
label: {
en: """Error in Node"""
zh: """节点产生的错误"""
}
}
matched {
desc {
en: """Count of this resource is queried."""
zh: """请求命中次数。"""
}
label: {
en: """Matched"""
zh: """已命中"""
}
}
success {
desc {
en: """Count of query success."""
zh: """请求成功次数。"""
}
label: {
en: """Success"""
zh: """成功"""
}
}
failed {
desc {
en: """Count of query failed."""
zh: """请求失败次数。"""
}
label: {
en: """Failed"""
zh: """失败"""
}
}
rate {
desc {
en: """The rate of matched, times/second."""
zh: """命中速率,单位:次/秒。"""
}
label: {
en: """Rate"""
zh: """速率"""
}
}
rate_max {
desc {
en: """The max rate of matched, times/second."""
zh: """最大命中速率,单位:次/秒。"""
}
label: {
en: """Max Rate"""
zh: """最大速率"""
}
}
rate_last5m {
desc {
en: """The average rate of matched in the last 5 minutes, times/second."""
zh: """5分钟内平均命中速率单位次/秒。"""
}
label: {
en: """Rate in Last 5min"""
zh: """5分钟内速率"""
}
}
node {
desc {
en: """Node name."""
zh: """节点名称。"""
}
label: {
en: """Node Name."""
zh: """节点名称。"""
}
}
ignore {
desc {
en: """Not match any rules."""
zh: """没有匹配到任何规则。"""
}
label: {
en: """Not Match Any Rules"""
zh: """没有匹配到任何规则。"""
}
}
allow {
desc {
en: """Authorize allow."""
zh: """鉴权成功。"""
}
label: {
en: """Authorize Allow"""
zh: """鉴权成功"""
}
}
deny {
desc {
en: """Authorize Deny."""
zh: """鉴权失败"""
}
label: {
en: """Authorize Deny"""
zh: """鉴权失败"""
}
}
}

View File

@ -171,6 +171,14 @@ do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
do_move(Cmd, InitedSources);
do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)),
%% create metrics
TypeName = type(RawNewSource),
ok = emqx_plugin_libs_metrics:create_metrics(
authz_metrics,
TypeName,
[matched, allow, deny, ignore],
[matched]
),
[InitedNewSource] ++ lookup();
do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) ->
InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)),
@ -185,6 +193,8 @@ do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
%% delete metrics
ok = emqx_plugin_libs_metrics:clear_metrics(authz_metrics, Type),
ok = ensure_resource_deleted(OldSource),
clear_certs(OldSource),
Front ++ Rear;
@ -247,6 +257,7 @@ init_sources(Sources) ->
true -> ?SLOG(info, #{msg => "disabled_sources_ignored", sources => Disabled});
false -> ok
end,
ok = lists:foreach(fun init_metrics/1, Sources),
lists:map(fun init_source/1, Sources).
init_source(#{enable := false} = Source) ->
@ -255,6 +266,15 @@ init_source(#{type := Type} = Source) ->
Module = authz_module(Type),
Module:init(Source).
init_metrics(Source) ->
TypeName = type(Source),
emqx_plugin_libs_metrics:create_metrics(
authz_metrics,
TypeName,
[matched, allow, deny, ignore],
[matched]
).
%%--------------------------------------------------------------------
%% AuthZ callbacks
%%--------------------------------------------------------------------
@ -290,6 +310,7 @@ authorize(
ipaddr => IpAddress,
topic => Topic
}),
emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, allow),
emqx_metrics:inc(?METRIC_ALLOW),
{stop, allow};
{{matched, deny}, AuthzSource} ->
@ -303,6 +324,7 @@ authorize(
ipaddr => IpAddress,
topic => Topic
}),
emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, deny),
emqx_metrics:inc(?METRIC_DENY),
{stop, deny};
nomatch ->
@ -332,9 +354,13 @@ do_authorize(
[Connector = #{type := Type} | Tail]
) ->
Module = authz_module(Type),
emqx_plugin_libs_metrics:inc(authz_metrics, Type, matched),
case Module:authorize(Client, PubSub, Topic, Connector) of
nomatch -> do_authorize(Client, PubSub, Topic, Tail);
Matched -> {Matched, Type}
nomatch ->
emqx_plugin_libs_metrics:inc(authz_metrics, Type, ignore),
do_authorize(Client, PubSub, Topic, Tail);
Matched ->
{Matched, Type}
end.
get_enabled_authzs() ->

View File

@ -143,7 +143,7 @@ schema("/authorization/sources/:type/status") ->
responses =>
#{
200 => emqx_dashboard_swagger:schema_with_examples(
hoconsc:ref(emqx_authn_schema, "metrics_status_fields"),
hoconsc:ref(emqx_authz_schema, "metrics_status_fields"),
status_metrics_example()
),
400 => emqx_dashboard_swagger:error_codes(
@ -258,24 +258,7 @@ source(delete, #{bindings := #{type := Type}}) ->
update_config({?CMD_DELETE, Type}, #{}).
source_status(get, #{bindings := #{type := Type}}) ->
BinType = atom_to_binary(Type, utf8),
case get_raw_source(BinType) of
[] ->
{404, #{
code => <<"NOT_FOUND">>,
message => <<"Not found", BinType/binary>>
}};
[#{<<"type">> := <<"file">>}] ->
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Not Support Status">>
}};
[_] ->
case emqx_authz:lookup(Type) of
#{annotations := #{id := ResourceId}} -> lookup_from_all_nodes(ResourceId);
_ -> {400, #{code => <<"BAD_REQUEST">>, message => <<"Resource Disable">>}}
end
end.
lookup_from_all_nodes(Type).
move_source(Method, #{bindings := #{type := Type} = Bindings} = Req) when
is_atom(Type)
@ -321,39 +304,51 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos
%% Internal functions
%%--------------------------------------------------------------------
lookup_from_local_node(ResourceId) ->
lookup_from_local_node(Type) ->
NodeId = node(self()),
case emqx_resource:get_instance(ResourceId) of
{error, not_found} -> {error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := Metrics}} -> {ok, {NodeId, Status, Metrics}}
try emqx_authz:lookup(Type) of
#{annotations := #{id := ResourceId}} ->
Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type),
case emqx_resource:get_instance(ResourceId) of
{error, not_found} ->
{error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := ResourceMetrics}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}}
end;
_ ->
Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type),
{ok, {NodeId, connected, Metrics, #{}}}
catch
_:Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
end.
lookup_from_all_nodes(ResourceId) ->
lookup_from_all_nodes(Type) ->
Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, ResourceId)) of
case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, Type)) of
{ok, ResList} ->
{StatusMap, MetricsMap, _} = make_result_map(ResList),
{StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)),
AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
AggregateResourceMetrics = aggregate_metrics(maps:values(ResourceMetricsMap)),
Fun = fun(_, V1) -> restructure_map(V1) end,
MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end,
HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end,
case AggregateStatus of
empty_metrics_and_status ->
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Resource Not Support Status">>
}};
_ ->
{200, #{
node_status => HelpFun(StatusMap, status),
node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
status => AggregateStatus,
metrics => restructure_map(AggregateMetrics)
}}
end;
{200, #{
node_resource_metrics => HelpFun(maps:map(Fun, ResourceMetricsMap), metrics),
resource_metrics =>
case maps:size(AggregateResourceMetrics) of
0 -> #{};
_ -> restructure_map(AggregateResourceMetrics)
end,
node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
metrics => restructure_map(AggregateMetrics),
node_status => HelpFun(StatusMap, status),
status => AggregateStatus,
node_error => HelpFun(maps:map(Fun, ErrorMap), reason)
}};
{error, ErrL} ->
{500, #{
{400, #{
code => <<"INTERNAL_ERROR">>,
message => bin_t(io_lib:format("~p", [ErrL]))
}}
@ -371,7 +366,7 @@ aggregate_status(AllStatus) ->
end.
aggregate_metrics([]) ->
empty_metrics_and_status;
#{};
aggregate_metrics([HeadMetrics | AllMetrics]) ->
CombinerFun =
fun ComFun(_Key, Val1, Val2) ->
@ -387,20 +382,34 @@ aggregate_metrics([HeadMetrics | AllMetrics]) ->
make_result_map(ResList) ->
Fun =
fun(Elem, {StatusMap, MetricsMap, ErrorMap}) ->
fun(Elem, {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap}) ->
case Elem of
{ok, {NodeId, Status, Metrics}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}} ->
{
maps:put(NodeId, Status, StatusMap),
maps:put(NodeId, Metrics, MetricsMap),
maps:put(NodeId, ResourceMetrics, ResourceMetricsMap),
ErrorMap
};
{error, {NodeId, Reason}} ->
{StatusMap, MetricsMap, maps:put(NodeId, Reason, ErrorMap)}
{StatusMap, MetricsMap, ResourceMetricsMap, maps:put(NodeId, Reason, ErrorMap)}
end
end,
lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList).
restructure_map(#{
counters := #{deny := Failed, matched := Match, allow := Succ, ignore := Ignore},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) ->
#{
matched => Match,
allow => Succ,
deny => Failed,
ignore => Ignore,
rate => Rate,
rate_last5m => Rate5m,
rate_max => RateMax
};
restructure_map(#{
counters := #{failed := Failed, matched := Match, success := Succ},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
@ -561,7 +570,7 @@ bin(Term) -> erlang:iolist_to_binary(io_lib:format("~p", [Term])).
status_metrics_example() ->
#{
metrics => #{
resource_metrics => #{
matched => 0,
success => 0,
failed => 0,
@ -569,7 +578,7 @@ status_metrics_example() ->
rate_last5m => 0.0,
rate_max => 0.0
},
node_metrics => [
node_resource_metrics => [
#{
node => node(),
metrics => #{
@ -582,6 +591,30 @@ status_metrics_example() ->
}
}
],
metrics => #{
matched => 0,
allow => 0,
deny => 0,
ignore => 0,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.0
},
node_metrics => [
#{
node => node(),
metrics => #{
matched => 0,
allow => 0,
deny => 0,
ignore => 0,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.0
}
}
],
status => connected,
node_status => [
#{

View File

@ -19,6 +19,7 @@
-include("emqx_authz.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-import(hoconsc, [mk/2, ref/2]).
-reflect_type([
permission/0,
@ -134,7 +135,84 @@ fields(jwt) ->
default => <<"acl">>,
desc => ?DESC(acl_claim_name)
}}
].
];
fields("metrics_status_fields") ->
[
{"resource_metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})},
{"node_resource_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_resource_metrics")),
#{desc => ?DESC("node_metrics")}
)},
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})},
{"node_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_metrics")),
#{desc => ?DESC("node_metrics")}
)},
{"status", mk(cluster_status(), #{desc => ?DESC("status")})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("node_status")}
)},
{"node_error",
mk(
hoconsc:array(ref(?MODULE, "node_error")),
#{desc => ?DESC("node_error")}
)}
];
fields("metrics") ->
[
{"matched", mk(integer(), #{desc => ?DESC("matched")})},
{"allow", mk(integer(), #{desc => ?DESC("allow")})},
{"deny", mk(integer(), #{desc => ?DESC("deny")})},
{"ignore", mk(float(), #{desc => ?DESC("ignore")})},
{"rate", mk(float(), #{desc => ?DESC("rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("rate_max")})},
{"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})}
];
fields("node_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})}
];
fields("resource_metrics") ->
common_field();
fields("node_resource_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})}
];
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{desc => ?DESC("node_status")})}
];
fields("node_error") ->
[
node_name(),
{"error", mk(string(), #{desc => ?DESC("node_error")})}
].
common_field() ->
[
{"matched", mk(integer(), #{desc => ?DESC("matched")})},
{"success", mk(integer(), #{desc => ?DESC("success")})},
{"failed", mk(integer(), #{desc => ?DESC("failed")})},
{"rate", mk(float(), #{desc => ?DESC("rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("rate_max")})},
{"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})}
].
status() ->
hoconsc:enum([connected, disconnected, connecting]).
cluster_status() ->
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("node"), example => "emqx@127.0.0.1"})}.
desc(?CONF_NS) ->
?DESC(?CONF_NS);

View File

@ -30,7 +30,7 @@
introduced_in() ->
"5.0.0".
-spec lookup_from_all_nodes([node()], binary()) ->
-spec lookup_from_all_nodes([node()], atom()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, ResourceId) ->
erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [ResourceId], ?TIMEOUT).
lookup_from_all_nodes(Nodes, Type) ->
erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [Type], ?TIMEOUT).

View File

@ -188,38 +188,6 @@ t_api(_) ->
],
{ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1),
Snd = fun({_, Val}) -> Val end,
LookupVal = fun LookupV(List, RestJson) ->
case List of
[Name] -> Snd(lists:keyfind(Name, 1, RestJson));
[Name | NS] -> LookupV(NS, Snd(lists:keyfind(Name, 1, RestJson)))
end
end,
EqualFun = fun(RList) ->
fun({M, V}) ->
?assertEqual(
V,
LookupVal(
[<<"metrics">>, M],
RList
)
)
end
end,
AssertFun =
fun(ResultJson) ->
{ok, RList} = emqx_json:safe_decode(ResultJson),
MetricsList = [
{<<"failed">>, 0},
{<<"matched">>, 0},
{<<"rate">>, 0.0},
{<<"rate_last5m">>, 0.0},
{<<"rate_max">>, 0.0},
{<<"success">>, 0}
],
lists:map(EqualFun(RList), MetricsList)
end,
{ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []),
Sources = get_sources(Result2),
?assertMatch(
@ -271,7 +239,14 @@ t_api(_) ->
),
{ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []),
{ok, 200, Status4} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
AssertFun(Status4),
#{
<<"metrics">> := #{
<<"allow">> := 0,
<<"deny">> := 0,
<<"matched">> := 0,
<<"ignore">> := 0
}
} = jiffy:decode(Status4, [return_maps]),
?assertMatch(
#{
<<"type">> := <<"mongodb">>,
@ -304,8 +279,6 @@ t_api(_) ->
}
),
{ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []),
{ok, 200, Status5} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
AssertFun(Status5),
?assertMatch(
#{
<<"type">> := <<"mongodb">>,
@ -320,6 +293,16 @@ t_api(_) ->
jsx:decode(Result5)
),
{ok, 200, Status5_1} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 0,
<<"deny">> := 0,
<<"matched">> := 0,
<<"ignore">> := 0
}
} = jiffy:decode(Status5_1, [return_maps]),
#{
ssl := #{
cacertfile := SavedCacertfile,
@ -367,6 +350,77 @@ t_api(_) ->
{ok, 200, Result6} = request(get, uri(["authorization", "sources"]), []),
?assertEqual([], get_sources(Result6)),
?assertEqual([], emqx:get_config([authorization, sources])),
{ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6),
{ok, Client} = emqtt:start_link(
[
{username, <<"u_event3">>},
{clientid, <<"c_event3">>},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 60}}
]
),
emqtt:connect(Client),
timer:sleep(50),
emqtt:publish(
Client,
<<"t1">>,
#{'Message-Expiry-Interval' => 60},
<<"{\"id\": 1, \"name\": \"ha\"}">>,
[{qos, 1}]
),
{ok, 200, Status5} = request(get, uri(["authorization", "sources", "file", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 1,
<<"deny">> := 0,
<<"matched">> := 1,
<<"ignore">> := 0
}
} = jiffy:decode(Status5, [return_maps]),
timer:sleep(50),
emqtt:publish(
Client,
<<"t2">>,
#{'Message-Expiry-Interval' => 60},
<<"{\"id\": 1, \"name\": \"ha\"}">>,
[{qos, 1}]
),
{ok, 200, Status6} = request(get, uri(["authorization", "sources", "file", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 2,
<<"deny">> := 0,
<<"matched">> := 2,
<<"ignore">> := 0
}
} = jiffy:decode(Status6, [return_maps]),
timer:sleep(50),
emqtt:publish(
Client,
<<"t3">>,
#{'Message-Expiry-Interval' => 60},
<<"{\"id\": 1, \"name\": \"ha\"}">>,
[{qos, 1}]
),
timer:sleep(50),
{ok, 200, Status7} = request(get, uri(["authorization", "sources", "file", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 3,
<<"deny">> := 0,
<<"matched">> := 3,
<<"ignore">> := 0
}
} = jiffy:decode(Status7, [return_maps]),
ok.
t_move_source(_) ->

View File

@ -72,7 +72,7 @@
rate := #{atom() => rate()}
}.
-type handler_name() :: atom().
-type metric_id() :: binary().
-type metric_id() :: binary() | atom().
-define(CntrRef(Name), {?MODULE, Name}).
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).