499 lines
14 KiB
Erlang
499 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_authz_schema).
|
|
|
|
-include("emqx_authz.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
|
|
-reflect_type([
|
|
permission/0,
|
|
action/0
|
|
]).
|
|
|
|
-type action() :: publish | subscribe | all.
|
|
-type permission() :: allow | deny.
|
|
|
|
-import(emqx_schema, [mk_duration/2]).
|
|
|
|
-export([
|
|
namespace/0,
|
|
roots/0,
|
|
tags/0,
|
|
fields/1,
|
|
validations/0,
|
|
desc/1,
|
|
authz_fields/0
|
|
]).
|
|
|
|
-export([
|
|
headers_no_content_type/1,
|
|
headers/1
|
|
]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Hocon Schema
|
|
%%--------------------------------------------------------------------
|
|
|
|
type_names() ->
|
|
[
|
|
file,
|
|
http_get,
|
|
http_post,
|
|
mnesia,
|
|
mongo_single,
|
|
mongo_rs,
|
|
mongo_sharded,
|
|
mysql,
|
|
postgresql,
|
|
redis_single,
|
|
redis_sentinel,
|
|
redis_cluster
|
|
].
|
|
|
|
namespace() -> authz.
|
|
|
|
tags() ->
|
|
[<<"Authorization">>].
|
|
|
|
%% @doc authorization schema is not exported
|
|
%% but directly used by emqx_schema
|
|
roots() -> [].
|
|
|
|
fields("authorization") ->
|
|
authz_fields();
|
|
fields(file) ->
|
|
authz_common_fields(file) ++
|
|
[{path, ?HOCON(string(), #{required => true, desc => ?DESC(path)})}];
|
|
fields(http_get) ->
|
|
authz_common_fields(http) ++
|
|
http_common_fields() ++
|
|
[
|
|
{method, method(get)},
|
|
{headers, fun headers_no_content_type/1}
|
|
];
|
|
fields(http_post) ->
|
|
authz_common_fields(http) ++
|
|
http_common_fields() ++
|
|
[
|
|
{method, method(post)},
|
|
{headers, fun headers/1}
|
|
];
|
|
fields(mnesia) ->
|
|
authz_common_fields(built_in_database);
|
|
fields(mongo_single) ->
|
|
authz_common_fields(mongodb) ++
|
|
mongo_common_fields() ++
|
|
emqx_connector_mongo:fields(single);
|
|
fields(mongo_rs) ->
|
|
authz_common_fields(mongodb) ++
|
|
mongo_common_fields() ++
|
|
emqx_connector_mongo:fields(rs);
|
|
fields(mongo_sharded) ->
|
|
authz_common_fields(mongodb) ++
|
|
mongo_common_fields() ++
|
|
emqx_connector_mongo:fields(sharded);
|
|
fields(mysql) ->
|
|
authz_common_fields(mysql) ++
|
|
connector_fields(mysql) ++
|
|
[{query, query()}];
|
|
fields(postgresql) ->
|
|
authz_common_fields(postgresql) ++
|
|
emqx_connector_pgsql:fields(config) ++
|
|
[{query, query()}];
|
|
fields(redis_single) ->
|
|
authz_common_fields(redis) ++
|
|
connector_fields(redis, single) ++
|
|
[{cmd, cmd()}];
|
|
fields(redis_sentinel) ->
|
|
authz_common_fields(redis) ++
|
|
connector_fields(redis, sentinel) ++
|
|
[{cmd, cmd()}];
|
|
fields(redis_cluster) ->
|
|
authz_common_fields(redis) ++
|
|
connector_fields(redis, cluster) ++
|
|
[{cmd, cmd()}];
|
|
fields("metrics_status_fields") ->
|
|
[
|
|
{"resource_metrics", ?HOCON(?R_REF("resource_metrics"), #{desc => ?DESC("metrics")})},
|
|
{"node_resource_metrics", array("node_resource_metrics", "node_metrics")},
|
|
{"metrics", ?HOCON(?R_REF("metrics"), #{desc => ?DESC("metrics")})},
|
|
{"node_metrics", array("node_metrics")},
|
|
{"status", ?HOCON(cluster_status(), #{desc => ?DESC("status")})},
|
|
{"node_status", array("node_status")},
|
|
{"node_error", array("node_error")}
|
|
];
|
|
fields("metrics") ->
|
|
[
|
|
{"total", ?HOCON(integer(), #{desc => ?DESC("metrics_total")})},
|
|
{"allow", ?HOCON(integer(), #{desc => ?DESC("allow")})},
|
|
{"deny", ?HOCON(integer(), #{desc => ?DESC("deny")})},
|
|
{"nomatch", ?HOCON(float(), #{desc => ?DESC("nomatch")})}
|
|
] ++ common_rate_field();
|
|
fields("node_metrics") ->
|
|
[
|
|
node_name(),
|
|
{"metrics", ?HOCON(?R_REF("metrics"), #{desc => ?DESC("metrics")})}
|
|
];
|
|
fields("resource_metrics") ->
|
|
common_field();
|
|
fields("node_resource_metrics") ->
|
|
[
|
|
node_name(),
|
|
{"metrics", ?HOCON(?R_REF("resource_metrics"), #{desc => ?DESC("metrics")})}
|
|
];
|
|
fields("node_status") ->
|
|
[
|
|
node_name(),
|
|
{"status", ?HOCON(status(), #{desc => ?DESC("node_status")})}
|
|
];
|
|
fields("node_error") ->
|
|
[
|
|
node_name(),
|
|
{"error", ?HOCON(string(), #{desc => ?DESC("node_error")})}
|
|
].
|
|
|
|
common_field() ->
|
|
[
|
|
{"matched", ?HOCON(integer(), #{desc => ?DESC("matched")})},
|
|
{"success", ?HOCON(integer(), #{desc => ?DESC("success")})},
|
|
{"failed", ?HOCON(integer(), #{desc => ?DESC("failed")})}
|
|
] ++ common_rate_field().
|
|
|
|
status() ->
|
|
hoconsc:enum([connected, disconnected, connecting]).
|
|
|
|
cluster_status() ->
|
|
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
|
|
|
|
node_name() ->
|
|
{"node", ?HOCON(binary(), #{desc => ?DESC("node"), example => "emqx@127.0.0.1"})}.
|
|
|
|
desc(?CONF_NS) ->
|
|
?DESC(?CONF_NS);
|
|
desc(file) ->
|
|
?DESC(file);
|
|
desc(http_get) ->
|
|
?DESC(http_get);
|
|
desc(http_post) ->
|
|
?DESC(http_post);
|
|
desc(mnesia) ->
|
|
?DESC(mnesia);
|
|
desc(mongo_single) ->
|
|
?DESC(mongo_single);
|
|
desc(mongo_rs) ->
|
|
?DESC(mongo_rs);
|
|
desc(mongo_sharded) ->
|
|
?DESC(mongo_sharded);
|
|
desc(mysql) ->
|
|
?DESC(mysql);
|
|
desc(postgresql) ->
|
|
?DESC(postgresql);
|
|
desc(redis_single) ->
|
|
?DESC(redis_single);
|
|
desc(redis_sentinel) ->
|
|
?DESC(redis_sentinel);
|
|
desc(redis_cluster) ->
|
|
?DESC(redis_cluster);
|
|
desc(_) ->
|
|
undefined.
|
|
|
|
authz_common_fields(Type) ->
|
|
[
|
|
{type, ?HOCON(Type, #{required => true, desc => ?DESC(type)})},
|
|
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}
|
|
].
|
|
|
|
http_common_fields() ->
|
|
[
|
|
{url, fun url/1},
|
|
{request_timeout,
|
|
mk_duration("Request timeout", #{
|
|
required => false, default => <<"30s">>, desc => ?DESC(request_timeout)
|
|
})},
|
|
{body, ?HOCON(map(), #{required => false, desc => ?DESC(body)})}
|
|
] ++
|
|
maps:to_list(
|
|
maps:without(
|
|
[
|
|
base_url,
|
|
pool_type
|
|
],
|
|
maps:from_list(connector_fields(http))
|
|
)
|
|
).
|
|
|
|
mongo_common_fields() ->
|
|
[
|
|
{collection,
|
|
?HOCON(binary(), #{
|
|
required => true,
|
|
desc => ?DESC(collection)
|
|
})},
|
|
{filter,
|
|
?HOCON(map(), #{
|
|
required => false,
|
|
default => #{},
|
|
desc => ?DESC(filter)
|
|
})}
|
|
].
|
|
|
|
validations() ->
|
|
[
|
|
{check_ssl_opts, fun check_ssl_opts/1}
|
|
].
|
|
|
|
headers(type) ->
|
|
list({binary(), binary()});
|
|
headers(desc) ->
|
|
?DESC(?FUNCTION_NAME);
|
|
headers(converter) ->
|
|
fun(Headers) ->
|
|
maps:to_list(maps:merge(default_headers(), transform_header_name(Headers)))
|
|
end;
|
|
headers(default) ->
|
|
default_headers();
|
|
headers(_) ->
|
|
undefined.
|
|
|
|
headers_no_content_type(type) ->
|
|
list({binary(), binary()});
|
|
headers_no_content_type(desc) ->
|
|
?DESC(?FUNCTION_NAME);
|
|
headers_no_content_type(converter) ->
|
|
fun(Headers) ->
|
|
maps:to_list(
|
|
maps:without(
|
|
[<<"content-type">>],
|
|
maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
|
|
)
|
|
)
|
|
end;
|
|
headers_no_content_type(default) ->
|
|
default_headers_no_content_type();
|
|
headers_no_content_type(validator) ->
|
|
fun(Headers) ->
|
|
case lists:keyfind(<<"content-type">>, 1, Headers) of
|
|
false -> ok;
|
|
_ -> {error, do_not_include_content_type}
|
|
end
|
|
end;
|
|
headers_no_content_type(_) ->
|
|
undefined.
|
|
|
|
url(type) -> binary();
|
|
url(desc) -> ?DESC(?FUNCTION_NAME);
|
|
url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
|
|
url(required) -> true;
|
|
url(_) -> undefined.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
default_headers() ->
|
|
maps:put(
|
|
<<"content-type">>,
|
|
<<"application/json">>,
|
|
default_headers_no_content_type()
|
|
).
|
|
|
|
default_headers_no_content_type() ->
|
|
#{
|
|
<<"accept">> => <<"application/json">>,
|
|
<<"cache-control">> => <<"no-cache">>,
|
|
<<"connection">> => <<"keep-alive">>,
|
|
<<"keep-alive">> => <<"timeout=30, max=1000">>
|
|
}.
|
|
|
|
transform_header_name(Headers) ->
|
|
maps:fold(
|
|
fun(K0, V, Acc) ->
|
|
K = list_to_binary(string:to_lower(to_list(K0))),
|
|
maps:put(K, V, Acc)
|
|
end,
|
|
#{},
|
|
Headers
|
|
).
|
|
|
|
check_ssl_opts(Conf) ->
|
|
Sources = hocon_maps:get("authorization.sources", Conf, []),
|
|
lists:foreach(
|
|
fun
|
|
(#{<<"url">> := Url} = Source) ->
|
|
case emqx_authz_http:parse_url(Url) of
|
|
{<<"https", _/binary>>, _, _} ->
|
|
case emqx_map_lib:deep_find([<<"ssl">>, <<"enable">>], Source) of
|
|
{ok, true} -> true;
|
|
{ok, false} -> throw({ssl_not_enable, Url});
|
|
_ -> throw({ssl_enable_not_found, Url})
|
|
end;
|
|
{<<"http", _/binary>>, _, _} ->
|
|
ok;
|
|
Bad ->
|
|
throw({bad_scheme, Url, Bad})
|
|
end;
|
|
(_Source) ->
|
|
ok
|
|
end,
|
|
Sources
|
|
).
|
|
|
|
query() ->
|
|
?HOCON(binary(), #{
|
|
desc => ?DESC(query),
|
|
required => true,
|
|
validator => fun(S) ->
|
|
case size(S) > 0 of
|
|
true -> ok;
|
|
_ -> {error, "Request query"}
|
|
end
|
|
end
|
|
}).
|
|
|
|
cmd() ->
|
|
?HOCON(binary(), #{
|
|
desc => ?DESC(cmd),
|
|
required => true,
|
|
validator => fun(S) ->
|
|
case size(S) > 0 of
|
|
true -> ok;
|
|
_ -> {error, "Request query"}
|
|
end
|
|
end
|
|
}).
|
|
|
|
connector_fields(DB) ->
|
|
connector_fields(DB, config).
|
|
connector_fields(DB, Fields) ->
|
|
Mod0 = io_lib:format("~ts_~ts", [emqx_connector, DB]),
|
|
Mod =
|
|
try
|
|
list_to_existing_atom(Mod0)
|
|
catch
|
|
error:badarg ->
|
|
list_to_atom(Mod0);
|
|
error:Reason ->
|
|
erlang:error(Reason)
|
|
end,
|
|
erlang:apply(Mod, fields, [Fields]).
|
|
|
|
to_list(A) when is_atom(A) ->
|
|
atom_to_list(A);
|
|
to_list(B) when is_binary(B) ->
|
|
binary_to_list(B).
|
|
|
|
common_rate_field() ->
|
|
[
|
|
{"rate", ?HOCON(float(), #{desc => ?DESC("rate")})},
|
|
{"rate_max", ?HOCON(float(), #{desc => ?DESC("rate_max")})},
|
|
{"rate_last5m", ?HOCON(float(), #{desc => ?DESC("rate_last5m")})}
|
|
].
|
|
|
|
method(Method) ->
|
|
?HOCON(Method, #{required => true, desc => ?DESC(method)}).
|
|
|
|
array(Ref) -> array(Ref, Ref).
|
|
|
|
array(Ref, DescId) ->
|
|
?HOCON(?ARRAY(?R_REF(Ref)), #{desc => ?DESC(DescId)}).
|
|
|
|
select_union_member(#{<<"type">> := <<"mongodb">>} = Value) ->
|
|
MongoType = maps:get(<<"mongo_type">>, Value, undefined),
|
|
case MongoType of
|
|
<<"single">> ->
|
|
?R_REF(mongo_single);
|
|
<<"rs">> ->
|
|
?R_REF(mongo_rs);
|
|
<<"sharded">> ->
|
|
?R_REF(mongo_sharded);
|
|
Else ->
|
|
throw(#{
|
|
reason => "unknown_mongo_type",
|
|
expected => "single | rs | sharded",
|
|
got => Else
|
|
})
|
|
end;
|
|
select_union_member(#{<<"type">> := <<"redis">>} = Value) ->
|
|
RedisType = maps:get(<<"redis_type">>, Value, undefined),
|
|
case RedisType of
|
|
<<"single">> ->
|
|
?R_REF(redis_single);
|
|
<<"cluster">> ->
|
|
?R_REF(redis_cluster);
|
|
<<"sentinel">> ->
|
|
?R_REF(redis_sentinel);
|
|
Else ->
|
|
throw(#{
|
|
reason => "unknown_redis_type",
|
|
expected => "single | cluster | sentinel",
|
|
got => Else
|
|
})
|
|
end;
|
|
select_union_member(#{<<"type">> := <<"http">>} = Value) ->
|
|
RedisType = maps:get(<<"method">>, Value, undefined),
|
|
case RedisType of
|
|
<<"get">> ->
|
|
?R_REF(http_get);
|
|
<<"post">> ->
|
|
?R_REF(http_post);
|
|
Else ->
|
|
throw(#{
|
|
reason => "unknown_http_method",
|
|
expected => "get | post",
|
|
got => Else
|
|
})
|
|
end;
|
|
select_union_member(#{<<"type">> := <<"built_in_database">>}) ->
|
|
?R_REF(mnesia);
|
|
select_union_member(#{<<"type">> := Type}) ->
|
|
select_union_member_loop(Type, type_names());
|
|
select_union_member(_) ->
|
|
throw("missing_type_field").
|
|
|
|
select_union_member_loop(TypeValue, []) ->
|
|
throw(#{
|
|
reason => "unknown_authz_type",
|
|
got => TypeValue
|
|
});
|
|
select_union_member_loop(TypeValue, [Type | Types]) ->
|
|
case TypeValue =:= atom_to_binary(Type) of
|
|
true ->
|
|
?R_REF(Type);
|
|
false ->
|
|
select_union_member_loop(TypeValue, Types)
|
|
end.
|
|
|
|
authz_fields() ->
|
|
Types = [?R_REF(Type) || Type <- type_names()],
|
|
UnionMemberSelector =
|
|
fun
|
|
(all_union_members) -> Types;
|
|
%% must return list
|
|
({value, Value}) -> [select_union_member(Value)]
|
|
end,
|
|
[
|
|
{sources,
|
|
?HOCON(
|
|
?ARRAY(?UNION(UnionMemberSelector)),
|
|
#{
|
|
default => [],
|
|
desc => ?DESC(sources)
|
|
}
|
|
)}
|
|
].
|