emqx/apps/emqx_authz/src/emqx_authz_api_sources.erl

662 lines
22 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_api_sources).
-behaviour(minirest_api).
-include("emqx_authz.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/1, mk/2, ref/2, array/1, enum/1]).
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(API_SCHEMA_MODULE, emqx_authz_api_schema).
-export([
get_raw_sources/0,
get_raw_source/1,
source_status/2,
lookup_from_local_node/1,
lookup_from_all_nodes/1
]).
-export([
api_spec/0,
paths/0,
schema/1,
fields/1
]).
-export([
sources/2,
source/2,
source_move/2,
aggregate_metrics/1
]).
-define(TAGS, [<<"Authorization">>]).
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/authorization/sources",
"/authorization/sources/:type",
"/authorization/sources/:type/status",
"/authorization/sources/:type/move"
].
fields(sources) ->
[{sources, mk(array(hoconsc:union(authz_sources_type_refs())), #{desc => ?DESC(sources)})}].
%%--------------------------------------------------------------------
%% Schema for each URI
%%--------------------------------------------------------------------
schema("/authorization/sources") ->
#{
'operationId' => sources,
get =>
#{
description => ?DESC(authorization_sources_get),
tags => ?TAGS,
responses =>
#{
200 => ref(?MODULE, sources)
}
},
post =>
#{
description => ?DESC(authorization_sources_post),
tags => ?TAGS,
'requestBody' => mk(
hoconsc:union(authz_sources_type_refs()),
#{desc => ?DESC(source_config)}
),
responses =>
#{
204 => <<"Authorization source created successfully">>,
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST],
<<"Bad Request">>
)
}
}
};
schema("/authorization/sources/:type") ->
#{
'operationId' => source,
get =>
#{
description => ?DESC(authorization_sources_type_get),
tags => ?TAGS,
parameters => parameters_field(),
responses =>
#{
200 => mk(
hoconsc:union(authz_sources_type_refs()),
#{desc => ?DESC(source)}
),
404 => emqx_dashboard_swagger:error_codes([?NOT_FOUND], <<"Not Found">>)
}
},
put =>
#{
description => ?DESC(authorization_sources_type_put),
tags => ?TAGS,
parameters => parameters_field(),
'requestBody' => mk(hoconsc:union(authz_sources_type_refs())),
responses =>
#{
204 => <<"Authorization source updated successfully">>,
400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
}
},
delete =>
#{
description => ?DESC(authorization_sources_type_delete),
tags => ?TAGS,
parameters => parameters_field(),
responses =>
#{
204 => <<"Deleted successfully">>,
400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
}
}
};
schema("/authorization/sources/:type/status") ->
#{
'operationId' => source_status,
get =>
#{
description => ?DESC(authorization_sources_type_status_get),
tags => ?TAGS,
parameters => parameters_field(),
responses =>
#{
200 => emqx_dashboard_swagger:schema_with_examples(
hoconsc:ref(emqx_authz_schema, "metrics_status_fields"),
status_metrics_example()
),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad request">>
),
404 => emqx_dashboard_swagger:error_codes([?NOT_FOUND], <<"Not Found">>)
}
}
};
schema("/authorization/sources/:type/move") ->
#{
'operationId' => source_move,
post =>
#{
description => ?DESC(authorization_sources_type_move_post),
tags => ?TAGS,
parameters => parameters_field(),
'requestBody' =>
emqx_dashboard_swagger:schema_with_examples(
ref(?API_SCHEMA_MODULE, position),
position_example()
),
responses =>
#{
204 => <<"No Content">>,
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad Request">>
),
404 => emqx_dashboard_swagger:error_codes([?NOT_FOUND], <<"Not Found">>)
}
}
}.
%%--------------------------------------------------------------------
%% Operation functions
%%--------------------------------------------------------------------
sources(Method, #{bindings := #{type := Type} = Bindings} = Req) when
is_atom(Type)
->
sources(Method, Req#{bindings => Bindings#{type => atom_to_binary(Type, utf8)}});
sources(get, _) ->
Sources = lists:foldl(
fun
(
#{
<<"type">> := <<"file">>,
<<"enable">> := Enable,
<<"path">> := Path
},
AccIn
) ->
case emqx_authz_file:read_file(Path) of
{ok, Rules} ->
lists:append(AccIn, [
#{
type => file,
enable => Enable,
rules => Rules
}
]);
{error, _} ->
lists:append(AccIn, [
#{
type => file,
enable => Enable,
rules => <<"">>
}
])
end;
(Source, AccIn) ->
lists:append(AccIn, [Source])
end,
[],
get_raw_sources()
),
{200, #{sources => Sources}};
sources(post, #{body := Body}) ->
update_config(?CMD_PREPEND, Body).
source(Method, #{bindings := #{type := Type} = Bindings} = Req) when
is_atom(Type)
->
source(Method, Req#{bindings => Bindings#{type => atom_to_binary(Type, utf8)}});
source(get, #{bindings := #{type := Type}}) ->
with_source(
Type,
fun
(#{<<"type">> := <<"file">>, <<"enable">> := Enable, <<"path">> := Path}) ->
case emqx_authz_file:read_file(Path) of
{ok, Rules} ->
{200, #{
type => file,
enable => Enable,
rules => Rules
}};
{error, Reason} ->
{500, #{
code => <<"INTERNAL_ERROR">>,
message => bin(Reason)
}}
end;
(Source) ->
{200, Source}
end
);
source(put, #{bindings := #{type := Type}, body := #{<<"type">> := Type} = Body}) ->
with_source(
Type,
fun(_) ->
update_config({?CMD_REPLACE, Type}, Body)
end
);
source(put, #{bindings := #{type := Type}, body := #{<<"type">> := _OtherType}}) ->
with_source(
Type,
fun(_) ->
{400, #{code => <<"BAD_REQUEST">>, message => <<"Type mismatch">>}}
end
);
source(delete, #{bindings := #{type := Type}}) ->
with_source(
Type,
fun(_) ->
update_config({?CMD_DELETE, Type}, #{})
end
).
source_status(get, #{bindings := #{type := Type}}) ->
with_source(
atom_to_binary(Type, utf8),
fun(_) -> lookup_from_all_nodes(Type) end
).
source_move(Method, #{bindings := #{type := Type} = Bindings} = Req) when
is_atom(Type)
->
source_move(Method, Req#{bindings => Bindings#{type => atom_to_binary(Type, utf8)}});
source_move(post, #{bindings := #{type := Type}, body := #{<<"position">> := Position}}) ->
with_source(
Type,
fun(_Source) ->
case parse_position(Position) of
{ok, NPosition} ->
try emqx_authz:move(Type, NPosition) of
{ok, _} ->
{204};
{error, {not_found_source, _Type}} ->
{404, #{
code => <<"NOT_FOUND">>,
message => <<"source ", Type/binary, " not found">>
}};
{error, {emqx_conf_schema, _}} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"BAD_SCHEMA">>
}};
{error, Reason} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => bin(Reason)
}}
catch
error:{unknown_authz_source_type, Unknown} ->
NUnknown = bin(Unknown),
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Unknown authz Source Type: ", NUnknown/binary>>
}}
end;
{error, Reason} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => bin(Reason)
}}
end
end
).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
lookup_from_local_node(Type) ->
NodeId = node(self()),
try emqx_authz:lookup(Type) of
#{annotations := #{id := ResourceId}} ->
Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type),
case emqx_resource:get_instance(ResourceId) of
{error, not_found} ->
{error, {NodeId, not_found_resource}};
{ok, _, #{status := Status}} ->
{ok, {NodeId, Status, Metrics, emqx_resource:get_metrics(ResourceId)}}
end;
_ ->
Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type),
%% for authz file/authz mnesia
{ok, {NodeId, connected, Metrics, #{}}}
catch
_:Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
end.
lookup_from_all_nodes(Type) ->
Nodes = mria:running_nodes(),
case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, Type)) of
{ok, ResList} ->
{StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)),
AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
AggregateResourceMetrics = aggregate_metrics(maps:values(ResourceMetricsMap)),
Fun = fun(_, V1) -> restructure_map(V1) end,
MKMap = fun(Name) -> fun({Key, Val}) -> #{node => Key, Name => Val} end end,
HelpFun = fun(M, Name) -> lists:map(MKMap(Name), maps:to_list(M)) end,
{200, #{
node_resource_metrics => HelpFun(maps:map(Fun, ResourceMetricsMap), metrics),
resource_metrics =>
case maps:size(AggregateResourceMetrics) of
0 -> #{};
_ -> restructure_map(AggregateResourceMetrics)
end,
node_metrics => HelpFun(maps:map(Fun, MetricsMap), metrics),
metrics => restructure_map(AggregateMetrics),
node_status => HelpFun(StatusMap, status),
status => AggregateStatus,
node_error => HelpFun(maps:map(Fun, ErrorMap), reason)
}};
{error, ErrL} ->
{400, #{
code => <<"INTERNAL_ERROR">>,
message => bin_t(io_lib:format("~p", [ErrL]))
}}
end.
aggregate_status([]) ->
empty_metrics_and_status;
aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end,
HeadVal = Head(AllStatus),
AllRes = lists:all(fun(Val) -> Val == HeadVal end, AllStatus),
case AllRes of
true -> HeadVal;
false -> inconsistent
end.
aggregate_metrics([]) ->
#{};
aggregate_metrics([HeadMetrics | AllMetrics]) ->
ErrorLogger = fun(Reason) -> ?SLOG(info, #{msg => "bad_metrics_value", error => Reason}) end,
Fun = fun(ElemMap, AccMap) ->
emqx_utils_maps:best_effort_recursive_sum(AccMap, ElemMap, ErrorLogger)
end,
lists:foldl(Fun, HeadMetrics, AllMetrics).
make_result_map(ResList) ->
Fun =
fun(Elem, {StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap}) ->
case Elem of
{ok, {NodeId, Status, Metrics, ResourceMetrics}} ->
{
maps:put(NodeId, Status, StatusMap),
maps:put(NodeId, Metrics, MetricsMap),
maps:put(NodeId, ResourceMetrics, ResourceMetricsMap),
ErrorMap
};
{error, {NodeId, Reason}} ->
{StatusMap, MetricsMap, ResourceMetricsMap, maps:put(NodeId, Reason, ErrorMap)}
end
end,
lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList).
restructure_map(#{
counters := #{deny := Failed, total := Total, allow := Succ, nomatch := Nomatch},
rate := #{total := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) ->
#{
total => Total,
allow => Succ,
deny => Failed,
nomatch => Nomatch,
rate => Rate,
rate_last5m => Rate5m,
rate_max => RateMax
};
restructure_map(#{
counters := #{failed := Failed, matched := Match, success := Succ},
rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) ->
#{
matched => Match,
success => Succ,
failed => Failed,
rate => Rate,
rate_last5m => Rate5m,
rate_max => RateMax
};
restructure_map(Error) ->
Error.
bin_t(S) when is_list(S) ->
list_to_binary(S).
is_ok(ResL) ->
case
lists:filter(
fun
({ok, _}) -> false;
(_) -> true
end,
ResL
)
of
[] -> {ok, [Res || {ok, Res} <- ResL]};
ErrL -> {error, ErrL}
end.
get_raw_sources() ->
RawSources = emqx:get_raw_config([authorization, sources], []),
Schema = emqx_hocon:make_schema(emqx_authz_schema:authz_fields()),
Conf = #{<<"sources">> => RawSources},
#{<<"sources">> := Sources} = hocon_tconf:make_serializable(Schema, Conf, #{}),
merge_default_headers(Sources).
merge_default_headers(Sources) ->
lists:map(
fun(Source) ->
case maps:find(<<"headers">>, Source) of
{ok, Headers} ->
NewHeaders =
case Source of
#{<<"method">> := <<"get">>} ->
(emqx_authz_schema:headers_no_content_type(converter))(Headers);
#{<<"method">> := <<"post">>} ->
(emqx_authz_schema:headers(converter))(Headers);
_ ->
Headers
end,
Source#{<<"headers">> => NewHeaders};
error ->
Source
end
end,
Sources
).
get_raw_source(Type) ->
lists:filter(
fun(#{<<"type">> := T}) ->
T =:= Type
end,
get_raw_sources()
).
-spec with_source(binary(), fun((map()) -> term())) -> term().
with_source(Type, ContF) ->
case get_raw_source(Type) of
[] ->
{404, #{code => <<"NOT_FOUND">>, message => <<"Not found: ", Type/binary>>}};
[Source] ->
ContF(Source)
end.
update_config(Cmd, Sources) ->
case emqx_authz:update(Cmd, Sources) of
{ok, _} ->
{204};
{error, {pre_config_update, emqx_authz, Reason}} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => bin(Reason)
}};
{error, {post_config_update, emqx_authz, Reason}} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => bin(Reason)
}};
%% TODO: The `Reason` may cann't be trans to json term. (i.e. ecpool start failed)
{error, {emqx_conf_schema, _}} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"BAD_SCHEMA">>
}};
{error, Reason} ->
{400, #{
code => <<"BAD_REQUEST">>,
message => bin(Reason)
}}
end.
parameters_field() ->
[
{type,
mk(
enum(?API_SCHEMA_MODULE:authz_sources_types(simple)),
#{in => path, desc => ?DESC(source_type)}
)}
].
parse_position(<<"front">>) ->
{ok, ?CMD_MOVE_FRONT};
parse_position(<<"rear">>) ->
{ok, ?CMD_MOVE_REAR};
parse_position(<<"before:">>) ->
{error, <<"Invalid parameter. Cannot be placed before an empty target">>};
parse_position(<<"after:">>) ->
{error, <<"Invalid parameter. Cannot be placed after an empty target">>};
parse_position(<<"before:", Before/binary>>) ->
{ok, ?CMD_MOVE_BEFORE(Before)};
parse_position(<<"after:", After/binary>>) ->
{ok, ?CMD_MOVE_AFTER(After)};
parse_position(_) ->
{error, <<"Invalid parameter. Unknow position">>}.
position_example() ->
#{
front =>
#{
summary => <<"front example">>,
value => #{<<"position">> => <<"front">>}
},
rear =>
#{
summary => <<"rear example">>,
value => #{<<"position">> => <<"rear">>}
},
relative_before =>
#{
summary => <<"relative example">>,
value => #{<<"position">> => <<"before:file">>}
},
relative_after =>
#{
summary => <<"relative example">>,
value => #{<<"position">> => <<"after:file">>}
}
}.
authz_sources_type_refs() ->
[
ref(?API_SCHEMA_MODULE, Type)
|| Type <- emqx_authz_api_schema:authz_sources_types(detailed)
].
bin(Term) -> erlang:iolist_to_binary(io_lib:format("~p", [Term])).
status_metrics_example() ->
#{
'metrics_example' => #{
summary => <<"Showing a typical metrics example">>,
value =>
#{
resource_metrics => #{
matched => 0,
success => 0,
failed => 0,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.0
},
node_resource_metrics => [
#{
node => node(),
metrics => #{
matched => 0,
success => 0,
failed => 0,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.0
}
}
],
metrics => #{
total => 0,
allow => 0,
deny => 0,
nomatch => 0,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.0
},
node_metrics => [
#{
node => node(),
metrics => #{
total => 0,
allow => 0,
deny => 0,
nomatch => 0,
rate => 0.0,
rate_last5m => 0.0,
rate_max => 0.0
}
}
],
status => connected,
node_status => [
#{
node => node(),
status => connected
}
]
}
}
}.