Merge remote-tracking branch 'origin/release-50' into 0502-merge-release-50-back-to-master

This commit is contained in:
Zaiming (Stone) Shi 2023-05-02 15:27:08 +02:00
commit d5f5f35787
64 changed files with 929 additions and 379 deletions

5
.gitignore vendored
View File

@ -43,8 +43,7 @@ tmp/
_packages _packages
elvis elvis
emqx_dialyzer_*_plt emqx_dialyzer_*_plt
*/emqx_dashboard/priv/www */emqx_dashboard/priv/
*/emqx_dashboard/priv/i18n.conf
dist.zip dist.zip
scripts/git-token scripts/git-token
apps/*/etc/*.all apps/*/etc/*.all
@ -71,3 +70,5 @@ apps/emqx/test/emqx_static_checks_data/master.bpapi
lux_logs/ lux_logs/
/.prepare /.prepare
bom.json bom.json
ct_run*/
apps/emqx_conf/etc/emqx.conf.all.rendered*

View File

@ -1,43 +0,0 @@
listeners.tcp.default {
bind = "0.0.0.0:1883"
max_connections = 1024000
}
listeners.ssl.default {
bind = "0.0.0.0:8883"
max_connections = 512000
ssl_options {
keyfile = "{{ platform_etc_dir }}/certs/key.pem"
certfile = "{{ platform_etc_dir }}/certs/cert.pem"
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
}
}
listeners.ws.default {
bind = "0.0.0.0:8083"
max_connections = 1024000
websocket.mqtt_path = "/mqtt"
}
listeners.wss.default {
bind = "0.0.0.0:8084"
max_connections = 512000
websocket.mqtt_path = "/mqtt"
ssl_options {
keyfile = "{{ platform_etc_dir }}/certs/key.pem"
certfile = "{{ platform_etc_dir }}/certs/cert.pem"
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
}
}
# listeners.quic.default {
# enabled = true
# bind = "0.0.0.0:14567"
# max_connections = 1024000
# ssl_options {
# verify = verify_none
# keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# }

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.0.24"). -define(EMQX_RELEASE_CE, "5.0.24").
%% Enterprise edition %% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.3-alpha.3"). -define(EMQX_RELEASE_EE, "5.0.3-alpha.5").
%% the HTTP API version %% the HTTP API version
-define(EMQX_API_VERSION, "5.0"). -define(EMQX_API_VERSION, "5.0").

View File

@ -0,0 +1,23 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_SCHEMA_HRL).
-define(EMQX_SCHEMA_HRL, true).
-define(TOMBSTONE_TYPE, marked_for_deletion).
-define(TOMBSTONE_VALUE, <<"marked_for_deletion">>).
-define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion).
-endif.

View File

@ -29,7 +29,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.3"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

View File

@ -3,7 +3,7 @@
{id, "emqx"}, {id, "emqx"},
{description, "EMQX Core"}, {description, "EMQX Core"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.24"}, {vsn, "5.0.25"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -35,7 +35,6 @@
save_to_config_map/2, save_to_config_map/2,
save_to_override_conf/3 save_to_override_conf/3
]). ]).
-export([raw_conf_with_default/4]).
-export([merge_envs/2]). -export([merge_envs/2]).
-export([ -export([
@ -90,7 +89,7 @@
]). ]).
-ifdef(TEST). -ifdef(TEST).
-export([erase_schema_mod_and_names/0]). -export([erase_all/0]).
-endif. -endif.
-include("logger.hrl"). -include("logger.hrl").
@ -329,7 +328,7 @@ init_load(SchemaMod, ConfFiles) ->
-spec init_load(module(), [string()] | binary() | hocon:config()) -> ok. -spec init_load(module(), [string()] | binary() | hocon:config()) -> ok.
init_load(SchemaMod, Conf, Opts) when is_list(Conf) orelse is_binary(Conf) -> init_load(SchemaMod, Conf, Opts) when is_list(Conf) orelse is_binary(Conf) ->
HasDeprecatedFile = has_deprecated_file(), HasDeprecatedFile = has_deprecated_file(),
RawConf = parse_hocon(HasDeprecatedFile, Conf), RawConf = load_config_files(HasDeprecatedFile, Conf),
init_load(HasDeprecatedFile, SchemaMod, RawConf, Opts). init_load(HasDeprecatedFile, SchemaMod, RawConf, Opts).
init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) -> init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) ->
@ -339,18 +338,16 @@ init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) ->
RawConfWithEnvs = merge_envs(SchemaMod, RawConf), RawConfWithEnvs = merge_envs(SchemaMod, RawConf),
Overrides = read_override_confs(), Overrides = read_override_confs(),
RawConfWithOverrides = hocon:deep_merge(RawConfWithEnvs, Overrides), RawConfWithOverrides = hocon:deep_merge(RawConfWithEnvs, Overrides),
RootNames = get_root_names(), RawConfAll = maybe_fill_defaults(SchemaMod, RawConfWithOverrides, Opts),
RawConfAll = raw_conf_with_default(SchemaMod, RootNames, RawConfWithOverrides, Opts),
%% check configs against the schema %% check configs against the schema
{AppEnvs, CheckedConf} = check_config(SchemaMod, RawConfAll, #{}), {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConfAll, #{}),
save_to_app_env(AppEnvs), save_to_app_env(AppEnvs),
ok = save_to_config_map(CheckedConf, RawConfAll); ok = save_to_config_map(CheckedConf, RawConfAll);
init_load(false, SchemaMod, RawConf, Opts) when is_map(RawConf) -> init_load(false, SchemaMod, RawConf, Opts) when is_map(RawConf) ->
ok = save_schema_mod_and_names(SchemaMod), ok = save_schema_mod_and_names(SchemaMod),
RootNames = get_root_names(),
%% Merge environment variable overrides on top %% Merge environment variable overrides on top
RawConfWithEnvs = merge_envs(SchemaMod, RawConf), RawConfWithEnvs = merge_envs(SchemaMod, RawConf),
RawConfAll = raw_conf_with_default(SchemaMod, RootNames, RawConfWithEnvs, Opts), RawConfAll = maybe_fill_defaults(SchemaMod, RawConfWithEnvs, Opts),
%% check configs against the schema %% check configs against the schema
{AppEnvs, CheckedConf} = check_config(SchemaMod, RawConfAll, #{}), {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConfAll, #{}),
save_to_app_env(AppEnvs), save_to_app_env(AppEnvs),
@ -363,47 +360,61 @@ read_override_confs() ->
hocon:deep_merge(ClusterOverrides, LocalOverrides). hocon:deep_merge(ClusterOverrides, LocalOverrides).
%% keep the raw and non-raw conf has the same keys to make update raw conf easier. %% keep the raw and non-raw conf has the same keys to make update raw conf easier.
raw_conf_with_default(SchemaMod, RootNames, RawConf, #{raw_with_default := true}) -> %% TODO: remove raw_with_default as it's now always true.
Fun = fun(Name, Acc) -> maybe_fill_defaults(SchemaMod, RawConf0, #{raw_with_default := true}) ->
case maps:is_key(Name, RawConf) of RootSchemas = hocon_schema:roots(SchemaMod),
true -> %% the roots which are missing from the loaded configs
Acc; MissingRoots = lists:filtermap(
false -> fun({BinName, Sc}) ->
case lists:keyfind(Name, 1, hocon_schema:roots(SchemaMod)) of case maps:is_key(BinName, RawConf0) orelse is_already_loaded(BinName) of
false -> true -> false;
Acc; false -> {true, Sc}
{_, {_, Schema}} -> end
Acc#{Name => schema_default(Schema)} end,
end RootSchemas
end ),
end, RawConf = lists:foldl(
RawDefault = lists:foldl(Fun, #{}, RootNames), fun({RootName, Schema}, Acc) ->
maps:merge(RawConf, fill_defaults(SchemaMod, RawDefault, #{})); Acc#{bin(RootName) => seed_default(Schema)}
raw_conf_with_default(_SchemaMod, _RootNames, RawConf, _Opts) -> end,
RawConf0,
MissingRoots
),
fill_defaults(RawConf);
maybe_fill_defaults(_SchemaMod, RawConf, _Opts) ->
RawConf. RawConf.
schema_default(Schema) -> %% So far, this can only return true when testing.
case hocon_schema:field_schema(Schema, type) of %% e.g. when testing an app, we need to load its config first
?ARRAY(_) -> %% then start emqx_conf application which will load the
[]; %% possibly empty config again (then filled with defaults).
_ -> is_already_loaded(Name) ->
#{} ?MODULE:get_raw([Name], #{}) =/= #{}.
%% if a root is not found in the raw conf, fill it with default values.
seed_default(Schema) ->
case hocon_schema:field_schema(Schema, default) of
undefined ->
%% so far all roots without a default value are objects
#{};
Value ->
Value
end. end.
parse_hocon(HasDeprecatedFile, Conf) -> load_config_files(HasDeprecatedFile, Conf) ->
IncDirs = include_dirs(), IncDirs = include_dirs(),
case do_parse_hocon(HasDeprecatedFile, Conf, IncDirs) of case do_parse_hocon(HasDeprecatedFile, Conf, IncDirs) of
{ok, HoconMap} -> {ok, HoconMap} ->
HoconMap; HoconMap;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_load_hocon_file", msg => "failed_to_load_config_file",
reason => Reason, reason => Reason,
pwd => file:get_cwd(), pwd => file:get_cwd(),
include_dirs => IncDirs, include_dirs => IncDirs,
config_file => Conf config_file => Conf
}), }),
error(failed_to_load_hocon_file) error(failed_to_load_config_file)
end. end.
do_parse_hocon(true, Conf, IncDirs) -> do_parse_hocon(true, Conf, IncDirs) ->
@ -548,7 +559,9 @@ save_schema_mod_and_names(SchemaMod) ->
}). }).
-ifdef(TEST). -ifdef(TEST).
erase_schema_mod_and_names() -> erase_all() ->
Names = get_root_names(),
lists:foreach(fun erase/1, Names),
persistent_term:erase(?PERSIS_SCHEMA_MODS). persistent_term:erase(?PERSIS_SCHEMA_MODS).
-endif. -endif.

View File

@ -18,6 +18,7 @@
-module(emqx_config_handler). -module(emqx_config_handler).
-include("logger.hrl"). -include("logger.hrl").
-include("emqx_schema.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-behaviour(gen_server). -behaviour(gen_server).
@ -447,11 +448,17 @@ merge_to_override_config(RawConf, Opts) ->
up_req({remove, _Opts}) -> '$remove'; up_req({remove, _Opts}) -> '$remove';
up_req({{update, Req}, _Opts}) -> Req. up_req({{update, Req}, _Opts}) -> Req.
return_change_result(ConfKeyPath, {{update, _Req}, Opts}) -> return_change_result(ConfKeyPath, {{update, Req}, Opts}) ->
#{ case Req =/= ?TOMBSTONE_CONFIG_CHANGE_REQ of
config => emqx_config:get(ConfKeyPath), true ->
raw_config => return_rawconf(ConfKeyPath, Opts) #{
}; config => emqx_config:get(ConfKeyPath),
raw_config => return_rawconf(ConfKeyPath, Opts)
};
false ->
%% like remove, nothing to return
#{}
end;
return_change_result(_ConfKeyPath, {remove, _Opts}) -> return_change_result(_ConfKeyPath, {remove, _Opts}) ->
#{}. #{}.

View File

@ -20,6 +20,7 @@
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]). -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]).
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("emqx_schema.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -33,7 +34,8 @@
is_running/1, is_running/1,
current_conns/2, current_conns/2,
max_conns/2, max_conns/2,
id_example/0 id_example/0,
default_max_conn/0
]). ]).
-export([ -export([
@ -61,8 +63,11 @@
-export([certs_dir/2]). -export([certs_dir/2]).
-endif. -endif.
-type listener_id() :: atom() | binary().
-define(CONF_KEY_PATH, [listeners, '?', '?']). -define(CONF_KEY_PATH, [listeners, '?', '?']).
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
-define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ).
-spec id_example() -> atom(). -spec id_example() -> atom().
id_example() -> 'tcp:default'. id_example() -> 'tcp:default'.
@ -105,19 +110,22 @@ do_list_raw() ->
format_raw_listeners({Type0, Conf}) -> format_raw_listeners({Type0, Conf}) ->
Type = binary_to_atom(Type0), Type = binary_to_atom(Type0),
lists:map( lists:filtermap(
fun({LName, LConf0}) when is_map(LConf0) -> fun
Bind = parse_bind(LConf0), ({LName, LConf0}) when is_map(LConf0) ->
Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), Bind = parse_bind(LConf0),
LConf1 = maps:remove(<<"authentication">>, LConf0), Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
LConf3 = maps:put(<<"running">>, Running, LConf1), LConf1 = maps:remove(<<"authentication">>, LConf0),
CurrConn = LConf2 = maps:put(<<"running">>, Running, LConf1),
case Running of CurrConn =
true -> current_conns(Type, LName, Bind); case Running of
false -> 0 true -> current_conns(Type, LName, Bind);
end, false -> 0
LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3), end,
{Type0, LName, LConf4} LConf = maps:put(<<"current_connections">>, CurrConn, LConf2),
{true, {Type0, LName, LConf}};
({_LName, _MarkDel}) ->
false
end, end,
maps:to_list(Conf) maps:to_list(Conf)
). ).
@ -195,7 +203,7 @@ start() ->
ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
foreach_listeners(fun start_listener/3). foreach_listeners(fun start_listener/3).
-spec start_listener(atom()) -> ok | {error, term()}. -spec start_listener(listener_id()) -> ok | {error, term()}.
start_listener(ListenerId) -> start_listener(ListenerId) ->
apply_on_listener(ListenerId, fun start_listener/3). apply_on_listener(ListenerId, fun start_listener/3).
@ -246,7 +254,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
restart() -> restart() ->
foreach_listeners(fun restart_listener/3). foreach_listeners(fun restart_listener/3).
-spec restart_listener(atom()) -> ok | {error, term()}. -spec restart_listener(listener_id()) -> ok | {error, term()}.
restart_listener(ListenerId) -> restart_listener(ListenerId) ->
apply_on_listener(ListenerId, fun restart_listener/3). apply_on_listener(ListenerId, fun restart_listener/3).
@ -271,7 +279,7 @@ stop() ->
_ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
foreach_listeners(fun stop_listener/3). foreach_listeners(fun stop_listener/3).
-spec stop_listener(atom()) -> ok | {error, term()}. -spec stop_listener(listener_id()) -> ok | {error, term()}.
stop_listener(ListenerId) -> stop_listener(ListenerId) ->
apply_on_listener(ListenerId, fun stop_listener/3). apply_on_listener(ListenerId, fun stop_listener/3).
@ -419,7 +427,9 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
end. end.
%% Update the listeners at runtime %% Update the listeners at runtime
pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) -> pre_config_update([listeners, Type, Name], {create, NewConf}, V) when
V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
->
CertsDir = certs_dir(Type, Name), CertsDir = certs_dir(Type, Name),
{ok, convert_certs(CertsDir, NewConf)}; {ok, convert_certs(CertsDir, NewConf)};
pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) -> pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
@ -434,6 +444,8 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) -> pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
NewConf = emqx_utils_maps:deep_merge(RawConf, Updated), NewConf = emqx_utils_maps:deep_merge(RawConf, Updated),
{ok, NewConf}; {ok, NewConf};
pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) ->
{ok, ?TOMBSTONE_VALUE};
pre_config_update(_Path, _Request, RawConf) -> pre_config_update(_Path, _Request, RawConf) ->
{ok, RawConf}. {ok, RawConf}.
@ -441,13 +453,15 @@ post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefin
start_listener(Type, Name, NewConf); start_listener(Type, Name, NewConf);
post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf), try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf),
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
case NewConf of case NewConf of
#{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf}); #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf});
_ -> ok _ -> ok
end; end;
post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) -> post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when
ok; Op =:= ?MARK_DEL andalso is_map(OldConf)
post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) -> ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
case stop_listener(Type, Name, OldConf) of case stop_listener(Type, Name, OldConf) of
ok -> ok ->
_ = emqx_authentication:delete_chain(listener_id(Type, Name)), _ = emqx_authentication:delete_chain(listener_id(Type, Name)),
@ -460,10 +474,18 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo
#{enabled := NewEnabled} = NewConf, #{enabled := NewEnabled} = NewConf,
#{enabled := OldEnabled} = OldConf, #{enabled := OldEnabled} = OldConf,
case {NewEnabled, OldEnabled} of case {NewEnabled, OldEnabled} of
{true, true} -> restart_listener(Type, Name, {OldConf, NewConf}); {true, true} ->
{true, false} -> start_listener(Type, Name, NewConf); ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
{false, true} -> stop_listener(Type, Name, OldConf); restart_listener(Type, Name, {OldConf, NewConf});
{false, false} -> stop_listener(Type, Name, OldConf) {true, false} ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
start_listener(Type, Name, NewConf);
{false, true} ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
stop_listener(Type, Name, OldConf);
{false, false} ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
stop_listener(Type, Name, OldConf)
end; end;
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
ok. ok.
@ -601,6 +623,7 @@ format_bind(Bin) when is_binary(Bin) ->
listener_id(Type, ListenerName) -> listener_id(Type, ListenerName) ->
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])). list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
-spec parse_listener_id(listener_id()) -> {ok, #{type => atom(), name => atom()}} | {error, term()}.
parse_listener_id(Id) -> parse_listener_id(Id) ->
case string:split(str(Id), ":", leading) of case string:split(str(Id), ":", leading) of
[Type, Name] -> [Type, Name] ->
@ -813,3 +836,22 @@ inject_crl_config(
}; };
inject_crl_config(Conf) -> inject_crl_config(Conf) ->
Conf. Conf.
maybe_unregister_ocsp_stapling_refresh(
ssl = Type, Name, #{ssl_options := #{ocsp := #{enable_ocsp_stapling := false}}} = _Conf
) ->
unregister_ocsp_stapling_refresh(Type, Name),
ok;
maybe_unregister_ocsp_stapling_refresh(_Type, _Name, _Conf) ->
ok.
unregister_ocsp_stapling_refresh(Type, Name) ->
ListenerId = listener_id(Type, Name),
emqx_ocsp_cache:unregister_listener(ListenerId),
ok.
%% There is currently an issue with frontend
%% infinity is not a good value for it, so we use 5m for now
default_max_conn() ->
%% TODO: <<"infinity">>
5_000_000.

View File

@ -30,6 +30,7 @@
sni_fun/2, sni_fun/2,
fetch_response/1, fetch_response/1,
register_listener/2, register_listener/2,
unregister_listener/1,
inject_sni_fun/2 inject_sni_fun/2
]). ]).
@ -107,6 +108,9 @@ fetch_response(ListenerID) ->
register_listener(ListenerID, Opts) -> register_listener(ListenerID, Opts) ->
gen_server:call(?MODULE, {register_listener, ListenerID, Opts}, ?CALL_TIMEOUT). gen_server:call(?MODULE, {register_listener, ListenerID, Opts}, ?CALL_TIMEOUT).
unregister_listener(ListenerID) ->
gen_server:cast(?MODULE, {unregister_listener, ListenerID}).
-spec inject_sni_fun(emqx_listeners:listener_id(), map()) -> map(). -spec inject_sni_fun(emqx_listeners:listener_id(), map()) -> map().
inject_sni_fun(ListenerID, Conf0) -> inject_sni_fun(ListenerID, Conf0) ->
SNIFun = emqx_const_v1:make_sni_fun(ListenerID), SNIFun = emqx_const_v1:make_sni_fun(ListenerID),
@ -160,6 +164,18 @@ handle_call({register_listener, ListenerID, Conf}, _From, State0) ->
handle_call(Call, _From, State) -> handle_call(Call, _From, State) ->
{reply, {error, {unknown_call, Call}}, State}. {reply, {error, {unknown_call, Call}}, State}.
handle_cast({unregister_listener, ListenerID}, State0) ->
State2 =
case maps:take(?REFRESH_TIMER(ListenerID), State0) of
error ->
State0;
{TRef, State1} ->
emqx_utils:cancel_timer(TRef),
State1
end,
State = maps:remove({refresh_interval, ListenerID}, State2),
?tp(ocsp_cache_listener_unregistered, #{listener_id => ListenerID}),
{noreply, State};
handle_cast(_Cast, State) -> handle_cast(_Cast, State) ->
{noreply, State}. {noreply, State}.

View File

@ -23,6 +23,7 @@
-dialyzer(no_fail_call). -dialyzer(no_fail_call).
-elvis([{elvis_style, invalid_dynamic_call, disable}]). -elvis([{elvis_style, invalid_dynamic_call, disable}]).
-include("emqx_schema.hrl").
-include("emqx_authentication.hrl"). -include("emqx_authentication.hrl").
-include("emqx_access_control.hrl"). -include("emqx_access_control.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
@ -77,7 +78,8 @@
user_lookup_fun_tr/2, user_lookup_fun_tr/2,
validate_alarm_actions/1, validate_alarm_actions/1,
non_empty_string/1, non_empty_string/1,
validations/0 validations/0,
naive_env_interpolation/1
]). ]).
-export([qos/0]). -export([qos/0]).
@ -110,6 +112,12 @@
convert_servers/2 convert_servers/2
]). ]).
%% tombstone types
-export([
tombstone_map/2,
get_tombstone_map_value_type/1
]).
-behaviour(hocon_schema). -behaviour(hocon_schema).
-reflect_type([ -reflect_type([
@ -787,41 +795,48 @@ fields("listeners") ->
[ [
{"tcp", {"tcp",
sc( sc(
map(name, ref("mqtt_tcp_listener")), tombstone_map(name, ref("mqtt_tcp_listener")),
#{ #{
desc => ?DESC(fields_listeners_tcp), desc => ?DESC(fields_listeners_tcp),
converter => fun(X, _) ->
ensure_default_listener(X, tcp)
end,
required => {false, recursively} required => {false, recursively}
} }
)}, )},
{"ssl", {"ssl",
sc( sc(
map(name, ref("mqtt_ssl_listener")), tombstone_map(name, ref("mqtt_ssl_listener")),
#{ #{
desc => ?DESC(fields_listeners_ssl), desc => ?DESC(fields_listeners_ssl),
converter => fun(X, _) -> ensure_default_listener(X, ssl) end,
required => {false, recursively} required => {false, recursively}
} }
)}, )},
{"ws", {"ws",
sc( sc(
map(name, ref("mqtt_ws_listener")), tombstone_map(name, ref("mqtt_ws_listener")),
#{ #{
desc => ?DESC(fields_listeners_ws), desc => ?DESC(fields_listeners_ws),
converter => fun(X, _) -> ensure_default_listener(X, ws) end,
required => {false, recursively} required => {false, recursively}
} }
)}, )},
{"wss", {"wss",
sc( sc(
map(name, ref("mqtt_wss_listener")), tombstone_map(name, ref("mqtt_wss_listener")),
#{ #{
desc => ?DESC(fields_listeners_wss), desc => ?DESC(fields_listeners_wss),
converter => fun(X, _) -> ensure_default_listener(X, wss) end,
required => {false, recursively} required => {false, recursively}
} }
)}, )},
{"quic", {"quic",
sc( sc(
map(name, ref("mqtt_quic_listener")), tombstone_map(name, ref("mqtt_quic_listener")),
#{ #{
desc => ?DESC(fields_listeners_quic), desc => ?DESC(fields_listeners_quic),
converter => fun keep_default_tombstone/2,
required => {false, recursively} required => {false, recursively}
} }
)} )}
@ -832,7 +847,7 @@ fields("crl_cache") ->
%% same URL. If they had diverging timeout options, it would be %% same URL. If they had diverging timeout options, it would be
%% confusing. %% confusing.
[ [
{"refresh_interval", {refresh_interval,
sc( sc(
duration(), duration(),
#{ #{
@ -840,7 +855,7 @@ fields("crl_cache") ->
desc => ?DESC("crl_cache_refresh_interval") desc => ?DESC("crl_cache_refresh_interval")
} }
)}, )},
{"http_timeout", {http_timeout,
sc( sc(
duration(), duration(),
#{ #{
@ -848,7 +863,7 @@ fields("crl_cache") ->
desc => ?DESC("crl_cache_refresh_http_timeout") desc => ?DESC("crl_cache_refresh_http_timeout")
} }
)}, )},
{"capacity", {capacity,
sc( sc(
pos_integer(), pos_integer(),
#{ #{
@ -1365,7 +1380,7 @@ fields("ssl_client_opts") ->
client_ssl_opts_schema(#{}); client_ssl_opts_schema(#{});
fields("ocsp") -> fields("ocsp") ->
[ [
{"enable_ocsp_stapling", {enable_ocsp_stapling,
sc( sc(
boolean(), boolean(),
#{ #{
@ -1373,7 +1388,7 @@ fields("ocsp") ->
desc => ?DESC("server_ssl_opts_schema_enable_ocsp_stapling") desc => ?DESC("server_ssl_opts_schema_enable_ocsp_stapling")
} }
)}, )},
{"responder_url", {responder_url,
sc( sc(
url(), url(),
#{ #{
@ -1381,7 +1396,7 @@ fields("ocsp") ->
desc => ?DESC("server_ssl_opts_schema_ocsp_responder_url") desc => ?DESC("server_ssl_opts_schema_ocsp_responder_url")
} }
)}, )},
{"issuer_pem", {issuer_pem,
sc( sc(
binary(), binary(),
#{ #{
@ -1389,7 +1404,7 @@ fields("ocsp") ->
desc => ?DESC("server_ssl_opts_schema_ocsp_issuer_pem") desc => ?DESC("server_ssl_opts_schema_ocsp_issuer_pem")
} }
)}, )},
{"refresh_interval", {refresh_interval,
sc( sc(
duration(), duration(),
#{ #{
@ -1397,7 +1412,7 @@ fields("ocsp") ->
desc => ?DESC("server_ssl_opts_schema_ocsp_refresh_interval") desc => ?DESC("server_ssl_opts_schema_ocsp_refresh_interval")
} }
)}, )},
{"refresh_http_timeout", {refresh_http_timeout,
sc( sc(
duration(), duration(),
#{ #{
@ -1947,7 +1962,7 @@ base_listener(Bind) ->
sc( sc(
hoconsc:union([infinity, pos_integer()]), hoconsc:union([infinity, pos_integer()]),
#{ #{
default => <<"infinity">>, default => emqx_listeners:default_max_conn(),
desc => ?DESC(base_listener_max_connections) desc => ?DESC(base_listener_max_connections)
} }
)}, )},
@ -2323,12 +2338,12 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
Field Field
|| not IsRanchListener, || not IsRanchListener,
Field <- [ Field <- [
{"gc_after_handshake", {gc_after_handshake,
sc(boolean(), #{ sc(boolean(), #{
default => false, default => false,
desc => ?DESC(server_ssl_opts_schema_gc_after_handshake) desc => ?DESC(server_ssl_opts_schema_gc_after_handshake)
})}, })},
{"ocsp", {ocsp,
sc( sc(
ref("ocsp"), ref("ocsp"),
#{ #{
@ -2336,7 +2351,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
validator => fun ocsp_inner_validator/1 validator => fun ocsp_inner_validator/1
} }
)}, )},
{"enable_crl_check", {enable_crl_check,
sc( sc(
boolean(), boolean(),
#{ #{
@ -2799,6 +2814,7 @@ authentication(Which) ->
hoconsc:mk(Type, #{ hoconsc:mk(Type, #{
desc => Desc, desc => Desc,
converter => fun ensure_array/2, converter => fun ensure_array/2,
default => [],
importance => Importance importance => Importance
}). }).
@ -3203,3 +3219,138 @@ assert_required_field(Conf, Key, ErrorMessage) ->
_ -> _ ->
ok ok
end. end.
default_listener(tcp) ->
#{
<<"bind">> => <<"0.0.0.0:1883">>
};
default_listener(ws) ->
#{
<<"bind">> => <<"0.0.0.0:8083">>,
<<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
};
default_listener(SSLListener) ->
%% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate
CertFile = fun(Name) ->
iolist_to_binary("${EMQX_ETC_DIR}/" ++ filename:join(["certs", Name]))
end,
SslOptions = #{
<<"cacertfile">> => CertFile(<<"cacert.pem">>),
<<"certfile">> => CertFile(<<"cert.pem">>),
<<"keyfile">> => CertFile(<<"key.pem">>)
},
case SSLListener of
ssl ->
#{
<<"bind">> => <<"0.0.0.0:8883">>,
<<"ssl_options">> => SslOptions
};
wss ->
#{
<<"bind">> => <<"0.0.0.0:8084">>,
<<"ssl_options">> => SslOptions,
<<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
}
end.
%% @doc This function helps to perform a naive string interpolation which
%% only looks at the first segment of the string and tries to replace it.
%% For example
%% "$MY_FILE_PATH"
%% "${MY_FILE_PATH}"
%% "$ENV_VARIABLE/sub/path"
%% "${ENV_VARIABLE}/sub/path"
%% "${ENV_VARIABLE}\sub\path" # windows
%% This function returns undefined if the input is undefined
%% otherwise always return string.
naive_env_interpolation(undefined) ->
undefined;
naive_env_interpolation(Bin) when is_binary(Bin) ->
naive_env_interpolation(unicode:characters_to_list(Bin, utf8));
naive_env_interpolation("$" ++ Maybe = Original) ->
{Env, Tail} = split_path(Maybe),
case resolve_env(Env) of
{ok, Path} ->
filename:join([Path, Tail]);
error ->
Original
end;
naive_env_interpolation(Other) ->
Other.
split_path(Path) ->
split_path(Path, []).
split_path([], Acc) ->
{lists:reverse(Acc), []};
split_path([Char | Rest], Acc) when Char =:= $/ orelse Char =:= $\\ ->
{lists:reverse(Acc), string:trim(Rest, leading, "/\\")};
split_path([Char | Rest], Acc) ->
split_path(Rest, [Char | Acc]).
resolve_env(Name0) ->
Name = string:trim(Name0, both, "{}"),
Value = os:getenv(Name),
case Value =/= false andalso Value =/= "" of
true ->
{ok, Value};
false ->
special_env(Name)
end.
-ifdef(TEST).
%% when running tests, we need to mock the env variables
special_env("EMQX_ETC_DIR") ->
{ok, filename:join([code:lib_dir(emqx), etc])};
special_env("EMQX_LOG_DIR") ->
{ok, "log"};
special_env(_Name) ->
%% only in tests
error.
-else.
special_env(_Name) -> error.
-endif.
%% The tombstone atom.
tombstone() ->
?TOMBSTONE_TYPE.
%% Make a map type, the value of which is allowed to be 'marked_for_deletion'
%% 'marked_for_delition' is a special value which means the key is deleted.
%% This is used to support the 'delete' operation in configs,
%% since deleting the key would result in default value being used.
tombstone_map(Name, Type) ->
%% marked_for_deletion must be the last member of the union
%% because we need to first union member to populate the default values
map(Name, ?UNION([Type, ?TOMBSTONE_TYPE])).
%% inverse of mark_del_map
get_tombstone_map_value_type(Schema) ->
%% TODO: violation of abstraction, expose an API in hoconsc
%% hoconsc:map_value_type(Schema)
?MAP(_Name, Union) = hocon_schema:field_schema(Schema, type),
%% TODO: violation of abstraction, fix hoconsc:union_members/1
?UNION(Members) = Union,
Tombstone = tombstone(),
[Type, Tombstone] = hoconsc:union_members(Members),
Type.
%% Keep the 'default' tombstone, but delete others.
keep_default_tombstone(Map, _Opts) when is_map(Map) ->
maps:filter(
fun(Key, Value) ->
Key =:= <<"default">> orelse Value =/= ?TOMBSTONE_VALUE
end,
Map
);
keep_default_tombstone(Value, _Opts) ->
Value.
ensure_default_listener(undefined, ListenerType) ->
%% let the schema's default value do its job
#{<<"default">> => default_listener(ListenerType)};
ensure_default_listener(#{<<"default">> := _} = Map, _ListenerType) ->
keep_default_tombstone(Map, #{});
ensure_default_listener(Map, ListenerType) ->
NewMap = Map#{<<"default">> => default_listener(ListenerType)},
keep_default_tombstone(NewMap, #{}).

View File

@ -309,19 +309,19 @@ ensure_ssl_files(Dir, SSL, Opts) ->
case ensure_ssl_file_key(SSL, RequiredKeys) of case ensure_ssl_file_key(SSL, RequiredKeys) of
ok -> ok ->
KeyPaths = ?SSL_FILE_OPT_PATHS ++ ?SSL_FILE_OPT_PATHS_A, KeyPaths = ?SSL_FILE_OPT_PATHS ++ ?SSL_FILE_OPT_PATHS_A,
ensure_ssl_files(Dir, SSL, KeyPaths, Opts); ensure_ssl_files_per_key(Dir, SSL, KeyPaths, Opts);
{error, _} = Error -> {error, _} = Error ->
Error Error
end. end.
ensure_ssl_files(_Dir, SSL, [], _Opts) -> ensure_ssl_files_per_key(_Dir, SSL, [], _Opts) ->
{ok, SSL}; {ok, SSL};
ensure_ssl_files(Dir, SSL, [KeyPath | KeyPaths], Opts) -> ensure_ssl_files_per_key(Dir, SSL, [KeyPath | KeyPaths], Opts) ->
case case
ensure_ssl_file(Dir, KeyPath, SSL, emqx_utils_maps:deep_get(KeyPath, SSL, undefined), Opts) ensure_ssl_file(Dir, KeyPath, SSL, emqx_utils_maps:deep_get(KeyPath, SSL, undefined), Opts)
of of
{ok, NewSSL} -> {ok, NewSSL} ->
ensure_ssl_files(Dir, NewSSL, KeyPaths, Opts); ensure_ssl_files_per_key(Dir, NewSSL, KeyPaths, Opts);
{error, Reason} -> {error, Reason} ->
{error, Reason#{which_options => [KeyPath]}} {error, Reason#{which_options => [KeyPath]}}
end. end.
@ -472,7 +472,8 @@ hex_str(Bin) ->
iolist_to_binary([io_lib:format("~2.16.0b", [X]) || <<X:8>> <= Bin]). iolist_to_binary([io_lib:format("~2.16.0b", [X]) || <<X:8>> <= Bin]).
%% @doc Returns 'true' when the file is a valid pem, otherwise {error, Reason}. %% @doc Returns 'true' when the file is a valid pem, otherwise {error, Reason}.
is_valid_pem_file(Path) -> is_valid_pem_file(Path0) ->
Path = resolve_cert_path_for_read(Path0),
case file:read_file(Path) of case file:read_file(Path) of
{ok, Pem} -> is_pem(Pem) orelse {error, not_pem}; {ok, Pem} -> is_pem(Pem) orelse {error, not_pem};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
@ -513,10 +514,16 @@ do_drop_invalid_certs([KeyPath | KeyPaths], SSL) ->
to_server_opts(Type, Opts) -> to_server_opts(Type, Opts) ->
Versions = integral_versions(Type, maps:get(versions, Opts, undefined)), Versions = integral_versions(Type, maps:get(versions, Opts, undefined)),
Ciphers = integral_ciphers(Versions, maps:get(ciphers, Opts, undefined)), Ciphers = integral_ciphers(Versions, maps:get(ciphers, Opts, undefined)),
maps:to_list(Opts#{ Path = fun(Key) -> resolve_cert_path_for_read_strict(maps:get(Key, Opts, undefined)) end,
ciphers => Ciphers, filter(
versions => Versions maps:to_list(Opts#{
}). keyfile => Path(keyfile),
certfile => Path(certfile),
cacertfile => Path(cacertfile),
ciphers => Ciphers,
versions => Versions
})
).
%% @doc Convert hocon-checked tls client options (map()) to %% @doc Convert hocon-checked tls client options (map()) to
%% proplist accepted by ssl library. %% proplist accepted by ssl library.
@ -530,11 +537,12 @@ to_client_opts(Opts) ->
to_client_opts(Type, Opts) -> to_client_opts(Type, Opts) ->
GetD = fun(Key, Default) -> fuzzy_map_get(Key, Opts, Default) end, GetD = fun(Key, Default) -> fuzzy_map_get(Key, Opts, Default) end,
Get = fun(Key) -> GetD(Key, undefined) end, Get = fun(Key) -> GetD(Key, undefined) end,
Path = fun(Key) -> resolve_cert_path_for_read_strict(Get(Key)) end,
case GetD(enable, false) of case GetD(enable, false) of
true -> true ->
KeyFile = ensure_str(Get(keyfile)), KeyFile = Path(keyfile),
CertFile = ensure_str(Get(certfile)), CertFile = Path(certfile),
CAFile = ensure_str(Get(cacertfile)), CAFile = Path(cacertfile),
Verify = GetD(verify, verify_none), Verify = GetD(verify, verify_none),
SNI = ensure_sni(Get(server_name_indication)), SNI = ensure_sni(Get(server_name_indication)),
Versions = integral_versions(Type, Get(versions)), Versions = integral_versions(Type, Get(versions)),
@ -556,6 +564,31 @@ to_client_opts(Type, Opts) ->
[] []
end. end.
resolve_cert_path_for_read_strict(Path) ->
case resolve_cert_path_for_read(Path) of
undefined ->
undefined;
ResolvedPath ->
case filelib:is_regular(ResolvedPath) of
true ->
ResolvedPath;
false ->
PathToLog = ensure_str(Path),
LogData =
case PathToLog =:= ResolvedPath of
true ->
#{path => PathToLog};
false ->
#{path => PathToLog, resolved_path => ResolvedPath}
end,
?SLOG(error, LogData#{msg => "cert_file_not_found"}),
undefined
end
end.
resolve_cert_path_for_read(Path) ->
emqx_schema:naive_env_interpolation(Path).
filter([]) -> []; filter([]) -> [];
filter([{_, undefined} | T]) -> filter(T); filter([{_, undefined} | T]) -> filter(T);
filter([{_, ""} | T]) -> filter(T); filter([{_, ""} | T]) -> filter(T);

View File

@ -31,6 +31,7 @@
start_apps/2, start_apps/2,
start_apps/3, start_apps/3,
stop_apps/1, stop_apps/1,
stop_apps/2,
reload/2, reload/2,
app_path/2, app_path/2,
proj_root/0, proj_root/0,
@ -250,11 +251,20 @@ start_app(App, SpecAppConfig, Opts) ->
case application:ensure_all_started(App) of case application:ensure_all_started(App) of
{ok, _} -> {ok, _} ->
ok = ensure_dashboard_listeners_started(App), ok = ensure_dashboard_listeners_started(App),
ok = wait_for_app_processes(App),
ok; ok;
{error, Reason} -> {error, Reason} ->
error({failed_to_start_app, App, Reason}) error({failed_to_start_app, App, Reason})
end. end.
wait_for_app_processes(emqx_conf) ->
%% emqx_conf app has a gen_server which
%% initializes its state asynchronously
gen_server:call(emqx_cluster_rpc, dummy),
ok;
wait_for_app_processes(_) ->
ok.
app_conf_file(emqx_conf) -> "emqx.conf.all"; app_conf_file(emqx_conf) -> "emqx.conf.all";
app_conf_file(App) -> atom_to_list(App) ++ ".conf". app_conf_file(App) -> atom_to_list(App) ++ ".conf".
@ -273,8 +283,7 @@ mustache_vars(App, Opts) ->
Defaults = #{ Defaults = #{
node_cookie => atom_to_list(erlang:get_cookie()), node_cookie => atom_to_list(erlang:get_cookie()),
platform_data_dir => app_path(App, "data"), platform_data_dir => app_path(App, "data"),
platform_etc_dir => app_path(App, "etc"), platform_etc_dir => app_path(App, "etc")
platform_log_dir => app_path(App, "log")
}, },
maps:merge(Defaults, ExtraMustacheVars). maps:merge(Defaults, ExtraMustacheVars).
@ -307,12 +316,21 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
-spec stop_apps(list()) -> ok. -spec stop_apps(list()) -> ok.
stop_apps(Apps) -> stop_apps(Apps) ->
stop_apps(Apps, #{}).
stop_apps(Apps, Opts) ->
[application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]], [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
ok = mria_mnesia:delete_schema(), ok = mria_mnesia:delete_schema(),
%% to avoid inter-suite flakiness %% to avoid inter-suite flakiness
application:unset_env(emqx, init_config_load_done), application:unset_env(emqx, init_config_load_done),
persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY), persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY),
emqx_config:erase_schema_mod_and_names(), case Opts of
#{erase_all_configs := false} ->
%% FIXME: this means inter-suite or inter-test dependencies
ok;
_ ->
emqx_config:erase_all()
end,
ok = emqx_config:delete_override_conf_files(), ok = emqx_config:delete_override_conf_files(),
application:unset_env(emqx, local_override_conf_file), application:unset_env(emqx, local_override_conf_file),
application:unset_env(emqx, cluster_override_conf_file), application:unset_env(emqx, cluster_override_conf_file),
@ -492,7 +510,7 @@ load_config(SchemaModule, Config, Opts) ->
ok. ok.
load_config(SchemaModule, Config) -> load_config(SchemaModule, Config) ->
load_config(SchemaModule, Config, #{raw_with_default => false}). load_config(SchemaModule, Config, #{raw_with_default => true}).
-spec is_all_tcp_servers_available(Servers) -> Result when -spec is_all_tcp_servers_available(Servers) -> Result when
Servers :: [{Host, Port}], Servers :: [{Host, Port}],

View File

@ -63,14 +63,14 @@ t_init_load(_Config) ->
ConfFile = "./test_emqx.conf", ConfFile = "./test_emqx.conf",
ok = file:write_file(ConfFile, <<"">>), ok = file:write_file(ConfFile, <<"">>),
ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)), ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)),
emqx_config:erase_schema_mod_and_names(), emqx_config:erase_all(),
{ok, DeprecatedFile} = application:get_env(emqx, cluster_override_conf_file), {ok, DeprecatedFile} = application:get_env(emqx, cluster_override_conf_file),
?assertEqual(false, filelib:is_regular(DeprecatedFile), DeprecatedFile), ?assertEqual(false, filelib:is_regular(DeprecatedFile), DeprecatedFile),
%% Don't has deprecated file %% Don't has deprecated file
ok = emqx_config:init_load(emqx_schema, [ConfFile]), ok = emqx_config:init_load(emqx_schema, [ConfFile]),
?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())), ?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())),
?assertMatch({ok, #{raw_config := 256}}, emqx:update_config([mqtt, max_topic_levels], 256)), ?assertMatch({ok, #{raw_config := 256}}, emqx:update_config([mqtt, max_topic_levels], 256)),
emqx_config:erase_schema_mod_and_names(), emqx_config:erase_all(),
%% Has deprecated file %% Has deprecated file
ok = file:write_file(DeprecatedFile, <<"{}">>), ok = file:write_file(DeprecatedFile, <<"{}">>),
ok = emqx_config:init_load(emqx_schema, [ConfFile]), ok = emqx_config:init_load(emqx_schema, [ConfFile]),

View File

@ -279,8 +279,7 @@ render_config_file() ->
mustache_vars() -> mustache_vars() ->
[ [
{platform_data_dir, local_path(["data"])}, {platform_data_dir, local_path(["data"])},
{platform_etc_dir, local_path(["etc"])}, {platform_etc_dir, local_path(["etc"])}
{platform_log_dir, local_path(["log"])}
]. ].
generate_config() -> generate_config() ->

View File

@ -22,7 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(LOGGER, emqx_logger). -define(LOGGER, emqx_logger).
-define(a, "a").
-define(SUPPORTED_LEVELS, [emergency, alert, critical, error, warning, notice, info, debug]). -define(SUPPORTED_LEVELS, [emergency, alert, critical, error, warning, notice, info, debug]).
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).

View File

@ -254,10 +254,15 @@ does_module_exist(Mod) ->
end. end.
assert_no_http_get() -> assert_no_http_get() ->
Timeout = 0,
Error = should_be_cached,
assert_no_http_get(Timeout, Error).
assert_no_http_get(Timeout, Error) ->
receive receive
{http_get, _URL} -> {http_get, _URL} ->
error(should_be_cached) error(Error)
after 0 -> after Timeout ->
ok ok
end. end.
@ -702,7 +707,9 @@ do_t_update_listener(Config) ->
%% the API converts that to an internally %% the API converts that to an internally
%% managed file %% managed file
<<"issuer_pem">> => IssuerPem, <<"issuer_pem">> => IssuerPem,
<<"responder_url">> => <<"http://localhost:9877">> <<"responder_url">> => <<"http://localhost:9877">>,
%% for quicker testing; min refresh in tests is 5 s.
<<"refresh_interval">> => <<"5s">>
} }
} }
}, },
@ -739,6 +746,70 @@ do_t_update_listener(Config) ->
) )
), ),
assert_http_get(1, 5_000), assert_http_get(1, 5_000),
%% Disable OCSP Stapling; the periodic refreshes should stop
RefreshInterval = emqx_config:get([listeners, ssl, default, ssl_options, ocsp, refresh_interval]),
OCSPConfig1 =
#{
<<"ssl_options">> =>
#{
<<"ocsp">> =>
#{
<<"enable_ocsp_stapling">> => false
}
}
},
ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, OCSPConfig1),
{ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"ocsp">> :=
#{
<<"enable_ocsp_stapling">> := false
}
}
},
ListenerData4
),
assert_no_http_get(2 * RefreshInterval, should_stop_refreshing),
ok.
t_double_unregister(_Config) ->
ListenerID = <<"ssl:test_ocsp">>,
Conf = emqx_config:get_listener_conf(ssl, test_ocsp, []),
?check_trace(
begin
{ok, {ok, _}} =
?wait_async_action(
emqx_ocsp_cache:register_listener(ListenerID, Conf),
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID},
5_000
),
assert_http_get(1),
{ok, {ok, _}} =
?wait_async_action(
emqx_ocsp_cache:unregister_listener(ListenerID),
#{?snk_kind := ocsp_cache_listener_unregistered, listener_id := ListenerID},
5_000
),
%% Should be idempotent and not crash
{ok, {ok, _}} =
?wait_async_action(
emqx_ocsp_cache:unregister_listener(ListenerID),
#{?snk_kind := ocsp_cache_listener_unregistered, listener_id := ListenerID},
5_000
),
ok
end,
[]
),
ok. ok.
t_ocsp_responder_error_responses(_Config) -> t_ocsp_responder_error_responses(_Config) ->

View File

@ -694,3 +694,81 @@ url_type_test_() ->
typerefl:from_string(emqx_schema:url(), <<"">>) typerefl:from_string(emqx_schema:url(), <<"">>)
) )
]. ].
env_test_() ->
Do = fun emqx_schema:naive_env_interpolation/1,
[
{"undefined", fun() -> ?assertEqual(undefined, Do(undefined)) end},
{"full env abs path",
with_env_fn(
"MY_FILE",
"/path/to/my/file",
fun() -> ?assertEqual("/path/to/my/file", Do("$MY_FILE")) end
)},
{"full env relative path",
with_env_fn(
"MY_FILE",
"path/to/my/file",
fun() -> ?assertEqual("path/to/my/file", Do("${MY_FILE}")) end
)},
%% we can not test windows style file join though
{"windows style",
with_env_fn(
"MY_FILE",
"path\\to\\my\\file",
fun() -> ?assertEqual("path\\to\\my\\file", Do("$MY_FILE")) end
)},
{"dir no {}",
with_env_fn(
"MY_DIR",
"/mydir",
fun() -> ?assertEqual("/mydir/foobar", Do(<<"$MY_DIR/foobar">>)) end
)},
{"dir with {}",
with_env_fn(
"MY_DIR",
"/mydir",
fun() -> ?assertEqual("/mydir/foobar", Do(<<"${MY_DIR}/foobar">>)) end
)},
%% a trailing / should not cause the sub path to become absolute
{"env dir with trailing /",
with_env_fn(
"MY_DIR",
"/mydir//",
fun() -> ?assertEqual("/mydir/foobar", Do(<<"${MY_DIR}/foobar">>)) end
)},
{"string dir with doulbe /",
with_env_fn(
"MY_DIR",
"/mydir/",
fun() -> ?assertEqual("/mydir/foobar", Do(<<"${MY_DIR}//foobar">>)) end
)},
{"env not found",
with_env_fn(
"MY_DIR",
"/mydir/",
fun() -> ?assertEqual("${MY_DIR2}//foobar", Do(<<"${MY_DIR2}//foobar">>)) end
)}
].
with_env_fn(Name, Value, F) ->
fun() ->
with_envs(F, [{Name, Value}])
end.
with_envs(Fun, Envs) ->
with_envs(Fun, [], Envs).
with_envs(Fun, Args, [{_Name, _Value} | _] = Envs) ->
set_envs(Envs),
try
apply(Fun, Args)
after
unset_envs(Envs)
end.
set_envs([{_Name, _Value} | _] = Envs) ->
lists:map(fun({Name, Value}) -> os:putenv(Name, Value) end, Envs).
unset_envs([{_Name, _Value} | _] = Envs) ->
lists:map(fun({Name, _}) -> os:unsetenv(Name) end, Envs).

View File

@ -138,13 +138,13 @@ end_per_testcase(t_ws_non_check_origin, Config) ->
del_bucket(), del_bucket(),
PrevConfig = ?config(prev_config, Config), PrevConfig = ?config(prev_config, Config),
emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
emqx_common_test_helpers:stop_apps([]), stop_apps(),
ok; ok;
end_per_testcase(_, Config) -> end_per_testcase(_, Config) ->
del_bucket(), del_bucket(),
PrevConfig = ?config(prev_config, Config), PrevConfig = ?config(prev_config, Config),
emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
emqx_common_test_helpers:stop_apps([]), stop_apps(),
Config. Config.
init_per_suite(Config) -> init_per_suite(Config) ->
@ -156,6 +156,10 @@ end_per_suite(_) ->
emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:stop_apps([]),
ok. ok.
%% FIXME: this is a temp fix to tests share configs.
stop_apps() ->
emqx_common_test_helpers:stop_apps([], #{erase_all_configs => false}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases %% Test Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -67,7 +67,7 @@ init_per_suite(Config) ->
emqx_config:erase(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY), emqx_config:erase(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY),
_ = application:load(emqx_conf), _ = application:load(emqx_conf),
ok = emqx_mgmt_api_test_util:init_suite( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_authn] [emqx_conf, emqx_authn]
), ),
?AUTHN:delete_chain(?GLOBAL), ?AUTHN:delete_chain(?GLOBAL),

View File

@ -42,15 +42,16 @@ init_per_testcase(_Case, Config) ->
<<"backend">> => <<"built_in_database">>, <<"backend">> => <<"built_in_database">>,
<<"user_id_type">> => <<"clientid">> <<"user_id_type">> => <<"clientid">>
}, },
emqx:update_config( {ok, _} = emqx:update_config(
?PATH, ?PATH,
{create_authenticator, ?GLOBAL, AuthnConfig} {create_authenticator, ?GLOBAL, AuthnConfig}
), ),
{ok, _} = emqx_conf:update(
emqx_conf:update( [listeners, tcp, listener_authn_enabled],
[listeners, tcp, listener_authn_enabled], {create, listener_mqtt_tcp_conf(18830, true)}, #{} {create, listener_mqtt_tcp_conf(18830, true)},
#{}
), ),
emqx_conf:update( {ok, _} = emqx_conf:update(
[listeners, tcp, listener_authn_disabled], [listeners, tcp, listener_authn_disabled],
{create, listener_mqtt_tcp_conf(18831, false)}, {create, listener_mqtt_tcp_conf(18831, false)},
#{} #{}

View File

@ -37,7 +37,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) -> init_per_suite(Config) ->
_ = application:load(emqx_conf), _ = application:load(emqx_conf),
emqx_common_test_helpers:start_apps([emqx_authn]), emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]),
application:ensure_all_started(emqx_resource), application:ensure_all_started(emqx_resource),
application:ensure_all_started(emqx_connector), application:ensure_all_started(emqx_connector),
Config. Config.

View File

@ -78,7 +78,8 @@ t_check_schema(_Config) ->
). ).
t_union_member_selector(_) -> t_union_member_selector(_) ->
?assertMatch(#{authentication := undefined}, check(undefined)), %% default value for authentication
?assertMatch(#{authentication := []}, check(undefined)),
C1 = #{<<"backend">> => <<"built_in_database">>}, C1 = #{<<"backend">> => <<"built_in_database">>},
?assertThrow( ?assertThrow(
#{ #{

View File

@ -2,14 +2,4 @@ authorization {
deny_action = ignore deny_action = ignore
no_match = allow no_match = allow
cache = { enable = true } cache = { enable = true }
sources = [
{
type = file
enable = true
# This file is immutable to EMQX.
# Once new rules are created from dashboard UI or HTTP API,
# the file 'data/authz/acl.conf' is used instead of this one
path = "{{ platform_etc_dir }}/acl.conf"
}
]
} }

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authz, [ {application, emqx_authz, [
{description, "An OTP application"}, {description, "An OTP application"},
{vsn, "0.1.18"}, {vsn, "0.1.19"},
{registered, []}, {registered, []},
{mod, {emqx_authz_app, []}}, {mod, {emqx_authz_app, []}},
{applications, [ {applications, [

View File

@ -205,7 +205,7 @@ sources(get, _) ->
}, },
AccIn AccIn
) -> ) ->
case file:read_file(Path) of case emqx_authz_file:read_file(Path) of
{ok, Rules} -> {ok, Rules} ->
lists:append(AccIn, [ lists:append(AccIn, [
#{ #{
@ -242,7 +242,7 @@ source(get, #{bindings := #{type := Type}}) ->
Type, Type,
fun fun
(#{<<"type">> := <<"file">>, <<"enable">> := Enable, <<"path">> := Path}) -> (#{<<"type">> := <<"file">>, <<"enable">> := Enable, <<"path">> := Path}) ->
case file:read_file(Path) of case emqx_authz_file:read_file(Path) of
{ok, Rules} -> {ok, Rules} ->
{200, #{ {200, #{
type => file, type => file,

View File

@ -32,13 +32,15 @@
create/1, create/1,
update/1, update/1,
destroy/1, destroy/1,
authorize/4 authorize/4,
read_file/1
]). ]).
description() -> description() ->
"AuthZ with static rules". "AuthZ with static rules".
create(#{path := Path} = Source) -> create(#{path := Path0} = Source) ->
Path = filename(Path0),
Rules = Rules =
case file:consult(Path) of case file:consult(Path) of
{ok, Terms} -> {ok, Terms} ->
@ -63,3 +65,9 @@ destroy(_Source) -> ok.
authorize(Client, PubSub, Topic, #{annotations := #{rules := Rules}}) -> authorize(Client, PubSub, Topic, #{annotations := #{rules := Rules}}) ->
emqx_authz_rule:matches(Client, PubSub, Topic, Rules). emqx_authz_rule:matches(Client, PubSub, Topic, Rules).
read_file(Path) ->
file:read_file(filename(Path)).
filename(PathMaybeTemplate) ->
emqx_schema:naive_env_interpolation(PathMaybeTemplate).

View File

@ -491,7 +491,7 @@ authz_fields() ->
?HOCON( ?HOCON(
?ARRAY(?UNION(UnionMemberSelector)), ?ARRAY(?UNION(UnionMemberSelector)),
#{ #{
default => [], default => [default_authz()],
desc => ?DESC(sources), desc => ?DESC(sources),
%% doc_lift is force a root level reference instead of nesting sub-structs %% doc_lift is force a root level reference instead of nesting sub-structs
extra => #{doc_lift => true}, extra => #{doc_lift => true},
@ -501,3 +501,10 @@ authz_fields() ->
} }
)} )}
]. ].
default_authz() ->
#{
<<"type">> => <<"file">>,
<<"enable">> => true,
<<"path">> => <<"${EMQX_ETC_DIR}/acl.conf">>
}.

View File

@ -230,7 +230,12 @@ webhook_bridge_converter(Conf0, _HoconOpts) ->
undefined -> undefined ->
undefined; undefined;
_ -> _ ->
do_convert_webhook_config(Conf1) maps:map(
fun(_Name, Conf) ->
do_convert_webhook_config(Conf)
end,
Conf1
)
end. end.
do_convert_webhook_config( do_convert_webhook_config(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [ {application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"}, {description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, ecql]}, {applications, [kernel, stdlib, ecql]},
{env, []}, {env, []},

View File

@ -281,7 +281,7 @@ proc_cql_params(query, SQL, Params, _State) ->
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
Type == query; Type == prepared_query Type == query; Type == prepared_query
-> ->
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of case exec(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}) of
{error, Reason} = Result -> {error, Reason} = Result ->
?tp( ?tp(
error, error,
@ -295,7 +295,7 @@ exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
end. end.
exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of case exec(PoolName, {?MODULE, batch_query, [Async, CQLs]}) of
{error, Reason} = Result -> {error, Reason} = Result ->
?tp( ?tp(
error, error,
@ -308,6 +308,13 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
Result Result
end. end.
%% Pick one of the pool members to do the query.
%% Using 'no_handoever' strategy,
%% meaning the buffer worker does the gen_server call or gen_server cast
%% towards the connection process.
exec(PoolName, Query) ->
ecpool:pick_and_do(PoolName, Query, no_handover).
on_get_status(_InstId, #{pool_name := PoolName} = State) -> on_get_status(_InstId, #{pool_name := PoolName} = State) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true -> true ->
@ -346,17 +353,23 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepar
query(Conn, sync, CQL, Params) -> query(Conn, sync, CQL, Params) ->
ecql:query(Conn, CQL, Params); ecql:query(Conn, CQL, Params);
query(Conn, {async, Callback}, CQL, Params) -> query(Conn, {async, Callback}, CQL, Params) ->
ecql:async_query(Conn, CQL, Params, one, Callback). ok = ecql:async_query(Conn, CQL, Params, one, Callback),
%% return the connection pid for buffer worker to monitor
{ok, Conn}.
prepared_query(Conn, sync, PreparedKey, Params) -> prepared_query(Conn, sync, PreparedKey, Params) ->
ecql:execute(Conn, PreparedKey, Params); ecql:execute(Conn, PreparedKey, Params);
prepared_query(Conn, {async, Callback}, PreparedKey, Params) -> prepared_query(Conn, {async, Callback}, PreparedKey, Params) ->
ecql:async_execute(Conn, PreparedKey, Params, Callback). ok = ecql:async_execute(Conn, PreparedKey, Params, Callback),
%% return the connection pid for buffer worker to monitor
{ok, Conn}.
batch_query(Conn, sync, Rows) -> batch_query(Conn, sync, Rows) ->
ecql:batch(Conn, Rows); ecql:batch(Conn, Rows);
batch_query(Conn, {async, Callback}, Rows) -> batch_query(Conn, {async, Callback}, Rows) ->
ecql:async_batch(Conn, Rows, Callback). ok = ecql:async_batch(Conn, Rows, Callback),
%% return the connection pid for buffer worker to monitor
{ok, Conn}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for ecpool %% callbacks for ecpool

View File

@ -404,7 +404,7 @@ t_setup_via_config_and_publish(Config) ->
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace), ?assertMatch([#{result := {ok, _Pid}}], Trace),
ok ok
end end
), ),
@ -443,7 +443,7 @@ t_setup_via_http_api_and_publish(Config) ->
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace), ?assertMatch([#{result := {ok, _Pid}}], Trace),
ok ok
end end
), ),
@ -604,7 +604,7 @@ t_missing_data(Config) ->
fun(Trace0) -> fun(Trace0) ->
%% 1. ecql driver will return `ok` first in async query %% 1. ecql driver will return `ok` first in async query
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace), ?assertMatch([#{result := {ok, _Pid}}], Trace),
%% 2. then it will return an error in callback function %% 2. then it will return an error in callback function
Trace1 = ?of_kind(handle_async_reply, Trace0), Trace1 = ?of_kind(handle_async_reply, Trace0),
?assertMatch([#{result := {error, {8704, _}}}], Trace1), ?assertMatch([#{result := {error, {8704, _}}}], Trace1),

View File

@ -1,7 +1,13 @@
## NOTE: ## NOTE:
## The EMQX configuration is prioritized (overlayed) in the following order: ## This config file overrides data/configs/cluster.hocon,
## `data/configs/cluster.hocon < etc/emqx.conf < environment variables`. ## and is merged with environment variables which start with 'EMQX_' prefix.
##
## Config changes made from EMQX dashboard UI, management HTTP API, or CLI
## are stored in data/configs/cluster.hocon.
## To avoid confusion, please do not store the same configs in both files.
##
## See https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html
## Configuration full example can be found in emqx.conf.example
node { node {
name = "emqx@127.0.0.1" name = "emqx@127.0.0.1"
@ -9,13 +15,6 @@ node {
data_dir = "{{ platform_data_dir }}" data_dir = "{{ platform_data_dir }}"
} }
log {
file_handlers.default {
level = warning
file = "{{ platform_log_dir }}/emqx.log"
}
}
cluster { cluster {
name = emqxcl name = emqxcl
discovery_strategy = manual discovery_strategy = manual

View File

@ -18,12 +18,14 @@
-compile({no_auto_import, [get/1, get/2]}). -compile({no_auto_import, [get/1, get/2]}).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/emqx_schema.hrl").
-export([add_handler/2, remove_handler/1]). -export([add_handler/2, remove_handler/1]).
-export([get/1, get/2, get_raw/1, get_raw/2, get_all/1]). -export([get/1, get/2, get_raw/1, get_raw/2, get_all/1]).
-export([get_by_node/2, get_by_node/3]). -export([get_by_node/2, get_by_node/3]).
-export([update/3, update/4]). -export([update/3, update/4]).
-export([remove/2, remove/3]). -export([remove/2, remove/3]).
-export([tombstone/2]).
-export([reset/2, reset/3]). -export([reset/2, reset/3]).
-export([dump_schema/2]). -export([dump_schema/2]).
-export([schema_module/0]). -export([schema_module/0]).
@ -114,6 +116,10 @@ update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
update(Node, KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts) ->
emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts). emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts).
%% @doc Mark the specified key path as tombstone
tombstone(KeyPath, Opts) ->
update(KeyPath, ?TOMBSTONE_CONFIG_CHANGE_REQ, Opts).
%% @doc remove all value of key path in cluster-override.conf or local-override.conf. %% @doc remove all value of key path in cluster-override.conf or local-override.conf.
-spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.

View File

@ -32,12 +32,8 @@ start(_StartType, _StartArgs) ->
ok = init_conf() ok = init_conf()
catch catch
C:E:St -> C:E:St ->
?SLOG(critical, #{ %% logger is not quite ready.
msg => failed_to_init_config, io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
exception => C,
reason => E,
stacktrace => St
}),
init:stop(1) init:stop(1)
end, end,
ok = emqx_config_logger:refresh_config(), ok = emqx_config_logger:refresh_config(),
@ -92,15 +88,8 @@ sync_data_from_node() ->
%% Internal functions %% Internal functions
%% ------------------------------------------------------------------------------ %% ------------------------------------------------------------------------------
-ifdef(TEST).
init_load() ->
emqx_config:init_load(emqx_conf:schema_module(), #{raw_with_default => false}).
-else.
init_load() -> init_load() ->
emqx_config:init_load(emqx_conf:schema_module(), #{raw_with_default => true}). emqx_config:init_load(emqx_conf:schema_module(), #{raw_with_default => true}).
-endif.
init_conf() -> init_conf() ->
%% Workaround for https://github.com/emqx/mria/issues/94: %% Workaround for https://github.com/emqx/mria/issues/94:

View File

@ -93,7 +93,10 @@ roots() ->
{"log", {"log",
sc( sc(
?R_REF("log"), ?R_REF("log"),
#{translate_to => ["kernel"]} #{
translate_to => ["kernel"],
importance => ?IMPORTANCE_HIGH
}
)}, )},
{"rpc", {"rpc",
sc( sc(
@ -472,7 +475,7 @@ fields("node") ->
%% for now, it's tricky to use a different data_dir %% for now, it's tricky to use a different data_dir
%% otherwise data paths in cluster config may differ %% otherwise data paths in cluster config may differ
%% TODO: change configurable data file paths to relative %% TODO: change configurable data file paths to relative
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_LOW,
desc => ?DESC(node_data_dir) desc => ?DESC(node_data_dir)
} }
)}, )},
@ -863,15 +866,25 @@ fields("rpc") ->
]; ];
fields("log") -> fields("log") ->
[ [
{"console_handler", ?R_REF("console_handler")}, {"console_handler",
sc(
?R_REF("console_handler"),
#{importance => ?IMPORTANCE_HIGH}
)},
{"file_handlers", {"file_handlers",
sc( sc(
map(name, ?R_REF("log_file_handler")), map(name, ?R_REF("log_file_handler")),
#{desc => ?DESC("log_file_handlers")} #{
desc => ?DESC("log_file_handlers"),
%% because file_handlers is a map
%% so there has to be a default value in order to populate the raw configs
default => #{<<"default">> => #{<<"level">> => <<"warning">>}},
importance => ?IMPORTANCE_HIGH
}
)} )}
]; ];
fields("console_handler") -> fields("console_handler") ->
log_handler_common_confs(false); log_handler_common_confs(console);
fields("log_file_handler") -> fields("log_file_handler") ->
[ [
{"file", {"file",
@ -879,6 +892,8 @@ fields("log_file_handler") ->
file(), file(),
#{ #{
desc => ?DESC("log_file_handler_file"), desc => ?DESC("log_file_handler_file"),
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
converter => fun emqx_schema:naive_env_interpolation/1,
validator => fun validate_file_location/1 validator => fun validate_file_location/1
} }
)}, )},
@ -892,10 +907,11 @@ fields("log_file_handler") ->
hoconsc:union([infinity, emqx_schema:bytesize()]), hoconsc:union([infinity, emqx_schema:bytesize()]),
#{ #{
default => <<"50MB">>, default => <<"50MB">>,
desc => ?DESC("log_file_handler_max_size") desc => ?DESC("log_file_handler_max_size"),
importance => ?IMPORTANCE_MEDIUM
} }
)} )}
] ++ log_handler_common_confs(true); ] ++ log_handler_common_confs(file);
fields("log_rotation") -> fields("log_rotation") ->
[ [
{"enable", {"enable",
@ -1104,14 +1120,33 @@ tr_logger_level(Conf) ->
tr_logger_handlers(Conf) -> tr_logger_handlers(Conf) ->
emqx_config_logger:tr_handlers(Conf). emqx_config_logger:tr_handlers(Conf).
log_handler_common_confs(Enable) -> log_handler_common_confs(Handler) ->
lists:map(
fun
({_Name, #{importance := _}} = F) -> F;
({Name, Sc}) -> {Name, Sc#{importance => ?IMPORTANCE_LOW}}
end,
do_log_handler_common_confs(Handler)
).
do_log_handler_common_confs(Handler) ->
%% we rarely support dynamic defaults like this
%% for this one, we have build-time defualut the same as runtime default
%% so it's less tricky
EnableValues =
case Handler of
console -> ["console", "both"];
file -> ["file", "both", "", false]
end,
EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"),
Enable = lists:member(EnvValue, EnableValues),
[ [
{"enable", {"enable",
sc( sc(
boolean(), boolean(),
#{ #{
default => Enable, default => Enable,
desc => ?DESC("common_handler_enable") desc => ?DESC("common_handler_enable"),
importance => ?IMPORTANCE_LOW
} }
)}, )},
{"level", {"level",
@ -1128,7 +1163,8 @@ log_handler_common_confs(Enable) ->
#{ #{
default => <<"system">>, default => <<"system">>,
desc => ?DESC("common_handler_time_offset"), desc => ?DESC("common_handler_time_offset"),
validator => fun validate_time_offset/1 validator => fun validate_time_offset/1,
importance => ?IMPORTANCE_LOW
} }
)}, )},
{"chars_limit", {"chars_limit",
@ -1136,7 +1172,8 @@ log_handler_common_confs(Enable) ->
hoconsc:union([unlimited, range(100, inf)]), hoconsc:union([unlimited, range(100, inf)]),
#{ #{
default => unlimited, default => unlimited,
desc => ?DESC("common_handler_chars_limit") desc => ?DESC("common_handler_chars_limit"),
importance => ?IMPORTANCE_LOW
} }
)}, )},
{"formatter", {"formatter",
@ -1144,7 +1181,8 @@ log_handler_common_confs(Enable) ->
hoconsc:enum([text, json]), hoconsc:enum([text, json]),
#{ #{
default => text, default => text,
desc => ?DESC("common_handler_formatter") desc => ?DESC("common_handler_formatter"),
importance => ?IMPORTANCE_MEDIUM
} }
)}, )},
{"single_line", {"single_line",
@ -1152,7 +1190,8 @@ log_handler_common_confs(Enable) ->
boolean(), boolean(),
#{ #{
default => true, default => true,
desc => ?DESC("common_handler_single_line") desc => ?DESC("common_handler_single_line"),
importance => ?IMPORTANCE_LOW
} }
)}, )},
{"sync_mode_qlen", {"sync_mode_qlen",
@ -1200,7 +1239,7 @@ log_handler_common_confs(Enable) ->
]. ].
crash_dump_file_default() -> crash_dump_file_default() ->
case os:getenv("RUNNER_LOG_DIR") of case os:getenv("EMQX_LOG_DIR") of
false -> false ->
%% testing, or running emqx app as deps %% testing, or running emqx app as deps
<<"log/erl_crash.dump">>; <<"log/erl_crash.dump">>;

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_connector, [ {application, emqx_connector, [
{description, "EMQX Data Integration Connectors"}, {description, "EMQX Data Integration Connectors"},
{vsn, "0.1.21"}, {vsn, "0.1.22"},
{registered, []}, {registered, []},
{mod, {emqx_connector_app, []}}, {mod, {emqx_connector_app, []}},
{applications, [ {applications, [

View File

@ -305,7 +305,20 @@ on_query(
Retry Retry
) )
of of
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> {error, Reason} when
Reason =:= econnrefused;
Reason =:= timeout;
Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed}
->
?SLOG(warning, #{
msg => "http_connector_do_request_failed",
reason => Reason,
connector => InstId
}),
{error, {recoverable_error, Reason}};
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "http_connector_do_request_failed", msg => "http_connector_do_request_failed",
reason => Reason, reason => Reason,
@ -593,7 +606,16 @@ reply_delegator(ReplyFunAndArgs, Result) ->
case Result of case Result of
%% The normal reason happens when the HTTP connection times out before %% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed %% the request has been fully processed
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal -> {error, Reason} when
Reason =:= econnrefused;
Reason =:= timeout;
Reason =:= normal;
Reason =:= {shutdown, normal}
->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
Result1 = {error, {recoverable_error, Reason}}, Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ -> _ ->

View File

@ -286,18 +286,17 @@ parse_spec_ref(Module, Path, Options) ->
Schema = Schema =
try try
erlang:apply(Module, schema, [Path]) erlang:apply(Module, schema, [Path])
%% better error message
catch catch
error:Reason:Stacktrace -> Error:Reason:Stacktrace ->
%% raise a new error with the same stacktrace. %% This error is intended to fail the build
%% it's a bug if this happens. %% hence print to standard_error
%% i.e. if a path is listed in the spec but the module doesn't io:format(
%% implement it or crashes when trying to build the schema. standard_error,
erlang:raise( "Failed to generate swagger for path ~p in module ~p~n"
error, "error:~p~nreason:~p~n~p~n",
#{mfa => {Module, schema, [Path]}, reason => Reason}, [Module, Path, Error, Reason, Stacktrace]
Stacktrace ),
) error({failed_to_generate_swagger_spec, Module, Path})
end, end,
{Specs, Refs} = maps:fold( {Specs, Refs} = maps:fold(
fun(Method, Meta, {Acc, RefsAcc}) -> fun(Method, Meta, {Acc, RefsAcc}) ->

View File

@ -308,10 +308,7 @@ t_nest_ref(_Config) ->
t_none_ref(_Config) -> t_none_ref(_Config) ->
Path = "/ref/none", Path = "/ref/none",
?assertError( ?assertError(
#{ {failed_to_generate_swagger_spec, ?MODULE, Path},
mfa := {?MODULE, schema, [Path]},
reason := function_clause
},
emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}) emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{})
), ),
ok. ok.

View File

@ -278,10 +278,7 @@ t_bad_ref(_Config) ->
t_none_ref(_Config) -> t_none_ref(_Config) ->
Path = "/ref/none", Path = "/ref/none",
?assertError( ?assertError(
#{ {failed_to_generate_swagger_spec, ?MODULE, Path},
mfa := {?MODULE, schema, ["/ref/none"]},
reason := function_clause
},
validate(Path, #{}, []) validate(Path, #{}, [])
), ),
ok. ok.

View File

@ -301,10 +301,10 @@ t_cluster_name(_) ->
ok ok
end, end,
emqx_common_test_helpers:stop_apps([emqx, emqx_exhook]), stop_apps([emqx, emqx_exhook]),
emqx_common_test_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), emqx_common_test_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
on_exit(fun() -> on_exit(fun() ->
emqx_common_test_helpers:stop_apps([emqx, emqx_exhook]), stop_apps([emqx, emqx_exhook]),
load_cfg(?CONF_DEFAULT), load_cfg(?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_exhook]), emqx_common_test_helpers:start_apps([emqx_exhook]),
mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]) mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT])
@ -489,3 +489,7 @@ data_file(Name) ->
cert_file(Name) -> cert_file(Name) ->
data_file(filename:join(["certs", Name])). data_file(filename:join(["certs", Name])).
%% FIXME: this creats inter-test dependency
stop_apps(Apps) ->
emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}).

View File

@ -293,12 +293,14 @@ listeners_type() ->
listeners_info(Opts) -> listeners_info(Opts) ->
Listeners = hocon_schema:fields(emqx_schema, "listeners"), Listeners = hocon_schema:fields(emqx_schema, "listeners"),
lists:map( lists:map(
fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> fun({ListenerType, Schema}) ->
Fields0 = hocon_schema:fields(Mod, Field), Type = emqx_schema:get_tombstone_map_value_type(Schema),
?R_REF(Mod, StructName) = Type,
Fields0 = hocon_schema:fields(Mod, StructName),
Fields1 = lists:keydelete("authentication", 1, Fields0), Fields1 = lists:keydelete("authentication", 1, Fields0),
Fields3 = required_bind(Fields1, Opts), Fields3 = required_bind(Fields1, Opts),
Ref = listeners_ref(Type, Opts), Ref = listeners_ref(ListenerType, Opts),
TypeAtom = list_to_existing_atom(Type), TypeAtom = list_to_existing_atom(ListenerType),
#{ #{
ref => ?R_REF(Ref), ref => ?R_REF(Ref),
schema => [ schema => [
@ -642,7 +644,7 @@ create(Path, Conf) ->
wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))). wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
ensure_remove(Path) -> ensure_remove(Path) ->
wrap(emqx_conf:remove(Path, ?OPTS(cluster))). wrap(emqx_conf:tombstone(Path, ?OPTS(cluster))).
wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason}; wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason}; wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};

View File

@ -20,18 +20,51 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(PORT, (20000 + ?LINE)). -define(PORT(Base), (Base + ?LINE)).
-define(PORT, ?PORT(20000)).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). [
{group, with_defaults_in_file},
{group, without_defaults_in_file}
].
groups() ->
AllTests = emqx_common_test_helpers:all(?MODULE),
[
{with_defaults_in_file, AllTests},
{without_defaults_in_file, AllTests}
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config. Config.
end_per_suite(_) -> end_per_suite(_Config) ->
emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}), ok.
emqx_conf:remove([listeners, tcp, new1], #{override_to => local}),
init_per_group(without_defaults_in_file, Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config;
init_per_group(with_defaults_in_file, Config) ->
%% we have to materialize the config file with default values for this test group
%% because we want to test the deletion of non-existing listener
%% if there is no config file, the such deletion would result in a deletion
%% of the default listener.
Name = atom_to_list(?MODULE) ++ "-default-listeners",
TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hocon_text()),
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
[{injected_conf_file, TmpConfFullPath} | Config].
end_per_group(Group, Config) ->
emqx_conf:tombstone([listeners, tcp, new], #{override_to => cluster}),
emqx_conf:tombstone([listeners, tcp, new1], #{override_to => local}),
case Group =:= with_defaults_in_file of
true ->
{_, File} = lists:keyfind(injected_conf_file, 1, Config),
ok = file:delete(File);
false ->
ok
end,
emqx_mgmt_api_test_util:end_suite([emqx_conf]). emqx_mgmt_api_test_util:end_suite([emqx_conf]).
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
@ -52,30 +85,25 @@ end_per_testcase(Case, Config) ->
t_max_connection_default({init, Config}) -> t_max_connection_default({init, Config}) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]), emqx_mgmt_api_test_util:end_suite([emqx_conf]),
Etc = filename:join(["etc", "emqx.conf.all"]),
TmpConfName = atom_to_list(?FUNCTION_NAME) ++ ".conf",
Inc = filename:join(["etc", TmpConfName]),
ConfFile = emqx_common_test_helpers:app_path(emqx_conf, Etc),
IncFile = emqx_common_test_helpers:app_path(emqx_conf, Inc),
Port = integer_to_binary(?PORT), Port = integer_to_binary(?PORT),
Bin = <<"listeners.tcp.max_connection_test {bind = \"0.0.0.0:", Port/binary, "\"}">>, Bin = <<"listeners.tcp.max_connection_test {bind = \"0.0.0.0:", Port/binary, "\"}">>,
ok = file:write_file(IncFile, Bin), TmpConfName = atom_to_list(?FUNCTION_NAME) ++ ".conf",
ok = file:write_file(ConfFile, ["include \"", TmpConfName, "\""], [append]), TmpConfFullPath = inject_tmp_config_content(TmpConfName, Bin),
emqx_mgmt_api_test_util:init_suite([emqx_conf]), emqx_mgmt_api_test_util:init_suite([emqx_conf]),
[{tmp_config_file, IncFile} | Config]; [{tmp_config_file, TmpConfFullPath} | Config];
t_max_connection_default({'end', Config}) -> t_max_connection_default({'end', Config}) ->
ok = file:delete(proplists:get_value(tmp_config_file, Config)); ok = file:delete(proplists:get_value(tmp_config_file, Config));
t_max_connection_default(Config) when is_list(Config) -> t_max_connection_default(Config) when is_list(Config) ->
%% Check infinity is binary not atom.
#{<<"listeners">> := Listeners} = emqx_mgmt_api_listeners:do_list_listeners(), #{<<"listeners">> := Listeners} = emqx_mgmt_api_listeners:do_list_listeners(),
Target = lists:filter( Target = lists:filter(
fun(#{<<"id">> := Id}) -> Id =:= 'tcp:max_connection_test' end, fun(#{<<"id">> := Id}) -> Id =:= 'tcp:max_connection_test' end,
Listeners Listeners
), ),
?assertMatch([#{<<"max_connections">> := <<"infinity">>}], Target), DefaultMaxConn = emqx_listeners:default_max_conn(),
?assertMatch([#{<<"max_connections">> := DefaultMaxConn}], Target),
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:max_connection_test"]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:max_connection_test"]),
?assertMatch(#{<<"max_connections">> := <<"infinity">>}, request(get, NewPath, [], [])), ?assertMatch(#{<<"max_connections">> := DefaultMaxConn}, request(get, NewPath, [], [])),
emqx_conf:remove([listeners, tcp, max_connection_test], #{override_to => cluster}), emqx_conf:tombstone([listeners, tcp, max_connection_test], #{override_to => cluster}),
ok. ok.
t_list_listeners(Config) when is_list(Config) -> t_list_listeners(Config) when is_list(Config) ->
@ -86,7 +114,7 @@ t_list_listeners(Config) when is_list(Config) ->
%% POST /listeners %% POST /listeners
ListenerId = <<"tcp:default">>, ListenerId = <<"tcp:default">>,
NewListenerId = <<"tcp:new">>, NewListenerId = <<"tcp:new11">>,
OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
@ -100,7 +128,7 @@ t_list_listeners(Config) when is_list(Config) ->
OriginListener2 = maps:remove(<<"id">>, OriginListener), OriginListener2 = maps:remove(<<"id">>, OriginListener),
Port = integer_to_binary(?PORT), Port = integer_to_binary(?PORT),
NewConf = OriginListener2#{ NewConf = OriginListener2#{
<<"name">> => <<"new">>, <<"name">> => <<"new11">>,
<<"bind">> => <<"0.0.0.0:", Port/binary>>, <<"bind">> => <<"0.0.0.0:", Port/binary>>,
<<"max_connections">> := <<"infinity">> <<"max_connections">> := <<"infinity">>
}, },
@ -123,7 +151,7 @@ t_tcp_crud_listeners_by_id(Config) when is_list(Config) ->
MinListenerId = <<"tcp:min">>, MinListenerId = <<"tcp:min">>,
BadId = <<"tcp:bad">>, BadId = <<"tcp:bad">>,
Type = <<"tcp">>, Type = <<"tcp">>,
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 31000).
t_ssl_crud_listeners_by_id(Config) when is_list(Config) -> t_ssl_crud_listeners_by_id(Config) when is_list(Config) ->
ListenerId = <<"ssl:default">>, ListenerId = <<"ssl:default">>,
@ -131,7 +159,7 @@ t_ssl_crud_listeners_by_id(Config) when is_list(Config) ->
MinListenerId = <<"ssl:min">>, MinListenerId = <<"ssl:min">>,
BadId = <<"ssl:bad">>, BadId = <<"ssl:bad">>,
Type = <<"ssl">>, Type = <<"ssl">>,
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 32000).
t_ws_crud_listeners_by_id(Config) when is_list(Config) -> t_ws_crud_listeners_by_id(Config) when is_list(Config) ->
ListenerId = <<"ws:default">>, ListenerId = <<"ws:default">>,
@ -139,7 +167,7 @@ t_ws_crud_listeners_by_id(Config) when is_list(Config) ->
MinListenerId = <<"ws:min">>, MinListenerId = <<"ws:min">>,
BadId = <<"ws:bad">>, BadId = <<"ws:bad">>,
Type = <<"ws">>, Type = <<"ws">>,
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 33000).
t_wss_crud_listeners_by_id(Config) when is_list(Config) -> t_wss_crud_listeners_by_id(Config) when is_list(Config) ->
ListenerId = <<"wss:default">>, ListenerId = <<"wss:default">>,
@ -147,7 +175,7 @@ t_wss_crud_listeners_by_id(Config) when is_list(Config) ->
MinListenerId = <<"wss:min">>, MinListenerId = <<"wss:min">>,
BadId = <<"wss:bad">>, BadId = <<"wss:bad">>,
Type = <<"wss">>, Type = <<"wss">>,
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 34000).
t_api_listeners_list_not_ready(Config) when is_list(Config) -> t_api_listeners_list_not_ready(Config) when is_list(Config) ->
net_kernel:start(['listeners@127.0.0.1', longnames]), net_kernel:start(['listeners@127.0.0.1', longnames]),
@ -266,7 +294,7 @@ cluster(Specs) ->
end} end}
]). ]).
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) -> crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, PortBase) ->
OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
OriginListener = request(get, OriginPath, [], []), OriginListener = request(get, OriginPath, [], []),
@ -274,8 +302,8 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) ->
%% create with full options %% create with full options
?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
Port1 = integer_to_binary(?PORT), Port1 = integer_to_binary(?PORT(PortBase)),
Port2 = integer_to_binary(?PORT), Port2 = integer_to_binary(?PORT(PortBase)),
NewConf = OriginListener#{ NewConf = OriginListener#{
<<"id">> => NewListenerId, <<"id">> => NewListenerId,
<<"bind">> => <<"0.0.0.0:", Port1/binary>> <<"bind">> => <<"0.0.0.0:", Port1/binary>>
@ -284,7 +312,7 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) ->
?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))), ?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))),
Get1 = request(get, NewPath, [], []), Get1 = request(get, NewPath, [], []),
?assertMatch(Create, Get1), ?assertMatch(Create, Get1),
?assert(is_running(NewListenerId)), ?assertEqual({true, NewListenerId}, {is_running(NewListenerId), NewListenerId}),
%% create with required options %% create with required options
MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]), MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]),
@ -417,3 +445,21 @@ data_file(Name) ->
cert_file(Name) -> cert_file(Name) ->
data_file(filename:join(["certs", Name])). data_file(filename:join(["certs", Name])).
default_listeners_hocon_text() ->
Sc = #{roots => emqx_schema:fields("listeners")},
Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}),
Config = #{<<"listeners">> => Listeners},
hocon_pp:do(Config, #{}).
%% inject a 'include' at the end of emqx.conf.all
%% the 'include' can be kept after test,
%% as long as the file has been deleted it is a no-op
inject_tmp_config_content(TmpFile, Content) ->
Etc = filename:join(["etc", "emqx.conf.all"]),
Inc = filename:join(["etc", TmpFile]),
ConfFile = emqx_common_test_helpers:app_path(emqx_conf, Etc),
TmpFileFullPath = emqx_common_test_helpers:app_path(emqx_conf, Inc),
ok = file:write_file(TmpFileFullPath, Content),
ok = file:write_file(ConfFile, ["\ninclude \"", TmpFileFullPath, "\"\n"], [append]),
TmpFileFullPath.

View File

@ -169,10 +169,10 @@ t_cluster(_) ->
emqx_delayed_proto_v1:get_delayed_message(node(), Id) emqx_delayed_proto_v1:get_delayed_message(node(), Id)
), ),
?assertEqual( %% The 'local' and the 'fake-remote' values should be the same,
emqx_delayed:get_delayed_message(Id), %% however there is a race condition, so we are just assert that they are both 'ok' tuples
emqx_delayed_proto_v1:get_delayed_message(node(), Id) ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)),
), ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)),
ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id), ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id),

View File

@ -463,6 +463,16 @@ t_num_clients(_Config) ->
ok. ok.
t_advanced_mqtt_features(_) -> t_advanced_mqtt_features(_) ->
try
ok = test_advanced_mqtt_features()
catch
_:_ ->
%% delayed messages' metrics might not be reported yet
timer:sleep(1000),
test_advanced_mqtt_features()
end.
test_advanced_mqtt_features() ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(), {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
AdvFeats = get_value(advanced_mqtt_features, TelemetryData), AdvFeats = get_value(advanced_mqtt_features, TelemetryData),
?assertEqual( ?assertEqual(

View File

@ -92,8 +92,10 @@ t_server_validator(_) ->
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?DEFAULT_CONF, #{ ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?DEFAULT_CONF, #{
raw_with_default => true raw_with_default => true
}), }),
undefined = emqx_conf:get_raw([statsd, server], undefined), DefaultServer = default_server(),
?assertMatch("127.0.0.1:8125", emqx_conf:get([statsd, server])), ?assertEqual(DefaultServer, emqx_conf:get_raw([statsd, server])),
DefaultServerStr = binary_to_list(DefaultServer),
?assertEqual(DefaultServerStr, emqx_conf:get([statsd, server])),
%% recover %% recover
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF, #{ ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
@ -204,3 +206,7 @@ request(Method, Body) ->
{ok, _Status, _} -> {ok, _Status, _} ->
error error
end. end.
default_server() ->
{server, Schema} = lists:keyfind(server, 1, emqx_statsd_schema:fields("statsd")),
hocon_schema:field_schema(Schema, default).

View File

@ -304,7 +304,7 @@ if [ "$ES" -ne 0 ]; then
fi fi
# Make sure log directory exists # Make sure log directory exists
mkdir -p "$RUNNER_LOG_DIR" mkdir -p "$EMQX_LOG_DIR"
# turn off debug as this is static # turn off debug as this is static
set +x set +x
@ -757,7 +757,7 @@ generate_config() {
local node_name="$2" local node_name="$2"
## Delete the *.siz files first or it can't start after ## Delete the *.siz files first or it can't start after
## changing the config 'log.rotation.size' ## changing the config 'log.rotation.size'
rm -f "${RUNNER_LOG_DIR}"/*.siz rm -f "${EMQX_LOG_DIR}"/*.siz
## timestamp for each generation ## timestamp for each generation
local NOW_TIME local NOW_TIME
@ -861,7 +861,13 @@ wait_until_return_val() {
done done
} }
# backward compatible with 4.x # First, there is EMQX_DEFAULT_LOG_HANDLER which can control the default values
# to be used when generating configs.
# It's set in docker entrypoint and in systemd service file.
#
# To be backward compatible with 4.x and v5.0.0 ~ v5.0.24/e5.0.2:
# if EMQX_LOG__TO is set, we try to enable handlers from environment variables.
# i.e. it overrides the default value set in EMQX_DEFAULT_LOG_HANDLER
tr_log_to_env() { tr_log_to_env() {
local log_to=${EMQX_LOG__TO:-undefined} local log_to=${EMQX_LOG__TO:-undefined}
# unset because it's unknown to 5.0 # unset because it's unknown to 5.0
@ -893,13 +899,11 @@ tr_log_to_env() {
maybe_log_to_console() { maybe_log_to_console() {
if [ "${EMQX_LOG__TO:-}" = 'default' ]; then if [ "${EMQX_LOG__TO:-}" = 'default' ]; then
# want to use config file defaults, do nothing # want to use defaults, do nothing
unset EMQX_LOG__TO unset EMQX_LOG__TO
else else
tr_log_to_env tr_log_to_env
# ensure defaults export EMQX_DEFAULT_LOG_HANDLER=${EMQX_DEFAULT_LOG_HANDLER:-console}
export EMQX_LOG__CONSOLE_HANDLER__ENABLE="${EMQX_LOG__CONSOLE_HANDLER__ENABLE:-true}"
export EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE="${EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE:-false}"
fi fi
} }
@ -979,7 +983,7 @@ diagnose_boot_failure_and_die() {
local ps_line local ps_line
ps_line="$(find_emqx_process)" ps_line="$(find_emqx_process)"
if [ -z "$ps_line" ]; then if [ -z "$ps_line" ]; then
echo "Find more information in the latest log file: ${RUNNER_LOG_DIR}/erlang.log.*" echo "Find more information in the latest log file: ${EMQX_LOG_DIR}/erlang.log.*"
exit 1 exit 1
fi fi
if ! relx_nodetool "ping" > /dev/null; then if ! relx_nodetool "ping" > /dev/null; then
@ -990,7 +994,7 @@ diagnose_boot_failure_and_die() {
fi fi
if ! relx_nodetool 'eval' 'true = emqx:is_running()' > /dev/null; then if ! relx_nodetool 'eval' 'true = emqx:is_running()' > /dev/null; then
logerr "$NAME node is started, but failed to complete the boot sequence in time." logerr "$NAME node is started, but failed to complete the boot sequence in time."
echo "Please collect the logs in ${RUNNER_LOG_DIR} and report a bug to EMQX team at https://github.com/emqx/emqx/issues/new/choose" echo "Please collect the logs in ${EMQX_LOG_DIR} and report a bug to EMQX team at https://github.com/emqx/emqx/issues/new/choose"
pipe_shutdown pipe_shutdown
exit 3 exit 3
fi fi
@ -1065,7 +1069,7 @@ case "${COMMAND}" in
mkdir -p "$PIPE_DIR" mkdir -p "$PIPE_DIR"
"$BINDIR/run_erl" -daemon "$PIPE_DIR" "$RUNNER_LOG_DIR" \ "$BINDIR/run_erl" -daemon "$PIPE_DIR" "$EMQX_LOG_DIR" \
"$(relx_start_command)" "$(relx_start_command)"
WAIT_TIME=${EMQX_WAIT_FOR_START:-120} WAIT_TIME=${EMQX_WAIT_FOR_START:-120}

View File

@ -10,10 +10,10 @@ echo "Running node dump in ${RUNNER_ROOT_DIR}"
cd "${RUNNER_ROOT_DIR}" cd "${RUNNER_ROOT_DIR}"
DUMP="$RUNNER_LOG_DIR/node_dump_$(date +"%Y%m%d_%H%M%S").tar.gz" DUMP="$EMQX_LOG_DIR/node_dump_$(date +"%Y%m%d_%H%M%S").tar.gz"
CONF_DUMP="$RUNNER_LOG_DIR/conf.dump" CONF_DUMP="$EMQX_LOG_DIR/conf.dump"
LICENSE_INFO="$RUNNER_LOG_DIR/license_info.txt" LICENSE_INFO="$EMQX_LOG_DIR/license_info.txt"
SYSINFO="$RUNNER_LOG_DIR/sysinfo.txt" SYSINFO="$EMQX_LOG_DIR/sysinfo.txt"
LOG_MAX_AGE_DAYS=3 LOG_MAX_AGE_DAYS=3
@ -74,7 +74,7 @@ done
# Pack files # Pack files
{ {
find "$RUNNER_LOG_DIR" -mtime -"${LOG_MAX_AGE_DAYS}" \( -name '*.log.*' -or -name 'run_erl.log*' \) find "$EMQX_LOG_DIR" -mtime -"${LOG_MAX_AGE_DAYS}" \( -name '*.log.*' -or -name 'run_erl.log*' \)
echo "${SYSINFO}" echo "${SYSINFO}"
echo "${CONF_DUMP}" echo "${CONF_DUMP}"
echo "${LICENSE_INFO}" echo "${LICENSE_INFO}"

View File

@ -0,0 +1,2 @@
Fixed a race condition in the HTTP driver that would result in an error rather than a retry of the request.
Related fix in the driver: https://github.com/emqx/ehttpc/pull/45

View File

@ -0,0 +1,25 @@
## This Dockerfile should not run in GitHub Action or any other automated process.
## It should be manually executed by the needs of the user.
##
## Before manaually execute:
## Please confirm the EMQX-Enterprise version you are using and modify the base layer image tag
## ```bash
## $ docker build -f=Dockerfile.msodbc -t emqx-enterprise-with-msodbc:5.0.3-alpha.2 .
## ```
# FROM emqx/emqx-enterprise:latest
FROM emqx/emqx-enterprise:5.0.3-alpha.2
USER root
RUN apt-get update \
&& apt-get install -y gnupg2 curl apt-utils \
&& curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \
&& curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-mkc crelease.list \
&& apt-get update \
&& ACCEPT_EULA=Y apt-get install -y msodbcsql17 unixodbc-dev \
&& sed -i 's/ODBC Driver 17 for SQL Server/ms-sql/g' /etc/odbcinst.ini \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER emqx

View File

@ -1,9 +1,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
## EMQ docker image start script
# Huang Rui <vowstar@gmail.com>
# EMQX Team <support@emqx.io>
## Shell setting ## EMQ docker image start script
if [[ -n "$DEBUG" ]]; then if [[ -n "$DEBUG" ]]; then
set -ex set -ex
else else

View File

@ -10,8 +10,8 @@ Group=emqx
Type=simple Type=simple
Environment=HOME=/var/lib/emqx Environment=HOME=/var/lib/emqx
# Enable logging to file # log to file by default (if no log handler config)
Environment=EMQX_LOG__TO=default Environment=EMQX_DEFAULT_LOG_HANDLER=file
# Start 'foreground' but not 'start' (daemon) mode. # Start 'foreground' but not 'start' (daemon) mode.
# Because systemd monitor/restarts 'simple' services # Because systemd monitor/restarts 'simple' services

View File

@ -207,7 +207,7 @@ kafka_structs() ->
#{ #{
desc => <<"Kafka Producer Bridge Config">>, desc => <<"Kafka Producer Bridge Config">>,
required => false, required => false,
converter => fun emqx_bridge_kafka:kafka_producer_converter/2 converter => fun kafka_producer_converter/2
} }
)}, )},
{kafka_consumer, {kafka_consumer,
@ -302,3 +302,13 @@ sqlserver_structs() ->
} }
)} )}
]. ].
kafka_producer_converter(undefined, _) ->
undefined;
kafka_producer_converter(Map, Opts) ->
maps:map(
fun(_Name, Config) ->
emqx_bridge_kafka:kafka_producer_converter(Config, Opts)
end,
Map
).

View File

@ -1,6 +1,6 @@
{application, emqx_ee_connector, [ {application, emqx_ee_connector, [
{description, "EMQX Enterprise connectors"}, {description, "EMQX Enterprise connectors"},
{vsn, "0.1.11"}, {vsn, "0.1.12"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -114,18 +114,19 @@ on_start(
sync_timeout => SyncTimeout, sync_timeout => SyncTimeout,
templates => Templates, templates => Templates,
producers_map_pid => ProducersMapPID, producers_map_pid => ProducersMapPID,
producers_opts => ProducerOpts producers_opts => emqx_secret:wrap(ProducerOpts)
}, },
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
{ok, _Pid} -> {ok, _Pid} ->
{ok, State}; {ok, State};
{error, _Reason} = Error -> {error, Reason0} ->
Reason = redact(Reason0),
?tp( ?tp(
rocketmq_connector_start_failed, rocketmq_connector_start_failed,
#{error => _Reason} #{error => Reason}
), ),
Error {error, Reason}
end. end.
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
@ -222,7 +223,7 @@ safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, R
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
catch catch
_Type:Reason -> _Type:Reason ->
{error, {unrecoverable_error, Reason}} {error, {unrecoverable_error, redact(Reason)}}
end. end.
produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) -> produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) ->
@ -337,7 +338,7 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
_ -> _ ->
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]), ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]),
{ok, Producers0} = rocketmq:ensure_supervised_producers( {ok, Producers0} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic1, ProducerOpts ClientId, ProducerGroup, Topic1, emqx_secret:unwrap(ProducerOpts)
), ),
ets:insert(ClientId, {TopicKey, Producers0}), ets:insert(ClientId, {TopicKey, Producers0}),
Producers0 Producers0

View File

@ -34,8 +34,6 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -43,7 +41,7 @@
-export([connect/1]). -export([connect/1]).
%% Internal exports used to execute code with ecpool worker %% Internal exports used to execute code with ecpool worker
-export([do_get_status/1, worker_do_insert/3, do_async_reply/2]). -export([do_get_status/1, worker_do_insert/3]).
-import(emqx_plugin_libs_rule, [str/1]). -import(emqx_plugin_libs_rule, [str/1]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
@ -51,7 +49,6 @@
-define(ACTION_SEND_MESSAGE, send_message). -define(ACTION_SEND_MESSAGE, send_message).
-define(SYNC_QUERY_MODE, handover). -define(SYNC_QUERY_MODE, handover).
-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).
-define(SQLSERVER_HOST_OPTIONS, #{ -define(SQLSERVER_HOST_OPTIONS, #{
default_port => 1433 default_port => 1433
@ -169,7 +166,7 @@ server() ->
%% Callbacks defined in emqx_resource %% Callbacks defined in emqx_resource
%%==================================================================== %%====================================================================
callback_mode() -> async_if_possible. callback_mode() -> always_sync.
is_buffer_supported() -> false. is_buffer_supported() -> false.
@ -252,27 +249,6 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
), ),
do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State).
-spec on_query_async(
manager_id(),
{?ACTION_SEND_MESSAGE, map()},
{ReplyFun :: function(), Args :: list()},
state()
) ->
{ok, any()}
| {error, term()}.
on_query_async(
InstanceId,
{?ACTION_SEND_MESSAGE, _Msg} = Query,
ReplyFunAndArgs,
State
) ->
?TRACE(
"SINGLE_QUERY_ASYNC",
"bridge_sqlserver_received",
#{requests => Query, connector => InstanceId, state => State}
),
do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
-spec on_batch_query( -spec on_batch_query(
manager_id(), manager_id(),
[{?ACTION_SEND_MESSAGE, map()}], [{?ACTION_SEND_MESSAGE, map()}],
@ -290,20 +266,6 @@ on_batch_query(InstanceId, BatchRequests, State) ->
), ),
do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State).
-spec on_batch_query_async(
manager_id(),
[{?ACTION_SEND_MESSAGE, map()}],
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, any()}.
on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"BATCH_QUERY_ASYNC",
"bridge_sqlserver_received",
#{requests => Requests, connector => InstanceId, state => State}
),
do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
Health = emqx_resource_pool:health_check_workers( Health = emqx_resource_pool:health_check_workers(
PoolName, PoolName,
@ -364,13 +326,11 @@ conn_str([{password, Password} | Opts], Acc) ->
conn_str([{_, _} | Opts], Acc) -> conn_str([{_, _} | Opts], Acc) ->
conn_str(Opts, Acc). conn_str(Opts, Acc).
%% Sync & Async query with singe & batch sql statement %% Query with singe & batch sql statement
-spec do_query( -spec do_query(
manager_id(), manager_id(),
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
ApplyMode :: ApplyMode :: handover,
handover
| {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}},
state() state()
) -> ) ->
{ok, list()} {ok, list()}
@ -530,6 +490,3 @@ apply_template(Query, Templates) ->
%% TODO: more detail infomatoin %% TODO: more detail infomatoin
?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}),
{error, failed_to_apply_sql_template}. {error, failed_to_apply_sql_template}.
do_async_reply(Result, {ReplyFun, Args}) ->
erlang:apply(ReplyFun, Args ++ [Result]).

View File

@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do
{:redbug, "2.0.8"}, {:redbug, "2.0.8"},
{:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.7", override: true}, {:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true},
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon # in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true}, {:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.3", override: true}, {:hocon, github: "emqx/hocon", tag: "0.39.4", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
@ -702,7 +702,6 @@ defmodule EMQXUmbrella.MixProject do
emqx_default_erlang_cookie: default_cookie(), emqx_default_erlang_cookie: default_cookie(),
platform_data_dir: "data", platform_data_dir: "data",
platform_etc_dir: "etc", platform_etc_dir: "etc",
platform_log_dir: "log",
platform_plugins_dir: "plugins", platform_plugins_dir: "plugins",
runner_bin_dir: "$RUNNER_ROOT_DIR/bin", runner_bin_dir: "$RUNNER_ROOT_DIR/bin",
emqx_etc_dir: "$RUNNER_ROOT_DIR/etc", emqx_etc_dir: "$RUNNER_ROOT_DIR/etc",
@ -725,7 +724,6 @@ defmodule EMQXUmbrella.MixProject do
emqx_default_erlang_cookie: default_cookie(), emqx_default_erlang_cookie: default_cookie(),
platform_data_dir: "/var/lib/emqx", platform_data_dir: "/var/lib/emqx",
platform_etc_dir: "/etc/emqx", platform_etc_dir: "/etc/emqx",
platform_log_dir: "/var/log/emqx",
platform_plugins_dir: "/var/lib/emqx/plugins", platform_plugins_dir: "/var/lib/emqx/plugins",
runner_bin_dir: "/usr/bin", runner_bin_dir: "/usr/bin",
emqx_etc_dir: "/etc/emqx", emqx_etc_dir: "/etc/emqx",

View File

@ -56,7 +56,7 @@
, {gpb, "4.19.7"} , {gpb, "4.19.7"}
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
@ -75,7 +75,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.3"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

View File

@ -352,7 +352,6 @@ overlay_vars_pkg(bin) ->
[ [
{platform_data_dir, "data"}, {platform_data_dir, "data"},
{platform_etc_dir, "etc"}, {platform_etc_dir, "etc"},
{platform_log_dir, "log"},
{platform_plugins_dir, "plugins"}, {platform_plugins_dir, "plugins"},
{runner_bin_dir, "$RUNNER_ROOT_DIR/bin"}, {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"},
{emqx_etc_dir, "$RUNNER_ROOT_DIR/etc"}, {emqx_etc_dir, "$RUNNER_ROOT_DIR/etc"},
@ -365,7 +364,6 @@ overlay_vars_pkg(pkg) ->
[ [
{platform_data_dir, "/var/lib/emqx"}, {platform_data_dir, "/var/lib/emqx"},
{platform_etc_dir, "/etc/emqx"}, {platform_etc_dir, "/etc/emqx"},
{platform_log_dir, "/var/log/emqx"},
{platform_plugins_dir, "/var/lib/emqx/plugins"}, {platform_plugins_dir, "/var/lib/emqx/plugins"},
{runner_bin_dir, "/usr/bin"}, {runner_bin_dir, "/usr/bin"},
{emqx_etc_dir, "/etc/emqx"}, {emqx_etc_dir, "/etc/emqx"},

View File

@ -11,7 +11,8 @@ RUNNER_LIB_DIR="{{ runner_lib_dir }}"
IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}" IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}"
## Allow users to pre-set `EMQX_LOG_DIR` because it only affects boot commands like `start` and `console`, ## Allow users to pre-set `EMQX_LOG_DIR` because it only affects boot commands like `start` and `console`,
## but not other commands such as `ping` and `ctl`. ## but not other commands such as `ping` and `ctl`.
RUNNER_LOG_DIR="${EMQX_LOG_DIR:-${RUNNER_LOG_DIR:-{{ runner_log_dir }}}}" ## RUNNER_LOG_DIR is kept for backward compatibility.
export EMQX_LOG_DIR="${EMQX_LOG_DIR:-${RUNNER_LOG_DIR:-{{ runner_log_dir }}}}"
EMQX_ETC_DIR="{{ emqx_etc_dir }}" EMQX_ETC_DIR="{{ emqx_etc_dir }}"
RUNNER_USER="{{ runner_user }}" RUNNER_USER="{{ runner_user }}"
SCHEMA_MOD="{{ emqx_schema_mod }}" SCHEMA_MOD="{{ emqx_schema_mod }}"

View File

@ -84,8 +84,10 @@ common_handler_max_depth.label:
"""Max Depth""" """Max Depth"""
desc_log.desc: desc_log.desc:
"""EMQX logging supports multiple sinks for the log events. """EMQX supports multiple log handlers, one console handler and multiple file handlers.
Each sink is represented by a _log handler_, which can be configured independently.""" EMQX by default logs to console when running in docker or in console/foreground mode,
otherwise it logs to file $EMQX_LOG_DIR/emqx.log.
For advanced configuration, you can find more parameters in this section."""
desc_log.label: desc_log.label:
"""Log""" """Log"""

View File

@ -90,7 +90,8 @@ common_handler_max_depth.label:
"""最大深度""" """最大深度"""
desc_log.desc: desc_log.desc:
"""EMQX 日志记录支持日志事件的多个接收器。 每个接收器由一个_log handler_表示可以独立配置。""" """EMQX 支持同时多个日志输出,一个控制台输出,和多个文件输出。
默认情况下EMQX 运行在容器中,或者在 'console' 或 'foreground' 模式下运行时,会输出到 控制台,否则输出到文件。"""
desc_log.label: desc_log.label:
"""日志""" """日志"""

View File

@ -47,7 +47,7 @@ while [ "$#" -gt 0 ]; do
exit 0 exit 0
;; ;;
--app) --app)
WHICH_APP="$2" WHICH_APP="${2%/}"
shift 2 shift 2
;; ;;
--only-up) --only-up)