%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_authz_schema). -include("emqx_authz.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx_connector/include/emqx_connector.hrl"). -reflect_type([ permission/0, action/0 ]). -type action() :: publish | subscribe | all. -type permission() :: allow | deny. -import(emqx_schema, [mk_duration/2]). -export([ namespace/0, roots/0, tags/0, fields/1, validations/0, desc/1, authz_fields/0 ]). -export([ headers_no_content_type/1, headers/1 ]). %%-------------------------------------------------------------------- %% Hocon Schema %%-------------------------------------------------------------------- type_names() -> [ file, http_get, http_post, mnesia, mongo_single, mongo_rs, mongo_sharded, mysql, postgresql, redis_single, redis_sentinel, redis_cluster ]. namespace() -> authz. tags() -> [<<"Authorization">>]. %% @doc authorization schema is not exported %% but directly used by emqx_schema roots() -> []. fields("authorization") -> authz_fields(); fields(file) -> authz_common_fields(file) ++ [{path, ?HOCON(string(), #{required => true, desc => ?DESC(path)})}]; fields(http_get) -> authz_common_fields(http) ++ http_common_fields() ++ [ {method, method(get)}, {headers, fun headers_no_content_type/1} ]; fields(http_post) -> authz_common_fields(http) ++ http_common_fields() ++ [ {method, method(post)}, {headers, fun headers/1} ]; fields(mnesia) -> authz_common_fields(built_in_database); fields(mongo_single) -> authz_common_fields(mongodb) ++ mongo_common_fields() ++ emqx_connector_mongo:fields(single); fields(mongo_rs) -> authz_common_fields(mongodb) ++ mongo_common_fields() ++ emqx_connector_mongo:fields(rs); fields(mongo_sharded) -> authz_common_fields(mongodb) ++ mongo_common_fields() ++ emqx_connector_mongo:fields(sharded); fields(mysql) -> authz_common_fields(mysql) ++ connector_fields(mysql) ++ [{query, query()}]; fields(postgresql) -> authz_common_fields(postgresql) ++ emqx_connector_pgsql:fields(config) ++ [{query, query()}]; fields(redis_single) -> authz_common_fields(redis) ++ connector_fields(redis, single) ++ [{cmd, cmd()}]; fields(redis_sentinel) -> authz_common_fields(redis) ++ connector_fields(redis, sentinel) ++ [{cmd, cmd()}]; fields(redis_cluster) -> authz_common_fields(redis) ++ connector_fields(redis, cluster) ++ [{cmd, cmd()}]; fields("metrics_status_fields") -> [ {"resource_metrics", ?HOCON(?R_REF("resource_metrics"), #{desc => ?DESC("metrics")})}, {"node_resource_metrics", array("node_resource_metrics", "node_metrics")}, {"metrics", ?HOCON(?R_REF("metrics"), #{desc => ?DESC("metrics")})}, {"node_metrics", array("node_metrics")}, {"status", ?HOCON(cluster_status(), #{desc => ?DESC("status")})}, {"node_status", array("node_status")}, {"node_error", array("node_error")} ]; fields("metrics") -> [ {"total", ?HOCON(integer(), #{desc => ?DESC("metrics_total")})}, {"allow", ?HOCON(integer(), #{desc => ?DESC("allow")})}, {"deny", ?HOCON(integer(), #{desc => ?DESC("deny")})}, {"nomatch", ?HOCON(float(), #{desc => ?DESC("nomatch")})} ] ++ common_rate_field(); fields("node_metrics") -> [ node_name(), {"metrics", ?HOCON(?R_REF("metrics"), #{desc => ?DESC("metrics")})} ]; fields("resource_metrics") -> common_field(); fields("node_resource_metrics") -> [ node_name(), {"metrics", ?HOCON(?R_REF("resource_metrics"), #{desc => ?DESC("metrics")})} ]; fields("node_status") -> [ node_name(), {"status", ?HOCON(status(), #{desc => ?DESC("node_status")})} ]; fields("node_error") -> [ node_name(), {"error", ?HOCON(string(), #{desc => ?DESC("node_error")})} ]. common_field() -> [ {"matched", ?HOCON(integer(), #{desc => ?DESC("matched")})}, {"success", ?HOCON(integer(), #{desc => ?DESC("success")})}, {"failed", ?HOCON(integer(), #{desc => ?DESC("failed")})} ] ++ common_rate_field(). status() -> hoconsc:enum([connected, disconnected, connecting]). cluster_status() -> hoconsc:enum([connected, disconnected, connecting, inconsistent]). node_name() -> {"node", ?HOCON(binary(), #{desc => ?DESC("node"), example => "emqx@127.0.0.1"})}. desc(?CONF_NS) -> ?DESC(?CONF_NS); desc(file) -> ?DESC(file); desc(http_get) -> ?DESC(http_get); desc(http_post) -> ?DESC(http_post); desc(mnesia) -> ?DESC(mnesia); desc(mongo_single) -> ?DESC(mongo_single); desc(mongo_rs) -> ?DESC(mongo_rs); desc(mongo_sharded) -> ?DESC(mongo_sharded); desc(mysql) -> ?DESC(mysql); desc(postgresql) -> ?DESC(postgresql); desc(redis_single) -> ?DESC(redis_single); desc(redis_sentinel) -> ?DESC(redis_sentinel); desc(redis_cluster) -> ?DESC(redis_cluster); desc(_) -> undefined. authz_common_fields(Type) -> [ {type, ?HOCON(Type, #{required => true, desc => ?DESC(type)})}, {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})} ]. http_common_fields() -> [ {url, fun url/1}, {request_timeout, mk_duration("Request timeout", #{ required => false, default => <<"30s">>, desc => ?DESC(request_timeout) })}, {body, ?HOCON(map(), #{required => false, desc => ?DESC(body)})} ] ++ maps:to_list( maps:without( [ base_url, pool_type ], maps:from_list(connector_fields(http)) ) ). mongo_common_fields() -> [ {collection, ?HOCON(binary(), #{ required => true, desc => ?DESC(collection) })}, {filter, ?HOCON(map(), #{ required => false, default => #{}, desc => ?DESC(filter) })} ]. validations() -> [ {check_ssl_opts, fun check_ssl_opts/1} ]. headers(type) -> list({binary(), binary()}); headers(desc) -> ?DESC(?FUNCTION_NAME); headers(converter) -> fun(Headers) -> maps:to_list(maps:merge(default_headers(), transform_header_name(Headers))) end; headers(default) -> default_headers(); headers(_) -> undefined. headers_no_content_type(type) -> list({binary(), binary()}); headers_no_content_type(desc) -> ?DESC(?FUNCTION_NAME); headers_no_content_type(converter) -> fun(Headers) -> maps:to_list( maps:without( [<<"content-type">>], maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) ) ) end; headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(validator) -> fun(Headers) -> case lists:keyfind(<<"content-type">>, 1, Headers) of false -> ok; _ -> {error, do_not_include_content_type} end end; headers_no_content_type(_) -> undefined. url(type) -> binary(); url(desc) -> ?DESC(?FUNCTION_NAME); url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; url(required) -> true; url(_) -> undefined. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- default_headers() -> maps:put( <<"content-type">>, <<"application/json">>, default_headers_no_content_type() ). default_headers_no_content_type() -> #{ <<"accept">> => <<"application/json">>, <<"cache-control">> => <<"no-cache">>, <<"connection">> => <<"keep-alive">>, <<"keep-alive">> => <<"timeout=30, max=1000">> }. transform_header_name(Headers) -> maps:fold( fun(K0, V, Acc) -> K = list_to_binary(string:to_lower(to_list(K0))), maps:put(K, V, Acc) end, #{}, Headers ). check_ssl_opts(Conf) -> Sources = hocon_maps:get("authorization.sources", Conf, []), lists:foreach( fun (#{<<"url">> := Url} = Source) -> case emqx_authz_http:parse_url(Url) of {<<"https", _/binary>>, _, _} -> case emqx_map_lib:deep_find([<<"ssl">>, <<"enable">>], Source) of {ok, true} -> true; {ok, false} -> throw({ssl_not_enable, Url}); _ -> throw({ssl_enable_not_found, Url}) end; {<<"http", _/binary>>, _, _} -> ok; Bad -> throw({bad_scheme, Url, Bad}) end; (_Source) -> ok end, Sources ). query() -> ?HOCON(binary(), #{ desc => ?DESC(query), required => true, validator => fun(S) -> case size(S) > 0 of true -> ok; _ -> {error, "Request query"} end end }). cmd() -> ?HOCON(binary(), #{ desc => ?DESC(cmd), required => true, validator => fun(S) -> case size(S) > 0 of true -> ok; _ -> {error, "Request query"} end end }). connector_fields(DB) -> connector_fields(DB, config). connector_fields(DB, Fields) -> Mod0 = io_lib:format("~ts_~ts", [emqx_connector, DB]), Mod = try list_to_existing_atom(Mod0) catch error:badarg -> list_to_atom(Mod0); error:Reason -> erlang:error(Reason) end, erlang:apply(Mod, fields, [Fields]). to_list(A) when is_atom(A) -> atom_to_list(A); to_list(B) when is_binary(B) -> binary_to_list(B). common_rate_field() -> [ {"rate", ?HOCON(float(), #{desc => ?DESC("rate")})}, {"rate_max", ?HOCON(float(), #{desc => ?DESC("rate_max")})}, {"rate_last5m", ?HOCON(float(), #{desc => ?DESC("rate_last5m")})} ]. method(Method) -> ?HOCON(Method, #{required => true, desc => ?DESC(method)}). array(Ref) -> array(Ref, Ref). array(Ref, DescId) -> ?HOCON(?ARRAY(?R_REF(Ref)), #{desc => ?DESC(DescId)}). select_union_member(#{<<"type">> := <<"mongodb">>} = Value) -> MongoType = maps:get(<<"mongo_type">>, Value, undefined), case MongoType of <<"single">> -> ?R_REF(mongo_single); <<"rs">> -> ?R_REF(mongo_rs); <<"sharded">> -> ?R_REF(mongo_sharded); Else -> throw(#{ reason => "unknown_mongo_type", expected => "single | rs | sharded", got => Else }) end; select_union_member(#{<<"type">> := <<"redis">>} = Value) -> RedisType = maps:get(<<"redis_type">>, Value, undefined), case RedisType of <<"single">> -> ?R_REF(redis_single); <<"cluster">> -> ?R_REF(redis_cluster); <<"sentinel">> -> ?R_REF(redis_sentinel); Else -> throw(#{ reason => "unknown_redis_type", expected => "single | cluster | sentinel", got => Else }) end; select_union_member(#{<<"type">> := <<"http">>} = Value) -> RedisType = maps:get(<<"method">>, Value, undefined), case RedisType of <<"get">> -> ?R_REF(http_get); <<"post">> -> ?R_REF(http_post); Else -> throw(#{ reason => "unknown_http_method", expected => "get | post", got => Else }) end; select_union_member(#{<<"type">> := <<"built_in_database">>}) -> ?R_REF(mnesia); select_union_member(#{<<"type">> := Type}) -> select_union_member_loop(Type, type_names()); select_union_member(_) -> throw("missing_type_field"). select_union_member_loop(TypeValue, []) -> throw(#{ reason => "unknown_authz_type", got => TypeValue }); select_union_member_loop(TypeValue, [Type | Types]) -> case TypeValue =:= atom_to_binary(Type) of true -> ?R_REF(Type); false -> select_union_member_loop(TypeValue, Types) end. authz_fields() -> Types = [?R_REF(Type) || Type <- type_names()], UnionMemberSelector = fun (all_union_members) -> Types; %% must return list ({value, Value}) -> [select_union_member(Value)] end, [ {sources, ?HOCON( ?ARRAY(?UNION(UnionMemberSelector)), #{ default => [], desc => ?DESC(sources) } )} ].