Merge pull request #5785 from Rory-Z/feat/add-authz-mnesia

add authz mnesia
This commit is contained in:
Zaiming (Stone) Shi 2021-09-26 13:46:51 +02:00 committed by GitHub
commit 31d6c34a04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1263 additions and 248 deletions

View File

@ -55,8 +55,12 @@ authorization {
# collection: mqtt_authz
# selector: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
# },
{
type: built-in-database
}
{
type: file
# file is loaded into cache
path: "{{ platform_etc_dir }}/acl.conf"
}
]

View File

@ -29,12 +29,32 @@
(A =:= all) orelse (A =:= <<"all">>)
)).
-define(ACL_SHARDED, emqx_acl_sharded).
-define(ACL_TABLE, emqx_acl).
%% To save some space, use an integer for label, 0 for 'all', {1, Username} and {2, ClientId}.
-define(ACL_TABLE_ALL, 0).
-define(ACL_TABLE_USERNAME, 1).
-define(ACL_TABLE_CLIENTID, 2).
-record(emqx_acl, {
who :: ?ACL_TABLE_ALL| {?ACL_TABLE_USERNAME, binary()} | {?ACL_TABLE_CLIENTID, binary()},
rules :: [ {permission(), action(), emqx_topic:topic()} ]
}).
-record(authz_metrics, {
allow = 'client.authorize.allow',
deny = 'client.authorize.deny',
ignore = 'client.authorize.ignore'
}).
-define(CMD_REPLCAE, replace).
-define(CMD_DELETE, delete).
-define(CMD_PREPEND, prepend).
-define(CMD_APPEND, append).
-define(CMD_MOVE, move).
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
-define(METRICS(Type, K), #Type{}#Type.K).

View File

@ -39,7 +39,6 @@
-export([post_config_update/4, pre_config_update/2]).
-define(CONF_KEY_PATH, [authorization, sources]).
-define(SOURCE_TYPES, [file, http, mongodb, mysql, postgresql, redis]).
-spec(register_metrics() -> ok).
register_metrics() ->
@ -50,228 +49,151 @@ init() ->
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
Sources = emqx:get_config(?CONF_KEY_PATH, []),
ok = check_dup_types(Sources),
NSources = [init_source(Source) || Source <- Sources],
NSources = init_sources(Sources),
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1).
lookup() ->
{_M, _F, [A]}= find_action_in_hooks(),
A.
lookup(Type) ->
try find_source_by_type(atom(Type), lookup()) of
{_, Source} -> Source
catch
error:Reason -> {error, Reason}
end.
{Source, _Front, _Rear} = take(Type),
Source.
move(Type, Cmd) ->
move(Type, Cmd, #{}).
move(Type, #{<<"before">> := Before}, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), #{<<"before">> => atom(Before)}}, Opts);
emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), #{<<"before">> => type(Before)}}, Opts);
move(Type, #{<<"after">> := After}, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), #{<<"after">> => atom(After)}}, Opts);
emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), #{<<"after">> => type(After)}}, Opts);
move(Type, Position, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), Position}, Opts).
emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), Position}, Opts).
update(Cmd, Sources) ->
update(Cmd, Sources, #{}).
update({replace_once, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{replace_once, atom(Type)}, Sources}, Opts);
update({delete_once, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{delete_once, atom(Type)}, Sources}, Opts);
update({replace, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{replace, type(Type)}, Sources}, Opts);
update({delete, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{delete, type(Type)}, Sources}, Opts);
update(Cmd, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {Cmd, Sources}, Opts).
pre_config_update({move, Type, <<"top">>}, Conf) when is_list(Conf) ->
{Index, _} = find_source_by_type(Type),
{List1, List2} = lists:split(Index, Conf),
NConf = [lists:nth(Index, Conf)] ++ lists:droplast(List1) ++ List2,
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({move, Type, <<"bottom">>}, Conf) when is_list(Conf) ->
{Index, _} = find_source_by_type(Type),
{List1, List2} = lists:split(Index, Conf),
NConf = lists:droplast(List1) ++ List2 ++ [lists:nth(Index, Conf)],
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({move, Type, #{<<"before">> := Before}}, Conf) when is_list(Conf) ->
{Index1, _} = find_source_by_type(Type),
Conf1 = lists:nth(Index1, Conf),
{Index2, _} = find_source_by_type(Before),
Conf2 = lists:nth(Index2, Conf),
{List1, List2} = lists:split(Index2, Conf),
NConf = lists:delete(Conf1, lists:droplast(List1))
++ [Conf1] ++ [Conf2]
++ lists:delete(Conf1, List2),
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({move, Type, #{<<"after">> := After}}, Conf) when is_list(Conf) ->
{Index1, _} = find_source_by_type(Type),
Conf1 = lists:nth(Index1, Conf),
{Index2, _} = find_source_by_type(After),
{List1, List2} = lists:split(Index2, Conf),
NConf = lists:delete(Conf1, List1)
++ [Conf1]
++ lists:delete(Conf1, List2),
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({head, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
do_update({?CMD_MOVE, Type, <<"top">>}, Conf) when is_list(Conf) ->
{Source, Front, Rear} = take(Type, Conf),
[Source | Front] ++ Rear;
do_update({?CMD_MOVE, Type, <<"bottom">>}, Conf) when is_list(Conf) ->
{Source, Front, Rear} = take(Type, Conf),
Front ++ Rear ++ [Source];
do_update({?CMD_MOVE, Type, #{<<"before">> := Before}}, Conf) when is_list(Conf) ->
{S1, Front1, Rear1} = take(Type, Conf),
{S2, Front2, Rear2} = take(Before, Front1 ++ Rear1),
Front2 ++ [S1, S2] ++ Rear2;
do_update({?CMD_MOVE, Type, #{<<"after">> := After}}, Conf) when is_list(Conf) ->
{S1, Front1, Rear1} = take(Type, Conf),
{S2, Front2, Rear2} = take(After, Front1 ++ Rear1),
Front2 ++ [S2, S1] ++ Rear2;
do_update({?CMD_PREPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
NConf = Sources ++ Conf,
case check_dup_types(NConf) of
ok -> {ok, Sources ++ Conf};
Error -> Error
end;
pre_config_update({tail, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
ok = check_dup_types(NConf),
NConf;
do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
NConf = Conf ++ Sources,
case check_dup_types(NConf) of
ok -> {ok, Conf ++ Sources};
Error -> Error
end;
pre_config_update({{replace_once, Type}, Source}, Conf) when is_map(Source), is_list(Conf) ->
{Index, _} = find_source_by_type(Type),
{List1, List2} = lists:split(Index, Conf),
NConf = lists:droplast(List1) ++ [Source] ++ List2,
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({{delete_once, Type}, _Source}, Conf) when is_list(Conf) ->
{Index, _} = find_source_by_type(Type),
{List1, List2} = lists:split(Index, Conf),
NConf = lists:droplast(List1) ++ List2,
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({_, Sources}, _Conf) when is_list(Sources)->
ok = check_dup_types(NConf),
NConf;
do_update({{replace, Type}, Source}, Conf) when is_map(Source), is_list(Conf) ->
{_Old, Front, Rear} = take(Type, Conf),
NConf = Front ++ [Source | Rear],
ok = check_dup_types(NConf),
NConf;
do_update({{delete, Type}, _Source}, Conf) when is_list(Conf) ->
{_Old, Front, Rear} = take(Type, Conf),
NConf = Front ++ Rear,
NConf;
do_update({_, Sources}, _Conf) when is_list(Sources)->
%% overwrite the entire config!
{ok, Sources}.
Sources.
pre_config_update(Cmd, Conf) ->
{ok, do_update(Cmd, Conf)}.
post_config_update(_, undefined, _Conf, _AppEnvs) ->
ok;
post_config_update({move, Type, <<"top">>}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{Index, Source} = find_source_by_type(Type, InitedSources),
{Sources1, Sources2 } = lists:split(Index, InitedSources),
Sources3 = [Source] ++ lists:droplast(Sources1) ++ Sources2,
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Type, <<"bottom">>}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{Index, Source} = find_source_by_type(Type, InitedSources),
{Sources1, Sources2 } = lists:split(Index, InitedSources),
Sources3 = lists:droplast(Sources1) ++ Sources2 ++ [Source],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Type, #{<<"before">> := Before}}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{_, Source0} = find_source_by_type(Type, InitedSources),
{Index, Source1} = find_source_by_type(Before, InitedSources),
{Sources1, Sources2} = lists:split(Index, InitedSources),
Sources3 = lists:delete(Source0, lists:droplast(Sources1))
++ [Source0] ++ [Source1]
++ lists:delete(Source0, Sources2),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Type, #{<<"after">> := After}}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{_, Source} = find_source_by_type(Type, InitedSources),
{Index, _} = find_source_by_type(After, InitedSources),
{Sources1, Sources2} = lists:split(Index, InitedSources),
Sources3 = lists:delete(Source, Sources1)
++ [Source]
++ lists:delete(Source, Sources2),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({head, Sources}, _NewSources, _OldConf, _AppEnvs) ->
InitedSources = [init_source(R) || R <- check_sources(Sources)],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources ++ lookup()]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({tail, Sources}, _NewSources, _OldConf, _AppEnvs) ->
InitedSources = [init_source(R) || R <- check_sources(Sources)],
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({{replace_once, Type}, #{type := Type} = Source}, _NewSources, _OldConf, _AppEnvs) when is_map(Source) ->
OldInitedSources = lookup(),
{Index, OldSource} = find_source_by_type(Type, OldInitedSources),
case maps:get(type, OldSource, undefined) of
undefined -> ok;
file -> ok;
_ ->
#{annotations := #{id := Id}} = OldSource,
ok = emqx_resource:remove(Id)
end,
{OldSources1, OldSources2 } = lists:split(Index, OldInitedSources),
InitedSources = [init_source(R) || R <- check_sources([Source])],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldSources1) ++ InitedSources ++ OldSources2]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({{delete_once, Type}, _Source}, _NewSources, _OldConf, _AppEnvs) ->
OldInitedSources = lookup(),
{_, OldSource} = find_source_by_type(Type, OldInitedSources),
case OldSource of
#{annotations := #{id := Id}} ->
ok = emqx_resource:remove(Id);
_ -> ok
end,
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:delete(OldSource, OldInitedSources)]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update(_, NewSources, _OldConf, _AppEnvs) ->
%% overwrite the entire config!
OldInitedSources = lookup(),
InitedSources = [init_source(Source) || Source <- NewSources],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1),
lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id);
(_) -> ok
end, OldInitedSources),
post_config_update(Cmd, NewSources, _OldSource, _AppEnvs) ->
ok = do_post_update(Cmd, NewSources),
ok = emqx_authz_cache:drain_cache().
%%--------------------------------------------------------------------
%% Initialize source
%%--------------------------------------------------------------------
do_post_update({?CMD_MOVE, _Type, _Where} = Cmd, _NewSources) ->
InitedSources = lookup(),
MovedSources = do_update(Cmd, InitedSources),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [MovedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({?CMD_PREPEND, Sources}, _NewSources) ->
InitedSources = init_sources(check_sources(Sources)),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources ++ lookup()]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({?CMD_APPEND, Sources}, _NewSources) ->
InitedSources = init_sources(check_sources(Sources)),
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({{replace, Type}, #{type := Type} = Source}, _NewSources) when is_map(Source) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
ok = ensure_resource_deleted(OldSource),
InitedSources = init_sources(check_sources([Source])),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Front ++ InitedSources ++ Rear]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({{delete, Type}, _Source}, _NewSources) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
ok = ensure_resource_deleted(OldSource),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, Front ++ Rear}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update(_, NewSources) ->
%% overwrite the entire config!
OldInitedSources = lookup(),
InitedSources = init_sources(NewSources),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1),
lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
ok = emqx_authz_cache:drain_cache().
ensure_resource_deleted(#{type := file}) -> ok;
ensure_resource_deleted(#{type := 'built-in-database'}) -> ok;
ensure_resource_deleted(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove(Id).
check_dup_types(Sources) ->
check_dup_types(Sources, ?SOURCE_TYPES).
check_dup_types(_Sources, []) -> ok;
check_dup_types(Sources, [T0 | Tail]) ->
case lists:foldl(fun (#{type := T1}, AccIn) ->
case T0 =:= T1 of
true -> AccIn + 1;
false -> AccIn
end;
(#{<<"type">> := T1}, AccIn) ->
case T0 =:= atom(T1) of
true -> AccIn + 1;
false -> AccIn
end
end, 0, Sources) > 1 of
check_dup_types(Sources, []).
check_dup_types([], _Checked) -> ok;
check_dup_types([Source | Sources], Checked) ->
%% the input might be raw or type-checked result, so lookup both 'type' and <<"type">>
%% TODO: check: really?
Type = case maps:get(<<"type">>, Source, maps:get(type, Source, undefined)) of
undefined ->
%% this should never happen if the value is type checked by honcon schema
error({bad_source_input, Source});
Type0 ->
type(Type0)
end,
case lists:member(Type, Checked) of
true ->
?LOG(error, "The type is duplicated in the Authorization source"),
{error, 'The type is duplicated in the Authorization source'};
false -> check_dup_types(Sources, Tail)
%% we have made it clear not to support more than one authz instance for each type
error({duplicated_authz_source_type, Type});
false ->
check_dup_types(Sources, [Type | Checked])
end.
init_source(#{enable := true,
type := file,
init_sources(Sources) ->
{Enabled, Disabled} = lists:partition(fun(#{enable := Enable}) -> Enable end, Sources),
case Disabled =/= [] of
true -> ?SLOG(info, #{msg => "disabled_sources_ignored", sources => Disabled});
false -> ok
end,
lists:map(fun init_source/1, Enabled).
init_source(#{type := file,
path := Path
} = Source) ->
Rules = case file:consult(Path) of
@ -288,8 +210,7 @@ init_source(#{enable := true,
error(Reason)
end,
Source#{annotations => #{rules => Rules}};
init_source(#{enable := true,
type := http,
init_source(#{type := http,
url := Url
} = Source) ->
NSource= maps:put(base_url, maps:remove(query, Url), Source),
@ -297,16 +218,17 @@ init_source(#{enable := true,
{error, Reason} -> error({load_config_error, Reason});
Id -> Source#{annotations => #{id => Id}}
end;
init_source(#{enable := true,
type := DB
init_source(#{type := 'built-in-database'
} = Source) ->
Source;
init_source(#{type := DB
} = Source) when DB =:= redis;
DB =:= mongodb ->
case create_resource(Source) of
{error, Reason} -> error({load_config_error, Reason});
Id -> Source#{annotations => #{id => Id}}
end;
init_source(#{enable := true,
type := DB,
init_source(#{type := DB,
query := SQL
} = Source) when DB =:= mysql;
DB =:= postgresql ->
@ -318,8 +240,7 @@ init_source(#{enable := true,
query => Mod:parse_query(SQL)
}
}
end;
init_source(#{enable := false} = Source) ->Source.
end.
%%--------------------------------------------------------------------
%% AuthZ callbacks
@ -373,13 +294,17 @@ check_sources(RawSources) ->
#{sources := Sources} = hocon_schema:check_plain(Schema, Conf, #{atom_key => true}),
Sources.
find_source_by_type(Type) -> find_source_by_type(Type, lookup()).
find_source_by_type(Type, Sources) -> find_source_by_type(Type, Sources, 1).
find_source_by_type(_, [], _N) -> error(not_found_source);
find_source_by_type(Type, [ Source = #{type := T} | Tail], N) ->
case Type =:= T of
true -> {N, Source};
false -> find_source_by_type(Type, Tail, N + 1)
take(Type) -> take(Type, lookup()).
%% Take the source of give type, the sources list is split into two parts
%% front part and rear part.
take(Type, Sources) ->
{Front, Rear} = lists:splitwith(fun(T) -> type(T) =/= type(Type) end, Sources),
case Rear =:= [] of
true ->
error({authz_source_of_type_not_found, Type});
_ ->
{hd(Rear), Front, tl(Rear)}
end.
find_action_in_hooks() ->
@ -404,6 +329,8 @@ create_resource(#{type := DB} = Source) ->
{error, Reason} -> {error, Reason}
end.
authz_module('built-in-database') ->
emqx_authz_mnesia;
authz_module(Type) ->
list_to_existing_atom("emqx_authz_" ++ atom_to_list(Type)).
@ -414,9 +341,20 @@ connector_module(postgresql) ->
connector_module(Type) ->
list_to_existing_atom("emqx_connector_" ++ atom_to_list(Type)).
atom(B) when is_binary(B) ->
try binary_to_existing_atom(B, utf8)
catch
_ -> binary_to_atom(B)
end;
atom(A) when is_atom(A) -> A.
type(#{type := Type}) -> type(Type);
type(#{<<"type">> := Type}) -> type(Type);
type(file) -> file;
type(<<"file">>) -> file;
type(http) -> http;
type(<<"http">>) -> http;
type(mongodb) -> mongodb;
type(<<"mongodb">>) -> mongodb;
type(mysql) -> mysql;
type(<<"mysql">>) -> mysql;
type(redis) -> redis;
type(<<"redis">>) -> redis;
type(postgresql) -> postgresql;
type(<<"postgresql">>) -> postgresql;
type('built-in-database') -> 'built-in-database';
type(<<"built-in-database">>) -> 'built-in-database';
type(Unknown) -> error({unknown_authz_source_type, Unknown}). % should never happend if the input is type-checked by hocon schema

View File

@ -0,0 +1,562 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_api_mnesia).
-behavior(minirest_api).
-include("emqx_authz.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-define(EXAMPLE_USERNAME, #{type => username,
key => user1,
rules => [ #{topic => <<"test/toopic/1">>,
permission => <<"allow">>,
action => <<"publish">>
}
, #{topic => <<"test/toopic/2">>,
permission => <<"allow">>,
action => <<"subscribe">>
}
, #{topic => <<"eq test/#">>,
permission => <<"deny">>,
action => <<"all">>
}
]
}).
-define(EXAMPLE_CLIENTID, #{type => clientid,
key => client1,
rules => [ #{topic => <<"test/toopic/1">>,
permission => <<"allow">>,
action => <<"publish">>
}
, #{topic => <<"test/toopic/2">>,
permission => <<"allow">>,
action => <<"subscribe">>
}
, #{topic => <<"eq test/#">>,
permission => <<"deny">>,
action => <<"all">>
}
]
}).
-define(EXAMPLE_ALL , #{type => all,
rules => [ #{topic => <<"test/toopic/1">>,
permission => <<"allow">>,
action => <<"publish">>
}
, #{topic => <<"test/toopic/2">>,
permission => <<"allow">>,
action => <<"subscribe">>
}
, #{topic => <<"eq test/#">>,
permission => <<"deny">>,
action => <<"all">>
}
]
}).
-export([ api_spec/0
, purge/2
, records/2
, record/2
]).
api_spec() ->
{[ purge_api()
, records_api()
, record_api()
], definitions()}.
definitions() ->
Rules = #{
type => array,
items => #{
type => object,
required => [topic, permission, action],
properties => #{
topic => #{
type => string,
example => <<"test/topic/1">>
},
permission => #{
type => string,
enum => [<<"allow">>, <<"deny">>],
example => <<"allow">>
},
action => #{
type => string,
enum => [<<"publish">>, <<"subscribe">>, <<"all">>],
example => <<"publish">>
}
}
}
},
Record = #{
oneOf => [ #{type => object,
required => [username, rules],
properties => #{
username => #{
type => string,
example => <<"username">>
},
rules => minirest:ref(<<"rules">>)
}
}
, #{type => object,
required => [clientid, rules],
properties => #{
username => #{
type => string,
example => <<"clientid">>
},
rules => minirest:ref(<<"rules">>)
}
}
, #{type => object,
required => [rules],
properties => #{
rules => minirest:ref(<<"rules">>)
}
}
]
},
[ #{<<"rules">> => Rules}
, #{<<"record">> => Record}
].
purge_api() ->
Metadata = #{
delete => #{
description => "Purge all records",
responses => #{
<<"204">> => #{description => <<"No Content">>},
<<"400">> => emqx_mgmt_util:bad_request()
}
}
},
{"/authorization/sources/built-in-database/purge-all", Metadata, purge}.
records_api() ->
Metadata = #{
get => #{
description => "List records",
parameters => [
#{
name => type,
in => path,
schema => #{
type => string,
enum => [<<"username">>, <<"clientid">>, <<"all">>]
},
required => true
},
#{
name => page,
in => query,
required => false,
description => <<"Page Index">>,
schema => #{type => integer}
},
#{
name => limit,
in => query,
required => false,
description => <<"Page limit">>,
schema => #{type => integer}
}
],
responses => #{
<<"200">> => #{
description => <<"OK">>,
content => #{
'application/json' => #{
schema => #{
type => array,
items => minirest:ref(<<"record">>)
},
examples => #{
username => #{
summary => <<"Username">>,
value => jsx:encode([?EXAMPLE_USERNAME])
},
clientid => #{
summary => <<"Clientid">>,
value => jsx:encode([?EXAMPLE_CLIENTID])
},
all => #{
summary => <<"All">>,
value => jsx:encode([?EXAMPLE_ALL])
}
}
}
}
}
}
},
post => #{
description => "Add new records",
parameters => [
#{
name => type,
in => path,
schema => #{
type => string,
enum => [<<"username">>, <<"clientid">>]
},
required => true
}
],
requestBody => #{
content => #{
'application/json' => #{
schema => #{
type => array,
items => minirest:ref(<<"record">>)
},
examples => #{
username => #{
summary => <<"Username">>,
value => jsx:encode([?EXAMPLE_USERNAME])
},
clientid => #{
summary => <<"Clientid">>,
value => jsx:encode([?EXAMPLE_CLIENTID])
}
}
}
}
},
responses => #{
<<"204">> => #{description => <<"Created">>},
<<"400">> => emqx_mgmt_util:bad_request()
}
},
put => #{
description => "Set the list of rules for all",
parameters => [
#{
name => type,
in => path,
schema => #{
type => string,
enum => [<<"all">>]
},
required => true
}
],
requestBody => #{
content => #{
'application/json' => #{
schema => minirest:ref(<<"record">>),
examples => #{
all => #{
summary => <<"All">>,
value => jsx:encode(?EXAMPLE_ALL)
}
}
}
}
},
responses => #{
<<"204">> => #{description => <<"Created">>},
<<"400">> => emqx_mgmt_util:bad_request()
}
}
},
{"/authorization/sources/built-in-database/:type", Metadata, records}.
record_api() ->
Metadata = #{
get => #{
description => "Get record info",
parameters => [
#{
name => type,
in => path,
schema => #{
type => string,
enum => [<<"username">>, <<"clientid">>]
},
required => true
},
#{
name => key,
in => path,
schema => #{
type => string
},
required => true
}
],
responses => #{
<<"200">> => #{
description => <<"OK">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"record">>),
examples => #{
username => #{
summary => <<"Username">>,
value => jsx:encode(?EXAMPLE_USERNAME)
},
clientid => #{
summary => <<"Clientid">>,
value => jsx:encode(?EXAMPLE_CLIENTID)
},
all => #{
summary => <<"All">>,
value => jsx:encode(?EXAMPLE_ALL)
}
}
}
}
},
<<"404">> => emqx_mgmt_util:bad_request(<<"Not Found">>)
}
},
put => #{
description => "Update one record",
parameters => [
#{
name => type,
in => path,
schema => #{
type => string,
enum => [<<"username">>, <<"clientid">>]
},
required => true
},
#{
name => key,
in => path,
schema => #{
type => string
},
required => true
}
],
requestBody => #{
content => #{
'application/json' => #{
schema => minirest:ref(<<"record">>),
examples => #{
username => #{
summary => <<"Username">>,
value => jsx:encode(?EXAMPLE_USERNAME)
},
clientid => #{
summary => <<"Clientid">>,
value => jsx:encode(?EXAMPLE_CLIENTID)
}
}
}
}
},
responses => #{
<<"204">> => #{description => <<"Updated">>},
<<"400">> => emqx_mgmt_util:bad_request()
}
},
delete => #{
description => "Delete one record",
parameters => [
#{
name => type,
in => path,
schema => #{
type => string,
enum => [<<"username">>, <<"clientid">>]
},
required => true
},
#{
name => key,
in => path,
schema => #{
type => string
},
required => true
}
],
responses => #{
<<"204">> => #{description => <<"No Content">>},
<<"400">> => emqx_mgmt_util:bad_request()
}
}
},
{"/authorization/sources/built-in-database/:type/:key", Metadata, record}.
purge(delete, _) ->
case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of
[#{enable := false}] ->
ok = lists:foreach(fun(Key) ->
ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key)
end, mnesia:dirty_all_keys(?ACL_TABLE)),
{204};
_ ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"'built-in-database' type source must be disabled before purge.">>}}
end.
records(get, #{bindings := #{type := <<"username">>},
query_string := Qs
}) ->
MatchSpec = ets:fun2ms(
fun({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}, Rules}) ->
[{username, Username}, {rules, Rules}]
end),
Format = fun ([{username, Username}, {rules, Rules}]) ->
#{username => Username,
rules => [ #{topic => Topic,
action => Action,
permission => Permission
} || {Permission, Action, Topic} <- Rules]
}
end,
case Qs of
#{<<"limit">> := _, <<"page">> := _} = Page ->
{200, emqx_mgmt_api:paginate(?ACL_TABLE, MatchSpec, Page, Format)};
#{<<"limit">> := Limit} ->
case ets:select(?ACL_TABLE, MatchSpec, binary_to_integer(Limit)) of
{Rows, _Continuation} -> {200, [Format(Row) || Row <- Rows ]};
'$end_of_table' -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}}
end;
_ ->
{200, [Format(Row) || Row <- ets:select(?ACL_TABLE, MatchSpec)]}
end;
records(get, #{bindings := #{type := <<"clientid">>},
query_string := Qs
}) ->
MatchSpec = ets:fun2ms(
fun({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}, Rules}) ->
[{clientid, Clientid}, {rules, Rules}]
end),
Format = fun ([{clientid, Clientid}, {rules, Rules}]) ->
#{clientid => Clientid,
rules => [ #{topic => Topic,
action => Action,
permission => Permission
} || {Permission, Action, Topic} <- Rules]
}
end,
case Qs of
#{<<"limit">> := _, <<"page">> := _} = Page ->
{200, emqx_mgmt_api:paginate(?ACL_TABLE, MatchSpec, Page, Format)};
#{<<"limit">> := Limit} ->
case ets:select(?ACL_TABLE, MatchSpec, binary_to_integer(Limit)) of
{Rows, _Continuation} -> {200, [Format(Row) || Row <- Rows ]};
'$end_of_table' -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}}
end;
_ ->
{200, [Format(Row) || Row <- ets:select(?ACL_TABLE, MatchSpec)]}
end;
records(get, #{bindings := #{type := <<"all">>}}) ->
MatchSpec = ets:fun2ms(
fun({?ACL_TABLE, ?ACL_TABLE_ALL, Rules}) ->
[{rules, Rules}]
end),
{200, [ #{rules => [ #{topic => Topic,
action => Action,
permission => Permission
} || {Permission, Action, Topic} <- Rules]
} || [{rules, Rules}] <- ets:select(?ACL_TABLE, MatchSpec)]};
records(post, #{bindings := #{type := <<"username">>},
body := Body}) when is_list(Body) ->
lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules)
})
end, Body),
{204};
records(post, #{bindings := #{type := <<"clientid">>},
body := Body}) when is_list(Body) ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules)
})
end, Body),
{204};
records(put, #{bindings := #{type := <<"all">>},
body := #{<<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = ?ACL_TABLE_ALL,
rules = format_rules(Rules)
}),
{204}.
record(get, #{bindings := #{type := <<"username">>, key := Key}}) ->
case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_USERNAME, Key}) of
[] -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}};
[#emqx_acl{who = {?ACL_TABLE_USERNAME, Username}, rules = Rules}] ->
{200, #{username => Username,
rules => [ #{topic => Topic,
action => Action,
permission => Permission
} || {Permission, Action, Topic} <- Rules]}
}
end;
record(get, #{bindings := #{type := <<"clientid">>, key := Key}}) ->
case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_CLIENTID, Key}) of
[] -> {404, #{code => <<"NOT_FOUND">>, message => <<"Not Found">>}};
[#emqx_acl{who = {?ACL_TABLE_CLIENTID, Clientid}, rules = Rules}] ->
{200, #{clientid => Clientid,
rules => [ #{topic => Topic,
action => Action,
permission => Permission
} || {Permission, Action, Topic} <- Rules]}
}
end;
record(put, #{bindings := #{type := <<"username">>, key := Username},
body := #{<<"username">> := Username, <<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules)
}),
{204};
record(put, #{bindings := #{type := <<"clientid">>, key := Clientid},
body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules)
}),
{204};
record(delete, #{bindings := #{type := <<"username">>, key := Key}}) ->
ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Key}}),
{204};
record(delete, #{bindings := #{type := <<"clientid">>, key := Key}}) ->
ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Key}}),
{204}.
format_rules(Rules) when is_list(Rules) ->
lists:foldl(fun(#{<<"topic">> := Topic,
<<"action">> := Action,
<<"permission">> := Permission
}, AccIn) when ?PUBSUB(Action)
andalso ?ALLOW_DENY(Permission) ->
AccIn ++ [{ atom(Permission), atom(Action), Topic }]
end, [], Rules).
atom(B) when is_binary(B) ->
try binary_to_existing_atom(B, utf8)
catch
_ -> binary_to_atom(B)
end;
atom(A) when is_atom(A) -> A.

View File

@ -41,6 +41,10 @@
]
}).
-export([ get_raw_sources/0
, get_raw_source/1
]).
-export([ api_spec/0
, sources/2
, source/2
@ -147,7 +151,15 @@ source_api() ->
name => type,
in => path,
schema => #{
type => string
type => string,
enum => [ <<"file">>
, <<"http">>
, <<"mongodb">>
, <<"mysql">>
, <<"postgresql">>
, <<"redis">>
, <<"built-in-database">>
]
},
required => true
}
@ -181,7 +193,15 @@ source_api() ->
name => type,
in => path,
schema => #{
type => string
type => string,
enum => [ <<"file">>
, <<"http">>
, <<"mongodb">>
, <<"mysql">>
, <<"postgresql">>
, <<"redis">>
, <<"built-in-database">>
]
},
required => true
}
@ -216,7 +236,15 @@ source_api() ->
name => type,
in => path,
schema => #{
type => string
type => string,
enum => [ <<"file">>
, <<"http">>
, <<"mongodb">>
, <<"mysql">>
, <<"postgresql">>
, <<"redis">>
, <<"built-in-database">>
]
},
required => true
}
@ -238,7 +266,15 @@ move_source_api() ->
name => type,
in => path,
schema => #{
type => string
type => string,
enum => [ <<"file">>
, <<"http">>
, <<"mongodb">>
, <<"mysql">>
, <<"postgresql">>
, <<"redis">>
, <<"built-in-database">>
]
},
required => true
}
@ -321,7 +357,7 @@ sources(put, #{body := Body}) when is_list(Body) ->
_ -> write_cert(Source)
end
end || Source <- Body],
update_config(replace, NBody).
update_config(?CMD_REPLCAE, NBody).
source(get, #{bindings := #{type := Type}}) ->
case get_raw_source(Type) of
@ -343,16 +379,16 @@ source(get, #{bindings := #{type := Type}}) ->
end;
source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable}}) ->
{ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""), Rules),
case emqx_authz:update({replace_once, file}, #{type => file, enable => Enable, path => Filename}) of
case emqx_authz:update({?CMD_REPLCAE, file}, #{type => file, enable => Enable, path => Filename}) of
{ok, _} -> {204};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
message => bin(Reason)}}
end;
source(put, #{bindings := #{type := Type}, body := Body}) when is_map(Body) ->
update_config({replace_once, Type}, write_cert(Body));
update_config({?CMD_REPLCAE, Type}, write_cert(Body));
source(delete, #{bindings := #{type := Type}}) ->
update_config({delete_once, Type}, #{}).
update_config({?CMD_DELETE, Type}, #{}).
move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Position}}) ->
case emqx_authz:move(Type, Position) of
@ -374,7 +410,7 @@ get_raw_sources() ->
get_raw_source(Type) ->
lists:filter(fun (#{type := T}) ->
bin(T) =:= Type
erlang:atom_to_binary(T) =:= Type
end, get_raw_sources()).
update_config(Cmd, Sources) ->
@ -382,13 +418,13 @@ update_config(Cmd, Sources) ->
{ok, _} -> {204};
{error, {pre_config_update, emqx_authz, Reason}} ->
{400, #{code => <<"BAD_REQUEST">>,
message => bin(Reason)}};
message => erlang:atom_to_binary(Reason)}};
{error, {post_config_update, emqx_authz, Reason}} ->
{400, #{code => <<"BAD_REQUEST">>,
message => bin(Reason)}};
message => erlang:atom_to_binary(Reason)}};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
message => bin(Reason)}}
message => erlang:atom_to_binary(Reason)}}
end.
read_cert(#{ssl := #{enable := true} = SSL} = Source) ->

View File

@ -7,9 +7,12 @@
-behaviour(application).
-include("emqx_authz.hrl").
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
ok = ekka_rlog:wait_for_shards([?ACL_SHARDED], infinity),
{ok, Sup} = emqx_authz_sup:start_link(),
ok = emqx_authz:init(),
{ok, Sup}.

View File

@ -0,0 +1,76 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_mnesia).
-include("emqx_authz.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
%% AuthZ Callbacks
-export([ mnesia/1
, authorize/4
, description/0
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
ok = ekka_mnesia:create_table(?ACL_TABLE, [
{type, ordered_set},
{rlog_shard, ?ACL_SHARDED},
{disc_copies, [node()]},
{attributes, record_info(fields, ?ACL_TABLE)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies).
description() ->
"AuthZ with Mnesia".
authorize(#{username := Username,
clientid := Clientid
} = Client, PubSub, Topic, #{type := 'built-in-database'}) ->
Rules = case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}) of
[] -> [];
[#emqx_acl{rules = Rules0}] when is_list(Rules0) -> Rules0
end
++ case mnesia:dirty_read(?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}) of
[] -> [];
[#emqx_acl{rules = Rules1}] when is_list(Rules1) -> Rules1
end
++ case mnesia:dirty_read(?ACL_TABLE, ?ACL_TABLE_ALL) of
[] -> [];
[#emqx_acl{rules = Rules2}] when is_list(Rules2) -> Rules2
end,
do_authorize(Client, PubSub, Topic, Rules).
do_authorize(_Client, _PubSub, _Topic, []) -> nomatch;
do_authorize(Client, PubSub, Topic, [ {Permission, Action, TopicFilter} | Tail]) ->
case emqx_authz_rule:match(Client, PubSub, Topic,
emqx_authz_rule:compile({Permission, all, Action, [TopicFilter]})
) of
{matched, Permission} -> {matched, Permission};
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
end.

View File

@ -58,9 +58,9 @@ do_authorize(Client, PubSub, Topic, [Rule | Tail]) ->
end.
replvar(Selector, #{clientid := Clientid,
username := Username,
peerhost := IpAddress
}) ->
username := Username,
peerhost := IpAddress
}) ->
Fun = fun
_Fun(K, V, AccIn) when is_map(V) -> maps:put(K, maps:fold(_Fun, AccIn, V), AccIn);
_Fun(K, V, AccIn) when is_list(V) ->

View File

@ -69,7 +69,6 @@ do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) ->
nomatch -> do_authorize(Client, PubSub, Topic, Columns, Tail)
end.
format_result(Columns, Row) ->
Permission = lists:nth(index(<<"permission">>, Columns), Row),
Action = lists:nth(index(<<"action">>, Columns), Row),

View File

@ -31,6 +31,7 @@ fields("authorization") ->
[ hoconsc:ref(?MODULE, file)
, hoconsc:ref(?MODULE, http_get)
, hoconsc:ref(?MODULE, http_post)
, hoconsc:ref(?MODULE, mnesia)
, hoconsc:ref(?MODULE, mongo_single)
, hoconsc:ref(?MODULE, mongo_rs)
, hoconsc:ref(?MODULE, mongo_sharded)
@ -51,7 +52,8 @@ fields(file) ->
true -> ok;
_ -> {error, "File does not exist"}
end
end
end,
desc => "Path to the file which contains the ACL rules."
}}
];
fields(http_get) ->
@ -115,6 +117,11 @@ fields(http_post) ->
}
}
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
fields(mnesia) ->
[ {type, #{type => 'built-in-database'}}
, {enable, #{type => boolean(),
default => true}}
];
fields(mongo_single) ->
[ {collection, #{type => atom()}}
, {selector, #{type => map()}}

View File

@ -50,14 +50,14 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx_authz:update(?CMD_REPLCAE, []),
emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource),
meck:unload(emqx_schema),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx_authz:update(?CMD_REPLCAE, []),
Config.
-define(SOURCE1, #{<<"type">> => <<"http">>,
@ -120,12 +120,12 @@ init_per_testcase(_, Config) ->
%%------------------------------------------------------------------------------
t_update_source(_) ->
{ok, _} = emqx_authz:update(replace, [?SOURCE3]),
{ok, _} = emqx_authz:update(head, [?SOURCE2]),
{ok, _} = emqx_authz:update(head, [?SOURCE1]),
{ok, _} = emqx_authz:update(tail, [?SOURCE4]),
{ok, _} = emqx_authz:update(tail, [?SOURCE5]),
{ok, _} = emqx_authz:update(tail, [?SOURCE6]),
{ok, _} = emqx_authz:update(?CMD_REPLCAE, [?SOURCE3]),
{ok, _} = emqx_authz:update(?CMD_PREPEND, [?SOURCE2]),
{ok, _} = emqx_authz:update(?CMD_PREPEND, [?SOURCE1]),
{ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE4]),
{ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE5]),
{ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE6]),
?assertMatch([ #{type := http, enable := true}
, #{type := mongodb, enable := true}
@ -135,12 +135,12 @@ t_update_source(_) ->
, #{type := file, enable := true}
], emqx:get_config([authorization, sources], [])),
{ok, _} = emqx_authz:update({replace_once, http}, ?SOURCE1#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({replace_once, mongodb}, ?SOURCE2#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({replace_once, mysql}, ?SOURCE3#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({replace_once, postgresql}, ?SOURCE4#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({replace_once, redis}, ?SOURCE5#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({replace_once, file}, ?SOURCE6#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, http}, ?SOURCE1#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, mongodb}, ?SOURCE2#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, mysql}, ?SOURCE3#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, postgresql}, ?SOURCE4#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, redis}, ?SOURCE5#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, file}, ?SOURCE6#{<<"enable">> := false}),
?assertMatch([ #{type := http, enable := false}
, #{type := mongodb, enable := false}
@ -150,10 +150,10 @@ t_update_source(_) ->
, #{type := file, enable := false}
], emqx:get_config([authorization, sources], [])),
{ok, _} = emqx_authz:update(replace, []).
{ok, _} = emqx_authz:update(?CMD_REPLCAE, []).
t_move_source(_) ->
{ok, _} = emqx_authz:update(replace, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]),
{ok, _} = emqx_authz:update(?CMD_REPLCAE, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]),
?assertMatch([ #{type := http}
, #{type := mongodb}
, #{type := mysql}

View File

@ -0,0 +1,224 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_api_mnesia_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
-import(emqx_ct_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
, auth_header/2
]).
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
-define(EXAMPLE_USERNAME, #{username => user1,
rules => [ #{topic => <<"test/toopic/1">>,
permission => <<"allow">>,
action => <<"publish">>
}
, #{topic => <<"test/toopic/2">>,
permission => <<"allow">>,
action => <<"subscribe">>
}
, #{topic => <<"eq test/#">>,
permission => <<"deny">>,
action => <<"all">>
}
]
}).
-define(EXAMPLE_CLIENTID, #{clientid => client1,
rules => [ #{topic => <<"test/toopic/1">>,
permission => <<"allow">>,
action => <<"publish">>
}
, #{topic => <<"test/toopic/2">>,
permission => <<"allow">>,
action => <<"subscribe">>
}
, #{topic => <<"eq test/#">>,
permission => <<"deny">>,
action => <<"all">>
}
]
}).
-define(EXAMPLE_ALL , #{rules => [ #{topic => <<"test/toopic/1">>,
permission => <<"allow">>,
action => <<"publish">>
}
, #{topic => <<"test/toopic/2">>,
permission => <<"allow">>,
action => <<"subscribe">>
}
, #{topic => <<"eq test/#">>,
permission => <<"deny">>,
action => <<"all">>
}
]
}).
all() ->
[]. %% Todo: Waiting for @terry-xiaoyu to fix the config_not_found error
% emqx_ct:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT),
ok = emqx_ct_helpers:start_apps([emqx_authz, emqx_dashboard], fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
emqx_ct_helpers:stop_apps([emqx_authz, emqx_dashboard]),
meck:unload(emqx_schema),
ok.
set_special_configs(emqx_dashboard) ->
Config = #{
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
},
emqx_config:put([emqx_dashboard], Config),
ok;
set_special_configs(emqx_authz) ->
emqx_config:put([authorization], #{sources => [#{type => 'built-in-database',
enable => true}
]}),
ok;
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_api(_) ->
{ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "username"]), [?EXAMPLE_USERNAME]),
{ok, 200, Request1} = request(get, uri(["authorization", "sources", "built-in-database", "username"]), []),
{ok, 200, Request2} = request(get, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []),
[#{<<"username">> := <<"user1">>, <<"rules">> := Rules1}] = jsx:decode(Request1),
#{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2),
?assertEqual(3, length(Rules1)),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "username", "user1"]), ?EXAMPLE_USERNAME#{rules => []}),
{ok, 200, Request3} = request(get, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []),
#{<<"username">> := <<"user1">>, <<"rules">> := Rules2} = jsx:decode(Request3),
?assertEqual(0, length(Rules2)),
{ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []),
{ok, 404, _} = request(get, uri(["authorization", "sources", "built-in-database", "username", "user1"]), []),
{ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "clientid"]), [?EXAMPLE_CLIENTID]),
{ok, 200, Request4} = request(get, uri(["authorization", "sources", "built-in-database", "clientid"]), []),
{ok, 200, Request5} = request(get, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []),
[#{<<"clientid">> := <<"client1">>, <<"rules">> := Rules3}] = jsx:decode(Request4),
#{<<"clientid">> := <<"client1">>, <<"rules">> := Rules3} = jsx:decode(Request5),
?assertEqual(3, length(Rules3)),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), ?EXAMPLE_CLIENTID#{rules => []}),
{ok, 200, Request6} = request(get, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []),
#{<<"clientid">> := <<"client1">>, <<"rules">> := Rules4} = jsx:decode(Request6),
?assertEqual(0, length(Rules4)),
{ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []),
{ok, 404, _} = request(get, uri(["authorization", "sources", "built-in-database", "clientid", "client1"]), []),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "all"]), ?EXAMPLE_ALL),
{ok, 200, Request7} = request(get, uri(["authorization", "sources", "built-in-database", "all"]), []),
[#{<<"rules">> := Rules5}] = jsx:decode(Request7),
?assertEqual(3, length(Rules5)),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "all"]), ?EXAMPLE_ALL#{rules => []}),
{ok, 200, Request8} = request(get, uri(["authorization", "sources", "built-in-database", "all"]), []),
[#{<<"rules">> := Rules6}] = jsx:decode(Request8),
?assertEqual(0, length(Rules6)),
{ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "username"]), [ #{username => N, rules => []} || N <- lists:seq(1, 20) ]),
{ok, 200, Request9} = request(get, uri(["authorization", "sources", "built-in-database", "username?page=2&limit=5"]), []),
#{<<"data">> := Data1} = jsx:decode(Request9),
?assertEqual(5, length(Data1)),
{ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "clientid"]), [ #{clientid => N, rules => []} || N <- lists:seq(1, 20) ]),
{ok, 200, Request10} = request(get, uri(["authorization", "sources", "built-in-database", "clientid?limit=5"]), []),
?assertEqual(5, length(jsx:decode(Request10))),
{ok, 400, _} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database"]), #{<<"enable">> => false}),
{ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []),
?assertEqual([], mnesia:dirty_all_keys(?ACL_TABLE)),
ok.
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
get_sources(Result) -> jsx:decode(Result).
auth_header_() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.

View File

@ -0,0 +1,109 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_mnesia_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
all() ->
emqx_ct:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT),
ok = emqx_ct_helpers:start_apps([emqx_authz]),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{<<"type">> => <<"built-in-database">>}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
emqx_ct_helpers:stop_apps([emqx_authz]),
meck:unload(emqx_schema),
ok.
init_per_testcase(t_authz, Config) ->
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>},
rules = [{allow, publish, <<"test/%u">>},
{allow, subscribe, <<"eq #">>}
]
}]),
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>},
rules = [{allow, publish, <<"test/%c">>},
{deny, subscribe, <<"eq #">>}
]
}]),
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL,
rules = [{deny, all, <<"#">>}]
}]),
Config;
init_per_testcase(_, Config) -> Config.
end_per_testcase(t_authz, Config) ->
[ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)],
Config;
end_per_testcase(_, Config) -> Config.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_authz(_) ->
ClientInfo1 = #{clientid => <<"test">>,
username => <<"test">>,
peerhost => {127,0,0,1},
listener => {tcp, default}
},
ClientInfo2 = #{clientid => <<"fake_clientid">>,
username => <<"test_username">>,
peerhost => {127,0,0,1},
listener => {tcp, default}
},
ClientInfo3 = #{clientid => <<"test_clientid">>,
username => <<"fake_username">>,
peerhost => {127,0,0,1},
listener => {tcp, default}
},
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, subscribe, <<"#">>)),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo3, publish, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)),
ok.

View File

@ -18,7 +18,9 @@
-include_lib("stdlib/include/qlc.hrl").
-export([paginate/3]).
-export([ paginate/3
, paginate/4
]).
%% first_next query APIs
-export([ params2qs/2
@ -47,6 +49,23 @@ paginate(Tables, Params, RowFun) ->
#{meta => #{page => Page, limit => Limit, count => Count},
data => [RowFun(Row) || Row <- Rows]}.
paginate(Tables, MatchSpec, Params, RowFun) ->
Qh = query_handle(Tables, MatchSpec),
Count = count(Tables, MatchSpec),
Page = b2i(page(Params)),
Limit = b2i(limit(Params)),
Cursor = qlc:cursor(Qh),
case Page > 1 of
true ->
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
ok;
false -> ok
end,
Rows = qlc:next_answers(Cursor, Limit),
qlc:delete_cursor(Cursor),
#{meta => #{page => Page, limit => Limit, count => Count},
data => [RowFun(Row) || Row <- Rows]}.
query_handle(Table) when is_atom(Table) ->
qlc:q([R|| R <- ets:table(Table)]);
query_handle([Table]) when is_atom(Table) ->
@ -54,6 +73,16 @@ query_handle([Table]) when is_atom(Table) ->
query_handle(Tables) ->
qlc:append([qlc:q([E || E <- ets:table(T)]) || T <- Tables]).
query_handle(Table, MatchSpec) when is_atom(Table) ->
Options = {traverse, {select, MatchSpec}},
qlc:q([R|| R <- ets:table(Table, Options)]);
query_handle([Table], MatchSpec) when is_atom(Table) ->
Options = {traverse, {select, MatchSpec}},
qlc:q([R|| R <- ets:table(Table, Options)]);
query_handle(Tables, MatchSpec) ->
Options = {traverse, {select, MatchSpec}},
qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]).
count(Table) when is_atom(Table) ->
ets:info(Table, size);
count([Table]) when is_atom(Table) ->
@ -61,8 +90,16 @@ count([Table]) when is_atom(Table) ->
count(Tables) ->
lists:sum([count(T) || T <- Tables]).
count(Table, Nodes) ->
lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]).
count(Table, MatchSpec) when is_atom(Table) ->
[{MatchPattern, Where, _Re}] = MatchSpec,
NMatchSpec = [{MatchPattern, Where, [true]}],
ets:select_count(Table, NMatchSpec);
count([Table], MatchSpec) when is_atom(Table) ->
[{MatchPattern, Where, _Re}] = MatchSpec,
NMatchSpec = [{MatchPattern, Where, [true]}],
ets:select_count(Table, NMatchSpec);
count(Tables, MatchSpec) ->
lists:sum([count(T, MatchSpec) || T <- Tables]).
page(Params) when is_map(Params) ->
maps:get(<<"page">>, Params, 1);
@ -122,7 +159,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) ->
Rows = do_cluster_query(Nodes, Tab, Qs, QueryFun, Start, Limit+1, []),
Meta = #{page => Page, limit => Limit},
NMeta = case CodCnt =:= 0 of
true -> Meta#{count => count(Tab, Nodes)};
true -> Meta#{count => lists:sum([rpc_call(Node, ets, info, [Tab, size], 5000) || Node <- Nodes])};
_ -> Meta#{count => length(Rows)}
end,
#{meta => NMeta, data => lists:sublist(Rows, Limit)}.