diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl
index 5dedfd285..c4cab7ce9 100644
--- a/apps/emqx/src/emqx_config.erl
+++ b/apps/emqx/src/emqx_config.erl
@@ -256,7 +256,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
init_load(SchemaMod, parse_hocon(Conf));
init_load(SchemaMod, RawConf) when is_map(RawConf) ->
ok = save_schema_mod_and_names(SchemaMod),
- %% Merge environment varialbe overrides on top
+ %% Merge environment variable overrides on top
RawConfWithEnvs = merge_envs(SchemaMod, RawConf),
ClusterOverrides = read_override_conf(#{override_to => cluster}),
LocalOverrides = read_override_conf(#{override_to => local}),
@@ -267,7 +267,7 @@ init_load(SchemaMod, RawConf) when is_map(RawConf) ->
check_config(SchemaMod, RawConfWithOverrides , #{}),
RootNames = get_root_names(),
ok = save_to_config_map(maps:with(get_atom_root_names(), CheckedConf),
- maps:with(RootNames, RawConfWithEnvs)).
+ maps:with(RootNames, RawConfWithOverrides)).
parse_hocon(Conf) ->
IncDirs = include_dirs(),
@@ -301,7 +301,7 @@ merge_envs(SchemaMod, RawConf) ->
},
hocon_tconf:merge_env_overrides(SchemaMod, RawConf, all, Opts).
--spec check_config(module(), raw_config()) -> {AppEnvs, CheckedConf}
+-spec check_config(hocon_schema:schema(), raw_config()) -> {AppEnvs, CheckedConf}
when AppEnvs :: app_envs(), CheckedConf :: config().
check_config(SchemaMod, RawConf) ->
check_config(SchemaMod, RawConf, #{}).
@@ -454,7 +454,7 @@ do_get(Type, [], Default) ->
AccIn#{conf_key(Type0, RootName) => Conf};
(_, AccIn) -> AccIn
end, #{}, persistent_term:get()),
- case map_size(AllConf) == 0 of
+ case AllConf =:= #{} of
true -> Default;
false -> AllConf
end;
diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl
index 14b9567ee..af99cb732 100644
--- a/apps/emqx/src/emqx_config_handler.erl
+++ b/apps/emqx/src/emqx_config_handler.erl
@@ -18,6 +18,7 @@
-module(emqx_config_handler).
-include("logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
-behaviour(gen_server).
@@ -28,6 +29,7 @@
, remove_handler/1
, update_config/3
, get_raw_cluster_override_conf/0
+ , info/0
, merge_to_old_config/2
]).
@@ -74,9 +76,11 @@ update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity).
--spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
+-spec add_handler(emqx_config:config_key_path(), handler_name()) ->
+ ok | {error, {conflict, list()}}.
add_handler(ConfKeyPath, HandlerName) ->
- gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}, infinity).
+ assert_callback_function(HandlerName),
+ gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}).
%% @doc Remove handler asynchronously
-spec remove_handler(emqx_config:config_key_path()) -> ok.
@@ -86,19 +90,21 @@ remove_handler(ConfKeyPath) ->
get_raw_cluster_override_conf() ->
gen_server:call(?MODULE, get_raw_cluster_override_conf).
+info() ->
+ gen_server:call(?MODULE, info).
+
%%============================================================================
-spec init(term()) -> {ok, state()}.
init(_) ->
process_flag(trap_exit, true),
- {ok, #{handlers => #{?MOD => ?MODULE}}}.
+ Handlers = load_prev_handlers(),
+ {ok, #{handlers => Handlers#{?MOD => ?MODULE}}}.
handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers := Handlers}) ->
case deep_put_handler(ConfKeyPath, Handlers, HandlerName) of
- {ok, NewHandlers} ->
- {reply, ok, State#{handlers => NewHandlers}};
- Error ->
- {reply, Error, State}
+ {ok, NewHandlers} -> {reply, ok, State#{handlers => NewHandlers}};
+ {error, _Reason} = Error -> {reply, Error, State}
end;
handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
@@ -108,48 +114,62 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
handle_call(get_raw_cluster_override_conf, _From, State) ->
Reply = emqx_config:read_override_conf(#{override_to => cluster}),
{reply, Reply, State};
+handle_call(info, _From, State) ->
+ {reply, State, State};
handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
+ {reply, ok, State}.
-handle_cast({remove_handler, ConfKeyPath},
- State = #{handlers := Handlers}) ->
- {noreply, State#{handlers => emqx_map_lib:deep_remove(ConfKeyPath ++ [?MOD], Handlers)}};
+handle_cast({remove_handler, ConfKeyPath}, State = #{handlers := Handlers}) ->
+ NewHandlers = do_remove_handler(ConfKeyPath, Handlers),
+ {noreply, State#{handlers => NewHandlers}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(_Reason, #{handlers := Handlers}) ->
+ save_handlers(Handlers),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-deep_put_handler([], Handlers, Mod) when is_map(Handlers) ->
+deep_put_handler([], Handlers, Mod) ->
{ok, Handlers#{?MOD => Mod}};
-deep_put_handler([], _Handlers, Mod) ->
- {ok, #{?MOD => Mod}};
-deep_put_handler([?WKEY | KeyPath], Handlers, Mod) ->
- deep_put_handler2(?WKEY, KeyPath, Handlers, Mod);
deep_put_handler([Key | KeyPath], Handlers, Mod) ->
- case maps:find(?WKEY, Handlers) of
- error ->
- deep_put_handler2(Key, KeyPath, Handlers, Mod);
- {ok, _SubHandlers} ->
- {error, {cannot_override_a_wildcard_path, [?WKEY | KeyPath]}}
- end.
-
-deep_put_handler2(Key, KeyPath, Handlers, Mod) ->
SubHandlers = maps:get(Key, Handlers, #{}),
case deep_put_handler(KeyPath, SubHandlers, Mod) of
- {ok, SubHandlers1} ->
- {ok, Handlers#{Key => SubHandlers1}};
- Error ->
- Error
+ {ok, NewSubHandlers} ->
+ NewHandlers = Handlers#{Key => NewSubHandlers},
+ case check_handler_conflict(NewHandlers) of
+ ok -> {ok, NewHandlers};
+ {error, Reason} -> {error, Reason}
+ end;
+ {error, _Reason} = Error -> Error
end.
+%% Make sure that Specify Key and ?WKEY cannot be on the same level.
+%%
+%% [k1, ?, ?], [k1, ?], [k1] is allow.
+%% [K1, ?, k2], [k1, ?, k3] is allow.
+%% [k1, ?, ?], [k1, ?, k2] is not allow.
+check_handler_conflict(Handlers) ->
+ Keys = filter_top_level_handlers(Handlers),
+ case lists:member(?WKEY, Keys) of
+ true when length(Keys) =:= 1 -> ok;
+ true -> {error, {conflict, Keys}};
+ false -> ok
+ end.
+
+filter_top_level_handlers(Handlers) ->
+ maps:fold(
+ fun
+ (K, #{?MOD := _}, Acc) -> [K | Acc];
+ (_K, #{}, Acc) -> Acc;
+ (?MOD, _, Acc) -> Acc
+ end, [], Handlers).
+
handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
try
do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs)
@@ -157,9 +177,12 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
throw : Reason ->
{error, Reason};
Error : Reason : ST ->
- ?SLOG(error, #{msg => "change_config_failed",
+ ?SLOG(error, #{msg => "change_config_crashed",
exception => Error,
reason => Reason,
+ update_req => UpdateArgs,
+ module => SchemaModule,
+ key_path => ConfKeyPath,
stacktrace => ST
}),
{error, config_update_crashed}
@@ -174,11 +197,12 @@ do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
{error, Result}
end.
+process_update_request([_], _Handlers, {remove, _Opts}) ->
+ {error, "remove_root_is_forbidden"};
process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
BinKeyPath = bin_path(ConfKeyPath),
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
- _ = remove_from_local_if_cluster_change(BinKeyPath, Opts),
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
{ok, NewRawConf, OverrideConf, Opts};
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
@@ -198,25 +222,22 @@ do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
UpdateReq, ConfKeyPath0) ->
ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
- SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
+ ConfKeyBin = bin(ConfKey),
+ SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
- {ok, NewUpdateReq} ->
- call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq},
- ConfKeyPath);
- Error ->
- Error
+ {ok, NewUpdateReq} -> merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf);
+ Error -> Error
end.
check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, OverrideConf,
UpdateArgs, Opts) ->
OldConf = emqx_config:get_root(ConfKeyPath),
- FullRawConf = with_full_raw_confs(NewRawConf),
- {AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, FullRawConf),
- NewConf = maps:with(maps:keys(OldConf), CheckedConf),
- _ = remove_from_local_if_cluster_change(ConfKeyPath, Opts),
+ Schema = schema(SchemaModule, ConfKeyPath),
+ {AppEnvs, #{root := NewConf}} = emqx_config:check_config(Schema, #{<<"root">> => NewRawConf}),
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
{ok, Result0} ->
+ remove_from_local_if_cluster_change(ConfKeyPath, Opts),
case save_configs(ConfKeyPath, AppEnvs, NewConf, NewRawConf, OverrideConf,
UpdateArgs, Opts) of
{ok, Result1} ->
@@ -259,8 +280,7 @@ get_sub_config(ConfKey, Conf) when is_map(Conf) ->
get_sub_config(_, _Conf) -> %% the Conf is a primitive
undefined.
-call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
- HandlerName = maps:get(?MOD, Handlers, undefined),
+call_pre_config_update(#{?MOD := HandlerName}, OldRawConf, UpdateReq, ConfKeyPath) ->
case erlang:function_exported(HandlerName, pre_config_update, 3) of
true ->
case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
@@ -268,21 +288,25 @@ call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
{error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
end;
false -> merge_to_old_config(UpdateReq, OldRawConf)
- end.
+ end;
+call_pre_config_update(_Handlers, OldRawConf, UpdateReq, _ConfKeyPath) ->
+ merge_to_old_config(UpdateReq, OldRawConf).
-call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result, ConfKeyPath) ->
- HandlerName = maps:get(?MOD, Handlers, undefined),
+call_post_config_update(#{?MOD := HandlerName}, OldConf, NewConf,
+ AppEnvs, UpdateReq, Result, ConfKeyPath) ->
case erlang:function_exported(HandlerName, post_config_update, 5) of
true ->
- case HandlerName:post_config_update(ConfKeyPath, UpdateReq, NewConf, OldConf,
- AppEnvs) of
+ case HandlerName:post_config_update(ConfKeyPath, UpdateReq,
+ NewConf, OldConf, AppEnvs) of
ok -> {ok, Result};
- {ok, Result1} ->
- {ok, Result#{HandlerName => Result1}};
+ {ok, Result1} -> {ok, Result#{HandlerName => Result1}};
{error, Reason} -> {error, {post_config_update, HandlerName, Reason}}
end;
false -> {ok, Result}
- end.
+ end;
+call_post_config_update(_Handlers, _OldConf, _NewConf, _AppEnvs,
+ _UpdateReq, Result, _ConfKeyPath) ->
+ {ok, Result}.
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
@@ -295,6 +319,7 @@ save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, Update
%% 1. the old config is undefined
%% 2. either the old or the new config is not of map type
%% the behaviour is merging the new the config to the old config if they are maps.
+
merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf) ->
{ok, maps:merge(RawConf, UpdateReq)};
merge_to_old_config(UpdateReq, _RawConf) ->
@@ -302,13 +327,12 @@ merge_to_old_config(UpdateReq, _RawConf) ->
%% local-override.conf priority is higher than cluster-override.conf
%% If we want cluster to take effect, we must remove the local.
-remove_from_local_if_cluster_change(BinKeyPath, Opts) ->
- case maps:get(override, Opts, local) of
- local -> ok;
- cluster ->
- Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
- emqx_config:save_to_override_conf(Local, Opts)
- end.
+remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
+ Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
+ _ = emqx_config:save_to_override_conf(Local, Opts),
+ ok;
+remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
+ ok.
remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
undefined;
@@ -337,9 +361,6 @@ return_rawconf(ConfKeyPath, #{rawconf_with_defaults := true}) ->
return_rawconf(ConfKeyPath, _) ->
emqx_config:get_raw(ConfKeyPath).
-with_full_raw_confs(PartialConf) ->
- maps:merge(emqx_config:get_raw([]), PartialConf).
-
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
@@ -351,3 +372,43 @@ atom(Str) when is_list(Str) ->
list_to_atom(Str);
atom(Atom) when is_atom(Atom) ->
Atom.
+
+-dialyzer({nowarn_function, do_remove_handler/2}).
+do_remove_handler(ConfKeyPath, Handlers) ->
+ NewHandlers = emqx_map_lib:deep_remove(ConfKeyPath ++ [?MOD], Handlers),
+ remove_empty_leaf(ConfKeyPath, NewHandlers).
+
+remove_empty_leaf([], Handlers) -> Handlers;
+remove_empty_leaf(KeyPath, Handlers) ->
+ case emqx_map_lib:deep_find(KeyPath, Handlers) =:= {ok, #{}} of
+ true -> %% empty leaf
+ Handlers1 = emqx_map_lib:deep_remove(KeyPath, Handlers),
+ SubKeyPath = lists:sublist(KeyPath, length(KeyPath) - 1),
+ remove_empty_leaf(SubKeyPath, Handlers1);
+ false -> Handlers
+ end.
+
+assert_callback_function(Mod) ->
+ case erlang:function_exported(Mod, pre_config_update, 3) orelse
+ erlang:function_exported(Mod, post_config_update, 5) of
+ true -> ok;
+ false -> error(#{msg => "bad_emqx_config_handler_callback", module => Mod})
+ end,
+ ok.
+
+schema(SchemaModule, [RootKey | _]) ->
+ Roots = hocon_schema:roots(SchemaModule),
+ Field =
+ case lists:keyfind(bin(RootKey), 1, Roots) of
+ {_, {Ref, ?REF(Ref)}} -> {Ref, ?R_REF(SchemaModule, Ref)};
+ {_, Field0} -> Field0
+ end,
+ #{roots => [root], fields => #{root => [Field]}}.
+
+load_prev_handlers() ->
+ Handlers = application:get_env(emqx, ?MODULE, #{}),
+ application:unset_env(emqx, ?MODULE),
+ Handlers.
+
+save_handlers(Handlers) ->
+ application:set_env(emqx, ?MODULE, Handlers).
diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl
index 8a5ecefad..ba3482f2f 100644
--- a/apps/emqx/src/emqx_map_lib.erl
+++ b/apps/emqx/src/emqx_map_lib.erl
@@ -61,8 +61,8 @@ deep_find([Key | KeyPath] = Path, Map) when is_map(Map) ->
{ok, SubMap} -> deep_find(KeyPath, SubMap);
error -> {not_found, Path, Map}
end;
-deep_find(_KeyPath, Data) ->
- {not_found, _KeyPath, Data}.
+deep_find(KeyPath, Data) ->
+ {not_found, KeyPath, Data}.
-spec deep_put(config_key_path(), map(), term()) -> map().
deep_put([], _Map, Data) ->
@@ -152,7 +152,7 @@ diff_maps(NewMap, OldMap) ->
binary_string_kv(K, V, JsonableFun) ->
case JsonableFun(K, V) of
drop -> drop;
- {K1, V1} -> {binary_string(K1), binary_string(V1)}
+ {K1, V1} -> {binary_string(K1), V1}
end.
binary_string([]) -> [];
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index f60e0c387..d09acd25e 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -21,6 +21,7 @@
-dialyzer(no_contracts).
-dialyzer(no_unused).
-dialyzer(no_fail_call).
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
-include("emqx_authentication.hrl").
-include_lib("typerefl/include/types.hrl").
@@ -385,7 +386,8 @@ after idling for 'Keepalive * backoff * 2'."""
, {"max_inflight",
sc(range(1, 65535),
#{ default => 32,
- desc => "Maximum size of the Inflight Window storing QoS1/2 messages delivered but un-acked."
+ desc => "Maximum size of the Inflight Window storing QoS1/2"
+ " messages delivered but un-acked."
})
}
, {"retry_interval",
@@ -403,7 +405,8 @@ after idling for 'Keepalive * backoff * 2'."""
, {"await_rel_timeout",
sc(duration(),
#{ default => "300s",
- desc => "The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout."
+ desc => "The QoS2 messages (Client -> Broker) will be dropped"
+ " if awaiting PUBREL timeout."
})
}
, {"session_expiry_interval",
@@ -1102,7 +1105,8 @@ fields("trace") ->
default => text,
desc => """
Determine the format of the payload format in the trace file.
-`text`: Text-based protocol or plain text protocol. It is recommended when payload is JSON encoded.
+`text`: Text-based protocol or plain text protocol.
+ It is recommended when payload is JSON encoded.
`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.
`hidden`: payload is obfuscated as `******`
"""
diff --git a/apps/emqx/test/emqx_authentication_SUITE.erl b/apps/emqx/test/emqx_authentication_SUITE.erl
index c02685884..434109163 100644
--- a/apps/emqx/test/emqx_authentication_SUITE.erl
+++ b/apps/emqx/test/emqx_authentication_SUITE.erl
@@ -35,7 +35,9 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
-roots() -> [{config, #{type => hoconsc:union([hoconsc:ref(type1), hoconsc:ref(type2)])}}].
+roots() -> [{config, #{type => hoconsc:union([
+ hoconsc:ref(?MODULE, type1),
+ hoconsc:ref(?MODULE, type2)])}}].
fields(type1) ->
[ {mechanism, {enum, ['password-based']}}
diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl
new file mode 100644
index 000000000..bd6bebbb8
--- /dev/null
+++ b/apps/emqx/test/emqx_config_handler_SUITE.erl
@@ -0,0 +1,314 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_config_handler_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(MOD, {mod}).
+-define(WKEY, '?').
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+ emqx_common_test_helpers:boot_modules(all),
+ emqx_common_test_helpers:start_apps([]),
+ Config.
+
+end_per_suite(_Config) ->
+ emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_Case, Config) ->
+ Config.
+
+end_per_testcase(_Case, _Config) ->
+ ok.
+
+t_handler(_Config) ->
+ BadCallBackMod = emqx,
+ RootKey = sysmon,
+ %% bad
+ ?assertError(#{msg := "bad_emqx_config_handler_callback", module := BadCallBackMod},
+ emqx_config_handler:add_handler([RootKey], BadCallBackMod)),
+ %% simple
+ ok = emqx_config_handler:add_handler([RootKey], ?MODULE),
+ #{handlers := Handlers0} = emqx_config_handler:info(),
+ ?assertMatch(#{RootKey := #{?MOD := ?MODULE}}, Handlers0),
+ ok = emqx_config_handler:remove_handler([RootKey]),
+ #{handlers := Handlers1} = emqx_config_handler:info(),
+ ct:pal("Key:~p simple: ~p~n", [RootKey, Handlers1]),
+ ?assertEqual(false, maps:is_key(RootKey, Handlers1)),
+ %% wildcard 1
+ Wildcard1 = [RootKey, '?', cpu_check_interval],
+ ok = emqx_config_handler:add_handler(Wildcard1, ?MODULE),
+ #{handlers := Handlers2} = emqx_config_handler:info(),
+ ?assertMatch(#{RootKey := #{?WKEY := #{cpu_check_interval := #{?MOD := ?MODULE}}}}, Handlers2),
+ ok = emqx_config_handler:remove_handler(Wildcard1),
+ #{handlers := Handlers3} = emqx_config_handler:info(),
+ ct:pal("Key:~p wildcard1: ~p~n", [Wildcard1, Handlers3]),
+ ?assertEqual(false, maps:is_key(RootKey, Handlers3)),
+
+ %% can_override_a_wildcard_path
+ ok = emqx_config_handler:add_handler(Wildcard1, ?MODULE),
+ ?assertEqual(ok, emqx_config_handler:add_handler([RootKey, os, cpu_check_interval], ?MODULE)),
+ ok = emqx_config_handler:remove_handler(Wildcard1),
+ ok = emqx_config_handler:remove_handler([RootKey, os, cpu_check_interval]),
+
+ ok = emqx_config_handler:add_handler([RootKey, os, cpu_check_interval], ?MODULE),
+ ok = emqx_config_handler:add_handler(Wildcard1, ?MODULE),
+ ok = emqx_config_handler:remove_handler([RootKey, os, cpu_check_interval]),
+ ok = emqx_config_handler:remove_handler(Wildcard1),
+ ok.
+
+t_conflict_handler(_Config) ->
+ ok = emqx_config_handler:add_handler([sysmon, '?', '?'], ?MODULE),
+ ?assertMatch({error, {conflict, _}},
+ emqx_config_handler:add_handler([sysmon, '?', cpu_check_interval], ?MODULE)),
+ ok = emqx_config_handler:remove_handler([sysmon, '?', '?']),
+
+ ok = emqx_config_handler:add_handler([sysmon, '?', cpu_check_interval], ?MODULE),
+ ?assertMatch({error, {conflict, _}},
+ emqx_config_handler:add_handler([sysmon, '?', '?'], ?MODULE)),
+ ok = emqx_config_handler:remove_handler([sysmon, '?', cpu_check_interval]),
+
+ %% override
+ ok = emqx_config_handler:add_handler([sysmon], emqx_logger),
+ ?assertMatch(#{handlers := #{sysmon := #{{mod} := emqx_logger}}},
+ emqx_config_handler:info()),
+ ok.
+
+t_root_key_update(_Config) ->
+ PathKey = [sysmon],
+ Opts = #{rawconf_with_defaults => true},
+ ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
+ %% update
+ Old = #{<<"os">> := OS} = emqx:get_raw_config(PathKey),
+ {ok, Res} = emqx:update_config(PathKey,
+ Old#{<<"os">> => OS#{<<"cpu_check_interval">> => <<"12s">>}}, Opts),
+ ?assertMatch(#{config := #{os := #{cpu_check_interval := 12000}},
+ post_config_update := #{?MODULE := ok},
+ raw_config := #{<<"os">> := #{<<"cpu_check_interval">> := <<"12s">>}}},
+ Res),
+ ?assertMatch(#{os := #{cpu_check_interval := 12000}}, emqx:get_config(PathKey)),
+
+ %% update sub key
+ SubKey = PathKey ++ [os, cpu_high_watermark],
+ ?assertEqual({ok,#{config => 0.81,
+ post_config_update => #{?MODULE => ok},
+ raw_config => <<"81%">>}},
+ emqx:update_config(SubKey, "81%", Opts)),
+ ?assertEqual(0.81, emqx:get_config(SubKey)),
+ ?assertEqual("81%", emqx:get_raw_config(SubKey)),
+ %% remove
+ ?assertEqual({error, "remove_root_is_forbidden"}, emqx:remove_config(PathKey)),
+ ?assertMatch(true, is_map(emqx:get_raw_config(PathKey))),
+
+ ok = emqx_config_handler:remove_handler(PathKey),
+ ok.
+
+t_sub_key_update_remove(_Config) ->
+ KeyPath = [sysmon, os, cpu_check_interval],
+ Opts = #{},
+ ok = emqx_config_handler:add_handler(KeyPath, ?MODULE),
+ {ok, Res} = emqx:update_config(KeyPath, <<"60s">>, Opts),
+ ?assertMatch(#{config := 60000,
+ post_config_update := #{?MODULE := ok},
+ raw_config := <<"60s">>},
+ Res),
+ ?assertMatch(60000, emqx:get_config(KeyPath)),
+
+ KeyPath2 = [sysmon, os, cpu_low_watermark],
+ ok = emqx_config_handler:add_handler(KeyPath2, ?MODULE),
+ {ok, Res1} = emqx:update_config(KeyPath2, <<"40%">>, Opts),
+ ?assertMatch(#{config := 0.4,
+ post_config_update := #{},
+ raw_config := <<"40%">>},
+ Res1),
+ ?assertMatch(0.4, emqx:get_config(KeyPath2)),
+
+ %% remove
+ ?assertEqual({ok,#{post_config_update => #{emqx_config_handler_SUITE => ok}}},
+ emqx:remove_config(KeyPath)),
+ ?assertError({config_not_found, KeyPath}, emqx:get_raw_config(KeyPath)),
+ OSKey = maps:keys(emqx:get_raw_config([sysmon, os])),
+ ?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)),
+ ?assert(length(OSKey) > 0),
+
+ ?assertEqual({ok,#{config => 60000,
+ post_config_update => #{?MODULE => ok},
+ raw_config => <<"60s">>}}, emqx:reset_config(KeyPath, Opts)),
+ OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])),
+ ?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)),
+ ?assert(length(OSKey1) > 1),
+
+ ok = emqx_config_handler:remove_handler(KeyPath),
+ ok = emqx_config_handler:remove_handler(KeyPath2),
+ ok.
+
+t_check_failed(_Config) ->
+ KeyPath = [sysmon, os, cpu_check_interval],
+ Opts = #{rawconf_with_defaults => true},
+ Origin = emqx:get_raw_config(KeyPath),
+ ok = emqx_config_handler:add_handler(KeyPath, ?MODULE),
+ %% It should be a duration("1h"), but we set it as a percent.
+ ?assertMatch({error, _Res}, emqx:update_config(KeyPath, <<"80%">>, Opts)),
+ New = emqx:get_raw_config(KeyPath),
+ ?assertEqual(Origin, New),
+ ok = emqx_config_handler:remove_handler(KeyPath),
+ ok.
+
+t_stop(_Config) ->
+ OldPid = erlang:whereis(emqx_config_handler),
+ OldInfo = emqx_config_handler:info(),
+ emqx_config_handler:stop(),
+ NewPid = wait_for_new_pid(),
+ NewInfo = emqx_config_handler:info(),
+ ?assertNotEqual(OldPid, NewPid),
+ ?assertEqual(OldInfo, NewInfo),
+ ok.
+
+t_callback_crash(_Config) ->
+ CrashPath = [sysmon, os, cpu_high_watermark],
+ Opts = #{rawconf_with_defaults => true},
+ ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
+ Old = emqx:get_raw_config(CrashPath),
+ ?assertEqual({error, config_update_crashed}, emqx:update_config(CrashPath, <<"89%">>, Opts)),
+ New = emqx:get_raw_config(CrashPath),
+ ?assertEqual(Old, New),
+ ok = emqx_config_handler:remove_handler(CrashPath),
+ ok.
+
+t_pre_callback_error(_Config) ->
+ callback_error([sysmon, os, mem_check_interval], <<"100s">>,
+ {error, {pre_config_update, ?MODULE, pre_config_update_error}}),
+ ok.
+
+t_post_update_error(_Config) ->
+ callback_error([sysmon, os, sysmem_high_watermark], <<"60%">>,
+ {error, {post_config_update, ?MODULE, post_config_update_error}}),
+ ok.
+
+t_handler_root() ->
+ %% Don't rely on default emqx_config_handler's merge behaviour.
+ RootKey = [],
+ Opts = #{rawconf_with_defaults => true},
+ ok = emqx_config_handler:add_handler(RootKey, ?MODULE),
+ %% update
+ Old = #{<<"sysmon">> := #{<<"os">> := OS}} = emqx:get_raw_config(RootKey),
+ {ok, Res} = emqx:update_config(RootKey,
+ Old#{<<"sysmon">> => #{<<"os">> => OS#{<<"cpu_check_interval">> => <<"12s">>}}},
+ Opts),
+ ?assertMatch(#{config := #{os := #{cpu_check_interval := 12000}},
+ post_config_update := #{?MODULE := ok},
+ raw_config := #{<<"os">> := #{<<"cpu_check_interval">> := <<"12s">>}}},
+ Res),
+ ?assertMatch(#{sysmon := #{os := #{cpu_check_interval := 12000}}}, emqx:get_config(RootKey)),
+ ok = emqx_config_handler:remove_handler(RootKey),
+ ok.
+
+t_get_raw_cluster_override_conf(_Config) ->
+ Raw0 = emqx_config:read_override_conf(#{override_to => cluster}),
+ Raw1 = emqx_config_handler:get_raw_cluster_override_conf(),
+ ?assertEqual(Raw0, Raw1),
+ OldPid = erlang:whereis(emqx_config_handler),
+ OldInfo = emqx_config_handler:info(),
+
+ ?assertEqual(ok, gen_server:call(emqx_config_handler, bad_call_msg)),
+ gen_server:cast(emqx_config_handler, bad_cast_msg),
+ erlang:send(emqx_config_handler, bad_info_msg),
+
+ NewPid = erlang:whereis(emqx_config_handler),
+ NewInfo = emqx_config_handler:info(),
+ ?assertEqual(OldPid, NewPid),
+ ?assertEqual(OldInfo, NewInfo),
+ ok.
+
+t_save_config_failed(_Config) ->
+ ok.
+
+t_update_sub(_Config) ->
+ PathKey = [sysmon],
+ Opts = #{rawconf_with_defaults => true},
+ ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
+ %% update sub key
+ #{<<"os">> := OS1} = emqx:get_raw_config(PathKey),
+ {ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts),
+ ?assertMatch(#{config := 120000,
+ post_config_update := #{?MODULE := ok},
+ raw_config := <<"120s">>},
+ Res),
+ ?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)),
+ #{<<"os">> := OS2} = emqx:get_raw_config(PathKey),
+ ?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))),
+
+ %% update sub key
+ SubKey = PathKey ++ [os, cpu_high_watermark],
+ ?assertEqual({ok,#{config => 0.81,
+ post_config_update => #{?MODULE => ok},
+ raw_config => <<"81%">>}},
+ emqx:update_config(SubKey, "81%", Opts)),
+ ?assertEqual(0.81, emqx:get_config(SubKey)),
+ ?assertEqual("81%", emqx:get_raw_config(SubKey)),
+
+ ok = emqx_config_handler:remove_handler(PathKey),
+ ok.
+
+
+pre_config_update([sysmon], UpdateReq, _RawConf) ->
+ {ok, UpdateReq};
+pre_config_update([sysmon, os], UpdateReq, _RawConf) ->
+ {ok, UpdateReq};
+pre_config_update([sysmon, os, cpu_check_interval], UpdateReq, _RawConf) ->
+ {ok, UpdateReq};
+pre_config_update([sysmon, os, cpu_low_watermark], UpdateReq, _RawConf) ->
+ {ok, UpdateReq};
+pre_config_update([sysmon, os, sysmem_high_watermark], UpdateReq, _RawConf) ->
+ {ok, UpdateReq};
+pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
+ {error, pre_config_update_error}.
+
+post_config_update([sysmon], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
+ {ok, ok};
+post_config_update([sysmon, os], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
+ {ok, ok};
+post_config_update([sysmon, os, cpu_check_interval], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
+ {ok, ok};
+post_config_update([sysmon, os, cpu_low_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
+ ok;
+post_config_update([sysmon, os, sysmem_high_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
+ {error, post_config_update_error}.
+
+wait_for_new_pid() ->
+ case erlang:whereis(emqx_config_handler) of
+ undefined ->
+ ct:sleep(10),
+ wait_for_new_pid();
+ Pid -> Pid
+ end.
+
+callback_error(FailedPath, Update, Error) ->
+ Opts = #{rawconf_with_defaults => true},
+ ok = emqx_config_handler:add_handler(FailedPath, ?MODULE),
+ Old = emqx:get_raw_config(FailedPath),
+ ?assertEqual(Error, emqx:update_config(FailedPath, Update, Opts)),
+ New = emqx:get_raw_config(FailedPath),
+ ?assertEqual(Old, New),
+ ok = emqx_config_handler:remove_handler(FailedPath),
+ ok.
diff --git a/apps/emqx_authz/src/emqx_authz_api_schema.erl b/apps/emqx_authz/src/emqx_authz_api_schema.erl
index f74792f42..663de11ad 100644
--- a/apps/emqx_authz/src/emqx_authz_api_schema.erl
+++ b/apps/emqx_authz/src/emqx_authz_api_schema.erl
@@ -19,7 +19,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
--import(hoconsc, [mk/2, ref/1, ref/2, array/1, enum/1]).
+-import(hoconsc, [mk/2, enum/1]).
-import(emqx_schema, [mk_duration/2]).
-export([fields/1, authz_sources_types/1]).
diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl
index 4a58adaca..51061e551 100644
--- a/apps/emqx_authz/src/emqx_authz_api_sources.erl
+++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl
@@ -22,7 +22,7 @@
-include("emqx_authz.hrl").
-include_lib("emqx/include/logger.hrl").
--import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]).
+-import(hoconsc, [mk/1, mk/2, ref/2, array/1, enum/1]).
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
@@ -83,26 +83,33 @@ schema("/authorization/sources") ->
, get =>
#{ description => <<"List all authorization sources">>
, responses =>
- #{ 200 => mk( array(hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
+ #{ 200 => mk( array(hoconsc:union(
+ [ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
, #{desc => <<"Authorization source">>})
}
}
, post =>
#{ description => <<"Add a new source">>
- , 'requestBody' => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)])
+ , 'requestBody' => mk( hoconsc:union(
+ [ref(?API_SCHEMA_MODULE, Type)
+ || Type <- authz_sources_types(detailed)])
, #{desc => <<"Source config">>})
, responses =>
#{ 204 => <<"Authorization source created successfully">>
- , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
+ , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST],
+ <<"Bad Request">>)
}
}
, put =>
#{ description => <<"Update all sources">>
- , 'requestBody' => mk( array(hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
+ , 'requestBody' => mk( array(hoconsc:union(
+ [ref(?API_SCHEMA_MODULE, Type)
+ || Type <- authz_sources_types(detailed)]))
, #{desc => <<"Sources">>})
, responses =>
#{ 204 => <<"Authorization source updated successfully">>
- , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
+ , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST],
+ <<"Bad Request">>)
}
}
};
@@ -112,7 +119,9 @@ schema("/authorization/sources/:type") ->
#{ description => <<"Get a authorization source">>
, parameters => parameters_field()
, responses =>
- #{ 200 => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)])
+ #{ 200 => mk( hoconsc:union(
+ [ref(?API_SCHEMA_MODULE, Type)
+ || Type <- authz_sources_types(detailed)])
, #{desc => <<"Authorization source">>})
, 404 => emqx_dashboard_swagger:error_codes([?NOT_FOUND], <<"Not Found">>)
}
@@ -120,7 +129,8 @@ schema("/authorization/sources/:type") ->
, put =>
#{ description => <<"Update source">>
, parameters => parameters_field()
- , 'requestBody' => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
+ , 'requestBody' => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type)
+ || Type <- authz_sources_types(detailed)]))
, responses =>
#{ 204 => <<"Authorization source updated successfully">>
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl
index 9240d4a82..f192cf73c 100644
--- a/apps/emqx_bridge/src/emqx_bridge_app.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_app.erl
@@ -29,8 +29,8 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_bridge:load(),
ok = emqx_bridge:load_hook(),
- emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
- emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
+ ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
+ ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
{ok, Sup}.
stop(_State) ->
diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index b9ad43cba..4603c255e 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -72,23 +72,23 @@ roots() ->
end,
emqx_schema_high_prio_roots() ++
[ {"node",
- sc(hoconsc:ref("node"),
+ sc(ref("node"),
#{ desc => "Node name, cookie, config & data directories "
"and the Erlang virtual machine (BEAM) boot parameters."
})}
, {"cluster",
- sc(hoconsc:ref("cluster"),
+ sc(ref("cluster"),
#{ desc => "EMQX nodes can form a cluster to scale up the total capacity.
"
"Here holds the configs to instruct how individual nodes "
"can discover each other."
})}
, {"log",
- sc(hoconsc:ref("log"),
+ sc(ref("log"),
#{ desc => "Configure logging backends (to console or to file), "
"and logging level for each logger backend."
})}
, {"rpc",
- sc(hoconsc:ref("rpc"),
+ sc(ref("rpc"),
#{ desc => "EMQX uses a library called gen_rpc
for "
"inter-broker communication.
Most of the time the default config "
"should work, but in case you need to do performance "
@@ -315,19 +315,22 @@ a crash dump"
sc(emqx_schema:duration(),
#{ mapping => "vm_args.-kernel net_ticktime"
, default => "2m"
- , desc => "This is the approximate time an EMQX node may be unresponsive until it is considered down and thereby disconnected."
+ , desc => "This is the approximate time an EMQX node may"
+ " be unresponsive until it is considered down and thereby disconnected."
})}
, {"dist_listen_min",
sc(range(1024, 65535),
#{ mapping => "kernel.inet_dist_listen_min"
, default => 6369
- , desc => "Lower bound for the port range where EMQX broker listens for peer connections."
+ , desc => "Lower bound for the port range where"
+ " EMQX broker listens for peer connections."
})}
, {"dist_listen_max",
sc(range(1024, 65535),
#{ mapping => "kernel.inet_dist_listen_max"
, default => 6369
- , desc => "Upper bound for the port range where EMQX broker listens for peer connections."
+ , desc => "Upper bound for the port range "
+ "where EMQX broker listens for peer connections."
})}
, {"backtrace_depth",
sc(integer(),
@@ -455,7 +458,8 @@ fields("rpc") ->
#{ mapping => "gen_rpc.port_discovery"
, default => stateless
, desc => "manual
: discover ports by tcp_server_port
.
"
- "stateless
: discover ports in a stateless manner, using the following algorithm. "
+ "stateless
: discover ports in a stateless manner,"
+ " using the following algorithm. "
"If node name is emqxN@127.0.0.1
, where the N is an integer, "
"then the listening port will be 5370 + N."
})}
@@ -464,7 +468,8 @@ fields("rpc") ->
#{ mapping => "gen_rpc.tcp_server_port"
, default => 5369
, desc => "Listening port used by RPC local service.
"
- "Note that this config only takes effect when rpc.port_discovery is set to manual."
+ "Note that this config only takes effect "
+ "when rpc.port_discovery is set to manual."
})}
, {"ssl_server_port",
sc(integer(),
@@ -497,7 +502,8 @@ fields("rpc") ->
sc(file(),
#{ mapping => "gen_rpc.keyfile"
, desc => "Path to the private key file for the rpc.certfile
.
"
- "Note: contents of this file are secret, so it's necessary to set permissions to 600."
+ "Note: contents of this file are secret, so it's necessary to "
+ "set permissions to 600."
})}
, {"cacertfile",
sc(file(),
@@ -528,7 +534,8 @@ fields("rpc") ->
sc(emqx_schema:duration_s(),
#{ mapping => "gen_rpc.socket_keepalive_idle"
, default => "7200s"
- , desc => "How long the connections between the brokers should remain open after the last message is sent."
+ , desc => "How long the connections between the brokers should remain open "
+ "after the last message is sent."
})}
, {"socket_keepalive_interval",
sc(emqx_schema:duration_s(),
@@ -941,7 +948,7 @@ roots(Module) ->
emqx_schema_high_prio_roots() ->
Roots = emqx_schema:roots(high),
Authz = {"authorization",
- sc(hoconsc:ref("authorization"),
+ sc(hoconsc:ref(?MODULE, "authorization"),
#{ desc => """
Authorization a.k.a. ACL.
In EMQX, MQTT client access control is extremely flexible.
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index 7c83671a9..4b7138165 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -91,12 +91,14 @@ init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector,
emqx_bridge, emqx_dashboard], fun set_special_configs/1),
ok = emqx_common_test_helpers:load_config(emqx_connector_schema, <<"connectors: {}">>),
- ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>),
+ ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema,
+ <<"rule_engine {rules {}}">>),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
- emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]),
+ emqx_common_test_helpers:stop_apps([emqx_rule_engine,
+ emqx_connector, emqx_bridge, emqx_dashboard]),
ok.
set_special_configs(emqx_dashboard) ->
@@ -113,8 +115,14 @@ set_special_configs(_) ->
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
%% assert we there's no connectors and no bridges at first
- {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+ {ok, 200, Connectors} = request(get, uri(["connectors"]), []),
+ lists:foreach(fun(#{<<"id">> := ConnectorID}) ->
+ {ok, 200, <<>>} = request(delete, uri(["connectors", ConnectorID]), [])
+ end, jsx:decode(Connectors)),
+ {ok, 200, Bridges} = request(get, uri(["bridges"]), []),
+ lists:foreach(fun(#{<<"id">> := BridgeID}) ->
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), [])
+ end, jsx:decode(Bridges)),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
index 8f9797843..bbe4a6ecb 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
@@ -45,7 +45,7 @@
]).
api_spec() ->
- emqx_dashboard_swagger:spec(?MODULE).
+ emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
namespace() -> "configuration".
@@ -53,7 +53,6 @@ paths() ->
["/configs", "/configs_reset/:rootname"] ++
lists:map(fun({Name, _Type}) -> ?PREFIX ++ to_list(Name) end, config_list(?EXCLUDES)).
-
schema("/configs") ->
#{
'operationId' => configs,
@@ -156,7 +155,7 @@ config(put, #{body := Body}, Req) ->
Path = conf_path(Req),
case emqx:update_config(Path, Body, #{rawconf_with_defaults => true}) of
{ok, #{raw_config := RawConf}} ->
- {200, emqx_map_lib:jsonable_map(RawConf)};
+ {200, RawConf};
{error, Reason} ->
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
end.
@@ -194,8 +193,7 @@ conf_path_reset(Req) ->
string:lexemes(Path, "/ ").
get_full_config() ->
- emqx_map_lib:jsonable_map(
- emqx_config:fill_defaults(emqx:get_raw_config([]))).
+ emqx_config:fill_defaults(emqx:get_raw_config([])).
conf_path_from_querystr(Req) ->
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of
diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl
index f51642d30..96ba4bf86 100644
--- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl
+++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl
@@ -160,20 +160,28 @@ t_update_disable(_Config) ->
t_update_re_failed(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
+ Re = <<"*^test/*">>,
Rules = [#{
<<"source_topic">> => <<"test/#">>,
- <<"re">> => <<"*^test/*">>,
+ <<"re">> => Re,
<<"dest_topic">> => <<"test1/$2">>,
<<"action">> => <<"publish">>
}],
- Error = {badmatch,
+ ?assertError({badmatch,
{error,
- {emqx_modules_schema,
- [{validation_error,
- #{path => "rewrite.1.re",
- reason => {<<"*^test/*">>,{"nothing to repeat",0}},
- value => <<"*^test/*">>}}]}}},
- ?assertError(Error, emqx_rewrite:update(Rules)),
+ {_,
+ [
+ {validation_error,
+ #{
+ path := "root.rewrite.1.re",
+ reason := {Re, {"nothing to repeat", 0}},
+ value := Re
+ }
+ }
+ ]
+ }
+ }
+ }, emqx_rewrite:update(Rules)),
ok.
%%--------------------------------------------------------------------
diff --git a/apps/emqx_plugins/src/emqx_plugins_schema.erl b/apps/emqx_plugins/src/emqx_plugins_schema.erl
index 9dfb77589..2de8430a1 100644
--- a/apps/emqx_plugins/src/emqx_plugins_schema.erl
+++ b/apps/emqx_plugins/src/emqx_plugins_schema.erl
@@ -68,7 +68,7 @@ root_fields() ->
, {install_dir, fun install_dir/1}
].
-states(type) -> hoconsc:array(hoconsc:ref(state));
+states(type) -> hoconsc:array(hoconsc:ref(?MODULE, state));
states(required) -> false;
states(default) -> [];
states(desc) -> "An array of plugins in the desired states.
"
diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl
index 387d3cb6a..a29dea123 100644
--- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl
+++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl
@@ -140,8 +140,7 @@ init([]) ->
Enable = emqx:get_config([slow_subs, enable]),
{ok, check_enable(Enable, InitState)}.
-handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
- emqx_config:put([slow_subs], Conf),
+handle_call({update_settings, #{enable := Enable}}, _From, State) ->
State2 = check_enable(Enable, State),
{reply, ok, State2};