diff --git a/apps/emqx_authz/etc/emqx_authz.conf b/apps/emqx_authz/etc/emqx_authz.conf index 1111a7819..9e65517ad 100644 --- a/apps/emqx_authz/etc/emqx_authz.conf +++ b/apps/emqx_authz/etc/emqx_authz.conf @@ -55,8 +55,12 @@ authorization { # collection: mqtt_authz # selector: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] } # }, + { + type: built-in-database + } { type: file + # file is loaded into cache path: "{{ platform_etc_dir }}/acl.conf" } ] diff --git a/apps/emqx_authz/include/emqx_authz.hrl b/apps/emqx_authz/include/emqx_authz.hrl index c83dfde0d..bf8371add 100644 --- a/apps/emqx_authz/include/emqx_authz.hrl +++ b/apps/emqx_authz/include/emqx_authz.hrl @@ -29,12 +29,32 @@ (A =:= all) orelse (A =:= <<"all">>) )). +-define(ACL_SHARDED, emqx_acl_sharded). + +-define(ACL_TABLE, emqx_acl). + +%% To save some space, use an integer for label, 0 for 'all', {1, Username} and {2, ClientId}. +-define(ACL_TABLE_ALL, 0). +-define(ACL_TABLE_USERNAME, 1). +-define(ACL_TABLE_CLIENTID, 2). + +-record(emqx_acl, { + who :: ?ACL_TABLE_ALL| {?ACL_TABLE_USERNAME, binary()} | {?ACL_TABLE_CLIENTID, binary()}, + rules :: [ {permission(), action(), emqx_topic:topic()} ] + }). + -record(authz_metrics, { allow = 'client.authorize.allow', deny = 'client.authorize.deny', ignore = 'client.authorize.ignore' }). +-define(CMD_REPLCAE, replace). +-define(CMD_DELETE, delete). +-define(CMD_PREPEND, prepend). +-define(CMD_APPEND, append). +-define(CMD_MOVE, move). + -define(METRICS(Type), tl(tuple_to_list(#Type{}))). -define(METRICS(Type, K), #Type{}#Type.K). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index e7ccbe5b0..7a417925e 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -39,7 +39,6 @@ -export([post_config_update/4, pre_config_update/2]). -define(CONF_KEY_PATH, [authorization, sources]). --define(SOURCE_TYPES, [file, http, mongodb, mysql, postgresql, redis]). -spec(register_metrics() -> ok). register_metrics() -> @@ -50,228 +49,151 @@ init() -> emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), Sources = emqx:get_config(?CONF_KEY_PATH, []), ok = check_dup_types(Sources), - NSources = [init_source(Source) || Source <- Sources], + NSources = init_sources(Sources), ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1). lookup() -> {_M, _F, [A]}= find_action_in_hooks(), A. + lookup(Type) -> - try find_source_by_type(atom(Type), lookup()) of - {_, Source} -> Source - catch - error:Reason -> {error, Reason} - end. + {Source, _Front, _Rear} = take(Type), + Source. move(Type, Cmd) -> move(Type, Cmd, #{}). move(Type, #{<<"before">> := Before}, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), #{<<"before">> => atom(Before)}}, Opts); + emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), #{<<"before">> => type(Before)}}, Opts); move(Type, #{<<"after">> := After}, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), #{<<"after">> => atom(After)}}, Opts); + emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), #{<<"after">> => type(After)}}, Opts); move(Type, Position, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), Position}, Opts). + emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), Position}, Opts). update(Cmd, Sources) -> update(Cmd, Sources, #{}). -update({replace_once, Type}, Sources, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {{replace_once, atom(Type)}, Sources}, Opts); -update({delete_once, Type}, Sources, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {{delete_once, atom(Type)}, Sources}, Opts); +update({replace, Type}, Sources, Opts) -> + emqx:update_config(?CONF_KEY_PATH, {{replace, type(Type)}, Sources}, Opts); +update({delete, Type}, Sources, Opts) -> + emqx:update_config(?CONF_KEY_PATH, {{delete, type(Type)}, Sources}, Opts); update(Cmd, Sources, Opts) -> emqx:update_config(?CONF_KEY_PATH, {Cmd, Sources}, Opts). -pre_config_update({move, Type, <<"top">>}, Conf) when is_list(Conf) -> - {Index, _} = find_source_by_type(Type), - {List1, List2} = lists:split(Index, Conf), - NConf = [lists:nth(Index, Conf)] ++ lists:droplast(List1) ++ List2, - case check_dup_types(NConf) of - ok -> {ok, NConf}; - Error -> Error - end; - -pre_config_update({move, Type, <<"bottom">>}, Conf) when is_list(Conf) -> - {Index, _} = find_source_by_type(Type), - {List1, List2} = lists:split(Index, Conf), - NConf = lists:droplast(List1) ++ List2 ++ [lists:nth(Index, Conf)], - case check_dup_types(NConf) of - ok -> {ok, NConf}; - Error -> Error - end; - -pre_config_update({move, Type, #{<<"before">> := Before}}, Conf) when is_list(Conf) -> - {Index1, _} = find_source_by_type(Type), - Conf1 = lists:nth(Index1, Conf), - {Index2, _} = find_source_by_type(Before), - Conf2 = lists:nth(Index2, Conf), - - {List1, List2} = lists:split(Index2, Conf), - NConf = lists:delete(Conf1, lists:droplast(List1)) - ++ [Conf1] ++ [Conf2] - ++ lists:delete(Conf1, List2), - case check_dup_types(NConf) of - ok -> {ok, NConf}; - Error -> Error - end; - -pre_config_update({move, Type, #{<<"after">> := After}}, Conf) when is_list(Conf) -> - {Index1, _} = find_source_by_type(Type), - Conf1 = lists:nth(Index1, Conf), - {Index2, _} = find_source_by_type(After), - - {List1, List2} = lists:split(Index2, Conf), - NConf = lists:delete(Conf1, List1) - ++ [Conf1] - ++ lists:delete(Conf1, List2), - case check_dup_types(NConf) of - ok -> {ok, NConf}; - Error -> Error - end; - -pre_config_update({head, Sources}, Conf) when is_list(Sources), is_list(Conf) -> +do_update({?CMD_MOVE, Type, <<"top">>}, Conf) when is_list(Conf) -> + {Source, Front, Rear} = take(Type, Conf), + [Source | Front] ++ Rear; +do_update({?CMD_MOVE, Type, <<"bottom">>}, Conf) when is_list(Conf) -> + {Source, Front, Rear} = take(Type, Conf), + Front ++ Rear ++ [Source]; +do_update({?CMD_MOVE, Type, #{<<"before">> := Before}}, Conf) when is_list(Conf) -> + {S1, Front1, Rear1} = take(Type, Conf), + {S2, Front2, Rear2} = take(Before, Front1 ++ Rear1), + Front2 ++ [S1, S2] ++ Rear2; +do_update({?CMD_MOVE, Type, #{<<"after">> := After}}, Conf) when is_list(Conf) -> + {S1, Front1, Rear1} = take(Type, Conf), + {S2, Front2, Rear2} = take(After, Front1 ++ Rear1), + Front2 ++ [S2, S1] ++ Rear2; +do_update({?CMD_PREPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) -> NConf = Sources ++ Conf, - case check_dup_types(NConf) of - ok -> {ok, Sources ++ Conf}; - Error -> Error - end; -pre_config_update({tail, Sources}, Conf) when is_list(Sources), is_list(Conf) -> + ok = check_dup_types(NConf), + NConf; +do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) -> NConf = Conf ++ Sources, - case check_dup_types(NConf) of - ok -> {ok, Conf ++ Sources}; - Error -> Error - end; -pre_config_update({{replace_once, Type}, Source}, Conf) when is_map(Source), is_list(Conf) -> - {Index, _} = find_source_by_type(Type), - {List1, List2} = lists:split(Index, Conf), - NConf = lists:droplast(List1) ++ [Source] ++ List2, - case check_dup_types(NConf) of - ok -> {ok, NConf}; - Error -> Error - end; -pre_config_update({{delete_once, Type}, _Source}, Conf) when is_list(Conf) -> - {Index, _} = find_source_by_type(Type), - {List1, List2} = lists:split(Index, Conf), - NConf = lists:droplast(List1) ++ List2, - case check_dup_types(NConf) of - ok -> {ok, NConf}; - Error -> Error - end; -pre_config_update({_, Sources}, _Conf) when is_list(Sources)-> + ok = check_dup_types(NConf), + NConf; +do_update({{replace, Type}, Source}, Conf) when is_map(Source), is_list(Conf) -> + {_Old, Front, Rear} = take(Type, Conf), + NConf = Front ++ [Source | Rear], + ok = check_dup_types(NConf), + NConf; +do_update({{delete, Type}, _Source}, Conf) when is_list(Conf) -> + {_Old, Front, Rear} = take(Type, Conf), + NConf = Front ++ Rear, + NConf; +do_update({_, Sources}, _Conf) when is_list(Sources)-> %% overwrite the entire config! - {ok, Sources}. + Sources. + +pre_config_update(Cmd, Conf) -> + {ok, do_update(Cmd, Conf)}. + post_config_update(_, undefined, _Conf, _AppEnvs) -> ok; -post_config_update({move, Type, <<"top">>}, _NewSources, _OldSources, _AppEnvs) -> - InitedSources = lookup(), - {Index, Source} = find_source_by_type(Type, InitedSources), - {Sources1, Sources2 } = lists:split(Index, InitedSources), - Sources3 = [Source] ++ lists:droplast(Sources1) ++ Sources2, - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1), - ok = emqx_authz_cache:drain_cache(); -post_config_update({move, Type, <<"bottom">>}, _NewSources, _OldSources, _AppEnvs) -> - InitedSources = lookup(), - {Index, Source} = find_source_by_type(Type, InitedSources), - {Sources1, Sources2 } = lists:split(Index, InitedSources), - Sources3 = lists:droplast(Sources1) ++ Sources2 ++ [Source], - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1), - ok = emqx_authz_cache:drain_cache(); -post_config_update({move, Type, #{<<"before">> := Before}}, _NewSources, _OldSources, _AppEnvs) -> - InitedSources = lookup(), - {_, Source0} = find_source_by_type(Type, InitedSources), - {Index, Source1} = find_source_by_type(Before, InitedSources), - {Sources1, Sources2} = lists:split(Index, InitedSources), - Sources3 = lists:delete(Source0, lists:droplast(Sources1)) - ++ [Source0] ++ [Source1] - ++ lists:delete(Source0, Sources2), - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1), - ok = emqx_authz_cache:drain_cache(); - -post_config_update({move, Type, #{<<"after">> := After}}, _NewSources, _OldSources, _AppEnvs) -> - InitedSources = lookup(), - {_, Source} = find_source_by_type(Type, InitedSources), - {Index, _} = find_source_by_type(After, InitedSources), - {Sources1, Sources2} = lists:split(Index, InitedSources), - Sources3 = lists:delete(Source, Sources1) - ++ [Source] - ++ lists:delete(Source, Sources2), - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1), - ok = emqx_authz_cache:drain_cache(); - -post_config_update({head, Sources}, _NewSources, _OldConf, _AppEnvs) -> - InitedSources = [init_source(R) || R <- check_sources(Sources)], - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources ++ lookup()]}, -1), - ok = emqx_authz_cache:drain_cache(); - -post_config_update({tail, Sources}, _NewSources, _OldConf, _AppEnvs) -> - InitedSources = [init_source(R) || R <- check_sources(Sources)], - emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1), - ok = emqx_authz_cache:drain_cache(); - -post_config_update({{replace_once, Type}, #{type := Type} = Source}, _NewSources, _OldConf, _AppEnvs) when is_map(Source) -> - OldInitedSources = lookup(), - {Index, OldSource} = find_source_by_type(Type, OldInitedSources), - case maps:get(type, OldSource, undefined) of - undefined -> ok; - file -> ok; - _ -> - #{annotations := #{id := Id}} = OldSource, - ok = emqx_resource:remove(Id) - end, - {OldSources1, OldSources2 } = lists:split(Index, OldInitedSources), - InitedSources = [init_source(R) || R <- check_sources([Source])], - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldSources1) ++ InitedSources ++ OldSources2]}, -1), - ok = emqx_authz_cache:drain_cache(); -post_config_update({{delete_once, Type}, _Source}, _NewSources, _OldConf, _AppEnvs) -> - OldInitedSources = lookup(), - {_, OldSource} = find_source_by_type(Type, OldInitedSources), - case OldSource of - #{annotations := #{id := Id}} -> - ok = emqx_resource:remove(Id); - _ -> ok - end, - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:delete(OldSource, OldInitedSources)]}, -1), - ok = emqx_authz_cache:drain_cache(); -post_config_update(_, NewSources, _OldConf, _AppEnvs) -> - %% overwrite the entire config! - OldInitedSources = lookup(), - InitedSources = [init_source(Source) || Source <- NewSources], - ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1), - lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) -> - ok = emqx_resource:remove(Id); - (_) -> ok - end, OldInitedSources), +post_config_update(Cmd, NewSources, _OldSource, _AppEnvs) -> + ok = do_post_update(Cmd, NewSources), ok = emqx_authz_cache:drain_cache(). -%%-------------------------------------------------------------------- -%% Initialize source -%%-------------------------------------------------------------------- +do_post_update({?CMD_MOVE, _Type, _Where} = Cmd, _NewSources) -> + InitedSources = lookup(), + MovedSources = do_update(Cmd, InitedSources), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [MovedSources]}, -1), + ok = emqx_authz_cache:drain_cache(); +do_post_update({?CMD_PREPEND, Sources}, _NewSources) -> + InitedSources = init_sources(check_sources(Sources)), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources ++ lookup()]}, -1), + ok = emqx_authz_cache:drain_cache(); +do_post_update({?CMD_APPEND, Sources}, _NewSources) -> + InitedSources = init_sources(check_sources(Sources)), + emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1), + ok = emqx_authz_cache:drain_cache(); +do_post_update({{replace, Type}, #{type := Type} = Source}, _NewSources) when is_map(Source) -> + OldInitedSources = lookup(), + {OldSource, Front, Rear} = take(Type, OldInitedSources), + ok = ensure_resource_deleted(OldSource), + InitedSources = init_sources(check_sources([Source])), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Front ++ InitedSources ++ Rear]}, -1), + ok = emqx_authz_cache:drain_cache(); +do_post_update({{delete, Type}, _Source}, _NewSources) -> + OldInitedSources = lookup(), + {OldSource, Front, Rear} = take(Type, OldInitedSources), + ok = ensure_resource_deleted(OldSource), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, Front ++ Rear}, -1), + ok = emqx_authz_cache:drain_cache(); +do_post_update(_, NewSources) -> + %% overwrite the entire config! + OldInitedSources = lookup(), + InitedSources = init_sources(NewSources), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1), + lists:foreach(fun ensure_resource_deleted/1, OldInitedSources), + ok = emqx_authz_cache:drain_cache(). + +ensure_resource_deleted(#{type := file}) -> ok; +ensure_resource_deleted(#{type := 'built-in-database'}) -> ok; +ensure_resource_deleted(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove(Id). check_dup_types(Sources) -> - check_dup_types(Sources, ?SOURCE_TYPES). -check_dup_types(_Sources, []) -> ok; -check_dup_types(Sources, [T0 | Tail]) -> - case lists:foldl(fun (#{type := T1}, AccIn) -> - case T0 =:= T1 of - true -> AccIn + 1; - false -> AccIn - end; - (#{<<"type">> := T1}, AccIn) -> - case T0 =:= atom(T1) of - true -> AccIn + 1; - false -> AccIn - end - end, 0, Sources) > 1 of + check_dup_types(Sources, []). + +check_dup_types([], _Checked) -> ok; +check_dup_types([Source | Sources], Checked) -> + %% the input might be raw or type-checked result, so lookup both 'type' and <<"type">> + %% TODO: check: really? + Type = case maps:get(<<"type">>, Source, maps:get(type, Source, undefined)) of + undefined -> + %% this should never happen if the value is type checked by honcon schema + error({bad_source_input, Source}); + Type0 -> + type(Type0) + end, + case lists:member(Type, Checked) of true -> - ?LOG(error, "The type is duplicated in the Authorization source"), - {error, 'The type is duplicated in the Authorization source'}; - false -> check_dup_types(Sources, Tail) + %% we have made it clear not to support more than one authz instance for each type + error({duplicated_authz_source_type, Type}); + false -> + check_dup_types(Sources, [Type | Checked]) end. -init_source(#{enable := true, - type := file, +init_sources(Sources) -> + {Enabled, Disabled} = lists:partition(fun(#{enable := Enable}) -> Enable end, Sources), + case Disabled =/= [] of + true -> ?SLOG(info, #{msg => "disabled_sources_ignored", sources => Disabled}); + false -> ok + end, + lists:map(fun init_source/1, Enabled). + +init_source(#{type := file, path := Path } = Source) -> Rules = case file:consult(Path) of @@ -288,8 +210,7 @@ init_source(#{enable := true, error(Reason) end, Source#{annotations => #{rules => Rules}}; -init_source(#{enable := true, - type := http, +init_source(#{type := http, url := Url } = Source) -> NSource= maps:put(base_url, maps:remove(query, Url), Source), @@ -297,16 +218,17 @@ init_source(#{enable := true, {error, Reason} -> error({load_config_error, Reason}); Id -> Source#{annotations => #{id => Id}} end; -init_source(#{enable := true, - type := DB +init_source(#{type := 'built-in-database' + } = Source) -> + Source; +init_source(#{type := DB } = Source) when DB =:= redis; DB =:= mongodb -> case create_resource(Source) of {error, Reason} -> error({load_config_error, Reason}); Id -> Source#{annotations => #{id => Id}} end; -init_source(#{enable := true, - type := DB, +init_source(#{type := DB, query := SQL } = Source) when DB =:= mysql; DB =:= postgresql -> @@ -318,8 +240,7 @@ init_source(#{enable := true, query => Mod:parse_query(SQL) } } - end; -init_source(#{enable := false} = Source) ->Source. + end. %%-------------------------------------------------------------------- %% AuthZ callbacks @@ -373,13 +294,17 @@ check_sources(RawSources) -> #{sources := Sources} = hocon_schema:check_plain(Schema, Conf, #{atom_key => true}), Sources. -find_source_by_type(Type) -> find_source_by_type(Type, lookup()). -find_source_by_type(Type, Sources) -> find_source_by_type(Type, Sources, 1). -find_source_by_type(_, [], _N) -> error(not_found_source); -find_source_by_type(Type, [ Source = #{type := T} | Tail], N) -> - case Type =:= T of - true -> {N, Source}; - false -> find_source_by_type(Type, Tail, N + 1) +take(Type) -> take(Type, lookup()). + +%% Take the source of give type, the sources list is split into two parts +%% front part and rear part. +take(Type, Sources) -> + {Front, Rear} = lists:splitwith(fun(T) -> type(T) =/= type(Type) end, Sources), + case Rear =:= [] of + true -> + error({authz_source_of_type_not_found, Type}); + _ -> + {hd(Rear), Front, tl(Rear)} end. find_action_in_hooks() -> @@ -404,6 +329,8 @@ create_resource(#{type := DB} = Source) -> {error, Reason} -> {error, Reason} end. +authz_module('built-in-database') -> + emqx_authz_mnesia; authz_module(Type) -> list_to_existing_atom("emqx_authz_" ++ atom_to_list(Type)). @@ -414,9 +341,20 @@ connector_module(postgresql) -> connector_module(Type) -> list_to_existing_atom("emqx_connector_" ++ atom_to_list(Type)). -atom(B) when is_binary(B) -> - try binary_to_existing_atom(B, utf8) - catch - _ -> binary_to_atom(B) - end; -atom(A) when is_atom(A) -> A. +type(#{type := Type}) -> type(Type); +type(#{<<"type">> := Type}) -> type(Type); +type(file) -> file; +type(<<"file">>) -> file; +type(http) -> http; +type(<<"http">>) -> http; +type(mongodb) -> mongodb; +type(<<"mongodb">>) -> mongodb; +type(mysql) -> mysql; +type(<<"mysql">>) -> mysql; +type(redis) -> redis; +type(<<"redis">>) -> redis; +type(postgresql) -> postgresql; +type(<<"postgresql">>) -> postgresql; +type('built-in-database') -> 'built-in-database'; +type(<<"built-in-database">>) -> 'built-in-database'; +type(Unknown) -> error({unknown_authz_source_type, Unknown}). % should never happend if the input is type-checked by hocon schema diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl new file mode 100644 index 000000000..6ae9a7b49 --- /dev/null +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -0,0 +1,562 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_api_mnesia). + +-behavior(minirest_api). + +-include("emqx_authz.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-define(EXAMPLE_USERNAME, #{type => username, + key => user1, + rules => [ #{topic => <<"test/toopic/1">>, + permission => <<"allow">>, + action => <<"publish">> + } + , #{topic => <<"test/toopic/2">>, + permission => <<"allow">>, + action => <<"subscribe">> + } + , #{topic => <<"eq test/#">>, + permission => <<"deny">>, + action => <<"all">> + } + ] + }). +-define(EXAMPLE_CLIENTID, #{type => clientid, + key => client1, + rules => [ #{topic => <<"test/toopic/1">>, + permission => <<"allow">>, + action => <<"publish">> + } + , #{topic => <<"test/toopic/2">>, + permission => <<"allow">>, + action => <<"subscribe">> + } + , #{topic => <<"eq test/#">>, + permission => <<"deny">>, + action => <<"all">> + } + ] + }). +-define(EXAMPLE_ALL , #{type => all, + rules => [ #{topic => <<"test/toopic/1">>, + permission => <<"allow">>, + action => <<"publish">> + } + , #{topic => <<"test/toopic/2">>, + permission => <<"allow">>, + action => <<"subscribe">> + } + , #{topic => <<"eq test/#">>, + permission => <<"deny">>, + action => <<"all">> + } + ] + }). + +-export([ api_spec/0 + , purge/2 + , records/2 + , record/2 + ]). + +api_spec() -> + {[ purge_api() + , records_api() + , record_api() + ], definitions()}. + +definitions() -> + Rules = #{ + type => array, + items => #{ + type => object, + required => [topic, permission, action], + properties => #{ + topic => #{ + type => string, + example => <<"test/topic/1">> + }, + permission => #{ + type => string, + enum => [<<"allow">>, <<"deny">>], + example => <<"allow">> + }, + action => #{ + type => string, + enum => [<<"publish">>, <<"subscribe">>, <<"all">>], + example => <<"publish">> + } + } + } + }, + Record = #{ + oneOf => [ #{type => object, + required => [username, rules], + properties => #{ + username => #{ + type => string, + example => <<"username">> + }, + rules => minirest:ref(<<"rules">>) + } + } + , #{type => object, + required => [clientid, rules], + properties => #{ + username => #{ + type => string, + example => <<"clientid">> + }, + rules => minirest:ref(<<"rules">>) + } + } + , #{type => object, + required => [rules], + properties => #{ + rules => minirest:ref(<<"rules">>) + } + } + ] + }, + [ #{<<"rules">> => Rules} + , #{<<"record">> => Record} + ]. + +purge_api() -> + Metadata = #{ + delete => #{ + description => "Purge all records", + responses => #{ + <<"204">> => #{description => <<"No Content">>}, + <<"400">> => emqx_mgmt_util:bad_request() + } + } + }, + {"/authorization/sources/built-in-database/purge-all", Metadata, purge}. + +records_api() -> + Metadata = #{ + get => #{ + description => "List records", + parameters => [ + #{ + name => type, + in => path, + schema => #{ + type => string, + enum => [<<"username">>, <<"clientid">>, <<"all">>] + }, + required => true + }, + #{ + name => page, + in => query, + required => false, + description => <<"Page Index">>, + schema => #{type => integer} + }, + #{ + name => limit, + in => query, + required => false, + description => <<"Page limit">>, + schema => #{type => integer} + } + ], + responses => #{ + <<"200">> => #{ + description => <<"OK">>, + content => #{ + 'application/json' => #{ + schema => #{ + type => array, + items => minirest:ref(<<"record">>) + }, + examples => #{ + username => #{ + summary => <<"Username">>, + value => jsx:encode([?EXAMPLE_USERNAME]) + }, + clientid => #{ + summary => <<"Clientid">>, + value => jsx:encode([?EXAMPLE_CLIENTID]) + }, + all => #{ + summary => <<"All">>, + value => jsx:encode([?EXAMPLE_ALL]) + } + } + } + } + } + } + }, + post => #{ + description => "Add new records", + parameters => [ + #{ + name => type, + in => path, + schema => #{ + type => string, + enum => [<<"username">>, <<"clientid">>] + }, + required => true + } + ], + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => #{ + type => array, + items => minirest:ref(<<"record">>) + }, + examples => #{ + username => #{ + summary => <<"Username">>, + value => jsx:encode([?EXAMPLE_USERNAME]) + }, + clientid => #{ + summary => <<"Clientid">>, + value => jsx:encode([?EXAMPLE_CLIENTID]) + } + } + } + } + }, + responses => #{ + <<"204">> => #{description => <<"Created">>}, + <<"400">> => emqx_mgmt_util:bad_request() + } + }, + put => #{ + description => "Set the list of rules for all", + parameters => [ + #{ + name => type, + in => path, + schema => #{ + type => string, + enum => [<<"all">>] + }, + required => true + } + ], + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"record">>), + examples => #{ + all => #{ + summary => <<"All">>, + value => jsx:encode(?EXAMPLE_ALL) + } + } + } + } + }, + responses => #{ + <<"204">> => #{description => <<"Created">>}, + <<"400">> => emqx_mgmt_util:bad_request() + } + } + }, + {"/authorization/sources/built-in-database/:type", Metadata, records}. + +record_api() -> + Metadata = #{ + get => #{ + description => "Get record info", + parameters => [ + #{ + name => type, + in => path, + schema => #{ + type => string, + enum => [<<"username">>, <<"clientid">>] + }, + required => true + }, + #{ + name => key, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + responses => #{ + <<"200">> => #{ + description => <<"OK">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"record">>), + examples => #{ + username => #{ + summary => <<"Username">>, + value => jsx:encode(?EXAMPLE_USERNAME) + }, + clientid => #{ + summary => <<"Clientid">>, + value => jsx:encode(?EXAMPLE_CLIENTID) + }, + all => #{ + summary => <<"All">>, + value => jsx:encode(?EXAMPLE_ALL) + } + } + } + } + }, + <<"404">> => emqx_mgmt_util:bad_request(<<"Not Found">>) + } + }, + put => #{ + description => "Update one record", + parameters => [ + #{ + name => type, + in => path, + schema => #{ + type => string, + enum => [<<"username">>, <<"clientid">>] + }, + required => true + }, + #{ + name => key, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"record">>), + examples => #{ + username => #{ + summary => <<"Username">>, + value => jsx:encode(?EXAMPLE_USERNAME) + }, + clientid => #{ + summary => <<"Clientid">>, + value => jsx:encode(?EXAMPLE_CLIENTID) + } + } + } + } + }, + responses => #{ + <<"204">> => #{description => <<"Updated">>}, + <<"400">> => emqx_mgmt_util:bad_request() + } + }, + delete => #{ + description => "Delete one record", + parameters => [ + #{ + name => type, + in => path, + schema => #{ + type => string, + enum => [<<"username">>, <<"clientid">>] + }, + required => true + }, + #{ + name => key, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + responses => #{ + <<"204">> => #{description => <<"No Content">>}, + <<"400">> => emqx_mgmt_util:bad_request() + } + } + }, + {"/authorization/sources/built-in-database/:type/:key", Metadata, record}. + +purge(delete, _) -> + case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of + [#{enable := false}] -> + ok = lists:foreach(fun(Key) -> + ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key) + end, mnesia:dirty_all_keys(?ACL_TABLE)), + {204}; + _ -> + {400, #{code => <<"BAD_REQUEST">>, + message => <<"'built-in-database' type source must be disabled before purge.">>}} + end. + +records(get, #{bindings := #{type := <<"username">>}, + query_string := Qs + }) -> + MatchSpec = ets:fun2ms( + fun({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}, Rules}) -> + [{username, Username}, {rules, Rules}] + end), + Format = fun ([{username, Username}, {rules, Rules}]) -> + #{username => Username, + rules => [ #{topic => Topic, + action => Action, + permission => Permission + } || {Permission, Action, Topic} <- Rules] + } + end, + case Qs of + #{<<"limit">> := _, <<"page">> := _} = Page -> + {200, emqx_mgmt_api:paginate(?ACL_TABLE, MatchSpec, Page, Format)}; + #{<<"limit">> := Limit} -> + case ets:select(?ACL_TABLE, MatchSpec, binary_to_integer(Limit)) of + {Rows, _Continuation} -> {200, [Format(Row) || Row <- Rows ]}; + '$end_of_table' -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}} + end; + _ -> + {200, [Format(Row) || Row <- ets:select(?ACL_TABLE, MatchSpec)]} + end; + +records(get, #{bindings := #{type := <<"clientid">>}, + query_string := Qs + }) -> + MatchSpec = ets:fun2ms( + fun({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}, Rules}) -> + [{clientid, Clientid}, {rules, Rules}] + end), + Format = fun ([{clientid, Clientid}, {rules, Rules}]) -> + #{clientid => Clientid, + rules => [ #{topic => Topic, + action => Action, + permission => Permission + } || {Permission, Action, Topic} <- Rules] + } + end, + case Qs of + #{<<"limit">> := _, <<"page">> := _} = Page -> + {200, emqx_mgmt_api:paginate(?ACL_TABLE, MatchSpec, Page, Format)}; + #{<<"limit">> := Limit} -> + case ets:select(?ACL_TABLE, MatchSpec, binary_to_integer(Limit)) of + {Rows, _Continuation} -> {200, [Format(Row) || Row <- Rows ]}; + '$end_of_table' -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}} + end; + _ -> + {200, [Format(Row) || Row <- ets:select(?ACL_TABLE, MatchSpec)]} + end; +records(get, #{bindings := #{type := <<"all">>}}) -> + MatchSpec = ets:fun2ms( + fun({?ACL_TABLE, ?ACL_TABLE_ALL, Rules}) -> + [{rules, Rules}] + end), + {200, [ #{rules => [ #{topic => Topic, + action => Action, + permission => Permission + } || {Permission, Action, Topic} <- Rules] + } || [{rules, Rules}] <- ets:select(?ACL_TABLE, MatchSpec)]}; +records(post, #{bindings := #{type := <<"username">>}, + body := Body}) when is_list(Body) -> + lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> + ekka_mnesia:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }) + end, Body), + {204}; +records(post, #{bindings := #{type := <<"clientid">>}, + body := Body}) when is_list(Body) -> + lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) -> + ekka_mnesia:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }) + end, Body), + {204}; +records(put, #{bindings := #{type := <<"all">>}, + body := #{<<"rules">> := Rules}}) -> + ekka_mnesia:dirty_write(#emqx_acl{ + who = ?ACL_TABLE_ALL, + rules = format_rules(Rules) + }), + {204}. + +record(get, #{bindings := #{type := <<"username">>, key := Key}}) -> + case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_USERNAME, Key}) of + [] -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}}; + [#emqx_acl{who = {?ACL_TABLE_USERNAME, Username}, rules = Rules}] -> + {200, #{username => Username, + rules => [ #{topic => Topic, + action => Action, + permission => Permission + } || {Permission, Action, Topic} <- Rules]} + } + end; +record(get, #{bindings := #{type := <<"clientid">>, key := Key}}) -> + case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_CLIENTID, Key}) of + [] -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}}; + [#emqx_acl{who = {?ACL_TABLE_CLIENTID, Clientid}, rules = Rules}] -> + {200, #{clientid => Clientid, + rules => [ #{topic => Topic, + action => Action, + permission => Permission + } || {Permission, Action, Topic} <- Rules]} + } + end; +record(put, #{bindings := #{type := <<"username">>, key := Username}, + body := #{<<"username">> := Username, <<"rules">> := Rules}}) -> + ekka_mnesia:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }), + {204}; +record(put, #{bindings := #{type := <<"clientid">>, key := Clientid}, + body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) -> + ekka_mnesia:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }), + {204}; +record(delete, #{bindings := #{type := <<"username">>, key := Key}}) -> + ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Key}}), + {204}; +record(delete, #{bindings := #{type := <<"clientid">>, key := Key}}) -> + ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Key}}), + {204}. + +format_rules(Rules) when is_list(Rules) -> + lists:foldl(fun(#{<<"topic">> := Topic, + <<"action">> := Action, + <<"permission">> := Permission + }, AccIn) when ?PUBSUB(Action) + andalso ?ALLOW_DENY(Permission) -> + AccIn ++ [{ atom(Permission), atom(Action), Topic }] + end, [], Rules). + +atom(B) when is_binary(B) -> + try binary_to_existing_atom(B, utf8) + catch + _ -> binary_to_atom(B) + end; +atom(A) when is_atom(A) -> A. diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 37df924be..87e5cb71a 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -41,6 +41,10 @@ ] }). +-export([ get_raw_sources/0 + , get_raw_source/1 + ]). + -export([ api_spec/0 , sources/2 , source/2 @@ -147,7 +151,15 @@ source_api() -> name => type, in => path, schema => #{ - type => string + type => string, + enum => [ <<"file">> + , <<"http">> + , <<"mongodb">> + , <<"mysql">> + , <<"postgresql">> + , <<"redis">> + , <<"built-in-database">> + ] }, required => true } @@ -181,7 +193,15 @@ source_api() -> name => type, in => path, schema => #{ - type => string + type => string, + enum => [ <<"file">> + , <<"http">> + , <<"mongodb">> + , <<"mysql">> + , <<"postgresql">> + , <<"redis">> + , <<"built-in-database">> + ] }, required => true } @@ -216,7 +236,15 @@ source_api() -> name => type, in => path, schema => #{ - type => string + type => string, + enum => [ <<"file">> + , <<"http">> + , <<"mongodb">> + , <<"mysql">> + , <<"postgresql">> + , <<"redis">> + , <<"built-in-database">> + ] }, required => true } @@ -238,7 +266,15 @@ move_source_api() -> name => type, in => path, schema => #{ - type => string + type => string, + enum => [ <<"file">> + , <<"http">> + , <<"mongodb">> + , <<"mysql">> + , <<"postgresql">> + , <<"redis">> + , <<"built-in-database">> + ] }, required => true } @@ -321,7 +357,7 @@ sources(put, #{body := Body}) when is_list(Body) -> _ -> write_cert(Source) end end || Source <- Body], - update_config(replace, NBody). + update_config(?CMD_REPLCAE, NBody). source(get, #{bindings := #{type := Type}}) -> case get_raw_source(Type) of @@ -343,16 +379,16 @@ source(get, #{bindings := #{type := Type}}) -> end; source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable}}) -> {ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""), Rules), - case emqx_authz:update({replace_once, file}, #{type => file, enable => Enable, path => Filename}) of + case emqx_authz:update({?CMD_REPLCAE, file}, #{type => file, enable => Enable, path => Filename}) of {ok, _} -> {204}; {error, Reason} -> {400, #{code => <<"BAD_REQUEST">>, message => bin(Reason)}} end; source(put, #{bindings := #{type := Type}, body := Body}) when is_map(Body) -> - update_config({replace_once, Type}, write_cert(Body)); + update_config({?CMD_REPLCAE, Type}, write_cert(Body)); source(delete, #{bindings := #{type := Type}}) -> - update_config({delete_once, Type}, #{}). + update_config({?CMD_DELETE, Type}, #{}). move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Position}}) -> case emqx_authz:move(Type, Position) of @@ -374,7 +410,7 @@ get_raw_sources() -> get_raw_source(Type) -> lists:filter(fun (#{type := T}) -> - bin(T) =:= Type + erlang:atom_to_binary(T) =:= Type end, get_raw_sources()). update_config(Cmd, Sources) -> @@ -382,13 +418,13 @@ update_config(Cmd, Sources) -> {ok, _} -> {204}; {error, {pre_config_update, emqx_authz, Reason}} -> {400, #{code => <<"BAD_REQUEST">>, - message => bin(Reason)}}; + message => erlang:atom_to_binary(Reason)}}; {error, {post_config_update, emqx_authz, Reason}} -> {400, #{code => <<"BAD_REQUEST">>, - message => bin(Reason)}}; + message => erlang:atom_to_binary(Reason)}}; {error, Reason} -> {400, #{code => <<"BAD_REQUEST">>, - message => bin(Reason)}} + message => erlang:atom_to_binary(Reason)}} end. read_cert(#{ssl := #{enable := true} = SSL} = Source) -> diff --git a/apps/emqx_authz/src/emqx_authz_app.erl b/apps/emqx_authz/src/emqx_authz_app.erl index 460d7cbf9..f868ac342 100644 --- a/apps/emqx_authz/src/emqx_authz_app.erl +++ b/apps/emqx_authz/src/emqx_authz_app.erl @@ -7,9 +7,12 @@ -behaviour(application). +-include("emqx_authz.hrl"). + -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + ok = ekka_rlog:wait_for_shards([?ACL_SHARDED], infinity), {ok, Sup} = emqx_authz_sup:start_link(), ok = emqx_authz:init(), {ok, Sup}. diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl new file mode 100644 index 000000000..ab755403e --- /dev/null +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -0,0 +1,76 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_mnesia). + +-include("emqx_authz.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% AuthZ Callbacks +-export([ mnesia/1 + , authorize/4 + , description/0 + ]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-spec(mnesia(boot | copy) -> ok). +mnesia(boot) -> + ok = ekka_mnesia:create_table(?ACL_TABLE, [ + {type, ordered_set}, + {rlog_shard, ?ACL_SHARDED}, + {disc_copies, [node()]}, + {attributes, record_info(fields, ?ACL_TABLE)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies). + +description() -> + "AuthZ with Mnesia". + +authorize(#{username := Username, + clientid := Clientid + } = Client, PubSub, Topic, #{type := 'built-in-database'}) -> + + Rules = case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}) of + [] -> []; + [#emqx_acl{rules = Rules0}] when is_list(Rules0) -> Rules0 + end + ++ case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}) of + [] -> []; + [#emqx_acl{rules = Rules1}] when is_list(Rules1) -> Rules1 + end + ++ case mnesia:dirty_read(?ACL_TABLE, ?ACL_TABLE_ALL) of + [] -> []; + [#emqx_acl{rules = Rules2}] when is_list(Rules2) -> Rules2 + end, + do_authorize(Client, PubSub, Topic, Rules). + +do_authorize(_Client, _PubSub, _Topic, []) -> nomatch; +do_authorize(Client, PubSub, Topic, [ {Permission, Action, TopicFilter} | Tail]) -> + case emqx_authz_rule:match(Client, PubSub, Topic, + emqx_authz_rule:compile({Permission, all, Action, [TopicFilter]}) + ) of + {matched, Permission} -> {matched, Permission}; + nomatch -> do_authorize(Client, PubSub, Topic, Tail) + end. diff --git a/apps/emqx_authz/src/emqx_authz_mongodb.erl b/apps/emqx_authz/src/emqx_authz_mongodb.erl index 6c0fb126a..ca65e6f53 100644 --- a/apps/emqx_authz/src/emqx_authz_mongodb.erl +++ b/apps/emqx_authz/src/emqx_authz_mongodb.erl @@ -58,9 +58,9 @@ do_authorize(Client, PubSub, Topic, [Rule | Tail]) -> end. replvar(Selector, #{clientid := Clientid, - username := Username, - peerhost := IpAddress - }) -> + username := Username, + peerhost := IpAddress + }) -> Fun = fun _Fun(K, V, AccIn) when is_map(V) -> maps:put(K, maps:fold(_Fun, AccIn, V), AccIn); _Fun(K, V, AccIn) when is_list(V) -> diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index ac8f04f32..6ad206d90 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -69,7 +69,6 @@ do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) -> nomatch -> do_authorize(Client, PubSub, Topic, Columns, Tail) end. - format_result(Columns, Row) -> Permission = lists:nth(index(<<"permission">>, Columns), Row), Action = lists:nth(index(<<"action">>, Columns), Row), diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 2838dcb2e..af1c59fdc 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -31,6 +31,7 @@ fields("authorization") -> [ hoconsc:ref(?MODULE, file) , hoconsc:ref(?MODULE, http_get) , hoconsc:ref(?MODULE, http_post) + , hoconsc:ref(?MODULE, mnesia) , hoconsc:ref(?MODULE, mongo_single) , hoconsc:ref(?MODULE, mongo_rs) , hoconsc:ref(?MODULE, mongo_sharded) @@ -51,7 +52,8 @@ fields(file) -> true -> ok; _ -> {error, "File does not exist"} end - end + end, + desc => "Path to the file which contains the ACL rules." }} ]; fields(http_get) -> @@ -115,6 +117,11 @@ fields(http_post) -> } } ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)); +fields(mnesia) -> + [ {type, #{type => 'built-in-database'}} + , {enable, #{type => boolean(), + default => true}} + ]; fields(mongo_single) -> [ {collection, #{type => atom()}} , {selector, #{type => map()}} diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index bfdc131a0..16bf39d49 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -50,14 +50,14 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx_authz:update(?CMD_REPLCAE, []), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), meck:unload(emqx_resource), meck:unload(emqx_schema), ok. init_per_testcase(_, Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx_authz:update(?CMD_REPLCAE, []), Config. -define(SOURCE1, #{<<"type">> => <<"http">>, @@ -120,12 +120,12 @@ init_per_testcase(_, Config) -> %%------------------------------------------------------------------------------ t_update_source(_) -> - {ok, _} = emqx_authz:update(replace, [?SOURCE3]), - {ok, _} = emqx_authz:update(head, [?SOURCE2]), - {ok, _} = emqx_authz:update(head, [?SOURCE1]), - {ok, _} = emqx_authz:update(tail, [?SOURCE4]), - {ok, _} = emqx_authz:update(tail, [?SOURCE5]), - {ok, _} = emqx_authz:update(tail, [?SOURCE6]), + {ok, _} = emqx_authz:update(?CMD_REPLCAE, [?SOURCE3]), + {ok, _} = emqx_authz:update(?CMD_PREPEND, [?SOURCE2]), + {ok, _} = emqx_authz:update(?CMD_PREPEND, [?SOURCE1]), + {ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE4]), + {ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE5]), + {ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE6]), ?assertMatch([ #{type := http, enable := true} , #{type := mongodb, enable := true} @@ -135,12 +135,12 @@ t_update_source(_) -> , #{type := file, enable := true} ], emqx:get_config([authorization, sources], [])), - {ok, _} = emqx_authz:update({replace_once, http}, ?SOURCE1#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({replace_once, mongodb}, ?SOURCE2#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({replace_once, mysql}, ?SOURCE3#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({replace_once, postgresql}, ?SOURCE4#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({replace_once, redis}, ?SOURCE5#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({replace_once, file}, ?SOURCE6#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLCAE, http}, ?SOURCE1#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLCAE, mongodb}, ?SOURCE2#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLCAE, mysql}, ?SOURCE3#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLCAE, postgresql}, ?SOURCE4#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLCAE, redis}, ?SOURCE5#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLCAE, file}, ?SOURCE6#{<<"enable">> := false}), ?assertMatch([ #{type := http, enable := false} , #{type := mongodb, enable := false} @@ -150,10 +150,10 @@ t_update_source(_) -> , #{type := file, enable := false} ], emqx:get_config([authorization, sources], [])), - {ok, _} = emqx_authz:update(replace, []). + {ok, _} = emqx_authz:update(?CMD_REPLCAE, []). t_move_source(_) -> - {ok, _} = emqx_authz:update(replace, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]), + {ok, _} = emqx_authz:update(?CMD_REPLCAE, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]), ?assertMatch([ #{type := http} , #{type := mongodb} , #{type := mysql} diff --git a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl new file mode 100644 index 000000000..1ea942c10 --- /dev/null +++ b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl @@ -0,0 +1,224 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_api_mnesia_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("emqx_authz.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(CONF_DEFAULT, <<"authorization: {sources: []}">>). + +-import(emqx_ct_http, [ request_api/3 + , request_api/5 + , get_http_data/1 + , create_default_app/0 + , delete_default_app/0 + , default_auth_header/0 + , auth_header/2 + ]). + +-define(HOST, "http://127.0.0.1:18083/"). +-define(API_VERSION, "v5"). +-define(BASE_PATH, "api"). + +-define(EXAMPLE_USERNAME, #{username => user1, + rules => [ #{topic => <<"test/toopic/1">>, + permission => <<"allow">>, + action => <<"publish">> + } + , #{topic => <<"test/toopic/2">>, + permission => <<"allow">>, + action => <<"subscribe">> + } + , #{topic => <<"eq test/#">>, + permission => <<"deny">>, + action => <<"all">> + } + ] + }). +-define(EXAMPLE_CLIENTID, #{clientid => client1, + rules => [ #{topic => <<"test/toopic/1">>, + permission => <<"allow">>, + action => <<"publish">> + } + , #{topic => <<"test/toopic/2">>, + permission => <<"allow">>, + action => <<"subscribe">> + } + , #{topic => <<"eq test/#">>, + permission => <<"deny">>, + action => <<"all">> + } + ] + }). +-define(EXAMPLE_ALL , #{rules => [ #{topic => <<"test/toopic/1">>, + permission => <<"allow">>, + action => <<"publish">> + } + , #{topic => <<"test/toopic/2">>, + permission => <<"allow">>, + action => <<"subscribe">> + } + , #{topic => <<"eq test/#">>, + permission => <<"deny">>, + action => <<"all">> + } + ] + }). + +all() -> + []. %% Todo: Waiting for @terry-xiaoyu to fix the config_not_found error + % emqx_ct:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_schema, fields, fun("authorization") -> + meck:passthrough(["authorization"]) ++ + emqx_authz_schema:fields("authorization"); + (F) -> meck:passthrough([F]) + end), + + ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), + + ok = emqx_ct_helpers:start_apps([emqx_authz, emqx_dashboard], fun set_special_configs/1), + {ok, _} = emqx:update_config([authorization, cache, enable], false), + {ok, _} = emqx:update_config([authorization, no_match], deny), + + Config. + +end_per_suite(_Config) -> + {ok, _} = emqx_authz:update(replace, []), + emqx_ct_helpers:stop_apps([emqx_authz, emqx_dashboard]), + meck:unload(emqx_schema), + ok. + +set_special_configs(emqx_dashboard) -> + Config = #{ + default_username => <<"admin">>, + default_password => <<"public">>, + listeners => [#{ + protocol => http, + port => 18083 + }] + }, + emqx_config:put([emqx_dashboard], Config), + ok; +set_special_configs(emqx_authz) -> + emqx_config:put([authorization], #{sources => [#{type => 'built-in-database', + enable => true} + ]}), + ok; +set_special_configs(_App) -> + ok. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_api(_) -> + {ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "username"]), [?EXAMPLE_USERNAME]), + {ok, 200, Request1} = request(get, uri(["authorization", "sources", "built-in-database", "username"]), []), + {ok, 200, Request2} = request(get, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []), + [#{<<"username">> := <<"user1">>, <<"rules">> := Rules1}] = jsx:decode(Request1), + #{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2), + ?assertEqual(3, length(Rules1)), + + {ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "username", "user1"]), ?EXAMPLE_USERNAME#{rules => []}), + {ok, 200, Request3} = request(get, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []), + #{<<"username">> := <<"user1">>, <<"rules">> := Rules2} = jsx:decode(Request3), + ?assertEqual(0, length(Rules2)), + + {ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []), + {ok, 404, _} = request(get, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []), + + {ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "clientid"]), [?EXAMPLE_CLIENTID]), + {ok, 200, Request4} = request(get, uri(["authorization", "sources", "built-in-database", "clientid"]), []), + {ok, 200, Request5} = request(get, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []), + [#{<<"clientid">> := <<"client1">>, <<"rules">> := Rules3}] = jsx:decode(Request4), + #{<<"clientid">> := <<"client1">>, <<"rules">> := Rules3} = jsx:decode(Request5), + ?assertEqual(3, length(Rules3)), + + {ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), ?EXAMPLE_CLIENTID#{rules => []}), + {ok, 200, Request6} = request(get, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []), + #{<<"clientid">> := <<"client1">>, <<"rules">> := Rules4} = jsx:decode(Request6), + ?assertEqual(0, length(Rules4)), + + {ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []), + {ok, 404, _} = request(get, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []), + + {ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "all"]), ?EXAMPLE_ALL), + {ok, 200, Request7} = request(get, uri(["authorization", "sources", "built-in-database", "all"]), []), + [#{<<"rules">> := Rules5}] = jsx:decode(Request7), + ?assertEqual(3, length(Rules5)), + + {ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "all"]), ?EXAMPLE_ALL#{rules => []}), + {ok, 200, Request8} = request(get, uri(["authorization", "sources", "built-in-database", "all"]), []), + [#{<<"rules">> := Rules6}] = jsx:decode(Request8), + ?assertEqual(0, length(Rules6)), + + {ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "username"]), [ #{username => N, rules => []} || N <- lists:seq(1, 20) ]), + {ok, 200, Request9} = request(get, uri(["authorization", "sources", "built-in-database", "username?page=2&limit=5"]), []), + #{<<"data">> := Data1} = jsx:decode(Request9), + ?assertEqual(5, length(Data1)), + + {ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "clientid"]), [ #{clientid => N, rules => []} || N <- lists:seq(1, 20) ]), + {ok, 200, Request10} = request(get, uri(["authorization", "sources", "built-in-database", "clientid?limit=5"]), []), + ?assertEqual(5, length(jsx:decode(Request10))), + + {ok, 400, _} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []), + {ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database"]), #{<<"enable">> => false}), + {ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []), + ?assertEqual([], mnesia:dirty_all_keys(?ACL_TABLE)), + + ok. + +%%-------------------------------------------------------------------- +%% HTTP Request +%%-------------------------------------------------------------------- + +request(Method, Url, Body) -> + Request = case Body of + [] -> {Url, [auth_header_()]}; + _ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)} + end, + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } -> + {ok, Code, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +uri() -> uri([]). +uri(Parts) when is_list(Parts) -> + NParts = [E || E <- Parts], + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + +get_sources(Result) -> jsx:decode(Result). + +auth_header_() -> + Username = <<"admin">>, + Password = <<"public">>, + {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), + {"Authorization", "Bearer " ++ binary_to_list(Token)}. diff --git a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl new file mode 100644 index 000000000..8b221d3e7 --- /dev/null +++ b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl @@ -0,0 +1,109 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_mnesia_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("emqx_authz.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(CONF_DEFAULT, <<"authorization: {sources: []}">>). + +all() -> + emqx_ct:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_schema, fields, fun("authorization") -> + meck:passthrough(["authorization"]) ++ + emqx_authz_schema:fields("authorization"); + (F) -> meck:passthrough([F]) + end), + + ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), + ok = emqx_ct_helpers:start_apps([emqx_authz]), + + {ok, _} = emqx:update_config([authorization, cache, enable], false), + {ok, _} = emqx:update_config([authorization, no_match], deny), + Rules = [#{<<"type">> => <<"built-in-database">>}], + {ok, _} = emqx_authz:update(replace, Rules), + Config. + +end_per_suite(_Config) -> + {ok, _} = emqx_authz:update(replace, []), + emqx_ct_helpers:stop_apps([emqx_authz]), + meck:unload(emqx_schema), + ok. + +init_per_testcase(t_authz, Config) -> + mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, + rules = [{allow, publish, <<"test/%u">>}, + {allow, subscribe, <<"eq #">>} + ] + }]), + mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, + rules = [{allow, publish, <<"test/%c">>}, + {deny, subscribe, <<"eq #">>} + ] + }]), + mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, + rules = [{deny, all, <<"#">>}] + }]), + Config; +init_per_testcase(_, Config) -> Config. + +end_per_testcase(t_authz, Config) -> + [ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], + Config; +end_per_testcase(_, Config) -> Config. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_authz(_) -> + ClientInfo1 = #{clientid => <<"test">>, + username => <<"test">>, + peerhost => {127,0,0,1}, + listener => {tcp, default} + }, + ClientInfo2 = #{clientid => <<"fake_clientid">>, + username => <<"test_username">>, + peerhost => {127,0,0,1}, + listener => {tcp, default} + }, + ClientInfo3 = #{clientid => <<"test_clientid">>, + username => <<"fake_username">>, + peerhost => {127,0,0,1}, + listener => {tcp, default} + }, + + ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), + ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), + + ?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), + ?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, subscribe, <<"#">>)), + + ?assertEqual(allow, emqx_access_control:authorize(ClientInfo3, publish, <<"test/test_clientid">>)), + ?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)), + + ok. + diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 8cf2fa1cb..e13f52691 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -18,7 +18,9 @@ -include_lib("stdlib/include/qlc.hrl"). --export([paginate/3]). +-export([ paginate/3 + , paginate/4 + ]). %% first_next query APIs -export([ params2qs/2 @@ -47,6 +49,23 @@ paginate(Tables, Params, RowFun) -> #{meta => #{page => Page, limit => Limit, count => Count}, data => [RowFun(Row) || Row <- Rows]}. +paginate(Tables, MatchSpec, Params, RowFun) -> + Qh = query_handle(Tables, MatchSpec), + Count = count(Tables, MatchSpec), + Page = b2i(page(Params)), + Limit = b2i(limit(Params)), + Cursor = qlc:cursor(Qh), + case Page > 1 of + true -> + _ = qlc:next_answers(Cursor, (Page - 1) * Limit), + ok; + false -> ok + end, + Rows = qlc:next_answers(Cursor, Limit), + qlc:delete_cursor(Cursor), + #{meta => #{page => Page, limit => Limit, count => Count}, + data => [RowFun(Row) || Row <- Rows]}. + query_handle(Table) when is_atom(Table) -> qlc:q([R|| R <- ets:table(Table)]); query_handle([Table]) when is_atom(Table) -> @@ -54,6 +73,16 @@ query_handle([Table]) when is_atom(Table) -> query_handle(Tables) -> qlc:append([qlc:q([E || E <- ets:table(T)]) || T <- Tables]). +query_handle(Table, MatchSpec) when is_atom(Table) -> + Options = {traverse, {select, MatchSpec}}, + qlc:q([R|| R <- ets:table(Table, Options)]); +query_handle([Table], MatchSpec) when is_atom(Table) -> + Options = {traverse, {select, MatchSpec}}, + qlc:q([R|| R <- ets:table(Table, Options)]); +query_handle(Tables, MatchSpec) -> + Options = {traverse, {select, MatchSpec}}, + qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]). + count(Table) when is_atom(Table) -> ets:info(Table, size); count([Table]) when is_atom(Table) -> @@ -61,8 +90,16 @@ count([Table]) when is_atom(Table) -> count(Tables) -> lists:sum([count(T) || T <- Tables]). -count(Table, Nodes) -> - lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]). +count(Table, MatchSpec) when is_atom(Table) -> + [{MatchPattern, Where, _Re}] = MatchSpec, + NMatchSpec = [{MatchPattern, Where, [true]}], + ets:select_count(Table, NMatchSpec); +count([Table], MatchSpec) when is_atom(Table) -> + [{MatchPattern, Where, _Re}] = MatchSpec, + NMatchSpec = [{MatchPattern, Where, [true]}], + ets:select_count(Table, NMatchSpec); +count(Tables, MatchSpec) -> + lists:sum([count(T, MatchSpec) || T <- Tables]). page(Params) when is_map(Params) -> maps:get(<<"page">>, Params, 1); @@ -122,7 +159,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) -> Rows = do_cluster_query(Nodes, Tab, Qs, QueryFun, Start, Limit+1, []), Meta = #{page => Page, limit => Limit}, NMeta = case CodCnt =:= 0 of - true -> Meta#{count => count(Tab, Nodes)}; + true -> Meta#{count => lists:sum([rpc_call(Node, ets, info, [Tab, size], 5000) || Node <- Nodes])}; _ -> Meta#{count => length(Rows)} end, #{meta => NMeta, data => lists:sublist(Rows, Limit)}.