feat: new authz metrics

This commit is contained in:
EMQ-YangM 2022-04-28 17:19:41 +08:00
parent 638b7195d4
commit a6920ac11b
6 changed files with 253 additions and 106 deletions

View File

@ -163,4 +163,37 @@ emqx_authn_schema {
zh: """节点名称。"""
}
}
ignore {
desc {
en: """Not match any rules."""
zh: """没有匹配到任何规则。"""
}
label: {
en: """Not Match Any Rules"""
zh: """没有匹配到任何规则。"""
}
}
allow {
desc {
en: """Authorize allow."""
zh: """鉴权成功。"""
}
label: {
en: """Authorize Allow"""
zh: """鉴权成功"""
}
}
deny {
desc {
en: """Authorize Deny."""
zh: """鉴权失败"""
}
label: {
en: """Authorize Deny"""
zh: """鉴权失败"""
}
}
}

View File

@ -89,41 +89,45 @@ backend(Name) ->
fields("metrics_status_fields") ->
[
{"resource_metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})},
{"node_resource_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_resource_metrics")),
#{desc => ?DESC("node_metrics")}
)},
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})},
{"node_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_metrics")),
#{desc => ?DESC("node_metrics")}
)},
{"status", mk(cluster_status(), #{desc => ?DESC("status")})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("node_status")}
)},
{"node_error",
mk(
hoconsc:array(ref(?MODULE, "node_error")),
#{desc => ?DESC("node_error")}
)}
];
] ++ common_metrics_field();
fields("metrics_status_fields_authz") ->
[
{"metrics", mk(ref(?MODULE, "metrics_authz"), #{desc => ?DESC("metrics")})},
{"node_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_metrics_authz")),
#{desc => ?DESC("node_metrics")}
)}
] ++ common_metrics_field();
fields("metrics") ->
[
{"ignore", mk(integer(), #{desc => ?DESC("failed")})}
] ++ common_field();
fields("resource_metrics") ->
common_field();
fields("metrics_authz") ->
[
{"matched", mk(integer(), #{desc => ?DESC("matched")})},
{"allow", mk(integer(), #{desc => ?DESC("allow")})},
{"deny", mk(integer(), #{desc => ?DESC("deny")})},
{"ignore", mk(float(), #{desc => ?DESC("ignore")})}
];
fields("node_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("metrics")})}
];
fields("node_metrics_authz") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "metrics_authz"), #{desc => ?DESC("metrics")})}
];
fields("resource_metrics") ->
common_field();
fields("node_resource_metrics") ->
[
node_name(),
@ -150,6 +154,27 @@ common_field() ->
{"rate_last5m", mk(float(), #{desc => ?DESC("rate_last5m")})}
].
common_metrics_field() ->
[
{"resource_metrics", mk(ref(?MODULE, "resource_metrics"), #{desc => ?DESC("metrics")})},
{"node_resource_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_resource_metrics")),
#{desc => ?DESC("node_metrics")}
)},
{"status", mk(cluster_status(), #{desc => ?DESC("status")})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("node_status")}
)},
{"node_error",
mk(
hoconsc:array(ref(?MODULE, "node_error")),
#{desc => ?DESC("node_error")}
)}
].
status() ->
hoconsc:enum([connected, disconnected, connecting]).

View File

@ -171,6 +171,14 @@ do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
do_move(Cmd, InitedSources);
do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)),
%% create metrics
TypeName = type(RawNewSource),
ok = emqx_plugin_libs_metrics:create_metrics(
authz_metrics,
TypeName,
[matched, allow, deny, ignore],
[matched]
),
[InitedNewSource] ++ lookup();
do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) ->
InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)),
@ -185,6 +193,8 @@ do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
%% delete metrics
ok = emqx_plugin_libs_metrics:clear_metrics(authz_metrics, Type),
ok = ensure_resource_deleted(OldSource),
clear_certs(OldSource),
Front ++ Rear;
@ -247,6 +257,7 @@ init_sources(Sources) ->
true -> ?SLOG(info, #{msg => "disabled_sources_ignored", sources => Disabled});
false -> ok
end,
lists:map(fun init_metrics/1, Sources),
lists:map(fun init_source/1, Sources).
init_source(#{enable := false} = Source) ->
@ -255,6 +266,15 @@ init_source(#{type := Type} = Source) ->
Module = authz_module(Type),
Module:init(Source).
init_metrics(Source) ->
TypeName = type(Source),
ok = emqx_plugin_libs_metrics:create_metrics(
authz_metrics,
TypeName,
[matched, allow, deny, ignore],
[matched]
).
%%--------------------------------------------------------------------
%% AuthZ callbacks
%%--------------------------------------------------------------------
@ -290,6 +310,7 @@ authorize(
ipaddr => IpAddress,
topic => Topic
}),
emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, allow),
emqx_metrics:inc(?METRIC_ALLOW),
{stop, allow};
{{matched, deny}, AuthzSource} ->
@ -303,6 +324,7 @@ authorize(
ipaddr => IpAddress,
topic => Topic
}),
emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, deny),
emqx_metrics:inc(?METRIC_DENY),
{stop, deny};
nomatch ->
@ -332,9 +354,13 @@ do_authorize(
[Connector = #{type := Type} | Tail]
) ->
Module = authz_module(Type),
emqx_plugin_libs_metrics:inc(authz_metrics, Type, matched),
case Module:authorize(Client, PubSub, Topic, Connector) of
nomatch -> do_authorize(Client, PubSub, Topic, Tail);
Matched -> {Matched, Type}
nomatch ->
emqx_plugin_libs_metrics:inc(authz_metrics, Type, ignore),
do_authorize(Client, PubSub, Topic, Tail);
Matched ->
{Matched, Type}
end.
get_enabled_authzs() ->

View File

@ -143,7 +143,7 @@ schema("/authorization/sources/:type/status") ->
responses =>
#{
200 => emqx_dashboard_swagger:schema_with_examples(
hoconsc:ref(emqx_authn_schema, "metrics_status_fields"),
hoconsc:ref(emqx_authn_schema, "metrics_status_fields_authz"),
status_metrics_example()
),
400 => emqx_dashboard_swagger:error_codes(
@ -258,24 +258,7 @@ source(delete, #{bindings := #{type := Type}}) ->
update_config({?CMD_DELETE, Type}, #{}).
source_status(get, #{bindings := #{type := Type}}) ->
BinType = atom_to_binary(Type, utf8),
case get_raw_source(BinType) of
[] ->
{404, #{
code => <<"NOT_FOUND">>,
message => <<"Not found", BinType/binary>>
}};
[#{<<"type">> := <<"file">>}] ->
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Not Support Status">>
}};
[_] ->
case emqx_authz:lookup(Type) of
#{annotations := #{id := ResourceId}} -> lookup_from_all_nodes(ResourceId);
_ -> {400, #{code => <<"BAD_REQUEST">>, message => <<"Resource Disable">>}}
end
end.
lookup_from_all_nodes(Type).
move_source(Method, #{bindings := #{type := Type} = Bindings} = Req) when
is_atom(Type)
@ -321,39 +304,51 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos
%% Internal functions
%%--------------------------------------------------------------------
lookup_from_local_node(ResourceId) ->
lookup_from_local_node(Type) ->
NodeId = node(self()),
try emqx_authz:lookup(Type) of
#{annotations := #{id := ResourceId}} ->
Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type),
case emqx_resource:get_instance(ResourceId) of
{error, not_found} -> {error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := Metrics}} -> {ok, {NodeId, Status, Metrics}}
{error, not_found} ->
{error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := ResourceMetrics}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}}
end;
_ ->
Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type),
{ok, {NodeId, connected, Metrics, #{}}}
catch
Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
end.
lookup_from_all_nodes(ResourceId) ->
lookup_from_all_nodes(Type) ->
Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, ResourceId)) of
case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, Type)) of
{ok, ResList} ->
{StatusMap, MetricsMap, _} = make_result_map(ResList),
{StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)),
AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
AggregateResourceMetrics = aggregate_metrics(maps:values(ResourceMetricsMap)),
Fun = fun(_, V1) -> restructure_map(V1) end,
MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end,
HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end,
case AggregateStatus of
empty_metrics_and_status ->
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Resource Not Support Status">>
}};
_ ->
{200, #{
node_status => HelpFun(StatusMap, status),
node_resource_metrics => HelpFun(maps:map(Fun, ResourceMetricsMap), metrics),
resource_metrics =>
case maps:size(AggregateResourceMetrics) of
0 -> #{};
_ -> restructure_map(AggregateResourceMetrics)
end,
node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
metrics => restructure_map(AggregateMetrics),
node_status => HelpFun(StatusMap, status),
status => AggregateStatus,
metrics => restructure_map(AggregateMetrics)
}}
end;
node_error => HelpFun(maps:map(Fun, ErrorMap), reason)
}};
{error, ErrL} ->
{500, #{
{400, #{
code => <<"INTERNAL_ERROR">>,
message => bin_t(io_lib:format("~p", [ErrL]))
}}
@ -371,7 +366,7 @@ aggregate_status(AllStatus) ->
end.
aggregate_metrics([]) ->
empty_metrics_and_status;
#{};
aggregate_metrics([HeadMetrics | AllMetrics]) ->
CombinerFun =
fun ComFun(_Key, Val1, Val2) ->
@ -387,20 +382,34 @@ aggregate_metrics([HeadMetrics | AllMetrics]) ->
make_result_map(ResList) ->
Fun =
fun(Elem, {StatusMap, MetricsMap, ErrorMap}) ->
fun(Elem, {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap}) ->
case Elem of
{ok, {NodeId, Status, Metrics}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}} ->
{
maps:put(NodeId, Status, StatusMap),
maps:put(NodeId, Metrics, MetricsMap),
maps:put(NodeId, ResourceMetrics, ResourceMetricsMap),
ErrorMap
};
{error, {NodeId, Reason}} ->
{StatusMap, MetricsMap, maps:put(NodeId, Reason, ErrorMap)}
{StatusMap, MetricsMap, ResourceMetricsMap, maps:put(NodeId, Reason, ErrorMap)}
end
end,
lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList).
restructure_map(#{
counters := #{deny := Failed, matched := Match, allow := Succ, ignore := Ignore},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) ->
#{
matched => Match,
allow => Succ,
deny => Failed,
ignore => Ignore,
rate => Rate,
rate_last5m => Rate5m,
rate_max => RateMax
};
restructure_map(#{
counters := #{failed := Failed, matched := Match, success := Succ},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}

View File

@ -30,7 +30,7 @@
introduced_in() ->
"5.0.0".
-spec lookup_from_all_nodes([node()], binary()) ->
-spec lookup_from_all_nodes([node()], atom()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, ResourceId) ->
erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [ResourceId], ?TIMEOUT).
lookup_from_all_nodes(Nodes, Type) ->
erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [Type], ?TIMEOUT).

View File

@ -188,38 +188,6 @@ t_api(_) ->
],
{ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1),
Snd = fun({_, Val}) -> Val end,
LookupVal = fun LookupV(List, RestJson) ->
case List of
[Name] -> Snd(lists:keyfind(Name, 1, RestJson));
[Name | NS] -> LookupV(NS, Snd(lists:keyfind(Name, 1, RestJson)))
end
end,
EqualFun = fun(RList) ->
fun({M, V}) ->
?assertEqual(
V,
LookupVal(
[<<"metrics">>, M],
RList
)
)
end
end,
AssertFun =
fun(ResultJson) ->
{ok, RList} = emqx_json:safe_decode(ResultJson),
MetricsList = [
{<<"failed">>, 0},
{<<"matched">>, 0},
{<<"rate">>, 0.0},
{<<"rate_last5m">>, 0.0},
{<<"rate_max">>, 0.0},
{<<"success">>, 0}
],
lists:map(EqualFun(RList), MetricsList)
end,
{ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []),
Sources = get_sources(Result2),
?assertMatch(
@ -271,7 +239,14 @@ t_api(_) ->
),
{ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []),
{ok, 200, Status4} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
AssertFun(Status4),
#{
<<"metrics">> := #{
<<"allow">> := 0,
<<"deny">> := 0,
<<"matched">> := 0,
<<"ignore">> := 0
}
} = jiffy:decode(Status4, [return_maps]),
?assertMatch(
#{
<<"type">> := <<"mongodb">>,
@ -304,8 +279,6 @@ t_api(_) ->
}
),
{ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []),
{ok, 200, Status5} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
AssertFun(Status5),
?assertMatch(
#{
<<"type">> := <<"mongodb">>,
@ -320,6 +293,16 @@ t_api(_) ->
jsx:decode(Result5)
),
{ok, 200, Status5_1} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 0,
<<"deny">> := 0,
<<"matched">> := 0,
<<"ignore">> := 0
}
} = jiffy:decode(Status5_1, [return_maps]),
#{
ssl := #{
cacertfile := SavedCacertfile,
@ -367,6 +350,77 @@ t_api(_) ->
{ok, 200, Result6} = request(get, uri(["authorization", "sources"]), []),
?assertEqual([], get_sources(Result6)),
?assertEqual([], emqx:get_config([authorization, sources])),
{ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6),
{ok, Client} = emqtt:start_link(
[
{username, <<"u_event3">>},
{clientid, <<"c_event3">>},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 60}}
]
),
emqtt:connect(Client),
timer:sleep(50),
emqtt:publish(
Client,
<<"t1">>,
#{'Message-Expiry-Interval' => 60},
<<"{\"id\": 1, \"name\": \"ha\"}">>,
[{qos, 1}]
),
{ok, 200, Status5} = request(get, uri(["authorization", "sources", "file", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 1,
<<"deny">> := 0,
<<"matched">> := 1,
<<"ignore">> := 0
}
} = jiffy:decode(Status5, [return_maps]),
timer:sleep(50),
emqtt:publish(
Client,
<<"t2">>,
#{'Message-Expiry-Interval' => 60},
<<"{\"id\": 1, \"name\": \"ha\"}">>,
[{qos, 1}]
),
{ok, 200, Status6} = request(get, uri(["authorization", "sources", "file", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 2,
<<"deny">> := 0,
<<"matched">> := 2,
<<"ignore">> := 0
}
} = jiffy:decode(Status6, [return_maps]),
timer:sleep(50),
emqtt:publish(
Client,
<<"t3">>,
#{'Message-Expiry-Interval' => 60},
<<"{\"id\": 1, \"name\": \"ha\"}">>,
[{qos, 1}]
),
timer:sleep(50),
{ok, 200, Status7} = request(get, uri(["authorization", "sources", "file", "status"]), []),
#{
<<"metrics">> := #{
<<"allow">> := 3,
<<"deny">> := 0,
<<"matched">> := 3,
<<"ignore">> := 0
}
} = jiffy:decode(Status7, [return_maps]),
ok.
t_move_source(_) ->