Merge branch 'master' into sync-release-50-to-master
This commit is contained in:
commit
a953b951fe
|
@ -24,9 +24,6 @@ jobs:
|
|||
profile:
|
||||
- ['emqx', 'master']
|
||||
- ['emqx-enterprise', 'release-50']
|
||||
branch:
|
||||
- master
|
||||
- release-50
|
||||
otp:
|
||||
- 24.3.4.2-3
|
||||
arch:
|
||||
|
|
|
@ -24,12 +24,12 @@
|
|||
{deps, [
|
||||
{emqx_utils, {path, "../emqx_utils"}},
|
||||
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
|
||||
{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
|
||||
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.6"}}},
|
||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
|
||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
||||
|
|
|
@ -112,8 +112,8 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
|
|||
end,
|
||||
ok.
|
||||
|
||||
id_for_log(console) -> "log.console_handler";
|
||||
id_for_log(Other) -> "log.file_handlers." ++ atom_to_list(Other).
|
||||
id_for_log(console) -> "log.console";
|
||||
id_for_log(Other) -> "log.file." ++ atom_to_list(Other).
|
||||
|
||||
atom(Id) when is_binary(Id) -> binary_to_atom(Id, utf8);
|
||||
atom(Id) when is_atom(Id) -> Id.
|
||||
|
@ -126,12 +126,12 @@ tr_handlers(Conf) ->
|
|||
|
||||
%% For the default logger that outputs to console
|
||||
tr_console_handler(Conf) ->
|
||||
case conf_get("log.console_handler.enable", Conf) of
|
||||
case conf_get("log.console.enable", Conf) of
|
||||
true ->
|
||||
ConsoleConf = conf_get("log.console_handler", Conf),
|
||||
ConsoleConf = conf_get("log.console", Conf),
|
||||
[
|
||||
{handler, console, logger_std_h, #{
|
||||
level => conf_get("log.console_handler.level", Conf),
|
||||
level => conf_get("log.console.level", Conf),
|
||||
config => (log_handler_conf(ConsoleConf))#{type => standard_io},
|
||||
formatter => log_formatter(ConsoleConf),
|
||||
filters => log_filter(ConsoleConf)
|
||||
|
@ -150,14 +150,10 @@ tr_file_handler({HandlerName, SubConf}) ->
|
|||
{handler, atom(HandlerName), logger_disk_log_h, #{
|
||||
level => conf_get("level", SubConf),
|
||||
config => (log_handler_conf(SubConf))#{
|
||||
type =>
|
||||
case conf_get("rotation.enable", SubConf) of
|
||||
true -> wrap;
|
||||
_ -> halt
|
||||
end,
|
||||
file => conf_get("file", SubConf),
|
||||
max_no_files => conf_get("rotation.count", SubConf),
|
||||
max_no_bytes => conf_get("max_size", SubConf)
|
||||
type => wrap,
|
||||
file => conf_get("to", SubConf),
|
||||
max_no_files => conf_get("rotation_count", SubConf),
|
||||
max_no_bytes => conf_get("rotation_size", SubConf)
|
||||
},
|
||||
formatter => log_formatter(SubConf),
|
||||
filters => log_filter(SubConf),
|
||||
|
@ -165,14 +161,11 @@ tr_file_handler({HandlerName, SubConf}) ->
|
|||
}}.
|
||||
|
||||
logger_file_handlers(Conf) ->
|
||||
Handlers = maps:to_list(conf_get("log.file_handlers", Conf, #{})),
|
||||
lists:filter(
|
||||
fun({_Name, Opts}) ->
|
||||
B = conf_get("enable", Opts),
|
||||
true = is_boolean(B),
|
||||
B
|
||||
fun({_Name, Handler}) ->
|
||||
conf_get("enable", Handler, false)
|
||||
end,
|
||||
Handlers
|
||||
maps:to_list(conf_get("log.file", Conf, #{}))
|
||||
).
|
||||
|
||||
conf_get(Key, Conf) -> emqx_schema:conf_get(Key, Conf).
|
||||
|
@ -237,12 +230,8 @@ log_filter(Conf) ->
|
|||
end.
|
||||
|
||||
tr_level(Conf) ->
|
||||
ConsoleLevel = conf_get("log.console_handler.level", Conf, undefined),
|
||||
FileLevels = [
|
||||
conf_get("level", SubConf)
|
||||
|| {_, SubConf} <-
|
||||
logger_file_handlers(Conf)
|
||||
],
|
||||
ConsoleLevel = conf_get("log.console.level", Conf, undefined),
|
||||
FileLevels = [conf_get("level", SubConf) || {_, SubConf} <- logger_file_handlers(Conf)],
|
||||
case FileLevels ++ [ConsoleLevel || ConsoleLevel =/= undefined] of
|
||||
%% warning is the default level we should use
|
||||
[] -> warning;
|
||||
|
|
|
@ -29,9 +29,13 @@
|
|||
authn_type/1
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([convert_certs/2, convert_certs/3, clear_certs/2]).
|
||||
-endif.
|
||||
%% Used in emqx_gateway
|
||||
-export([
|
||||
certs_dir/2,
|
||||
convert_certs/2,
|
||||
convert_certs/3,
|
||||
clear_certs/2
|
||||
]).
|
||||
|
||||
-export_type([config/0]).
|
||||
|
||||
|
|
|
@ -2289,6 +2289,17 @@ common_ssl_opts_schema(Defaults, Type) ->
|
|||
desc => ?DESC(common_ssl_opts_schema_secure_renegotiate)
|
||||
}
|
||||
)},
|
||||
{"log_level",
|
||||
sc(
|
||||
hoconsc:enum([
|
||||
emergency, alert, critical, error, warning, notice, info, debug, none, all
|
||||
]),
|
||||
#{
|
||||
default => notice,
|
||||
desc => ?DESC(common_ssl_opts_schema_log_level),
|
||||
importance => ?IMPORTANCE_LOW
|
||||
}
|
||||
)},
|
||||
|
||||
{"hibernate_after",
|
||||
sc(
|
||||
|
|
|
@ -291,16 +291,16 @@ stats(Session) -> info(?STATS_KEYS, Session).
|
|||
|
||||
ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
|
||||
Subs = info(subscriptions, Session),
|
||||
lists:dropwhile(
|
||||
lists:filter(
|
||||
fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
|
||||
case maps:find(Topic, Subs) of
|
||||
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = emqx_metrics:inc('delivery.dropped.no_local'),
|
||||
true;
|
||||
false;
|
||||
_ ->
|
||||
false
|
||||
true
|
||||
end
|
||||
end,
|
||||
Delivers
|
||||
|
|
|
@ -251,6 +251,7 @@ start_app(App, SpecAppConfig, Opts) ->
|
|||
{ok, _} ->
|
||||
ok = ensure_dashboard_listeners_started(App),
|
||||
ok = wait_for_app_processes(App),
|
||||
ok = perform_sanity_checks(App),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
error({failed_to_start_app, App, Reason})
|
||||
|
@ -264,6 +265,27 @@ wait_for_app_processes(emqx_conf) ->
|
|||
wait_for_app_processes(_) ->
|
||||
ok.
|
||||
|
||||
%% These are checks to detect inter-suite or inter-testcase flakiness
|
||||
%% early. For example, one suite might forget one application running
|
||||
%% and stop others, and then the `application:start/2' callback is
|
||||
%% never called again for this application.
|
||||
perform_sanity_checks(emqx_rule_engine) ->
|
||||
ensure_config_handler(emqx_rule_engine, [rule_engine, rules]),
|
||||
ok;
|
||||
perform_sanity_checks(emqx_bridge) ->
|
||||
ensure_config_handler(emqx_bridge, [bridges]),
|
||||
ok;
|
||||
perform_sanity_checks(_App) ->
|
||||
ok.
|
||||
|
||||
ensure_config_handler(Module, ConfigPath) ->
|
||||
#{handlers := Handlers} = sys:get_state(emqx_config_handler),
|
||||
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
|
||||
#{{mod} := Module} -> ok;
|
||||
_NotFound -> error({config_handler_missing, ConfigPath, Module})
|
||||
end,
|
||||
ok.
|
||||
|
||||
app_conf_file(emqx_conf) -> "emqx.conf.all";
|
||||
app_conf_file(App) -> atom_to_list(App) ++ ".conf".
|
||||
|
||||
|
|
|
@ -829,6 +829,42 @@ t_subscribe_no_local(Config) ->
|
|||
?assertEqual(1, length(receive_messages(2))),
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
t_subscribe_no_local_mixed(Config) ->
|
||||
ConnFun = ?config(conn_fun, Config),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
|
||||
{ok, _} = emqtt:ConnFun(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
|
||||
{ok, _} = emqtt:ConnFun(Client2),
|
||||
|
||||
%% Given tow clients and client1 subscribe to topic with 'no local' set to true
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
|
||||
|
||||
%% When mixed publish traffic are sent from both clients (Client1 sent 6 and Client2 sent 2)
|
||||
CB = {fun emqtt:sync_publish_result/3, [self(), async_res]},
|
||||
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed1">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed2">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed3">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed4">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed5">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed6">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed7">>, 0, CB),
|
||||
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed8">>, 0, CB),
|
||||
[
|
||||
receive
|
||||
{async_res, Res} -> ?assertEqual(ok, Res)
|
||||
end
|
||||
|| _ <- lists:seq(1, 8)
|
||||
],
|
||||
|
||||
%% Then only two messages from clients 2 are received
|
||||
PubRecvd = receive_messages(9),
|
||||
ct:pal("~p", [PubRecvd]),
|
||||
?assertEqual(2, length(PubRecvd)),
|
||||
ok = emqtt:disconnect(Client1),
|
||||
ok = emqtt:disconnect(Client2).
|
||||
|
||||
t_subscribe_actions(Config) ->
|
||||
ConnFun = ?config(conn_fun, Config),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
|
|
@ -129,7 +129,7 @@ assert_upgraded1(Map) ->
|
|||
?assert(maps:is_key(<<"ssl">>, Map)).
|
||||
|
||||
check(Conf) when is_map(Conf) ->
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{required => false}).
|
||||
|
||||
%% erlfmt-ignore
|
||||
%% this is config generated from v5.0.11
|
||||
|
|
|
@ -687,11 +687,12 @@ fields("rpc") ->
|
|||
desc => ?DESC(rpc_mode)
|
||||
}
|
||||
)},
|
||||
{"driver",
|
||||
{"protocol",
|
||||
sc(
|
||||
hoconsc:enum([tcp, ssl]),
|
||||
#{
|
||||
mapping => "gen_rpc.driver",
|
||||
aliases => [driver],
|
||||
default => tcp,
|
||||
desc => ?DESC(rpc_driver)
|
||||
}
|
||||
|
@ -866,19 +867,22 @@ fields("rpc") ->
|
|||
];
|
||||
fields("log") ->
|
||||
[
|
||||
{"console_handler",
|
||||
{"console",
|
||||
sc(?R_REF("console_handler"), #{
|
||||
aliases => [console_handler],
|
||||
importance => ?IMPORTANCE_HIGH
|
||||
})},
|
||||
{"file",
|
||||
sc(
|
||||
?R_REF("console_handler"),
|
||||
#{importance => ?IMPORTANCE_HIGH}
|
||||
)},
|
||||
{"file_handlers",
|
||||
sc(
|
||||
map(name, ?R_REF("log_file_handler")),
|
||||
?UNION([
|
||||
?R_REF("log_file_handler"),
|
||||
?MAP(handler_name, ?R_REF("log_file_handler"))
|
||||
]),
|
||||
#{
|
||||
desc => ?DESC("log_file_handlers"),
|
||||
%% because file_handlers is a map
|
||||
%% so there has to be a default value in order to populate the raw configs
|
||||
default => #{<<"default">> => #{<<"level">> => <<"warning">>}},
|
||||
converter => fun ensure_file_handlers/2,
|
||||
default => #{<<"level">> => <<"warning">>},
|
||||
aliases => [file_handlers],
|
||||
importance => ?IMPORTANCE_HIGH
|
||||
}
|
||||
)}
|
||||
|
@ -887,50 +891,40 @@ fields("console_handler") ->
|
|||
log_handler_common_confs(console);
|
||||
fields("log_file_handler") ->
|
||||
[
|
||||
{"file",
|
||||
{"to",
|
||||
sc(
|
||||
file(),
|
||||
#{
|
||||
desc => ?DESC("log_file_handler_file"),
|
||||
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
|
||||
converter => fun emqx_schema:naive_env_interpolation/1,
|
||||
validator => fun validate_file_location/1
|
||||
validator => fun validate_file_location/1,
|
||||
aliases => [file],
|
||||
importance => ?IMPORTANCE_HIGH
|
||||
}
|
||||
)},
|
||||
{"rotation",
|
||||
{"rotation_count",
|
||||
sc(
|
||||
?R_REF("log_rotation"),
|
||||
#{}
|
||||
range(1, 128),
|
||||
#{
|
||||
aliases => [rotation],
|
||||
default => 10,
|
||||
converter => fun convert_rotation/2,
|
||||
desc => ?DESC("log_rotation_count"),
|
||||
importance => ?IMPORTANCE_MEDIUM
|
||||
}
|
||||
)},
|
||||
{"max_size",
|
||||
{"rotation_size",
|
||||
sc(
|
||||
hoconsc:union([infinity, emqx_schema:bytesize()]),
|
||||
#{
|
||||
default => <<"50MB">>,
|
||||
desc => ?DESC("log_file_handler_max_size"),
|
||||
aliases => [max_size],
|
||||
importance => ?IMPORTANCE_MEDIUM
|
||||
}
|
||||
)}
|
||||
] ++ log_handler_common_confs(file);
|
||||
fields("log_rotation") ->
|
||||
[
|
||||
{"enable",
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC("log_rotation_enable")
|
||||
}
|
||||
)},
|
||||
{"count",
|
||||
sc(
|
||||
range(1, 2048),
|
||||
#{
|
||||
default => 10,
|
||||
desc => ?DESC("log_rotation_count")
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields("log_overload_kill") ->
|
||||
[
|
||||
{"enable",
|
||||
|
@ -1038,8 +1032,8 @@ translation("ekka") ->
|
|||
[{"cluster_discovery", fun tr_cluster_discovery/1}];
|
||||
translation("kernel") ->
|
||||
[
|
||||
{"logger_level", fun tr_logger_level/1},
|
||||
{"logger", fun tr_logger_handlers/1},
|
||||
{"logger_level", fun emqx_config_logger:tr_level/1},
|
||||
{"logger", fun emqx_config_logger:tr_handlers/1},
|
||||
{"error_logger", fun(_) -> silent end}
|
||||
];
|
||||
translation("emqx") ->
|
||||
|
@ -1113,24 +1107,9 @@ tr_cluster_discovery(Conf) ->
|
|||
Strategy = conf_get("cluster.discovery_strategy", Conf),
|
||||
{Strategy, filter(cluster_options(Strategy, Conf))}.
|
||||
|
||||
-spec tr_logger_level(hocon:config()) -> logger:level().
|
||||
tr_logger_level(Conf) ->
|
||||
emqx_config_logger:tr_level(Conf).
|
||||
|
||||
tr_logger_handlers(Conf) ->
|
||||
emqx_config_logger:tr_handlers(Conf).
|
||||
|
||||
log_handler_common_confs(Handler) ->
|
||||
lists:map(
|
||||
fun
|
||||
({_Name, #{importance := _}} = F) -> F;
|
||||
({Name, Sc}) -> {Name, Sc#{importance => ?IMPORTANCE_LOW}}
|
||||
end,
|
||||
do_log_handler_common_confs(Handler)
|
||||
).
|
||||
do_log_handler_common_confs(Handler) ->
|
||||
%% we rarely support dynamic defaults like this
|
||||
%% for this one, we have build-time defualut the same as runtime default
|
||||
%% for this one, we have build-time default the same as runtime default
|
||||
%% so it's less tricky
|
||||
EnableValues =
|
||||
case Handler of
|
||||
|
@ -1140,21 +1119,31 @@ do_log_handler_common_confs(Handler) ->
|
|||
EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"),
|
||||
Enable = lists:member(EnvValue, EnableValues),
|
||||
[
|
||||
{"level",
|
||||
sc(
|
||||
log_level(),
|
||||
#{
|
||||
default => warning,
|
||||
desc => ?DESC("common_handler_level"),
|
||||
importance => ?IMPORTANCE_HIGH
|
||||
}
|
||||
)},
|
||||
{"enable",
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
default => Enable,
|
||||
desc => ?DESC("common_handler_enable"),
|
||||
importance => ?IMPORTANCE_LOW
|
||||
importance => ?IMPORTANCE_MEDIUM
|
||||
}
|
||||
)},
|
||||
{"level",
|
||||
{"formatter",
|
||||
sc(
|
||||
log_level(),
|
||||
hoconsc:enum([text, json]),
|
||||
#{
|
||||
default => warning,
|
||||
desc => ?DESC("common_handler_level")
|
||||
default => text,
|
||||
desc => ?DESC("common_handler_formatter"),
|
||||
importance => ?IMPORTANCE_MEDIUM
|
||||
}
|
||||
)},
|
||||
{"time_offset",
|
||||
|
@ -1173,16 +1162,7 @@ do_log_handler_common_confs(Handler) ->
|
|||
#{
|
||||
default => unlimited,
|
||||
desc => ?DESC("common_handler_chars_limit"),
|
||||
importance => ?IMPORTANCE_LOW
|
||||
}
|
||||
)},
|
||||
{"formatter",
|
||||
sc(
|
||||
hoconsc:enum([text, json]),
|
||||
#{
|
||||
default => text,
|
||||
desc => ?DESC("common_handler_formatter"),
|
||||
importance => ?IMPORTANCE_MEDIUM
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"single_line",
|
||||
|
@ -1191,7 +1171,7 @@ do_log_handler_common_confs(Handler) ->
|
|||
#{
|
||||
default => true,
|
||||
desc => ?DESC("common_handler_single_line"),
|
||||
importance => ?IMPORTANCE_LOW
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"sync_mode_qlen",
|
||||
|
@ -1199,7 +1179,8 @@ do_log_handler_common_confs(Handler) ->
|
|||
non_neg_integer(),
|
||||
#{
|
||||
default => 100,
|
||||
desc => ?DESC("common_handler_sync_mode_qlen")
|
||||
desc => ?DESC("common_handler_sync_mode_qlen"),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"drop_mode_qlen",
|
||||
|
@ -1207,7 +1188,8 @@ do_log_handler_common_confs(Handler) ->
|
|||
pos_integer(),
|
||||
#{
|
||||
default => 3000,
|
||||
desc => ?DESC("common_handler_drop_mode_qlen")
|
||||
desc => ?DESC("common_handler_drop_mode_qlen"),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"flush_qlen",
|
||||
|
@ -1215,17 +1197,19 @@ do_log_handler_common_confs(Handler) ->
|
|||
pos_integer(),
|
||||
#{
|
||||
default => 8000,
|
||||
desc => ?DESC("common_handler_flush_qlen")
|
||||
desc => ?DESC("common_handler_flush_qlen"),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"overload_kill", sc(?R_REF("log_overload_kill"), #{})},
|
||||
{"burst_limit", sc(?R_REF("log_burst_limit"), #{})},
|
||||
{"overload_kill", sc(?R_REF("log_overload_kill"), #{importance => ?IMPORTANCE_HIDDEN})},
|
||||
{"burst_limit", sc(?R_REF("log_burst_limit"), #{importance => ?IMPORTANCE_HIDDEN})},
|
||||
{"supervisor_reports",
|
||||
sc(
|
||||
hoconsc:enum([error, progress]),
|
||||
#{
|
||||
default => error,
|
||||
desc => ?DESC("common_handler_supervisor_reports")
|
||||
desc => ?DESC("common_handler_supervisor_reports"),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"max_depth",
|
||||
|
@ -1233,7 +1217,8 @@ do_log_handler_common_confs(Handler) ->
|
|||
hoconsc:union([unlimited, non_neg_integer()]),
|
||||
#{
|
||||
default => 100,
|
||||
desc => ?DESC("common_handler_max_depth")
|
||||
desc => ?DESC("common_handler_max_depth"),
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
@ -1355,3 +1340,19 @@ validator_string_re(Val, RE, Error) ->
|
|||
|
||||
node_array() ->
|
||||
hoconsc:union([emqx_schema:comma_separated_atoms(), hoconsc:array(atom())]).
|
||||
|
||||
ensure_file_handlers(Conf, _Opts) ->
|
||||
FileFields = lists:flatmap(
|
||||
fun({F, Schema}) ->
|
||||
Alias = [atom_to_binary(A) || A <- maps:get(aliases, Schema, [])],
|
||||
[list_to_binary(F) | Alias]
|
||||
end,
|
||||
fields("log_file_handler")
|
||||
),
|
||||
HandlersWithoutName = maps:with(FileFields, Conf),
|
||||
HandlersWithName = maps:without(FileFields, Conf),
|
||||
emqx_utils_maps:deep_merge(#{<<"default">> => HandlersWithoutName}, HandlersWithName).
|
||||
|
||||
convert_rotation(undefined, _Opts) -> undefined;
|
||||
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
|
||||
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
|
||||
|
|
|
@ -47,6 +47,198 @@ array_nodes_test() ->
|
|||
),
|
||||
ok.
|
||||
|
||||
%% erlfmt-ignore
|
||||
-define(OUTDATED_LOG_CONF,
|
||||
"""
|
||||
log.console_handler {
|
||||
burst_limit {
|
||||
enable = true
|
||||
max_count = 10000
|
||||
window_time = 1000
|
||||
}
|
||||
chars_limit = unlimited
|
||||
drop_mode_qlen = 3000
|
||||
enable = true
|
||||
flush_qlen = 8000
|
||||
formatter = text
|
||||
level = warning
|
||||
max_depth = 100
|
||||
overload_kill {
|
||||
enable = true
|
||||
mem_size = 31457280
|
||||
qlen = 20000
|
||||
restart_after = 5000
|
||||
}
|
||||
single_line = true
|
||||
supervisor_reports = error
|
||||
sync_mode_qlen = 100
|
||||
time_offset = \"+02:00\"
|
||||
}
|
||||
log.file_handlers {
|
||||
default {
|
||||
burst_limit {
|
||||
enable = true
|
||||
max_count = 10000
|
||||
window_time = 1000
|
||||
}
|
||||
chars_limit = unlimited
|
||||
drop_mode_qlen = 3000
|
||||
enable = true
|
||||
file = \"log/my-emqx.log\"
|
||||
flush_qlen = 8000
|
||||
formatter = text
|
||||
level = debug
|
||||
max_depth = 100
|
||||
max_size = \"1024MB\"
|
||||
overload_kill {
|
||||
enable = true
|
||||
mem_size = 31457280
|
||||
qlen = 20000
|
||||
restart_after = 5000
|
||||
}
|
||||
rotation {count = 20, enable = true}
|
||||
single_line = true
|
||||
supervisor_reports = error
|
||||
sync_mode_qlen = 100
|
||||
time_offset = \"+01:00\"
|
||||
}
|
||||
}
|
||||
"""
|
||||
).
|
||||
-define(FORMATTER(TimeOffset),
|
||||
{emqx_logger_textfmt, #{
|
||||
chars_limit => unlimited,
|
||||
depth => 100,
|
||||
single_line => true,
|
||||
template => [time, " [", level, "] ", msg, "\n"],
|
||||
time_offset => TimeOffset
|
||||
}}
|
||||
).
|
||||
|
||||
-define(FILTERS, [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]).
|
||||
-define(LOG_CONFIG, #{
|
||||
burst_limit_enable => true,
|
||||
burst_limit_max_count => 10000,
|
||||
burst_limit_window_time => 1000,
|
||||
drop_mode_qlen => 3000,
|
||||
flush_qlen => 8000,
|
||||
overload_kill_enable => true,
|
||||
overload_kill_mem_size => 31457280,
|
||||
overload_kill_qlen => 20000,
|
||||
overload_kill_restart_after => 5000,
|
||||
sync_mode_qlen => 100
|
||||
}).
|
||||
|
||||
outdated_log_test() ->
|
||||
validate_log(?OUTDATED_LOG_CONF).
|
||||
|
||||
validate_log(Conf) ->
|
||||
BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]),
|
||||
Conf0 = <<BaseConf/binary, (list_to_binary(Conf))/binary>>,
|
||||
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
|
||||
ConfList = hocon_tconf:generate(emqx_conf_schema, ConfMap0),
|
||||
Kernel = proplists:get_value(kernel, ConfList),
|
||||
|
||||
?assertEqual(silent, proplists:get_value(error_logger, Kernel)),
|
||||
?assertEqual(debug, proplists:get_value(logger_level, Kernel)),
|
||||
Loggers = proplists:get_value(logger, Kernel),
|
||||
FileHandler = lists:keyfind(logger_disk_log_h, 3, Loggers),
|
||||
?assertEqual(
|
||||
{handler, default, logger_disk_log_h, #{
|
||||
config => ?LOG_CONFIG#{
|
||||
type => wrap,
|
||||
file => "log/my-emqx.log",
|
||||
max_no_bytes => 1073741824,
|
||||
max_no_files => 20
|
||||
},
|
||||
filesync_repeat_interval => no_repeat,
|
||||
filters => ?FILTERS,
|
||||
formatter => ?FORMATTER("+01:00"),
|
||||
level => debug
|
||||
}},
|
||||
FileHandler
|
||||
),
|
||||
ConsoleHandler = lists:keyfind(logger_std_h, 3, Loggers),
|
||||
?assertEqual(
|
||||
{handler, console, logger_std_h, #{
|
||||
config => ?LOG_CONFIG#{type => standard_io},
|
||||
filters => ?FILTERS,
|
||||
formatter => ?FORMATTER("+02:00"),
|
||||
level => warning
|
||||
}},
|
||||
ConsoleHandler
|
||||
).
|
||||
|
||||
%% erlfmt-ignore
|
||||
-define(KERNEL_LOG_CONF,
|
||||
"""
|
||||
log.console {
|
||||
enable = true
|
||||
formatter = text
|
||||
level = warning
|
||||
time_offset = \"+02:00\"
|
||||
}
|
||||
log.file {
|
||||
enable = false
|
||||
file = \"log/xx-emqx.log\"
|
||||
formatter = text
|
||||
level = debug
|
||||
rotation_count = 20
|
||||
rotation_size = \"1024MB\"
|
||||
time_offset = \"+01:00\"
|
||||
}
|
||||
log.file_handlers.default {
|
||||
enable = true
|
||||
file = \"log/my-emqx.log\"
|
||||
}
|
||||
"""
|
||||
).
|
||||
|
||||
log_test() ->
|
||||
validate_log(?KERNEL_LOG_CONF).
|
||||
|
||||
%% erlfmt-ignore
|
||||
log_rotation_count_limit_test() ->
|
||||
Format =
|
||||
"""
|
||||
log.file {
|
||||
enable = true
|
||||
to = \"log/emqx.log\"
|
||||
formatter = text
|
||||
level = debug
|
||||
rotation = {count = ~w}
|
||||
rotation_size = \"1024MB\"
|
||||
}
|
||||
""",
|
||||
BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]),
|
||||
lists:foreach(fun({Conf, Count}) ->
|
||||
Conf0 = <<BaseConf/binary, Conf/binary>>,
|
||||
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
|
||||
ConfList = hocon_tconf:generate(emqx_conf_schema, ConfMap0),
|
||||
Kernel = proplists:get_value(kernel, ConfList),
|
||||
Loggers = proplists:get_value(logger, Kernel),
|
||||
?assertMatch(
|
||||
{handler, default, logger_disk_log_h, #{
|
||||
config := #{max_no_files := Count}
|
||||
}},
|
||||
lists:keyfind(logger_disk_log_h, 3, Loggers)
|
||||
)
|
||||
end,
|
||||
[{to_bin(Format, [1]), 1}, {to_bin(Format, [128]), 128}]),
|
||||
lists:foreach(fun({Conf, Count}) ->
|
||||
Conf0 = <<BaseConf/binary, Conf/binary>>,
|
||||
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
|
||||
?assertThrow({emqx_conf_schema,
|
||||
[#{kind := validation_error,
|
||||
mismatches := #{"handler_name" :=
|
||||
#{kind := validation_error,
|
||||
path := "log.file.default.rotation_count",
|
||||
reason := #{expected_type := "1..128"},
|
||||
value := Count}
|
||||
}}]},
|
||||
hocon_tconf:generate(emqx_conf_schema, ConfMap0))
|
||||
end, [{to_bin(Format, [0]), 0}, {to_bin(Format, [129]), 129}]).
|
||||
|
||||
%% erlfmt-ignore
|
||||
-define(BASE_AUTHN_ARRAY,
|
||||
"""
|
||||
|
@ -84,36 +276,44 @@ authn_validations_test() ->
|
|||
OKHttps = to_bin(?BASE_AUTHN_ARRAY, [post, true, <<"https://127.0.0.1:8080">>]),
|
||||
Conf0 = <<BaseConf/binary, OKHttps/binary>>,
|
||||
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
|
||||
?assert(is_list(hocon_tconf:generate(emqx_conf_schema, ConfMap0))),
|
||||
{_, Res0} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap0, #{format => richmap}),
|
||||
Headers0 = authentication_headers(Res0),
|
||||
?assertEqual(<<"application/json">>, maps:get(<<"content-type">>, Headers0)),
|
||||
%% accept from converter
|
||||
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers0)),
|
||||
|
||||
OKHttp = to_bin(?BASE_AUTHN_ARRAY, [post, false, <<"http://127.0.0.1:8080">>]),
|
||||
Conf1 = <<BaseConf/binary, OKHttp/binary>>,
|
||||
{ok, ConfMap1} = hocon:binary(Conf1, #{format => richmap}),
|
||||
?assert(is_list(hocon_tconf:generate(emqx_conf_schema, ConfMap1))),
|
||||
{_, Res1} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap1, #{format => richmap}),
|
||||
Headers1 = authentication_headers(Res1),
|
||||
?assertEqual(<<"application/json">>, maps:get(<<"content-type">>, Headers1), Headers1),
|
||||
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers1), Headers1),
|
||||
|
||||
DisableSSLWithHttps = to_bin(?BASE_AUTHN_ARRAY, [post, false, <<"https://127.0.0.1:8080">>]),
|
||||
Conf2 = <<BaseConf/binary, DisableSSLWithHttps/binary>>,
|
||||
{ok, ConfMap2} = hocon:binary(Conf2, #{format => richmap}),
|
||||
?assertThrow(
|
||||
?ERROR(check_http_ssl_opts),
|
||||
hocon_tconf:generate(emqx_conf_schema, ConfMap2)
|
||||
hocon_tconf:map_translate(emqx_conf_schema, ConfMap2, #{format => richmap})
|
||||
),
|
||||
|
||||
BadHeader = to_bin(?BASE_AUTHN_ARRAY, [get, true, <<"https://127.0.0.1:8080">>]),
|
||||
Conf3 = <<BaseConf/binary, BadHeader/binary>>,
|
||||
{ok, ConfMap3} = hocon:binary(Conf3, #{format => richmap}),
|
||||
?assertThrow(
|
||||
?ERROR(check_http_headers),
|
||||
hocon_tconf:generate(emqx_conf_schema, ConfMap3)
|
||||
),
|
||||
{_, Res3} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap3, #{format => richmap}),
|
||||
Headers3 = authentication_headers(Res3),
|
||||
%% remove the content-type header when get method
|
||||
?assertEqual(false, maps:is_key(<<"content-type">>, Headers3), Headers3),
|
||||
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers3), Headers3),
|
||||
|
||||
BadHeaderWithTuple = binary:replace(BadHeader, [<<"[">>, <<"]">>], <<"">>, [global]),
|
||||
Conf4 = <<BaseConf/binary, BadHeaderWithTuple/binary>>,
|
||||
{ok, ConfMap4} = hocon:binary(Conf4, #{format => richmap}),
|
||||
?assertThrow(
|
||||
?ERROR(check_http_headers),
|
||||
hocon_tconf:generate(emqx_conf_schema, ConfMap4)
|
||||
),
|
||||
{_, Res4} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap4, #{}),
|
||||
Headers4 = authentication_headers(Res4),
|
||||
?assertEqual(false, maps:is_key(<<"content-type">>, Headers4), Headers4),
|
||||
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers4), Headers4),
|
||||
ok.
|
||||
|
||||
%% erlfmt-ignore
|
||||
|
@ -197,6 +397,10 @@ listeners_test() ->
|
|||
),
|
||||
ok.
|
||||
|
||||
authentication_headers(Conf) ->
|
||||
[#{<<"headers">> := Headers}] = hocon_maps:get("authentication", Conf),
|
||||
Headers.
|
||||
|
||||
doc_gen_test() ->
|
||||
%% the json file too large to encode.
|
||||
{
|
||||
|
|
|
@ -448,10 +448,12 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
|
|||
)
|
||||
of
|
||||
undefined ->
|
||||
CertsDir = authn_certs_dir(GwName, Conf),
|
||||
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf),
|
||||
{ok,
|
||||
emqx_utils_maps:deep_merge(
|
||||
RawConf,
|
||||
#{GwName => #{?AUTHN_BIN => Conf}}
|
||||
#{GwName => #{?AUTHN_BIN => Conf1}}
|
||||
)};
|
||||
_ ->
|
||||
badres_authn(already_exist, GwName)
|
||||
|
@ -469,7 +471,9 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
|||
Listener ->
|
||||
case maps:get(?AUTHN_BIN, Listener, undefined) of
|
||||
undefined ->
|
||||
NListener = maps:put(?AUTHN_BIN, Conf, Listener),
|
||||
CertsDir = authn_certs_dir(GwName, LType, LName, Conf),
|
||||
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf),
|
||||
NListener = maps:put(?AUTHN_BIN, Conf1, Listener),
|
||||
NGateway = #{
|
||||
GwName =>
|
||||
#{
|
||||
|
@ -490,8 +494,10 @@ pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
|
|||
of
|
||||
undefined ->
|
||||
badres_authn(not_found, GwName);
|
||||
_Authn ->
|
||||
{ok, emqx_utils_maps:deep_put([GwName, ?AUTHN_BIN], RawConf, Conf)}
|
||||
OldAuthnConf ->
|
||||
CertsDir = authn_certs_dir(GwName, Conf),
|
||||
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf, OldAuthnConf),
|
||||
{ok, emqx_utils_maps:deep_put([GwName, ?AUTHN_BIN], RawConf, Conf1)}
|
||||
end;
|
||||
pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||
case
|
||||
|
@ -507,10 +513,16 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
|||
case maps:get(?AUTHN_BIN, Listener, undefined) of
|
||||
undefined ->
|
||||
badres_listener_authn(not_found, GwName, LType, LName);
|
||||
_Auth ->
|
||||
OldAuthnConf ->
|
||||
CertsDir = authn_certs_dir(GwName, LType, LName, OldAuthnConf),
|
||||
Conf1 = emqx_authentication_config:convert_certs(
|
||||
CertsDir,
|
||||
Conf,
|
||||
OldAuthnConf
|
||||
),
|
||||
NListener = maps:put(
|
||||
?AUTHN_BIN,
|
||||
Conf,
|
||||
Conf1,
|
||||
Listener
|
||||
),
|
||||
{ok,
|
||||
|
@ -522,12 +534,36 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
|||
end
|
||||
end;
|
||||
pre_config_update(_, {remove_authn, GwName}, RawConf) ->
|
||||
case
|
||||
emqx_utils_maps:deep_get(
|
||||
[GwName, ?AUTHN_BIN], RawConf, undefined
|
||||
)
|
||||
of
|
||||
undefined ->
|
||||
ok;
|
||||
OldAuthnConf ->
|
||||
CertsDir = authn_certs_dir(GwName, OldAuthnConf),
|
||||
emqx_authentication_config:clear_certs(CertsDir, OldAuthnConf)
|
||||
end,
|
||||
{ok,
|
||||
emqx_utils_maps:deep_remove(
|
||||
[GwName, ?AUTHN_BIN], RawConf
|
||||
)};
|
||||
pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
|
||||
Path = [GwName, <<"listeners">>, LType, LName, ?AUTHN_BIN],
|
||||
case
|
||||
emqx_utils_maps:deep_get(
|
||||
Path,
|
||||
RawConf,
|
||||
undefined
|
||||
)
|
||||
of
|
||||
undefined ->
|
||||
ok;
|
||||
OldAuthnConf ->
|
||||
CertsDir = authn_certs_dir(GwName, LType, LName, OldAuthnConf),
|
||||
emqx_authentication_config:clear_certs(CertsDir, OldAuthnConf)
|
||||
end,
|
||||
{ok, emqx_utils_maps:deep_remove(Path, RawConf)};
|
||||
pre_config_update(_, UnknownReq, _RawConf) ->
|
||||
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
|
||||
|
@ -678,6 +714,18 @@ apply_to_gateway_basic_confs(_Fun, _GwName, Conf) ->
|
|||
certs_dir(GwName) when is_binary(GwName) ->
|
||||
GwName.
|
||||
|
||||
authn_certs_dir(GwName, ListenerType, ListenerName, AuthnConf) ->
|
||||
ChainName = emqx_gateway_utils:listener_chain(GwName, ListenerType, ListenerName),
|
||||
emqx_authentication_config:certs_dir(ChainName, AuthnConf).
|
||||
|
||||
authn_certs_dir(GwName, AuthnConf) when is_binary(GwName) ->
|
||||
authn_certs_dir(binary_to_existing_atom(GwName), AuthnConf);
|
||||
authn_certs_dir(GwName, AuthnConf) ->
|
||||
emqx_authentication_config:certs_dir(
|
||||
emqx_gateway_utils:global_chain(GwName),
|
||||
AuthnConf
|
||||
).
|
||||
|
||||
convert_certs(SubDir, Conf) ->
|
||||
convert_certs(<<"dtls_options">>, SubDir, convert_certs(<<"ssl_options">>, SubDir, Conf)).
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_gateway_mqttsn, [
|
||||
{description, "MQTT-SN Gateway"},
|
||||
{vsn, "0.1.0"},
|
||||
{vsn, "0.1.1"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx, emqx_gateway]},
|
||||
{env, []},
|
||||
|
|
|
@ -2045,15 +2045,15 @@ handle_deliver(
|
|||
|
||||
ignore_local(Delivers, Subscriber, Session, Ctx) ->
|
||||
Subs = emqx_session:info(subscriptions, Session),
|
||||
lists:dropwhile(
|
||||
lists:filter(
|
||||
fun({deliver, Topic, #message{from = Publisher}}) ->
|
||||
case maps:find(Topic, Subs) of
|
||||
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
||||
ok = metrics_inc(Ctx, 'delivery.dropped'),
|
||||
ok = metrics_inc(Ctx, 'delivery.dropped.no_local'),
|
||||
true;
|
||||
false;
|
||||
_ ->
|
||||
false
|
||||
true
|
||||
end
|
||||
end,
|
||||
Delivers
|
||||
|
|
|
@ -151,7 +151,7 @@ log_path() ->
|
|||
Configs = logger:get_handler_config(),
|
||||
case get_log_path(Configs) of
|
||||
undefined ->
|
||||
<<"log.file_handler.default.enable is false, not logging to file.">>;
|
||||
<<"log.file.enable is false, not logging to file.">>;
|
||||
Path ->
|
||||
iolist_to_binary(filename:join(RootDir, Path))
|
||||
end.
|
||||
|
|
|
@ -224,8 +224,6 @@ config(put, #{body := NewConf}, Req) ->
|
|||
case emqx_conf:update(Path, NewConf, ?OPTS) of
|
||||
{ok, #{raw_config := RawConf}} ->
|
||||
{200, RawConf};
|
||||
{error, {permission_denied, Reason}} ->
|
||||
{403, #{code => 'UPDATE_FAILED', message => Reason}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
|
||||
end.
|
||||
|
@ -270,8 +268,6 @@ config_reset(post, _Params, Req) ->
|
|||
case emqx_conf:reset(Path, ?OPTS) of
|
||||
{ok, _} ->
|
||||
{200};
|
||||
{error, {permission_denied, Reason}} ->
|
||||
{403, #{code => 'REST_FAILED', message => Reason}};
|
||||
{error, no_default_value} ->
|
||||
{400, #{code => 'NO_DEFAULT_VALUE', message => <<"No Default Value.">>}};
|
||||
{error, Reason} ->
|
||||
|
|
|
@ -99,24 +99,24 @@ t_log(_Config) ->
|
|||
{ok, Log} = get_config("log"),
|
||||
File = "log/emqx-test.log",
|
||||
%% update handler
|
||||
Log1 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, true),
|
||||
Log2 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"default">>, <<"file">>], Log1, File),
|
||||
Log1 = emqx_utils_maps:deep_put([<<"file">>, <<"default">>, <<"enable">>], Log, true),
|
||||
Log2 = emqx_utils_maps:deep_put([<<"file">>, <<"default">>, <<"to">>], Log1, File),
|
||||
{ok, #{}} = update_config(<<"log">>, Log2),
|
||||
{ok, Log3} = logger:get_handler_config(default),
|
||||
?assertMatch(#{config := #{file := File}}, Log3),
|
||||
ErrLog1 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, 1),
|
||||
ErrLog1 = emqx_utils_maps:deep_put([<<"file">>, <<"default">>, <<"enable">>], Log, 1),
|
||||
?assertMatch({error, {"HTTP/1.1", 400, _}}, update_config(<<"log">>, ErrLog1)),
|
||||
ErrLog2 = emqx_utils_maps:deep_put(
|
||||
[<<"file_handlers">>, <<"default">>, <<"enabfe">>], Log, true
|
||||
[<<"file">>, <<"default">>, <<"enabfe">>], Log, true
|
||||
),
|
||||
?assertMatch({error, {"HTTP/1.1", 400, _}}, update_config(<<"log">>, ErrLog2)),
|
||||
|
||||
%% add new handler
|
||||
File1 = "log/emqx-test1.log",
|
||||
Handler = emqx_utils_maps:deep_get([<<"file_handlers">>, <<"default">>], Log2),
|
||||
NewLog1 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"new">>], Log2, Handler),
|
||||
Handler = emqx_utils_maps:deep_get([<<"file">>, <<"default">>], Log2),
|
||||
NewLog1 = emqx_utils_maps:deep_put([<<"file">>, <<"new">>], Log2, Handler),
|
||||
NewLog2 = emqx_utils_maps:deep_put(
|
||||
[<<"file_handlers">>, <<"new">>, <<"file">>], NewLog1, File1
|
||||
[<<"file">>, <<"new">>, <<"to">>], NewLog1, File1
|
||||
),
|
||||
{ok, #{}} = update_config(<<"log">>, NewLog2),
|
||||
{ok, Log4} = logger:get_handler_config(new),
|
||||
|
@ -124,7 +124,7 @@ t_log(_Config) ->
|
|||
|
||||
%% disable new handler
|
||||
Disable = emqx_utils_maps:deep_put(
|
||||
[<<"file_handlers">>, <<"new">>, <<"enable">>], NewLog2, false
|
||||
[<<"file">>, <<"new">>, <<"enable">>], NewLog2, false
|
||||
),
|
||||
{ok, #{}} = update_config(<<"log">>, Disable),
|
||||
?assertEqual({error, {not_found, new}}, logger:get_handler_config(new)),
|
||||
|
|
|
@ -34,8 +34,8 @@ init_per_testcase(t_log_path, Config) ->
|
|||
emqx_config_logger:add_handler(),
|
||||
Log = emqx_conf:get_raw([log], #{}),
|
||||
File = "log/emqx-test.log",
|
||||
Log1 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, true),
|
||||
Log2 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"default">>, <<"file">>], Log1, File),
|
||||
Log1 = emqx_utils_maps:deep_put([<<"file">>, <<"default">>, <<"enable">>], Log, true),
|
||||
Log2 = emqx_utils_maps:deep_put([<<"file">>, <<"default">>, <<"to">>], Log1, File),
|
||||
{ok, #{}} = emqx_conf:update([log], Log2, #{rawconf_with_defaults => true}),
|
||||
Config;
|
||||
init_per_testcase(_, Config) ->
|
||||
|
@ -43,7 +43,7 @@ init_per_testcase(_, Config) ->
|
|||
|
||||
end_per_testcase(t_log_path, Config) ->
|
||||
Log = emqx_conf:get_raw([log], #{}),
|
||||
Log1 = emqx_utils_maps:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, false),
|
||||
Log1 = emqx_utils_maps:deep_put([<<"file">>, <<"default">>, <<"enable">>], Log, false),
|
||||
{ok, #{}} = emqx_conf:update([log], Log1, #{rawconf_with_defaults => true}),
|
||||
emqx_config_logger:remove_handler(),
|
||||
Config;
|
||||
|
|
|
@ -206,6 +206,8 @@ inflight_get(ID) ->
|
|||
dropped_inc(ID) ->
|
||||
dropped_inc(ID, 1).
|
||||
|
||||
dropped_inc(_ID, 0) ->
|
||||
ok;
|
||||
dropped_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, dropped], #{counter_inc => Val}, #{resource_id => ID}).
|
||||
|
||||
|
@ -216,6 +218,8 @@ dropped_get(ID) ->
|
|||
dropped_other_inc(ID) ->
|
||||
dropped_other_inc(ID, 1).
|
||||
|
||||
dropped_other_inc(_ID, 0) ->
|
||||
ok;
|
||||
dropped_other_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, dropped_other], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -228,6 +232,8 @@ dropped_other_get(ID) ->
|
|||
dropped_expired_inc(ID) ->
|
||||
dropped_expired_inc(ID, 1).
|
||||
|
||||
dropped_expired_inc(_ID, 0) ->
|
||||
ok;
|
||||
dropped_expired_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, dropped_expired], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -240,6 +246,8 @@ dropped_expired_get(ID) ->
|
|||
late_reply_inc(ID) ->
|
||||
late_reply_inc(ID, 1).
|
||||
|
||||
late_reply_inc(_ID, 0) ->
|
||||
ok;
|
||||
late_reply_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, late_reply], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -252,6 +260,8 @@ late_reply_get(ID) ->
|
|||
dropped_queue_full_inc(ID) ->
|
||||
dropped_queue_full_inc(ID, 1).
|
||||
|
||||
dropped_queue_full_inc(_ID, 0) ->
|
||||
ok;
|
||||
dropped_queue_full_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_full], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -264,6 +274,8 @@ dropped_queue_full_get(ID) ->
|
|||
dropped_resource_not_found_inc(ID) ->
|
||||
dropped_resource_not_found_inc(ID, 1).
|
||||
|
||||
dropped_resource_not_found_inc(_ID, 0) ->
|
||||
ok;
|
||||
dropped_resource_not_found_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_not_found], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -276,6 +288,8 @@ dropped_resource_not_found_get(ID) ->
|
|||
dropped_resource_stopped_inc(ID) ->
|
||||
dropped_resource_stopped_inc(ID, 1).
|
||||
|
||||
dropped_resource_stopped_inc(_ID, 0) ->
|
||||
ok;
|
||||
dropped_resource_stopped_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_stopped], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -288,6 +302,8 @@ dropped_resource_stopped_get(ID) ->
|
|||
matched_inc(ID) ->
|
||||
matched_inc(ID, 1).
|
||||
|
||||
matched_inc(_ID, 0) ->
|
||||
ok;
|
||||
matched_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, matched], #{counter_inc => Val}, #{resource_id => ID}).
|
||||
|
||||
|
@ -298,6 +314,8 @@ matched_get(ID) ->
|
|||
received_inc(ID) ->
|
||||
received_inc(ID, 1).
|
||||
|
||||
received_inc(_ID, 0) ->
|
||||
ok;
|
||||
received_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, received], #{counter_inc => Val}, #{resource_id => ID}).
|
||||
|
||||
|
@ -308,6 +326,8 @@ received_get(ID) ->
|
|||
retried_inc(ID) ->
|
||||
retried_inc(ID, 1).
|
||||
|
||||
retried_inc(_ID, 0) ->
|
||||
ok;
|
||||
retried_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, retried], #{counter_inc => Val}, #{resource_id => ID}).
|
||||
|
||||
|
@ -318,6 +338,8 @@ retried_get(ID) ->
|
|||
failed_inc(ID) ->
|
||||
failed_inc(ID, 1).
|
||||
|
||||
failed_inc(_ID, 0) ->
|
||||
ok;
|
||||
failed_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, failed], #{counter_inc => Val}, #{resource_id => ID}).
|
||||
|
||||
|
@ -328,6 +350,8 @@ failed_get(ID) ->
|
|||
retried_failed_inc(ID) ->
|
||||
retried_failed_inc(ID, 1).
|
||||
|
||||
retried_failed_inc(_ID, 0) ->
|
||||
ok;
|
||||
retried_failed_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, retried_failed], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -340,6 +364,8 @@ retried_failed_get(ID) ->
|
|||
retried_success_inc(ID) ->
|
||||
retried_success_inc(ID, 1).
|
||||
|
||||
retried_success_inc(_ID, 0) ->
|
||||
ok;
|
||||
retried_success_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, retried_success], #{counter_inc => Val}, #{
|
||||
resource_id => ID
|
||||
|
@ -352,6 +378,8 @@ retried_success_get(ID) ->
|
|||
success_inc(ID) ->
|
||||
success_inc(ID, 1).
|
||||
|
||||
success_inc(_ID, 0) ->
|
||||
ok;
|
||||
success_inc(ID, Val) ->
|
||||
telemetry:execute([?TELEMETRY_PREFIX, success], #{counter_inc => Val}, #{resource_id => ID}).
|
||||
|
||||
|
|
12
bin/emqx
12
bin/emqx
|
@ -875,16 +875,16 @@ tr_log_to_env() {
|
|||
unset EMQX_LOG__TO
|
||||
case "${log_to}" in
|
||||
console)
|
||||
export EMQX_LOG__CONSOLE_HANDLER__ENABLE='true'
|
||||
export EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE='false'
|
||||
export EMQX_LOG__CONSOLE__ENABLE='true'
|
||||
export EMQX_LOG__FILE__ENABLE='false'
|
||||
;;
|
||||
file)
|
||||
export EMQX_LOG__CONSOLE_HANDLER__ENABLE='false'
|
||||
export EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE='true'
|
||||
export EMQX_LOG__CONSOLE__ENABLE='false'
|
||||
export EMQX_LOG__FILE__ENABLE='true'
|
||||
;;
|
||||
both)
|
||||
export EMQX_LOG__CONSOLE_HANDLER__ENABLE='true'
|
||||
export EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE='true'
|
||||
export EMQX_LOG__CONSOLE__ENABLE='true'
|
||||
export EMQX_LOG__FILE__ENABLE='true'
|
||||
;;
|
||||
default)
|
||||
# want to use config file defaults, do nothing
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Add log level configuration to SSL communication
|
|
@ -0,0 +1,2 @@
|
|||
Corrected an issue where the no_local flag was not functioning correctly.
|
||||
|
|
@ -0,0 +1 @@
|
|||
Store gateway authentication TLS certificates and keys in the data directory.
|
|
@ -0,0 +1 @@
|
|||
Optimized counter increment calls to avoid work if increment is zero.
|
4
mix.exs
4
mix.exs
|
@ -50,7 +50,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
|
||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true},
|
||||
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
||||
{:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
|
||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
|
||||
|
@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
# in conflict by emqtt and hocon
|
||||
{:getopt, "1.0.2", override: true},
|
||||
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
|
||||
{:hocon, github: "emqx/hocon", tag: "0.39.4", override: true},
|
||||
{:hocon, github: "emqx/hocon", tag: "0.39.6", override: true},
|
||||
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
|
||||
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
|
||||
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
|
||||
|
|
|
@ -57,7 +57,7 @@
|
|||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
|
||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}}
|
||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||
, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
|
||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
|
||||
|
@ -75,7 +75,7 @@
|
|||
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
||||
, {getopt, "1.0.2"}
|
||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
|
||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}}
|
||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.6"}}}
|
||||
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
|
||||
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
||||
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
|
||||
|
|
|
@ -1321,6 +1321,11 @@ you drop support for the insecure renegotiation, prone to MitM attacks."""
|
|||
common_ssl_opts_schema_secure_renegotiate.label:
|
||||
"""SSL renegotiate"""
|
||||
|
||||
common_ssl_opts_schema_log_level.desc:
|
||||
"""Log level for SSL communication. Default is 'notice'. Set to 'debug' to inspect TLS handshake messages."""
|
||||
common_ssl_opts_schema_log_level.label:
|
||||
"""SSL log level"""
|
||||
|
||||
sysmon_vm_busy_port.desc:
|
||||
"""When a port (e.g. TCP socket) is overloaded, there will be a <code>busy_port</code> warning log,
|
||||
and an MQTT message is published to the system topic <code>$SYS/sysmon/busy_port</code>."""
|
||||
|
|
Loading…
Reference in New Issue