refactor(authz): simplify config update impl

This commit is contained in:
Zaiming Shi 2021-09-24 22:10:30 +02:00 committed by zhanghongtong
parent f18d0c7167
commit 65d0b70ff6
2 changed files with 149 additions and 214 deletions

View File

@ -39,7 +39,6 @@
-export([post_config_update/4, pre_config_update/2]). -export([post_config_update/4, pre_config_update/2]).
-define(CONF_KEY_PATH, [authorization, sources]). -define(CONF_KEY_PATH, [authorization, sources]).
-define(SOURCE_TYPES, [file, http, mongodb, mysql, postgresql, redis, 'built-in-database']).
-spec(register_metrics() -> ok). -spec(register_metrics() -> ok).
register_metrics() -> register_metrics() ->
@ -50,228 +49,151 @@ init() ->
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
Sources = emqx:get_config(?CONF_KEY_PATH, []), Sources = emqx:get_config(?CONF_KEY_PATH, []),
ok = check_dup_types(Sources), ok = check_dup_types(Sources),
NSources = [init_source(Source) || Source <- Sources], NSources = init_sources(Sources),
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1). ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1).
lookup() -> lookup() ->
{_M, _F, [A]}= find_action_in_hooks(), {_M, _F, [A]}= find_action_in_hooks(),
A. A.
lookup(Type) -> lookup(Type) ->
try find_source_by_type(atom(Type), lookup()) of {Source, _Front, _Rear} = take(Type),
{_, Source} -> Source Source.
catch
error:Reason -> {error, Reason}
end.
move(Type, Cmd) -> move(Type, Cmd) ->
move(Type, Cmd, #{}). move(Type, Cmd, #{}).
move(Type, #{<<"before">> := Before}, Opts) -> move(Type, #{<<"before">> := Before}, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), #{<<"before">> => atom(Before)}}, Opts); emqx:update_config(?CONF_KEY_PATH, {move, type(Type), #{<<"before">> => type(Before)}}, Opts);
move(Type, #{<<"after">> := After}, Opts) -> move(Type, #{<<"after">> := After}, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), #{<<"after">> => atom(After)}}, Opts); emqx:update_config(?CONF_KEY_PATH, {move, type(Type), #{<<"after">> => type(After)}}, Opts);
move(Type, Position, Opts) -> move(Type, Position, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {move, atom(Type), Position}, Opts). emqx:update_config(?CONF_KEY_PATH, {move, type(Type), Position}, Opts).
update(Cmd, Sources) -> update(Cmd, Sources) ->
update(Cmd, Sources, #{}). update(Cmd, Sources, #{}).
update({replace_once, Type}, Sources, Opts) -> update({replace_once, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{replace_once, atom(Type)}, Sources}, Opts); emqx:update_config(?CONF_KEY_PATH, {{replace_once, type(Type)}, Sources}, Opts);
update({delete_once, Type}, Sources, Opts) -> update({delete_once, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{delete_once, atom(Type)}, Sources}, Opts); emqx:update_config(?CONF_KEY_PATH, {{delete_once, type(Type)}, Sources}, Opts);
update(Cmd, Sources, Opts) -> update(Cmd, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {Cmd, Sources}, Opts). emqx:update_config(?CONF_KEY_PATH, {Cmd, Sources}, Opts).
pre_config_update({move, Type, <<"top">>}, Conf) when is_list(Conf) -> do_update({move, Type, <<"top">>}, Conf) when is_list(Conf) ->
{Index, _} = find_source_by_type(Type), {Source, Front, Rear} = take(Type, Conf),
{List1, List2} = lists:split(Index, Conf), [Source | Front] ++ Rear;
NConf = [lists:nth(Index, Conf)] ++ lists:droplast(List1) ++ List2, do_update({move, Type, <<"bottom">>}, Conf) when is_list(Conf) ->
case check_dup_types(NConf) of {Source, Front, Rear} = take(Type, Conf),
ok -> {ok, NConf}; Front ++ Rear ++ [Source];
Error -> Error do_update({move, Type, #{<<"before">> := Before}}, Conf) when is_list(Conf) ->
end; {S1, Front1, Rear1} = take(Type, Conf),
{S2, Front2, Rear2} = take(Before, Front1 ++ Rear1),
pre_config_update({move, Type, <<"bottom">>}, Conf) when is_list(Conf) -> Front2 ++ [S1, S2] ++ Rear2;
{Index, _} = find_source_by_type(Type), do_update({move, Type, #{<<"after">> := After}}, Conf) when is_list(Conf) ->
{List1, List2} = lists:split(Index, Conf), {S1, Front1, Rear1} = take(Type, Conf),
NConf = lists:droplast(List1) ++ List2 ++ [lists:nth(Index, Conf)], {S2, Front2, Rear2} = take(After, Front1 ++ Rear1),
case check_dup_types(NConf) of Front2 ++ [S2, S1] ++ Rear2;
ok -> {ok, NConf}; do_update({head, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
Error -> Error
end;
pre_config_update({move, Type, #{<<"before">> := Before}}, Conf) when is_list(Conf) ->
{Index1, _} = find_source_by_type(Type),
Conf1 = lists:nth(Index1, Conf),
{Index2, _} = find_source_by_type(Before),
Conf2 = lists:nth(Index2, Conf),
{List1, List2} = lists:split(Index2, Conf),
NConf = lists:delete(Conf1, lists:droplast(List1))
++ [Conf1] ++ [Conf2]
++ lists:delete(Conf1, List2),
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({move, Type, #{<<"after">> := After}}, Conf) when is_list(Conf) ->
{Index1, _} = find_source_by_type(Type),
Conf1 = lists:nth(Index1, Conf),
{Index2, _} = find_source_by_type(After),
{List1, List2} = lists:split(Index2, Conf),
NConf = lists:delete(Conf1, List1)
++ [Conf1]
++ lists:delete(Conf1, List2),
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({head, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
NConf = Sources ++ Conf, NConf = Sources ++ Conf,
case check_dup_types(NConf) of ok = check_dup_types(NConf),
ok -> {ok, Sources ++ Conf}; NConf;
Error -> Error do_update({tail, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
end;
pre_config_update({tail, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
NConf = Conf ++ Sources, NConf = Conf ++ Sources,
case check_dup_types(NConf) of ok = check_dup_types(NConf),
ok -> {ok, Conf ++ Sources}; NConf;
Error -> Error do_update({{replace_once, Type}, Source}, Conf) when is_map(Source), is_list(Conf) ->
end; {_Old, Front, Rear} = take(Type, Conf),
pre_config_update({{replace_once, Type}, Source}, Conf) when is_map(Source), is_list(Conf) -> NConf = Front ++ [Source | Rear],
{Index, _} = find_source_by_type(Type), ok = check_dup_types(NConf),
{List1, List2} = lists:split(Index, Conf), NConf;
NConf = lists:droplast(List1) ++ [Source] ++ List2, do_update({{delete_once, Type}, _Source}, Conf) when is_list(Conf) ->
case check_dup_types(NConf) of {_Old, Front, Rear} = take(Type, Conf),
ok -> {ok, NConf}; NConf = Front ++ Rear,
Error -> Error NConf;
end; do_update({_, Sources}, _Conf) when is_list(Sources)->
pre_config_update({{delete_once, Type}, _Source}, Conf) when is_list(Conf) ->
{Index, _} = find_source_by_type(Type),
{List1, List2} = lists:split(Index, Conf),
NConf = lists:droplast(List1) ++ List2,
case check_dup_types(NConf) of
ok -> {ok, NConf};
Error -> Error
end;
pre_config_update({_, Sources}, _Conf) when is_list(Sources)->
%% overwrite the entire config! %% overwrite the entire config!
{ok, Sources}. Sources.
pre_config_update(Cmd, Conf) ->
{ok, do_update(Cmd, Conf)}.
post_config_update(_, undefined, _Conf, _AppEnvs) -> post_config_update(_, undefined, _Conf, _AppEnvs) ->
ok; ok;
post_config_update({move, Type, <<"top">>}, _NewSources, _OldSources, _AppEnvs) -> post_config_update(Cmd, NewSources, _OldSource, _AppEnvs) ->
InitedSources = lookup(), ok = do_post_update(Cmd, NewSources),
{Index, Source} = find_source_by_type(Type, InitedSources),
{Sources1, Sources2 } = lists:split(Index, InitedSources),
Sources3 = [Source] ++ lists:droplast(Sources1) ++ Sources2,
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Type, <<"bottom">>}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{Index, Source} = find_source_by_type(Type, InitedSources),
{Sources1, Sources2 } = lists:split(Index, InitedSources),
Sources3 = lists:droplast(Sources1) ++ Sources2 ++ [Source],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Type, #{<<"before">> := Before}}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{_, Source0} = find_source_by_type(Type, InitedSources),
{Index, Source1} = find_source_by_type(Before, InitedSources),
{Sources1, Sources2} = lists:split(Index, InitedSources),
Sources3 = lists:delete(Source0, lists:droplast(Sources1))
++ [Source0] ++ [Source1]
++ lists:delete(Source0, Sources2),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Type, #{<<"after">> := After}}, _NewSources, _OldSources, _AppEnvs) ->
InitedSources = lookup(),
{_, Source} = find_source_by_type(Type, InitedSources),
{Index, _} = find_source_by_type(After, InitedSources),
{Sources1, Sources2} = lists:split(Index, InitedSources),
Sources3 = lists:delete(Source, Sources1)
++ [Source]
++ lists:delete(Source, Sources2),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Sources3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({head, Sources}, _NewSources, _OldConf, _AppEnvs) ->
InitedSources = [init_source(R) || R <- check_sources(Sources)],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources ++ lookup()]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({tail, Sources}, _NewSources, _OldConf, _AppEnvs) ->
InitedSources = [init_source(R) || R <- check_sources(Sources)],
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({{replace_once, Type}, #{type := Type} = Source}, _NewSources, _OldConf, _AppEnvs) when is_map(Source) ->
OldInitedSources = lookup(),
{Index, OldSource} = find_source_by_type(Type, OldInitedSources),
case maps:get(type, OldSource, undefined) of
undefined -> ok;
file -> ok;
_ ->
#{annotations := #{id := Id}} = OldSource,
ok = emqx_resource:remove(Id)
end,
{OldSources1, OldSources2 } = lists:split(Index, OldInitedSources),
InitedSources = [init_source(R) || R <- check_sources([Source])],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldSources1) ++ InitedSources ++ OldSources2]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({{delete_once, Type}, _Source}, _NewSources, _OldConf, _AppEnvs) ->
OldInitedSources = lookup(),
{_, OldSource} = find_source_by_type(Type, OldInitedSources),
case OldSource of
#{annotations := #{id := Id}} ->
ok = emqx_resource:remove(Id);
_ -> ok
end,
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:delete(OldSource, OldInitedSources)]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update(_, NewSources, _OldConf, _AppEnvs) ->
%% overwrite the entire config!
OldInitedSources = lookup(),
InitedSources = [init_source(Source) || Source <- NewSources],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1),
lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id);
(_) -> ok
end, OldInitedSources),
ok = emqx_authz_cache:drain_cache(). ok = emqx_authz_cache:drain_cache().
%%-------------------------------------------------------------------- do_post_update({move, _Type, _Where} = Cmd, _NewSources) ->
%% Initialize source InitedSources = lookup(),
%%-------------------------------------------------------------------- MovedSources = do_update(Cmd, InitedSources),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [MovedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({head, Sources}, _NewSources) ->
InitedSources = init_sources(check_sources(Sources)),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources ++ lookup()]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({tail, Sources}, _NewSources) ->
InitedSources = init_sources(check_sources(Sources)),
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({{replace_once, Type}, #{type := Type} = Source}, _NewSources) when is_map(Source) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
ok = ensure_resource_deleted(OldSource),
InitedSources = init_sources(check_sources([Source])),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Front ++ InitedSources ++ Rear]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({{delete_once, Type}, _Source}, _NewSources) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
ok = ensure_resource_deleted(OldSource),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, Front ++ Rear}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update(_, NewSources) ->
%% overwrite the entire config!
OldInitedSources = lookup(),
InitedSources = init_sources(NewSources),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1),
lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
ok = emqx_authz_cache:drain_cache().
ensure_resource_deleted(#{type := file}) -> ok;
ensure_resource_deleted(#{type := 'built-in-database'}) -> ok;
ensure_resource_deleted(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove(Id).
check_dup_types(Sources) -> check_dup_types(Sources) ->
check_dup_types(Sources, ?SOURCE_TYPES). check_dup_types(Sources, []).
check_dup_types(_Sources, []) -> ok;
check_dup_types(Sources, [T0 | Tail]) -> check_dup_types([], _Checked) -> ok;
case lists:foldl(fun (#{type := T1}, AccIn) -> check_dup_types([Source | Sources], Checked) ->
case T0 =:= T1 of %% the input might be raw or type-checked result, so lookup both 'type' and <<"type">>
true -> AccIn + 1; %% TODO: check: really?
false -> AccIn Type = case maps:get(<<"type">>, Source, maps:get(type, Source, undefined)) of
end; undefined ->
(#{<<"type">> := T1}, AccIn) -> %% this should never happen if the value is type checked by honcon schema
case T0 =:= atom(T1) of error({bad_source_input, Source});
true -> AccIn + 1; Type0 ->
false -> AccIn type(Type0)
end end,
end, 0, Sources) > 1 of case lists:member(Type, Checked) of
true -> true ->
?LOG(error, "The type is duplicated in the Authorization source"), %% we have made it clear not to support more than one authz instance for each type
{error, 'The type is duplicated in the Authorization source'}; error({duplicated_authz_source_type, Type});
false -> check_dup_types(Sources, Tail) false ->
check_dup_types(Sources, [Type | Checked])
end. end.
init_source(#{enable := true, init_sources(Sources) ->
type := file, {Enabled, Disabled} = lists:partition(fun(#{enable := Enable}) -> Enable end, Sources),
case Disabled =/= [] of
true -> ?SLOG(info, #{msg => "disabled_sources_ignored", sources => Disabled});
false -> ok
end,
lists:map(fun init_source/1, Enabled).
init_source(#{type := file,
path := Path path := Path
} = Source) -> } = Source) ->
Rules = case file:consult(Path) of Rules = case file:consult(Path) of
@ -288,8 +210,7 @@ init_source(#{enable := true,
error(Reason) error(Reason)
end, end,
Source#{annotations => #{rules => Rules}}; Source#{annotations => #{rules => Rules}};
init_source(#{enable := true, init_source(#{type := http,
type := http,
url := Url url := Url
} = Source) -> } = Source) ->
NSource= maps:put(base_url, maps:remove(query, Url), Source), NSource= maps:put(base_url, maps:remove(query, Url), Source),
@ -297,19 +218,17 @@ init_source(#{enable := true,
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
Id -> Source#{annotations => #{id => Id}} Id -> Source#{annotations => #{id => Id}}
end; end;
init_source(#{enable := true, init_source(#{type := 'built-in-database'
type := 'built-in-database' } = Source) ->
} = Source) -> Source; Source;
init_source(#{enable := true, init_source(#{type := DB
type := DB
} = Source) when DB =:= redis; } = Source) when DB =:= redis;
DB =:= mongodb -> DB =:= mongodb ->
case create_resource(Source) of case create_resource(Source) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
Id -> Source#{annotations => #{id => Id}} Id -> Source#{annotations => #{id => Id}}
end; end;
init_source(#{enable := true, init_source(#{type := DB,
type := DB,
query := SQL query := SQL
} = Source) when DB =:= mysql; } = Source) when DB =:= mysql;
DB =:= postgresql -> DB =:= postgresql ->
@ -321,8 +240,7 @@ init_source(#{enable := true,
query => Mod:parse_query(SQL) query => Mod:parse_query(SQL)
} }
} }
end; end.
init_source(#{enable := false} = Source) ->Source.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% AuthZ callbacks %% AuthZ callbacks
@ -376,13 +294,17 @@ check_sources(RawSources) ->
#{sources := Sources} = hocon_schema:check_plain(Schema, Conf, #{atom_key => true}), #{sources := Sources} = hocon_schema:check_plain(Schema, Conf, #{atom_key => true}),
Sources. Sources.
find_source_by_type(Type) -> find_source_by_type(Type, lookup()). take(Type) -> take(Type, lookup()).
find_source_by_type(Type, Sources) -> find_source_by_type(Type, Sources, 1).
find_source_by_type(_, [], _N) -> error(not_found_source); %% Take the source of give type, the sources list is split into two parts
find_source_by_type(Type, [ Source = #{type := T} | Tail], N) -> %% front part and rear part.
case Type =:= T of take(Type, Sources) ->
true -> {N, Source}; {Front, Rear} = lists:splitwith(fun(T) -> type(T) =/= type(Type) end, Sources),
false -> find_source_by_type(Type, Tail, N + 1) case Rear =:= [] of
true ->
error({authz_source_of_type_not_found, Type});
_ ->
{hd(Rear), Front, tl(Rear)}
end. end.
find_action_in_hooks() -> find_action_in_hooks() ->
@ -407,7 +329,8 @@ create_resource(#{type := DB} = Source) ->
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
authz_module('built-in-database') ->emqx_authz_mnesia; authz_module('built-in-database') ->
emqx_authz_mnesia;
authz_module(Type) -> authz_module(Type) ->
list_to_existing_atom("emqx_authz_" ++ atom_to_list(Type)). list_to_existing_atom("emqx_authz_" ++ atom_to_list(Type)).
@ -418,9 +341,20 @@ connector_module(postgresql) ->
connector_module(Type) -> connector_module(Type) ->
list_to_existing_atom("emqx_connector_" ++ atom_to_list(Type)). list_to_existing_atom("emqx_connector_" ++ atom_to_list(Type)).
atom(B) when is_binary(B) -> type(#{type := Type}) -> type(Type);
try binary_to_existing_atom(B, utf8) type(#{<<"type">> := Type}) -> type(Type);
catch type(file) -> file;
_ -> binary_to_atom(B) type(<<"file">>) -> file;
end; type(http) -> http;
atom(A) when is_atom(A) -> A. type(<<"http">>) -> http;
type(mongodb) -> mongodb;
type(<<"mongodb">>) -> mongodb;
type(mysql) -> mysql;
type(<<"mysql">>) -> mysql;
type(redis) -> redis;
type(<<"redis">>) -> redis;
type(postgresql) -> postgresql;
type(<<"postgresql">>) -> postgresql;
type('built-in-database') -> 'built-in-database';
type(<<"built-in-database">>) -> 'built-in-database';
type(Unknown) -> error({unknown_authz_source_type, Unknown}). % should never happend if the input is type-checked by hocon schema

View File

@ -52,7 +52,8 @@ fields(file) ->
true -> ok; true -> ok;
_ -> {error, "File does not exist"} _ -> {error, "File does not exist"}
end end
end end,
desc => "Path to the file which contains the ACL rules."
}} }}
]; ];
fields(http_get) -> fields(http_get) ->