fix(authz): support config hot upgrading
This commit is contained in:
parent
cde4b9092a
commit
b82693cc0b
|
@ -45,9 +45,10 @@
|
|||
-define(CONF, ?MODULE).
|
||||
-define(RAW_CONF, {?MODULE, raw}).
|
||||
|
||||
-export_type([update_request/0, raw_config/0]).
|
||||
-export_type([update_request/0, raw_config/0, config/0]).
|
||||
-type update_request() :: term().
|
||||
-type raw_config() :: hocon:config() | undefined.
|
||||
-type raw_config() :: #{binary() => term()} | undefined.
|
||||
-type config() :: #{atom() => term()} | undefined.
|
||||
|
||||
-spec get() -> map().
|
||||
get() ->
|
||||
|
|
|
@ -45,11 +45,15 @@
|
|||
-type handler_name() :: module().
|
||||
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
|
||||
|
||||
-optional_callbacks([handle_update_config/2]).
|
||||
-optional_callbacks([ handle_update_config/2
|
||||
, post_update_config/2
|
||||
]).
|
||||
|
||||
-callback handle_update_config(emqx_config:update_request(), emqx_config:raw_config()) ->
|
||||
emqx_config:update_request().
|
||||
|
||||
-callback post_update_config(emqx_config:config(), emqx_config:config()) -> any().
|
||||
|
||||
-type state() :: #{
|
||||
handlers := handlers(),
|
||||
atom() => term()
|
||||
|
@ -83,11 +87,12 @@ handle_call({add_child, ConfKeyPath, HandlerName}, _From,
|
|||
|
||||
handle_call({update_config, ConfKeyPath, UpdateReq, RawConf}, _From,
|
||||
#{handlers := Handlers} = State) ->
|
||||
OldConf = emqx_config:get(),
|
||||
try {RootKeys, Conf} = do_update_config(ConfKeyPath, Handlers, RawConf, UpdateReq),
|
||||
{reply, save_configs(RootKeys, Conf), State}
|
||||
Result = save_configs(RootKeys, Conf),
|
||||
do_post_update_config(ConfKeyPath, Handlers, OldConf, emqx_config:get()),
|
||||
{reply, Result, State}
|
||||
catch
|
||||
throw: Reason ->
|
||||
{reply, {error, Reason}, State};
|
||||
Error : Reason : ST ->
|
||||
?LOG(error, "update config failed: ~p", [{Error, Reason, ST}]),
|
||||
{reply, {error, Reason}, State}
|
||||
|
@ -109,26 +114,40 @@ terminate(_Reason, _State) ->
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
do_update_config([], Handlers, OldConf, UpdateReq) ->
|
||||
call_handle_update_config(Handlers, OldConf, UpdateReq);
|
||||
do_update_config([ConfKey | ConfKeyPath], Handlers, OldConf, UpdateReq) ->
|
||||
SubOldConf = get_sub_config(ConfKey, OldConf),
|
||||
do_update_config([], Handlers, OldRawConf, UpdateReq) ->
|
||||
call_handle_update_config(Handlers, OldRawConf, UpdateReq);
|
||||
do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) ->
|
||||
SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
|
||||
SubHandlers = maps:get(ConfKey, Handlers, #{}),
|
||||
NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldConf, UpdateReq),
|
||||
call_handle_update_config(Handlers, OldConf, #{bin(ConfKey) => NewUpdateReq}).
|
||||
NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq),
|
||||
call_handle_update_config(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq}).
|
||||
|
||||
get_sub_config(_, undefined) ->
|
||||
undefined;
|
||||
get_sub_config(ConfKey, OldConf) when is_map(OldConf) ->
|
||||
maps:get(bin(ConfKey), OldConf, undefined);
|
||||
get_sub_config(_, OldConf) ->
|
||||
OldConf.
|
||||
do_post_update_config([], Handlers, OldConf, NewConf) ->
|
||||
call_post_update_config(Handlers, OldConf, NewConf);
|
||||
do_post_update_config([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf) ->
|
||||
SubOldConf = get_sub_config(ConfKey, OldConf),
|
||||
SubNewConf = get_sub_config(ConfKey, NewConf),
|
||||
SubHandlers = maps:get(ConfKey, Handlers, #{}),
|
||||
_ = do_post_update_config(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf),
|
||||
call_post_update_config(Handlers, OldConf, NewConf).
|
||||
|
||||
call_handle_update_config(Handlers, OldConf, UpdateReq) ->
|
||||
get_sub_config(ConfKey, Conf) when is_map(Conf) ->
|
||||
maps:get(ConfKey, Conf, undefined);
|
||||
get_sub_config(_, _Conf) -> %% the Conf is a primitive
|
||||
undefined.
|
||||
|
||||
call_handle_update_config(Handlers, OldRawConf, UpdateReq) ->
|
||||
HandlerName = maps:get(?MOD, Handlers, undefined),
|
||||
case erlang:function_exported(HandlerName, handle_update_config, 2) of
|
||||
true -> HandlerName:handle_update_config(UpdateReq, OldConf);
|
||||
false -> merge_to_old_config(UpdateReq, OldConf)
|
||||
true -> HandlerName:handle_update_config(UpdateReq, OldRawConf);
|
||||
false -> merge_to_old_config(UpdateReq, OldRawConf)
|
||||
end.
|
||||
|
||||
call_post_update_config(Handlers, OldConf, NewConf) ->
|
||||
HandlerName = maps:get(?MOD, Handlers, undefined),
|
||||
case erlang:function_exported(HandlerName, post_update_config, 2) of
|
||||
true -> _ = HandlerName:post_update_config(NewConf, OldConf);
|
||||
false -> ok
|
||||
end.
|
||||
|
||||
%% callbacks for the top-level handler
|
||||
|
|
|
@ -63,9 +63,6 @@ structs() -> ["cluster", "node", "rpc", "log", "lager",
|
|||
"plugins", "sysmon", "alarm"]
|
||||
++ includes().
|
||||
|
||||
-ifdef(TEST).
|
||||
includes() ->[].
|
||||
-else.
|
||||
includes() ->
|
||||
[ "emqx_data_bridge"
|
||||
, "emqx_telemetry"
|
||||
|
@ -78,7 +75,6 @@ includes() ->
|
|||
, "emqx_management"
|
||||
, "emqx_gateway"
|
||||
].
|
||||
-endif.
|
||||
|
||||
fields("cluster") ->
|
||||
[ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
|
||||
|
|
|
@ -47,7 +47,7 @@ emqx_authz:{
|
|||
# type: mongo
|
||||
# config: {
|
||||
# mongo_type: single
|
||||
# servers: "127.0.0.1:27017"
|
||||
# server: "127.0.0.1:27017"
|
||||
# pool_size: 1
|
||||
# database: mqtt
|
||||
# ssl: {enable: false}
|
||||
|
|
|
@ -26,12 +26,12 @@
|
|||
, init/0
|
||||
, init_rule/1
|
||||
, lookup/0
|
||||
, update/1
|
||||
, update/2
|
||||
, authorize/5
|
||||
, match/4
|
||||
]).
|
||||
|
||||
-export([handle_update_config/2]).
|
||||
-export([post_update_config/2, handle_update_config/2]).
|
||||
|
||||
-define(CONF_KEY_PATH, [emqx_authz, rules]).
|
||||
|
||||
|
@ -48,17 +48,28 @@ init() ->
|
|||
lookup() ->
|
||||
emqx_config:get(?CONF_KEY_PATH, []).
|
||||
|
||||
update(Rules) ->
|
||||
emqx_config:update_config(?CONF_KEY_PATH, Rules).
|
||||
update(Cmd, Rules) ->
|
||||
emqx_config:update_config(?CONF_KEY_PATH, {Cmd, Rules}).
|
||||
|
||||
%% For now we only support re-creating the entire rule list
|
||||
handle_update_config(Rules, _OldConf) ->
|
||||
InitedRules = [init_rule(Rule) || Rule <- Rules],
|
||||
handle_update_config({head, Rule}, OldConf) when is_map(Rule), is_list(OldConf) ->
|
||||
[Rule | OldConf];
|
||||
handle_update_config({tail, Rule}, OldConf) when is_map(Rule), is_list(OldConf) ->
|
||||
OldConf ++ [Rule];
|
||||
handle_update_config({_, NewConf}, _OldConf) ->
|
||||
%% overwrite the entire config!
|
||||
case is_list(NewConf) of
|
||||
true -> NewConf;
|
||||
false -> [NewConf]
|
||||
end.
|
||||
|
||||
post_update_config(NewRules, _OldConf) ->
|
||||
%_ = [release_rules(Rule) || Rule <- OldConf],
|
||||
InitedRules = [init_rule(Rule) || Rule <- NewRules],
|
||||
Action = find_action_in_hooks(),
|
||||
ok = emqx_hooks:del('client.authorize', Action),
|
||||
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [InitedRules]}, -1),
|
||||
ok = emqx_acl_cache:drain_cache(),
|
||||
Rules.
|
||||
ok = emqx_acl_cache:drain_cache().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
|
|
|
@ -56,28 +56,23 @@ lookup_authz(_Bindings, _Params) ->
|
|||
return({ok, emqx_authz:lookup()}).
|
||||
|
||||
update_authz(_Bindings, Params) ->
|
||||
Rules = get_rules(Params),
|
||||
return(emqx_authz:update(Rules)).
|
||||
Rules = form_rules(Params),
|
||||
return(emqx_authz:update(replace, Rules)).
|
||||
|
||||
append_authz(_Bindings, Params) ->
|
||||
Rules = get_rules(Params),
|
||||
NRules = lists:append(emqx_authz:lookup(), Rules),
|
||||
return(emqx_authz:update(NRules)).
|
||||
Rules = form_rules(Params),
|
||||
return(emqx_authz:update(tail, Rules)).
|
||||
|
||||
push_authz(_Bindings, Params) ->
|
||||
Rules = get_rules(Params),
|
||||
NRules = lists:append(Rules, emqx_authz:lookup()),
|
||||
return(emqx_authz:update(NRules)).
|
||||
Rules = form_rules(Params),
|
||||
return(emqx_authz:update(head, Rules)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Interval Funcs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
get_rules(Params) ->
|
||||
{ok, Conf} = hocon:binary(jsx:encode(#{<<"emqx_authz">> => Params}), #{format => richmap}),
|
||||
CheckConf = hocon_schema:check(emqx_authz_schema, Conf, #{atom_key => true}),
|
||||
#{emqx_authz := #{rules := Rules}} = hocon_schema:richmap_to_map(CheckConf),
|
||||
Rules.
|
||||
form_rules(Params) ->
|
||||
Params.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% EUnits
|
||||
|
|
|
@ -32,7 +32,7 @@ init_per_suite(Config) ->
|
|||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||
ok = emqx_config:update_config([zones, default, acl, cache, enable], false),
|
||||
ok = emqx_config:update_config([zones, default, acl, enable], true),
|
||||
emqx_authz:update([]),
|
||||
emqx_authz:update(replace, []),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
all() ->
|
||||
%% TODO: V5 API
|
||||
%% emqx_ct:all(?MODULE).
|
||||
[].
|
||||
[t_api_unit_test].
|
||||
|
||||
groups() ->
|
||||
[].
|
||||
|
@ -72,6 +72,23 @@ set_special_configs(_App) ->
|
|||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_api_unit_test(_Config) ->
|
||||
Rule1 = #{<<"principal">> =>
|
||||
#{<<"and">> => [#{<<"username">> => <<"^test?">>},
|
||||
#{<<"clientid">> => <<"^test?">>}
|
||||
]},
|
||||
<<"action">> => <<"subscribe">>,
|
||||
<<"topics">> => [<<"%u">>],
|
||||
<<"permission">> => <<"allow">>
|
||||
},
|
||||
ok = emqx_authz_api:push_authz(#{}, Rule1),
|
||||
[#{action := subscribe,
|
||||
permission := allow,
|
||||
principal :=
|
||||
#{'and' := [#{username := <<"^test?">>},
|
||||
#{clientid := <<"^test?">>}]},
|
||||
topics := [<<"%u">>]}] = emqx_config:get([emqx_authz, rules]).
|
||||
|
||||
t_api(_Config) ->
|
||||
Rule1 = #{<<"principal">> =>
|
||||
#{<<"and">> => [#{<<"username">> => <<"^test?">>},
|
||||
|
|
|
@ -35,13 +35,18 @@ init_per_suite(Config) ->
|
|||
ct:pal("---- emqx_hooks: ~p", [ets:tab2list(emqx_hooks)]),
|
||||
ok = emqx_config:update_config([zones, default, acl, cache, enable], false),
|
||||
ok = emqx_config:update_config([zones, default, acl, enable], true),
|
||||
Rules = [#{config =>#{},
|
||||
principal => all,
|
||||
collection => <<"fake">>,
|
||||
find => #{<<"a">> => <<"b">>},
|
||||
type => mongo}
|
||||
Rules = [#{ <<"config">> => #{
|
||||
<<"mongo_type">> => <<"single">>,
|
||||
<<"server">> => <<"127.0.0.1:27017">>,
|
||||
<<"pool_size">> => 1,
|
||||
<<"database">> => <<"mqtt">>,
|
||||
<<"ssl">> => #{<<"enable">> => false}},
|
||||
<<"principal">> => <<"all">>,
|
||||
<<"collection">> => <<"fake">>,
|
||||
<<"find">> => #{<<"a">> => <<"b">>},
|
||||
<<"type">> => <<"mongo">>}
|
||||
],
|
||||
emqx_authz:update(Rules),
|
||||
ok = emqx_authz:update(replace, Rules),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -34,12 +34,19 @@ init_per_suite(Config) ->
|
|||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||
ok = emqx_config:update_config([zones, default, acl, cache, enable], false),
|
||||
ok = emqx_config:update_config([zones, default, acl, enable], true),
|
||||
Rules = [#{config =>#{},
|
||||
principal => all,
|
||||
sql => <<"fake">>,
|
||||
type => mysql}
|
||||
],
|
||||
emqx_authz:update(Rules),
|
||||
Rules = [#{ <<"config">> => #{
|
||||
<<"server">> => <<"127.0.0.1:27017">>,
|
||||
<<"pool_size">> => 1,
|
||||
<<"database">> => <<"mqtt">>,
|
||||
<<"username">> => <<"xx">>,
|
||||
<<"password">> => <<"ee">>,
|
||||
<<"auto_reconnect">> => true,
|
||||
<<"ssl">> => #{<<"enable">> => false}
|
||||
},
|
||||
<<"principal">> => <<"all">>,
|
||||
<<"sql">> => <<"abcb">>,
|
||||
<<"type">> => <<"mysql">> }],
|
||||
emqx_authz:update(replace, Rules),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -34,12 +34,18 @@ init_per_suite(Config) ->
|
|||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||
ok = emqx_config:update_config([zones, default, acl, cache, enable], false),
|
||||
ok = emqx_config:update_config([zones, default, acl, enable], true),
|
||||
Rules = [#{config =>#{},
|
||||
principal => all,
|
||||
sql => <<"fake">>,
|
||||
type => pgsql}
|
||||
],
|
||||
emqx_authz:update(Rules),
|
||||
Rules = [#{ <<"config">> => #{
|
||||
<<"server">> => <<"127.0.0.1:27017">>,
|
||||
<<"pool_size">> => 1,
|
||||
<<"database">> => <<"mqtt">>,
|
||||
<<"username">> => <<"xx">>,
|
||||
<<"password">> => <<"ee">>,
|
||||
<<"auto_reconnect">> => true,
|
||||
<<"ssl">> => #{<<"enable">> => false}
|
||||
},
|
||||
<<"sql">> => <<"abcb">>,
|
||||
<<"type">> => <<"pgsql">> }],
|
||||
emqx_authz:update(replace, Rules),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -34,12 +34,17 @@ init_per_suite(Config) ->
|
|||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||
ok = emqx_config:update_config([zones, default, acl, cache, enable], false),
|
||||
ok = emqx_config:update_config([zones, default, acl, enable], true),
|
||||
Rules = [#{config =>#{},
|
||||
principal => all,
|
||||
cmd => <<"fake">>,
|
||||
type => redis}
|
||||
],
|
||||
emqx_authz:update(Rules),
|
||||
Rules = [#{ <<"config">> => #{
|
||||
<<"server">> => <<"127.0.0.1:27017">>,
|
||||
<<"pool_size">> => 1,
|
||||
<<"database">> => 0,
|
||||
<<"password">> => <<"ee">>,
|
||||
<<"auto_reconnect">> => true,
|
||||
<<"ssl">> => #{<<"enable">> => false}
|
||||
},
|
||||
<<"cmd">> => <<"HGETALL mqtt_acl:%u">>,
|
||||
<<"type">> => <<"redis">> }],
|
||||
emqx_authz:update(replace, Rules),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
Loading…
Reference in New Issue