Merge pull request #5342 from zmstone/refactor-config-store-per-root
refactor(emqx_config): store configs per root name
This commit is contained in:
commit
dcfc705811
|
@ -155,9 +155,12 @@ get_alarms(deactivated) ->
|
||||||
|
|
||||||
pre_config_update(#{<<"validity_period">> := Period0} = NewConf, OldConf) ->
|
pre_config_update(#{<<"validity_period">> := Period0} = NewConf, OldConf) ->
|
||||||
?MODULE ! {update_timer, hocon_postprocess:duration(Period0)},
|
?MODULE ! {update_timer, hocon_postprocess:duration(Period0)},
|
||||||
maps:merge(OldConf, NewConf);
|
merge(OldConf, NewConf);
|
||||||
pre_config_update(NewConf, OldConf) ->
|
pre_config_update(NewConf, OldConf) ->
|
||||||
maps:merge(OldConf, NewConf).
|
merge(OldConf, NewConf).
|
||||||
|
|
||||||
|
merge(undefined, New) -> New;
|
||||||
|
merge(Old, New) -> maps:merge(Old, New).
|
||||||
|
|
||||||
format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
|
format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
|
||||||
Now = erlang:system_time(microsecond),
|
Now = erlang:system_time(microsecond),
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
, stop/1
|
, stop/1
|
||||||
, get_description/0
|
, get_description/0
|
||||||
, get_release/0
|
, get_release/0
|
||||||
|
, set_init_config_load_done/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
@ -43,8 +44,8 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
emqx_config:load(),
|
ok = maybe_load_config(),
|
||||||
set_backtrace_depth(),
|
ok = set_backtrace_depth(),
|
||||||
print_otp_version_warning(),
|
print_otp_version_warning(),
|
||||||
print_banner(),
|
print_banner(),
|
||||||
%% Load application first for ekka_mnesia scanner
|
%% Load application first for ekka_mnesia scanner
|
||||||
|
@ -71,6 +72,22 @@ prep_stop(_State) ->
|
||||||
|
|
||||||
stop(_State) -> ok.
|
stop(_State) -> ok.
|
||||||
|
|
||||||
|
%% @doc Call this function to make emqx boot without loading config,
|
||||||
|
%% in case we want to delegate the config load to a higher level app
|
||||||
|
%% which manages emqx app.
|
||||||
|
set_init_config_load_done() ->
|
||||||
|
application:set_env(emqx, init_config_load_done, true).
|
||||||
|
|
||||||
|
maybe_load_config() ->
|
||||||
|
case application:get_env(emqx, init_config_load_done, false) of
|
||||||
|
true ->
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
%% the app env 'config_files' should be set before emqx get started.
|
||||||
|
ConfFiles = application:get_env(emqx, config_files, []),
|
||||||
|
emqx_config:init_load(emqx_schema, ConfFiles)
|
||||||
|
end.
|
||||||
|
|
||||||
set_backtrace_depth() ->
|
set_backtrace_depth() ->
|
||||||
Depth = emqx_config:get([node, backtrace_depth]),
|
Depth = emqx_config:get([node, backtrace_depth]),
|
||||||
_ = erlang:system_flag(backtrace_depth, Depth),
|
_ = erlang:system_flag(backtrace_depth, Depth),
|
||||||
|
|
|
@ -15,9 +15,9 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_config).
|
-module(emqx_config).
|
||||||
|
|
||||||
-compile({no_auto_import, [get/0, get/1]}).
|
-compile({no_auto_import, [get/0, get/1, put/2]}).
|
||||||
|
|
||||||
-export([ load/0
|
-export([ init_load/2
|
||||||
, read_override_conf/0
|
, read_override_conf/0
|
||||||
, save_configs/2
|
, save_configs/2
|
||||||
, save_to_app_env/1
|
, save_to_app_env/1
|
||||||
|
@ -27,8 +27,10 @@
|
||||||
, to_plainmap/1
|
, to_plainmap/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ get/0
|
-export([get_root/1,
|
||||||
, get/1
|
get_root_raw/1]).
|
||||||
|
|
||||||
|
-export([ get/1
|
||||||
, get/2
|
, get/2
|
||||||
, find/1
|
, find/1
|
||||||
, put/1
|
, put/1
|
||||||
|
@ -52,15 +54,14 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% raw configs is the config that is now parsed and tranlated by hocon schema
|
%% raw configs is the config that is now parsed and tranlated by hocon schema
|
||||||
-export([ get_raw/0
|
-export([ get_raw/1
|
||||||
, get_raw/1
|
|
||||||
, get_raw/2
|
, get_raw/2
|
||||||
, put_raw/1
|
, put_raw/1
|
||||||
, put_raw/2
|
, put_raw/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(CONF, ?MODULE).
|
-define(CONF, fun(ROOT) -> {?MODULE, bin(ROOT)} end).
|
||||||
-define(RAW_CONF, {?MODULE, raw}).
|
-define(RAW_CONF, fun(ROOT) -> {?MODULE, raw, bin(ROOT)} end).
|
||||||
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
|
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
|
||||||
-define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]).
|
-define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]).
|
||||||
|
|
||||||
|
@ -69,22 +70,28 @@
|
||||||
-type raw_config() :: #{binary() => term()} | undefined.
|
-type raw_config() :: #{binary() => term()} | undefined.
|
||||||
-type config() :: #{atom() => term()} | undefined.
|
-type config() :: #{atom() => term()} | undefined.
|
||||||
|
|
||||||
-spec get() -> map().
|
%% @doc For the given path, get root value enclosed in a single-key map.
|
||||||
get() ->
|
-spec get_root(emqx_map_lib:config_key_path()) -> map().
|
||||||
persistent_term:get(?CONF, #{}).
|
get_root([RootName | _]) ->
|
||||||
|
#{RootName => do_get(?CONF, [RootName], #{})}.
|
||||||
|
|
||||||
|
%% @doc For the given path, get raw root value enclosed in a single-key map.
|
||||||
|
%% key is ensured to be binary.
|
||||||
|
get_root_raw([RootName | _]) ->
|
||||||
|
#{bin(RootName) => do_get(?RAW_CONF, [RootName], #{})}.
|
||||||
|
|
||||||
|
%% @doc Get a config value for the given path.
|
||||||
|
%% The path should at least include root config name.
|
||||||
-spec get(emqx_map_lib:config_key_path()) -> term().
|
-spec get(emqx_map_lib:config_key_path()) -> term().
|
||||||
get(KeyPath) ->
|
get(KeyPath) -> do_get(?CONF, KeyPath).
|
||||||
emqx_map_lib:deep_get(KeyPath, get()).
|
|
||||||
|
|
||||||
-spec get(emqx_map_lib:config_key_path(), term()) -> term().
|
-spec get(emqx_map_lib:config_key_path(), term()) -> term().
|
||||||
get(KeyPath, Default) ->
|
get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default).
|
||||||
emqx_map_lib:deep_get(KeyPath, get(), Default).
|
|
||||||
|
|
||||||
-spec find(emqx_map_lib:config_key_path()) ->
|
-spec find(emqx_map_lib:config_key_path()) ->
|
||||||
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
|
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
|
||||||
find(KeyPath) ->
|
find(KeyPath) ->
|
||||||
emqx_map_lib:deep_find(KeyPath, get()).
|
emqx_map_lib:deep_find(KeyPath, get_root(KeyPath)).
|
||||||
|
|
||||||
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term().
|
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term().
|
||||||
get_zone_conf(Zone, KeyPath) ->
|
get_zone_conf(Zone, KeyPath) ->
|
||||||
|
@ -122,11 +129,12 @@ find_listener_conf(Zone, Listener, KeyPath) ->
|
||||||
|
|
||||||
-spec put(map()) -> ok.
|
-spec put(map()) -> ok.
|
||||||
put(Config) ->
|
put(Config) ->
|
||||||
persistent_term:put(?CONF, Config).
|
maps:fold(fun(RootName, RootValue, _) ->
|
||||||
|
?MODULE:put([RootName], RootValue)
|
||||||
|
end, [], Config).
|
||||||
|
|
||||||
-spec put(emqx_map_lib:config_key_path(), term()) -> ok.
|
-spec put(emqx_map_lib:config_key_path(), term()) -> ok.
|
||||||
put(KeyPath, Config) ->
|
put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config).
|
||||||
put(emqx_map_lib:deep_put(KeyPath, get(), Config)).
|
|
||||||
|
|
||||||
-spec update(emqx_map_lib:config_key_path(), update_request()) ->
|
-spec update(emqx_map_lib:config_key_path(), update_request()) ->
|
||||||
ok | {error, term()}.
|
ok | {error, term()}.
|
||||||
|
@ -137,37 +145,35 @@ update(ConfKeyPath, UpdateReq) ->
|
||||||
remove(ConfKeyPath) ->
|
remove(ConfKeyPath) ->
|
||||||
emqx_config_handler:remove_config(ConfKeyPath).
|
emqx_config_handler:remove_config(ConfKeyPath).
|
||||||
|
|
||||||
-spec get_raw() -> map().
|
|
||||||
get_raw() ->
|
|
||||||
persistent_term:get(?RAW_CONF, #{}).
|
|
||||||
|
|
||||||
-spec get_raw(emqx_map_lib:config_key_path()) -> term().
|
-spec get_raw(emqx_map_lib:config_key_path()) -> term().
|
||||||
get_raw(KeyPath) ->
|
get_raw(KeyPath) -> do_get(?RAW_CONF, KeyPath).
|
||||||
emqx_map_lib:deep_get(KeyPath, get_raw()).
|
|
||||||
|
|
||||||
-spec get_raw(emqx_map_lib:config_key_path(), term()) -> term().
|
-spec get_raw(emqx_map_lib:config_key_path(), term()) -> term().
|
||||||
get_raw(KeyPath, Default) ->
|
get_raw(KeyPath, Default) -> do_get(?RAW_CONF, KeyPath, Default).
|
||||||
emqx_map_lib:deep_get(KeyPath, get_raw(), Default).
|
|
||||||
|
|
||||||
-spec put_raw(map()) -> ok.
|
-spec put_raw(map()) -> ok.
|
||||||
put_raw(Config) ->
|
put_raw(Config) ->
|
||||||
persistent_term:put(?RAW_CONF, Config).
|
maps:fold(fun(RootName, RootV, _) ->
|
||||||
|
?MODULE:put_raw([RootName], RootV)
|
||||||
|
end, [], hocon_schema:get_value([], Config)).
|
||||||
|
|
||||||
-spec put_raw(emqx_map_lib:config_key_path(), term()) -> ok.
|
-spec put_raw(emqx_map_lib:config_key_path(), term()) -> ok.
|
||||||
put_raw(KeyPath, Config) ->
|
put_raw(KeyPath, Config) -> do_put(?RAW_CONF, KeyPath, Config).
|
||||||
put_raw(emqx_map_lib:deep_put(KeyPath, get_raw(), Config)).
|
|
||||||
|
|
||||||
%%============================================================================
|
%%============================================================================
|
||||||
%% Load/Update configs From/To files
|
%% Load/Update configs From/To files
|
||||||
%%============================================================================
|
%%============================================================================
|
||||||
load() ->
|
|
||||||
%% the app env 'config_files' should be set before emqx get started.
|
%% @doc Initial load of the given config files.
|
||||||
ConfFiles = application:get_env(emqx, config_files, []),
|
%% NOTE: The order of the files is significant, configs from files orderd
|
||||||
|
%% in the rear of the list overrides prior values.
|
||||||
|
-spec init_load(module(), [string()]) -> ok.
|
||||||
|
init_load(SchemaModule, ConfFiles) ->
|
||||||
RawRichConf = lists:foldl(fun(ConfFile, Acc) ->
|
RawRichConf = lists:foldl(fun(ConfFile, Acc) ->
|
||||||
Raw = load_hocon_file(ConfFile, richmap),
|
Raw = load_hocon_file(ConfFile, richmap),
|
||||||
emqx_map_lib:deep_merge(Acc, Raw)
|
emqx_map_lib:deep_merge(Acc, Raw)
|
||||||
end, #{}, ConfFiles),
|
end, #{}, ConfFiles),
|
||||||
{_MappedEnvs, RichConf} = hocon_schema:map_translate(emqx_schema, RawRichConf, #{}),
|
{_MappedEnvs, RichConf} = hocon_schema:map_translate(SchemaModule, RawRichConf, #{}),
|
||||||
ok = save_to_emqx_config(to_plainmap(RichConf), to_plainmap(RawRichConf)).
|
ok = save_to_emqx_config(to_plainmap(RichConf), to_plainmap(RawRichConf)).
|
||||||
|
|
||||||
-spec read_override_conf() -> raw_config().
|
-spec read_override_conf() -> raw_config().
|
||||||
|
@ -180,7 +186,8 @@ save_configs(RawConf, OverrideConf) ->
|
||||||
%% We may need also support hot config update for the apps that use application envs.
|
%% We may need also support hot config update for the apps that use application envs.
|
||||||
%% If that is the case uncomment the following line to update the configs to application env
|
%% If that is the case uncomment the following line to update the configs to application env
|
||||||
%save_to_app_env(_MappedEnvs),
|
%save_to_app_env(_MappedEnvs),
|
||||||
save_to_emqx_config(to_plainmap(RichConf), RawConf),
|
Conf = maps:with(maps:keys(RawConf), to_plainmap(RichConf)),
|
||||||
|
save_to_emqx_config(Conf, RawConf),
|
||||||
save_to_override_conf(OverrideConf).
|
save_to_override_conf(OverrideConf).
|
||||||
|
|
||||||
-spec save_to_app_env([tuple()]) -> ok.
|
-spec save_to_app_env([tuple()]) -> ok.
|
||||||
|
@ -191,8 +198,8 @@ save_to_app_env(AppEnvs) ->
|
||||||
|
|
||||||
-spec save_to_emqx_config(config(), raw_config()) -> ok.
|
-spec save_to_emqx_config(config(), raw_config()) -> ok.
|
||||||
save_to_emqx_config(Conf, RawConf) ->
|
save_to_emqx_config(Conf, RawConf) ->
|
||||||
emqx_config:put(emqx_map_lib:unsafe_atom_key_map(Conf)),
|
?MODULE:put(emqx_map_lib:unsafe_atom_key_map(Conf)),
|
||||||
emqx_config:put_raw(RawConf).
|
?MODULE:put_raw(RawConf).
|
||||||
|
|
||||||
-spec save_to_override_conf(raw_config()) -> ok | {error, term()}.
|
-spec save_to_override_conf(raw_config()) -> ok | {error, term()}.
|
||||||
save_to_override_conf(RawConf) ->
|
save_to_override_conf(RawConf) ->
|
||||||
|
@ -214,7 +221,7 @@ load_hocon_file(FileName, LoadType) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
emqx_override_conf_name() ->
|
emqx_override_conf_name() ->
|
||||||
filename:join([emqx_config:get([node, data_dir]), "emqx_override.conf"]).
|
filename:join([?MODULE:get([node, data_dir]), "emqx_override.conf"]).
|
||||||
|
|
||||||
to_richmap(Map) ->
|
to_richmap(Map) ->
|
||||||
{ok, RichMap} = hocon:binary(jsx:encode(Map), #{format => richmap}),
|
{ok, RichMap} = hocon:binary(jsx:encode(Map), #{format => richmap}),
|
||||||
|
@ -222,3 +229,25 @@ to_richmap(Map) ->
|
||||||
|
|
||||||
to_plainmap(RichMap) ->
|
to_plainmap(RichMap) ->
|
||||||
hocon_schema:richmap_to_map(RichMap).
|
hocon_schema:richmap_to_map(RichMap).
|
||||||
|
|
||||||
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
||||||
|
do_get(PtKey, KeyPath) ->
|
||||||
|
Ref = make_ref(),
|
||||||
|
Res = do_get(PtKey, KeyPath, Ref),
|
||||||
|
case Res =:= Ref of
|
||||||
|
true -> error({config_not_found, KeyPath});
|
||||||
|
false -> Res
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_get(PtKey, [RootName], Default) ->
|
||||||
|
persistent_term:get(PtKey(RootName), Default);
|
||||||
|
do_get(PtKey, [RootName | KeyPath], Default) ->
|
||||||
|
RootV = persistent_term:get(PtKey(RootName), #{}),
|
||||||
|
emqx_map_lib:deep_get(KeyPath, RootV, Default).
|
||||||
|
|
||||||
|
do_put(PtKey, [RootName | KeyPath], DeepValue) ->
|
||||||
|
OldValue = do_get(PtKey, [RootName], #{}),
|
||||||
|
NewValue = emqx_map_lib:deep_put(KeyPath, OldValue, DeepValue),
|
||||||
|
persistent_term:put(PtKey(RootName), NewValue).
|
||||||
|
|
|
@ -86,12 +86,12 @@ handle_call({add_child, ConfKeyPath, HandlerName}, _From,
|
||||||
|
|
||||||
handle_call({update_config, ConfKeyPath, UpdateReq}, _From,
|
handle_call({update_config, ConfKeyPath, UpdateReq}, _From,
|
||||||
#{handlers := Handlers} = State) ->
|
#{handlers := Handlers} = State) ->
|
||||||
OldConf = emqx_config:get(),
|
OldConf = emqx_config:get_root(ConfKeyPath),
|
||||||
OldRawConf = emqx_config:get_raw(),
|
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
|
||||||
try NewRawConf = do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq),
|
try NewRawConf = do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq),
|
||||||
OverrideConf = update_override_config(ConfKeyPath, NewRawConf),
|
OverrideConf = update_override_config(NewRawConf),
|
||||||
Result = emqx_config:save_configs(NewRawConf, OverrideConf),
|
Result = emqx_config:save_configs(NewRawConf, OverrideConf),
|
||||||
do_post_config_update(ConfKeyPath, Handlers, OldConf, emqx_config:get()),
|
do_post_config_update(ConfKeyPath, Handlers, OldConf, emqx_config:get_root(ConfKeyPath)),
|
||||||
{reply, Result, State}
|
{reply, Result, State}
|
||||||
catch
|
catch
|
||||||
Error : Reason : ST ->
|
Error : Reason : ST ->
|
||||||
|
@ -100,13 +100,13 @@ handle_call({update_config, ConfKeyPath, UpdateReq}, _From,
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({remove_config, ConfKeyPath}, _From, #{handlers := Handlers} = State) ->
|
handle_call({remove_config, ConfKeyPath}, _From, #{handlers := Handlers} = State) ->
|
||||||
OldConf = emqx_config:get(),
|
OldConf = emqx_config:get_root(ConfKeyPath),
|
||||||
OldRawConf = emqx_config:get_raw(),
|
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
|
||||||
BinKeyPath = bin_path(ConfKeyPath),
|
BinKeyPath = bin_path(ConfKeyPath),
|
||||||
try NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
|
try NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
|
||||||
OverrideConf = emqx_map_lib:deep_remove(BinKeyPath, emqx_config:read_override_conf()),
|
OverrideConf = emqx_map_lib:deep_remove(BinKeyPath, emqx_config:read_override_conf()),
|
||||||
Result = emqx_config:save_configs(NewRawConf, OverrideConf),
|
Result = emqx_config:save_configs(NewRawConf, OverrideConf),
|
||||||
do_post_config_update(ConfKeyPath, Handlers, OldConf, emqx_config:get()),
|
do_post_config_update(ConfKeyPath, Handlers, OldConf, emqx_config:get_root(ConfKeyPath)),
|
||||||
{reply, Result, State}
|
{reply, Result, State}
|
||||||
catch
|
catch
|
||||||
Error : Reason : ST ->
|
Error : Reason : ST ->
|
||||||
|
@ -176,18 +176,11 @@ merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf)
|
||||||
merge_to_old_config(UpdateReq, _RawConf) ->
|
merge_to_old_config(UpdateReq, _RawConf) ->
|
||||||
UpdateReq.
|
UpdateReq.
|
||||||
|
|
||||||
update_override_config(ConfKeyPath, RawConf) ->
|
update_override_config(RawConf) ->
|
||||||
%% We don't save the entire config to emqx_override.conf, but only the part
|
|
||||||
%% specified by the ConfKeyPath
|
|
||||||
PartialConf = maps:with(root_keys(ConfKeyPath), RawConf),
|
|
||||||
OldConf = emqx_config:read_override_conf(),
|
OldConf = emqx_config:read_override_conf(),
|
||||||
maps:merge(OldConf, PartialConf).
|
maps:merge(OldConf, RawConf).
|
||||||
|
|
||||||
root_keys([]) -> [];
|
|
||||||
root_keys([RootKey | _]) -> [bin(RootKey)].
|
|
||||||
|
|
||||||
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
|
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
|
||||||
|
|
||||||
bin(A) when is_atom(A) -> list_to_binary(atom_to_list(A));
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||||
bin(B) when is_binary(B) -> B;
|
bin(B) when is_binary(B) -> B.
|
||||||
bin(S) when is_list(S) -> list_to_binary(S).
|
|
||||||
|
|
|
@ -306,7 +306,7 @@ merge_zone_and_listener_confs(ZoneConf, ListenerConf) ->
|
||||||
|
|
||||||
apply_on_listener(ListenerId, Do) ->
|
apply_on_listener(ListenerId, Do) ->
|
||||||
{ZoneName, ListenerName} = decode_listener_id(ListenerId),
|
{ZoneName, ListenerName} = decode_listener_id(ListenerId),
|
||||||
case emqx_config:find([zones, ZoneName, listeners, ListenerName]) of
|
case emqx_config:find_listener_conf(ZoneName, ListenerName, []) of
|
||||||
{not_found, _, _} -> error({not_found, ListenerId});
|
{not_found, _, _} -> error({listener_config_not_found, ZoneName, ListenerName});
|
||||||
{ok, Conf} -> Do(ZoneName, ListenerName, Conf)
|
{ok, Conf} -> Do(ZoneName, ListenerName, Conf)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -32,9 +32,11 @@
|
||||||
%%-----------------------------------------------------------------
|
%%-----------------------------------------------------------------
|
||||||
-spec deep_get(config_key_path(), map()) -> term().
|
-spec deep_get(config_key_path(), map()) -> term().
|
||||||
deep_get(ConfKeyPath, Map) ->
|
deep_get(ConfKeyPath, Map) ->
|
||||||
case deep_find(ConfKeyPath, Map) of
|
Ref = make_ref(),
|
||||||
{not_found, KeyPath, Data} -> error({not_found, KeyPath, Data});
|
Res = deep_get(ConfKeyPath, Map, Ref),
|
||||||
{ok, Data} -> Data
|
case Res =:= Ref of
|
||||||
|
true -> error({config_not_found, ConfKeyPath});
|
||||||
|
false -> Res
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec deep_get(config_key_path(), map(), term()) -> term().
|
-spec deep_get(config_key_path(), map(), term()) -> term().
|
||||||
|
|
|
@ -153,10 +153,8 @@ fields("node") ->
|
||||||
sensitive => true,
|
sensitive => true,
|
||||||
override_env => "EMQX_NODE_COOKIE"
|
override_env => "EMQX_NODE_COOKIE"
|
||||||
})}
|
})}
|
||||||
, {"data_dir", t(string(), undefined, undefined)}
|
, {"data_dir", t(string())}
|
||||||
, {"config_files", t(list(string()), "emqx.config_files",
|
, {"config_files", t(comma_separated_list())}
|
||||||
[ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
|
|
||||||
])}
|
|
||||||
, {"global_gc_interval", t(duration(), undefined, "15m")}
|
, {"global_gc_interval", t(duration(), undefined, "15m")}
|
||||||
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
|
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
|
||||||
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
|
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
|
||||||
|
@ -511,14 +509,31 @@ base_listener() ->
|
||||||
, {"rate_limit", ref("rate_limit")}
|
, {"rate_limit", ref("rate_limit")}
|
||||||
].
|
].
|
||||||
|
|
||||||
translations() -> ["ekka", "kernel"].
|
translations() -> ["ekka", "kernel", "emqx"].
|
||||||
|
|
||||||
translation("ekka") ->
|
translation("ekka") ->
|
||||||
[ {"cluster_discovery", fun tr_cluster__discovery/1}];
|
[ {"cluster_discovery", fun tr_cluster__discovery/1}];
|
||||||
|
|
||||||
translation("kernel") ->
|
translation("kernel") ->
|
||||||
[ {"logger_level", fun tr_logger_level/1}
|
[ {"logger_level", fun tr_logger_level/1}
|
||||||
, {"logger", fun tr_logger/1}].
|
, {"logger", fun tr_logger/1}];
|
||||||
|
|
||||||
|
translation("emqx") ->
|
||||||
|
[ {"config_files", fun tr_config_files/1}
|
||||||
|
].
|
||||||
|
|
||||||
|
tr_config_files(Conf) ->
|
||||||
|
case conf_get("emqx.config_files", Conf) of
|
||||||
|
[_ | _] = Files ->
|
||||||
|
Files;
|
||||||
|
_ ->
|
||||||
|
case os:getenv("RUNNER_ETC_DIR") of
|
||||||
|
false ->
|
||||||
|
[filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])];
|
||||||
|
Dir ->
|
||||||
|
[filename:join([Dir, "emqx.conf"])]
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
tr_cluster__discovery(Conf) ->
|
tr_cluster__discovery(Conf) ->
|
||||||
Strategy = conf_get("cluster.discovery_strategy", Conf),
|
Strategy = conf_get("cluster.discovery_strategy", Conf),
|
||||||
|
@ -579,7 +594,7 @@ tr_logger(Conf) ->
|
||||||
filters => Filters,
|
filters => Filters,
|
||||||
filesync_repeat_interval => no_repeat
|
filesync_repeat_interval => no_repeat
|
||||||
}}
|
}}
|
||||||
|| {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf))],
|
|| {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf, #{}))],
|
||||||
|
|
||||||
[{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
|
[{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
ct:pal("------------config: ~p", [emqx_config:get()]),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
t_check_pub(_) ->
|
t_check_pub(_) ->
|
||||||
OldConf = emqx_config:get(),
|
OldConf = emqx_config:get([zones]),
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
|
emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
|
||||||
emqx_config:put_zone_conf(default, [mqtt, retain_available], false),
|
emqx_config:put_zone_conf(default, [mqtt, retain_available], false),
|
||||||
timer:sleep(50),
|
timer:sleep(50),
|
||||||
|
@ -36,10 +36,10 @@ t_check_pub(_) ->
|
||||||
PubFlags2 = #{qos => ?QOS_1, retain => true},
|
PubFlags2 = #{qos => ?QOS_1, retain => true},
|
||||||
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
|
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
|
||||||
emqx_mqtt_caps:check_pub(default, PubFlags2)),
|
emqx_mqtt_caps:check_pub(default, PubFlags2)),
|
||||||
emqx_config:put(OldConf).
|
emqx_config:put([zones], OldConf).
|
||||||
|
|
||||||
t_check_sub(_) ->
|
t_check_sub(_) ->
|
||||||
OldConf = emqx_config:get(),
|
OldConf = emqx_config:get([zones]),
|
||||||
SubOpts = #{rh => 0,
|
SubOpts = #{rh => 0,
|
||||||
rap => 0,
|
rap => 0,
|
||||||
nl => 0,
|
nl => 0,
|
||||||
|
@ -57,4 +57,4 @@ t_check_sub(_) ->
|
||||||
emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts)),
|
emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts)),
|
||||||
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||||
emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true})),
|
emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true})),
|
||||||
emqx_config:put(OldConf).
|
emqx_config:put([zones], OldConf).
|
||||||
|
|
|
@ -43,12 +43,13 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
|
||||||
%% important! let emqx_schema include the current app!
|
%% important! let emqx_schema include the current app!
|
||||||
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
||||||
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
||||||
|
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||||
|
|
||||||
%create_default_app(),
|
%create_default_app(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
|
@ -29,8 +29,6 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
|
||||||
|
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
|
|
||||||
|
@ -39,6 +37,8 @@ init_per_suite(Config) ->
|
||||||
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
||||||
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
||||||
|
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||||
|
|
||||||
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
||||||
ok = emqx_config:update([zones, default, authorization, enable], true),
|
ok = emqx_config:update([zones, default, authorization, enable], true),
|
||||||
Rules = [#{ <<"config">> => #{
|
Rules = [#{ <<"config">> => #{
|
||||||
|
|
|
@ -29,16 +29,15 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
|
||||||
|
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
|
||||||
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
|
||||||
|
|
||||||
%% important! let emqx_schema include the current app!
|
%% important! let emqx_schema include the current app!
|
||||||
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
||||||
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
||||||
|
|
||||||
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
|
||||||
|
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||||
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
||||||
ok = emqx_config:update([zones, default, authorization, enable], true),
|
ok = emqx_config:update([zones, default, authorization, enable], true),
|
||||||
Rules = [#{ <<"config">> => #{
|
Rules = [#{ <<"config">> => #{
|
||||||
|
|
|
@ -29,7 +29,6 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
|
||||||
|
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
|
@ -39,6 +38,8 @@ init_per_suite(Config) ->
|
||||||
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
||||||
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
||||||
|
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||||
|
|
||||||
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
||||||
ok = emqx_config:update([zones, default, authorization, enable], true),
|
ok = emqx_config:update([zones, default, authorization, enable], true),
|
||||||
Rules = [#{ <<"config">> => #{
|
Rules = [#{ <<"config">> => #{
|
||||||
|
|
|
@ -29,8 +29,6 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
|
||||||
|
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
|
|
||||||
|
@ -39,6 +37,8 @@ init_per_suite(Config) ->
|
||||||
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
||||||
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
||||||
|
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||||
|
|
||||||
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
||||||
ok = emqx_config:update([zones, default, authorization, enable], true),
|
ok = emqx_config:update([zones, default, authorization, enable], true),
|
||||||
Rules = [#{ <<"config">> => #{
|
Rules = [#{ <<"config">> => #{
|
||||||
|
|
|
@ -29,8 +29,6 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
|
||||||
|
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
|
|
||||||
|
@ -39,6 +37,8 @@ init_per_suite(Config) ->
|
||||||
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
meck:expect(emqx_schema, includes, fun() -> ["authorization"] end ),
|
||||||
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_authz_schema:fields(FieldName) end),
|
||||||
|
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_authz]),
|
||||||
|
|
||||||
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
|
||||||
ok = emqx_config:update([zones, default, authorization, enable], true),
|
ok = emqx_config:update([zones, default, authorization, enable], true),
|
||||||
Rules = [#{ <<"config">> => #{
|
Rules = [#{ <<"config">> => #{
|
||||||
|
|
|
@ -257,4 +257,8 @@ proc_name(Mod, Id) ->
|
||||||
list_to_atom(lists:concat([Mod, "_", Id])).
|
list_to_atom(lists:concat([Mod, "_", Id])).
|
||||||
|
|
||||||
pick(InstId) ->
|
pick(InstId) ->
|
||||||
gproc_pool:pick_worker(emqx_resource_instance, InstId).
|
Pid = gproc_pool:pick_worker(emqx_resource_instance, InstId),
|
||||||
|
case is_pid(Pid) of
|
||||||
|
true -> Pid;
|
||||||
|
false -> error({failed_to_pick_worker, emqx_resource_instance, InstId})
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue