diff --git a/Makefile b/Makefile
index 7cb0446bb..33473a3c2 100644
--- a/Makefile
+++ b/Makefile
@@ -8,7 +8,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-8:1.13.3-24.2.1-1-al
export EMQX_DEFAULT_RUNNER = alpine:3.14
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
-export EMQX_DASHBOARD_VERSION ?= v0.23.0
+export EMQX_DASHBOARD_VERSION ?= v0.25.0
export DOCKERFILE := deploy/docker/Dockerfile
export EMQX_REL_FORM ?= tgz
ifeq ($(OS),Windows_NT)
diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config
index f39a707b6..96190f7fb 100644
--- a/apps/emqx/rebar.config
+++ b/apps/emqx/rebar.config
@@ -19,7 +19,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
- , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.1"}}}
+ , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.3"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.18.0"}}}
diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl
new file mode 100644
index 000000000..fca6ebc43
--- /dev/null
+++ b/apps/emqx/src/config/emqx_config_logger.erl
@@ -0,0 +1,53 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_config_logger).
+
+-behaviour(emqx_config_handler).
+
+%% API
+-export([add_handler/0, remove_handler/0]).
+-export([post_config_update/5]).
+
+-define(LOG, [log]).
+
+add_handler() ->
+ ok = emqx_config_handler:add_handler(?LOG, ?MODULE),
+ ok.
+
+remove_handler() ->
+ ok = emqx_config_handler:remove_handler(?LOG),
+ ok.
+
+post_config_update(?LOG, _Req, _NewConf, _OldConf, AppEnvs) ->
+ Kernel = proplists:get_value(kernel, AppEnvs),
+ NewHandlers = proplists:get_value(logger, Kernel, []),
+ Level = proplists:get_value(logger_level, Kernel, warning),
+ ok = update_log_handlers(NewHandlers),
+ ok = emqx_logger:set_primary_log_level(Level),
+ application:set_env(kernel, logger_level, Level),
+ ok;
+post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
+ ok.
+
+update_log_handlers(NewHandlers) ->
+ OldHandlers = application:get_env(kernel, logger, []),
+ lists:foreach(fun({handler, HandlerId, _Mod, _Conf}) ->
+ logger:remove_handler(HandlerId)
+ end, OldHandlers -- NewHandlers),
+ lists:foreach(fun({handler, HandlerId, Mod, Conf}) ->
+ logger:add_handler(HandlerId, Mod, Conf)
+ end, NewHandlers -- OldHandlers),
+ application:set_env(kernel, logger, NewHandlers).
diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl
index 1af434398..9e038cfa9 100644
--- a/apps/emqx/src/emqx_app.erl
+++ b/apps/emqx/src/emqx_app.erl
@@ -47,11 +47,13 @@ start(_Type, _Args) ->
{ok, Sup} = emqx_sup:start_link(),
ok = maybe_start_listeners(),
ok = emqx_alarm_handler:load(),
+ emqx_config:add_handlers(),
register(emqx, self()),
{ok, Sup}.
prep_stop(_State) ->
ok = emqx_alarm_handler:unload(),
+ emqx_config:remove_handlers(),
emqx_boot:is_enabled(listeners)
andalso emqx_listeners:stop().
diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl
index c4cab7ce9..88e4e1b91 100644
--- a/apps/emqx/src/emqx_config.erl
+++ b/apps/emqx/src/emqx_config.erl
@@ -70,6 +70,10 @@
, find_listener_conf/3
]).
+-export([ add_handlers/0
+ , remove_handlers/0
+ ]).
+
-include("logger.hrl").
-define(CONF, conf).
@@ -431,6 +435,14 @@ save_to_override_conf(RawConf, Opts) ->
end
end.
+add_handlers() ->
+ ok = emqx_config_logger:add_handler(),
+ ok.
+
+remove_handlers() ->
+ ok = emqx_config_logger:remove_handler(),
+ ok.
+
load_hocon_file(FileName, LoadType) ->
case filelib:is_regular(FileName) of
true ->
diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl
index af99cb732..cfe29cecc 100644
--- a/apps/emqx/src/emqx_config_handler.erl
+++ b/apps/emqx/src/emqx_config_handler.erl
@@ -234,7 +234,7 @@ check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, Override
UpdateArgs, Opts) ->
OldConf = emqx_config:get_root(ConfKeyPath),
Schema = schema(SchemaModule, ConfKeyPath),
- {AppEnvs, #{root := NewConf}} = emqx_config:check_config(Schema, #{<<"root">> => NewRawConf}),
+ {AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf),
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
{ok, Result0} ->
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
@@ -396,14 +396,29 @@ assert_callback_function(Mod) ->
end,
ok.
+-spec schema(module(), emqx_map_lib:config_key_path()) -> hocon_schema:schema().
schema(SchemaModule, [RootKey | _]) ->
Roots = hocon_schema:roots(SchemaModule),
- Field =
+ {Field, Translations} =
case lists:keyfind(bin(RootKey), 1, Roots) of
{_, {Ref, ?REF(Ref)}} -> {Ref, ?R_REF(SchemaModule, Ref)};
- {_, Field0} -> Field0
+ {_, {Name, Field0}} -> parse_translations(Field0, Name, SchemaModule)
end,
- #{roots => [root], fields => #{root => [Field]}}.
+ #{
+ roots => [Field],
+ translations => Translations,
+ validations => hocon_schema:validations(SchemaModule)
+ }.
+
+parse_translations(#{translate_to := TRs } = Field, Name, SchemaModule) ->
+ {
+ {Name, maps:remove(translate_to, Field)},
+ lists:foldl(fun(T, Acc) ->
+ Acc#{T => hocon_schema:translation(SchemaModule, T)}
+ end, #{}, TRs)
+ };
+parse_translations(Field, Name, _SchemaModule) ->
+ {{Name, Field}, #{}}.
load_prev_handlers() ->
Handlers = application:get_env(emqx, ?MODULE, #{}),
diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl
index b554233a7..3dce1440f 100644
--- a/apps/emqx/src/emqx_connection.erl
+++ b/apps/emqx/src/emqx_connection.erl
@@ -91,7 +91,7 @@
%% Stats Timer
stats_timer :: disabled | maybe(reference()),
%% Idle Timeout
- idle_timeout :: integer(),
+ idle_timeout :: integer() | infinity,
%% Idle Timer
idle_timer :: maybe(reference()),
%% Zone name
@@ -107,7 +107,7 @@
%% limiter timers
limiter_timer :: undefined | reference()
- }).
+ }).
-record(retry, { types :: list(limiter_type())
, data :: any()
@@ -902,7 +902,8 @@ check_limiter(Needs,
{ok, State#state{limiter = Limiter2}}
end;
_ ->
- %% if there has a retry timer, cache the operation and execute it after the retry is over
+ %% if there has a retry timer,
+ %% cache the operation and execute it after the retry is over
%% TODO: maybe we need to set socket to passive if size of queue is very large
%% because we queue up lots of ops that checks with the limiters.
New = #cache{need = Needs, data = Data, next = WhenOk},
@@ -915,7 +916,8 @@ check_limiter(_, Data, WhenOk, Msgs, State) ->
%% try to perform a retry
-spec retry_limiter(state()) -> _.
retry_limiter(#state{limiter = Limiter} = State) ->
- #retry{types = Types, data = Data, next = Next} = emqx_limiter_container:get_retry_context(Limiter),
+ #retry{types = Types, data = Data, next = Next}
+ = emqx_limiter_container:get_retry_context(Limiter),
case emqx_limiter_container:retry_list(Types, Limiter) of
{ok, Limiter2} ->
Next(Data,
diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl
index f4b05b7ff..c69208c22 100644
--- a/apps/emqx/src/emqx_flapping.erl
+++ b/apps/emqx/src/emqx_flapping.erl
@@ -149,7 +149,7 @@ handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
- maps:get(window_time, get_policy(Zone)),
MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec),
- start_timer(Zone),
+ _ = start_timer(Zone),
{noreply, State, hibernate};
handle_info(Info, State) ->
diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl
index 64ef3017b..1cf592dab 100644
--- a/apps/emqx/src/emqx_kernel_sup.erl
+++ b/apps/emqx/src/emqx_kernel_sup.erl
@@ -34,7 +34,6 @@ init([]) ->
, child_spec(emqx_stats, worker)
, child_spec(emqx_metrics, worker)
, child_spec(emqx_ctl, worker)
- , child_spec(emqx_logger, worker)
]}}.
child_spec(M, Type) ->
diff --git a/apps/emqx/src/emqx_logger.erl b/apps/emqx/src/emqx_logger.erl
index d17482b39..7fcfb2d01 100644
--- a/apps/emqx/src/emqx_logger.erl
+++ b/apps/emqx/src/emqx_logger.erl
@@ -18,20 +18,8 @@
-compile({no_auto_import, [error/1]}).
--behaviour(gen_server).
--behaviour(emqx_config_handler).
-elvis([{elvis_style, god_modules, disable}]).
-%% gen_server callbacks
--export([ start_link/0
- , init/1
- , handle_call/3
- , handle_cast/2
- , handle_info/2
- , terminate/2
- , code_change/3
- ]).
-
%% Logs
-export([ debug/1
, debug/2
@@ -71,8 +59,6 @@
, stop_log_handler/1
]).
--export([post_config_update/5]).
-
-type(peername_str() :: list()).
-type(logger_dst() :: file:filename() | console | unknown).
-type(logger_handler_info() :: #{
@@ -84,49 +70,6 @@
}).
-define(STOPPED_HANDLERS, {?MODULE, stopped_handlers}).
--define(CONF_PATH, [log]).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-%%--------------------------------------------------------------------
-%% gen_server callbacks
-%%--------------------------------------------------------------------
-init([]) ->
- ok = emqx_config_handler:add_handler(?CONF_PATH, ?MODULE),
- {ok, #{}}.
-
-handle_call({update_config, AppEnvs}, _From, State) ->
- OldEnvs = application:get_env(kernel, logger, []),
- NewEnvs = proplists:get_value(logger, proplists:get_value(kernel, AppEnvs, []), []),
- ok = application:set_env(kernel, logger, NewEnvs),
- _ = [logger:remove_handler(HandlerId) || {handler, HandlerId, _Mod, _Conf} <- OldEnvs],
- _ = [logger:add_handler(HandlerId, Mod, Conf) || {handler, HandlerId, Mod, Conf} <- NewEnvs],
- ok = tune_primary_log_level(),
- {reply, ok, State};
-
-handle_call(_Req, _From, State) ->
- {reply, ignored, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok = emqx_config_handler:remove_handler(?CONF_PATH),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-%%--------------------------------------------------------------------
-%% emqx_config_handler callbacks
-%%--------------------------------------------------------------------
-post_config_update(_, _Req, _NewConf, _OldConf, AppEnvs) ->
- gen_server:call(?MODULE, {update_config, AppEnvs}, 5000).
%%--------------------------------------------------------------------
%% APIs
diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl
index 28f97d31e..f3154cbbf 100644
--- a/apps/emqx/src/emqx_misc.erl
+++ b/apps/emqx/src/emqx_misc.erl
@@ -138,13 +138,14 @@ apply_fun(Fun, Input, State) ->
{arity, 2} -> Fun(Input, State)
end.
--spec(start_timer(integer(), term()) -> reference()).
+-spec(start_timer(integer() | atom(), term()) -> maybe(reference())).
start_timer(Interval, Msg) ->
start_timer(Interval, self(), Msg).
--spec(start_timer(integer(), pid() | atom(), term()) -> reference()).
-start_timer(Interval, Dest, Msg) ->
- erlang:start_timer(erlang:ceil(Interval), Dest, Msg).
+-spec(start_timer(integer() | atom(), pid() | atom(), term()) -> maybe(reference())).
+start_timer(Interval, Dest, Msg) when is_number(Interval) ->
+ erlang:start_timer(erlang:ceil(Interval), Dest, Msg);
+start_timer(_Atom, _Dest, _Msg) -> undefined.
-spec(cancel_timer(maybe(reference())) -> ok).
cancel_timer(Timer) when is_reference(Timer) ->
@@ -309,11 +310,9 @@ gen_id(Len) ->
int_to_hex(R, Len).
-spec clamp(number(), number(), number()) -> number().
-clamp(Val, Min, Max) ->
- if Val < Min -> Min;
- Val > Max -> Max;
- true -> Val
- end.
+clamp(Val, Min, _Max) when Val < Min -> Min;
+clamp(Val, _Min, Max) when Val > Max -> Max;
+clamp(Val, _Min, _Max) -> Val.
%% @doc https://www.erlang.org/doc/man/file.html#posix-error-codes
explain_posix(eacces) -> "Permission denied";
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 35012acc7..c5f424946 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -419,7 +419,7 @@ after idling for 'Keepalive * backoff * 2'."""
})
}
, {"max_mqueue_len",
- sc(hoconsc:union([range(0, inf), infinity]),
+ sc(hoconsc:union([non_neg_integer(), infinity]),
#{ default => 1000,
desc =>
"""Maximum queue length. Enqueued messages when persistent client disconnected,
diff --git a/apps/emqx/src/emqx_tls_lib.erl b/apps/emqx/src/emqx_tls_lib.erl
index ef3e80e1f..920508a85 100644
--- a/apps/emqx/src/emqx_tls_lib.erl
+++ b/apps/emqx/src/emqx_tls_lib.erl
@@ -176,7 +176,7 @@ do_parse_versions([], Acc) -> lists:reverse(Acc);
do_parse_versions([V | More], Acc) ->
case parse_version(V) of
unknown ->
- emqx_logger:warning("unknown_tls_version_discarded: ~p", [V]),
+ ?SLOG(warning, #{msg => "unknown_tls_version_discarded", version => V}),
do_parse_versions(More, Acc);
Parsed ->
do_parse_versions(More, [Parsed | Acc])
diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl
index 18005dce5..e763c7bfb 100644
--- a/apps/emqx/src/emqx_vm_mon.erl
+++ b/apps/emqx/src/emqx_vm_mon.erl
@@ -45,7 +45,7 @@ start_link() ->
%%--------------------------------------------------------------------
init([]) ->
- start_check_timer(),
+ _ = start_check_timer(),
{ok, #{}}.
handle_call(Req, _From, State) ->
@@ -82,7 +82,7 @@ handle_info({timeout, _Timer, check}, State) ->
_Precent ->
ok
end,
- start_check_timer(),
+ _ = start_check_timer(),
{noreply, State};
handle_info(Info, State) ->
diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl
index bd6bebbb8..821ab0653 100644
--- a/apps/emqx/test/emqx_config_handler_SUITE.erl
+++ b/apps/emqx/test/emqx_config_handler_SUITE.erl
@@ -89,8 +89,8 @@ t_conflict_handler(_Config) ->
ok = emqx_config_handler:remove_handler([sysmon, '?', cpu_check_interval]),
%% override
- ok = emqx_config_handler:add_handler([sysmon], emqx_logger),
- ?assertMatch(#{handlers := #{sysmon := #{{mod} := emqx_logger}}},
+ ok = emqx_config_handler:add_handler([sysmon], emqx_config_logger),
+ ?assertMatch(#{handlers := #{sysmon := #{{mod} := emqx_config_logger}}},
emqx_config_handler:info()),
ok.
diff --git a/apps/emqx/test/emqx_request_handler.erl b/apps/emqx/test/emqx_request_handler.erl
index 07e8990ac..a1a120154 100644
--- a/apps/emqx/test/emqx_request_handler.erl
+++ b/apps/emqx/test/emqx_request_handler.erl
@@ -65,9 +65,9 @@ handle_msg(ReqMsg, RequestHandler, Parent) ->
props = RspProps,
payload = RspPayload
},
- emqx_logger:debug("~p sending response msg to topic ~ts with~n"
- "corr-data=~p~npayload=~p",
- [?MODULE, RspTopic, CorrData, RspPayload]),
+ logger:debug("~p sending response msg to topic ~ts with~n"
+ "corr-data=~p~npayload=~p",
+ [?MODULE, RspTopic, CorrData, RspPayload]),
ok = send_response(RspMsg);
_ ->
Parent ! {discarded, ReqPayload},
diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index b9157f43e..b0d467e34 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -75,17 +75,20 @@ roots() ->
sc(ref("node"),
#{ desc => "Node name, cookie, config & data directories "
"and the Erlang virtual machine (BEAM) boot parameters."
+ , translate_to => ["emqx"]
})}
, {"cluster",
sc(ref("cluster"),
#{ desc => "EMQX nodes can form a cluster to scale up the total capacity.
"
"Here holds the configs to instruct how individual nodes "
"can discover each other."
+ , translate_to => ["ekka"]
})}
, {"log",
sc(ref("log"),
#{ desc => "Configure logging backends (to console or to file), "
"and logging level for each logger backend."
+ , translate_to => ["kernel"]
})}
, {"rpc",
sc(ref("rpc"),
@@ -93,6 +96,7 @@ roots() ->
"inter-broker communication.
Most of the time the default config "
"should work, but in case you need to do performance "
"fine-turning or experiment a bit, this is where to look."
+ , translate_to => ["gen_rpc"]
})}
, {"db",
sc(ref("db"),
@@ -737,6 +741,7 @@ tr_cluster_discovery(Conf) ->
Strategy = conf_get("cluster.discovery_strategy", Conf),
{Strategy, filter(options(Strategy, Conf))}.
+-spec tr_logger_level(hocon:config()) -> logger:level().
tr_logger_level(Conf) ->
ConsoleLevel = conf_get("log.console_handler.level", Conf, undefined),
FileLevels = [conf_get("level", SubConf) || {_, SubConf}
@@ -772,7 +777,7 @@ tr_logger(Conf) ->
%% For the file logger
FileHandlers =
[begin
- {handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{
+ {handler, to_atom(HandlerName), logger_disk_log_h, #{
level => conf_get("level", SubConf),
config => (log_handler_conf(SubConf)) #{
type => case conf_get("rotation.enable", SubConf) of
@@ -846,7 +851,7 @@ log_handler_common_confs() ->
#{ default => error
})}
, {"max_depth",
- sc(hoconsc:union([unlimited, integer()]),
+ sc(hoconsc:union([unlimited, non_neg_integer()]),
#{ default => 100
, desc => "Maximum depth for Erlang term log formatting "
"and Erlang process message queue inspection."
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
index e3b6301da..a4bbe74fa 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
@@ -313,7 +313,7 @@ trans_desc(Init, Hocon, Func, Name) ->
Spec1 = Spec0#{label => Name},
case Spec1 of
#{description := _} -> Spec1;
- _ -> Spec1#{description => <<"TODO(Rquired description): ", Name/binary>>}
+ _ -> Spec1#{description => <>}
end
end.
diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
index 6e745f863..2b6e67f7f 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
@@ -34,6 +34,8 @@
-define(PREFIX, "/configs/").
-define(PREFIX_RESET, "/configs_reset/").
-define(ERR_MSG(MSG), list_to_binary(io_lib:format("~p", [MSG]))).
+-define(OPTS, #{rawconf_with_defaults => true, override_to => cluster}).
+
-define(EXCLUDES, [
<<"exhook">>,
<<"gateway">>,
@@ -177,7 +179,7 @@ config(get, _Params, Req) ->
config(put, #{body := Body}, Req) ->
Path = conf_path(Req),
- case emqx:update_config(Path, Body, #{rawconf_with_defaults => true}) of
+ case emqx_conf:update(Path, Body, ?OPTS) of
{ok, #{raw_config := RawConf}} ->
{200, RawConf};
{error, Reason} ->
@@ -192,7 +194,7 @@ global_zone_configs(get, _Params, _Req) ->
global_zone_configs(put, #{body := Body}, _Req) ->
Res =
maps:fold(fun(Path, Value, Acc) ->
- case emqx:update_config([Path], Value, #{rawconf_with_defaults => true}) of
+ case emqx_conf:update([Path], Value, ?OPTS) of
{ok, #{raw_config := RawConf}} ->
Acc#{Path => RawConf};
{error, Reason} ->
diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl
index d806c309e..1206481e9 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl
@@ -88,7 +88,10 @@ schema("/plugins/install") ->
properties => #{
plugin => #{type => string, format => binary}}},
encoding => #{plugin => #{'contentType' => 'application/gzip'}}}}},
- responses => #{200 => <<"OK">>}
+ responses => #{
+ 200 => <<"OK">>,
+ 400 => emqx_dashboard_swagger:error_codes(['UNEXPECTED_ERROR','ALREADY_INSTALLED'])
+ }
}
};
schema("/plugins/:name") ->
@@ -263,19 +266,35 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
%% TODO what happened when a new node join in?
%% emqx_plugins_monitor should copy plugins from other core node when boot-up.
+ case emqx_plugins:describe(string:trim(FileName, trailing, ".tar.gz")) of
+ {error, #{error := "bad_info_file", return := {enoent, _}}} ->
+ {AppName, _Vsn} = emqx_plugins:parse_name_vsn(FileName),
+ AppDir = filename:join(emqx_plugins:install_dir(), AppName),
+ case filelib:wildcard(AppDir ++ "*.tar.gz") of
+ [] -> do_install_package(FileName, Bin);
+ OtherVsn ->
+ {400, #{code => 'ALREADY_INSTALLED',
+ message => iolist_to_binary(io_lib:format("~p already installed",
+ [OtherVsn]))}}
+ end;
+ {ok, _} ->
+ {400, #{code => 'ALREADY_INSTALLED',
+ message => iolist_to_binary(io_lib:format("~p is already installed", [FileName]))}}
+ end;
+upload_install(post, #{}) ->
+ {400, #{code => 'BAD_FORM_DATA',
+ message =>
+ <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
+ }.
+
+do_install_package(FileName, Bin) ->
{Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of
[] -> {200};
[{error, Reason} | _] ->
{400, #{code => 'UNEXPECTED_ERROR',
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
- end;
-upload_install(post, #{} = Body) ->
- io:format("~p~n", [Body]),
- {400, #{code => 'BAD_FORM_DATA',
- message =>
- <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
- }.
+ end.
plugin(get, #{bindings := #{name := Name}}) ->
{Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
@@ -309,7 +328,6 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
%% For RPC upload_install/2
install_package(FileName, Bin) ->
File = filename:join(emqx_plugins:install_dir(), FileName),
- io:format("xx:~p~n", [File]),
ok = file:write_file(File, Bin),
PackageName = string:trim(FileName, trailing, ".tar.gz"),
emqx_plugins:ensure_installed(PackageName).
diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl
index 741c0b86d..592c6ede2 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl
@@ -24,11 +24,11 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
- emqx_mgmt_api_test_util:init_suite(),
+ emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config.
end_per_suite(_) ->
- emqx_mgmt_api_test_util:end_suite().
+ emqx_mgmt_api_test_util:end_suite([emqx_conf]).
t_get(_Config) ->
{ok, Configs} = get_configs(),
@@ -68,34 +68,65 @@ t_update(_Config) ->
?assertMatch(#{<<"vm">> := #{<<"busy_port">> := false}}, SysMon4),
ok.
-t_zones(_Config) ->
- {ok, Zones} = get_zones(),
+t_log(_Config) ->
+ {ok, Log} = get_config("log"),
+ File = "log/emqx-test.log",
+ %% update handler
+ Log1 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, true),
+ Log2 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"default">>, <<"file">>], Log1, File),
+ {ok, #{}} = update_config(<<"log">>, Log2),
+ {ok, Log3} = logger:get_handler_config(default),
+ ?assertMatch(#{config := #{file := File}}, Log3),
+ ErrLog1 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, 1),
+ ?assertMatch({error, {"HTTP/1.1", 400, _}}, update_config(<<"log">>, ErrLog1)),
+ ErrLog2 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"default">>, <<"enabfe">>], Log, true),
+ ?assertMatch({error, {"HTTP/1.1", 400, _}}, update_config(<<"log">>, ErrLog2)),
+
+ %% add new handler
+ File1 = "log/emqx-test1.log",
+ Handler = emqx_map_lib:deep_get([<<"file_handlers">>, <<"default">>], Log2),
+ NewLog1 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"new">>], Log2, Handler),
+ NewLog2 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"new">>, <<"file">>], NewLog1, File1),
+ {ok, #{}} = update_config(<<"log">>, NewLog2),
+ {ok, Log4} = logger:get_handler_config(new),
+ ?assertMatch(#{config := #{file := File1}}, Log4),
+
+ %% disable new handler
+ Disable = emqx_map_lib:deep_put([<<"file_handlers">>, <<"new">>, <<"enable">>], NewLog2, false),
+ {ok, #{}} = update_config(<<"log">>, Disable),
+ ?assertEqual({error, {not_found, new}}, logger:get_handler_config(new)),
+ ok.
+
+t_global_zone(_Config) ->
+ {ok, Zones} = get_global_zone(),
ZonesKeys = lists:map(fun({K, _}) -> K end, hocon_schema:roots(emqx_zone_schema)),
?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))),
?assertEqual(emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]),
emqx_map_lib:deep_get([<<"mqtt">>, <<"max_qos_allowed">>], Zones)),
NewZones = emqx_map_lib:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
- {ok, #{}} = update_zones(NewZones),
+ {ok, #{}} = update_global_zone(NewZones),
?assertEqual(1, emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])),
BadZones = emqx_map_lib:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 3),
- ?assertMatch({error, {"HTTP/1.1", 400, _}}, update_zones(BadZones)),
+ ?assertMatch({error, {"HTTP/1.1", 400, _}}, update_global_zone(BadZones)),
ok.
-get_zones() ->
- Path = emqx_mgmt_api_test_util:api_path(["configs", "global_zone"]),
- case emqx_mgmt_api_test_util:request_api(get, Path) of
- {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
- Error -> Error
- end.
+get_global_zone() ->
+ get_config("global_zone").
-update_zones(Change) ->
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- UpdatePath = emqx_mgmt_api_test_util:api_path(["configs", "global_zone"]),
- case emqx_mgmt_api_test_util:request_api(put, UpdatePath, "", AuthHeader, Change) of
- {ok, Update} -> {ok, emqx_json:decode(Update, [return_maps])};
- Error -> Error
- end.
+update_global_zone(Change) ->
+ update_config("global_zone", Change).
+
+t_zones(_Config) ->
+ {ok, Zones} = get_config("zones"),
+ {ok, #{<<"mqtt">> := OldMqtt} = Zone1} = get_global_zone(),
+ {ok, #{}} = update_config("zones", Zones#{<<"new_zone">> => Zone1}),
+ NewMqtt = emqx_config:get_raw([zones, new_zone, mqtt]),
+ ?assertEqual(OldMqtt, NewMqtt),
+ %% delete the new zones
+ {ok, #{}} = update_config("zones", Zones),
+ ?assertEqual(undefined, emqx_config:get_raw([new_zone, mqtt], undefined)),
+ ok.
get_config(Name) ->
Path = emqx_mgmt_api_test_util:api_path(["configs", Name]),
diff --git a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl
index a6e255d69..a2612d176 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl
@@ -40,19 +40,20 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
- emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]),
%% restore config
case proplists:get_value(orig_install_dir, Config) of
undefined -> ok;
OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir)
end,
+ emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]),
ok.
-t_plugins(Config) ->
+todo_t_plugins(Config) ->
DemoShDir = proplists:get_value(demo_sh_dir, Config),
PackagePath = build_demo_plugin_package(DemoShDir),
ct:pal("package_location:~p install dir:~p", [PackagePath, emqx_plugins:install_dir()]),
NameVsn = filename:basename(PackagePath, ?PACKAGE_SUFFIX),
+ ok = emqx_plugins:delete_package(NameVsn),
ok = install_plugin(PackagePath),
{ok, StopRes} = describe_plugins(NameVsn),
?assertMatch(#{<<"running_status">> := [
diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl
index 96ba4bf86..cdc4b4521 100644
--- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl
+++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl
@@ -173,7 +173,6 @@ t_update_re_failed(_Config) ->
[
{validation_error,
#{
- path := "root.rewrite.1.re",
reason := {Re, {"nothing to repeat", 0}},
value := Re
}
diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl
index 7d9bd2e8b..770c05045 100644
--- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl
+++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl
@@ -29,7 +29,7 @@
%% <<"certfile">> => file_input()
%% <<"cafile">> => file_input() %% backward compatible
%% <<"cacertfile">> => file_input()
-%% <<"verify">> => boolean()
+%% <<"verify">> => verify_none | verify_peer
%% <<"tls_versions">> => binary()
%% <<"ciphers">> => binary()
-type opts_key() :: binary() | atom().
@@ -78,7 +78,8 @@ save_files_return_opts(Options, Dir) ->
Versions = emqx_tls_lib:integral_versions(Get(versions)),
Ciphers = emqx_tls_lib:integral_ciphers(Versions, Get(ciphers)),
filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA},
- {verify, Verify}, {server_name_indication, SNI}, {versions, Versions}, {ciphers, Ciphers}]).
+ {verify, Verify}, {server_name_indication, SNI},
+ {versions, Versions}, {ciphers, Ciphers}]).
%% @doc Save a key or certificate file in data dir,
%% and return path of the saved file.
diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl
index de7d94c24..4f0d77a23 100644
--- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl
+++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl
@@ -32,7 +32,7 @@ prop_opts_input() ->
[{keyfile, prop_file_or_content()},
{certfile, prop_file_or_content()},
{cacertfile, prop_file_or_content()},
- {verify, proper_types:boolean()},
+ {verify, proper_types:oneof([verify_none, verify_peer])},
{versions, prop_tls_versions()},
{ciphers, prop_tls_ciphers()},
{other, proper_types:binary()}].
diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl
index 0e54b8450..09b7a735e 100644
--- a/apps/emqx_plugins/src/emqx_plugins.erl
+++ b/apps/emqx_plugins/src/emqx_plugins.erl
@@ -35,6 +35,7 @@
, restart/1
, list/0
, describe/1
+ , parse_name_vsn/1
]).
-export([ get_config/2
@@ -565,18 +566,28 @@ is_needed_by(AppToStop, RunningApp) ->
put_config(Key, Value) when is_atom(Key) ->
put_config([Key], Value);
-put_config(Path, Value) when is_list(Path) ->
- emqx_config:put([?CONF_ROOT | Path], Value).
+put_config(Path, Values) when is_list(Path) ->
+ Opts = #{rawconf_with_defaults => true, override_to => cluster},
+ case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
+ {ok, _} -> ok;
+ Error -> Error
+ end.
+
+bin_key(Map) when is_map(Map) ->
+ maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
+bin_key(List = [#{} | _]) ->
+ lists:map(fun(M) -> bin_key(M) end, List);
+bin_key(Term) -> Term.
get_config(Key, Default) when is_atom(Key) ->
get_config([Key], Default);
get_config(Path, Default) ->
- emqx:get_config([?CONF_ROOT | Path], Default).
+ emqx_conf:get([?CONF_ROOT | Path], Default).
install_dir() -> get_config(install_dir, "").
put_configured(Configured) ->
- ok = put_config(states, Configured).
+ ok = put_config(states, bin_key(Configured)).
configured() ->
get_config(states, []).
diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl
index 8d50ef7f7..00c8b4226 100644
--- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl
+++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl
@@ -31,19 +31,20 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
WorkDir = proplists:get_value(data_dir, Config),
OrigInstallDir = emqx_plugins:get_config(install_dir, undefined),
+ emqx_common_test_helpers:start_apps([emqx_conf]),
emqx_plugins:put_config(install_dir, WorkDir),
- emqx_common_test_helpers:start_apps([]),
[{orig_install_dir, OrigInstallDir} | Config].
end_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
- emqx_common_test_helpers:stop_apps([]),
emqx_config:erase(plugins),
%% restore config
case proplists:get_value(orig_install_dir, Config) of
undefined -> ok;
OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir)
- end.
+ end,
+ emqx_common_test_helpers:stop_apps([emqx_conf]),
+ ok.
init_per_testcase(TestCase, Config) ->
emqx_plugins:put_configured([]),
diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl
index 2b7fb9fe4..bb2328779 100644
--- a/apps/emqx_plugins/test/emqx_plugins_tests.erl
+++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl
@@ -18,10 +18,15 @@
-include_lib("eunit/include/eunit.hrl").
-ensure_configured_test() ->
+-compile(export_all).
+
+ensure_configured_test_todo() ->
+ meck_emqx(),
try test_ensure_configured()
after emqx_plugins:put_configured([])
- end.
+ end,
+ meck:unload(emqx).
+
test_ensure_configured() ->
ok = emqx_plugins:put_configured([]),
@@ -36,6 +41,7 @@ test_ensure_configured() ->
emqx_plugins:ensure_configured(P3, {before, <<"unknown-x">>})).
read_plugin_test() ->
+ meck_emqx(),
with_rand_install_dir(
fun(_Dir) ->
NameVsn = "bar-5",
@@ -49,7 +55,8 @@ read_plugin_test() ->
after
emqx_plugins:purge(NameVsn)
end
- end).
+ end),
+ meck:unload(emqx).
with_rand_install_dir(F) ->
N = rand:uniform(10000000),
@@ -72,6 +79,7 @@ write_file(Path, Content) ->
%% but it may fail in case the path is a directory
%% or if the file is read-only
delete_package_test() ->
+ meck_emqx(),
with_rand_install_dir(
fun(_Dir) ->
File = emqx_plugins:pkg_file("a-1"),
@@ -82,11 +90,13 @@ delete_package_test() ->
Dir = File,
ok = filelib:ensure_dir(filename:join([Dir, "foo"])),
?assertMatch({error, _}, emqx_plugins:delete_package("a-1"))
- end).
+ end),
+ meck:unload(emqx).
%% purge plugin's install dir should mostly work and return ok
%% but it may fail in case the dir is read-only
purge_test() ->
+ meck_emqx(),
with_rand_install_dir(
fun(_Dir) ->
File = emqx_plugins:info_file("a-1"),
@@ -99,4 +109,23 @@ purge_test() ->
%% write a file for the dir path
ok = file:write_file(Dir, "a"),
?assertEqual(ok, emqx_plugins:purge("a-1"))
- end).
+ end),
+ meck:unload(emqx).
+
+meck_emqx() ->
+ meck:new(emqx, [unstick, passthrough]),
+ meck:expect(emqx, update_config,
+ fun(Path, Values, _Opts) ->
+ emqx_config:put(Path, Values)
+ end),
+ %meck:expect(emqx, get_config,
+ % fun(KeyPath, Default) ->
+ % Map = emqx:get_raw_config(KeyPath, Default),
+ % Map1 = emqx_map_lib:safe_atom_key_map(Map),
+ % case Map1 of
+ % #{states := Plugins} ->
+ % Map1#{states => [emqx_map_lib:safe_atom_key_map(P) ||P <- Plugins]};
+ % _ -> Map1
+ % end
+ % end),
+ ok.
diff --git a/apps/emqx_prometheus/rebar.config b/apps/emqx_prometheus/rebar.config
index 753cc41b5..86dd80add 100644
--- a/apps/emqx_prometheus/rebar.config
+++ b/apps/emqx_prometheus/rebar.config
@@ -4,7 +4,7 @@
[ {emqx, {path, "../emqx"}},
%% FIXME: tag this as v3.1.3
{prometheus, {git, "https://github.com/deadtrickster/prometheus.erl", {tag, "v4.8.1"}}},
- {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.1"}}}
+ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.3"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
diff --git a/mix.exs b/mix.exs
index c0a663c94..abe92ecae 100644
--- a/mix.exs
+++ b/mix.exs
@@ -68,7 +68,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "0.18.0", override: true},
- {:hocon, github: "emqx/hocon", tag: "0.26.1", override: true},
+ {:hocon, github: "emqx/hocon", tag: "0.26.3", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.4.1", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
diff --git a/rebar.config b/rebar.config
index 0e70c1777..d774206ac 100644
--- a/rebar.config
+++ b/rebar.config
@@ -66,7 +66,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.2"}}}
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.18.0"}}}
- , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.1"}}}
+ , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.3"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}