diff --git a/.gitignore b/.gitignore index 62e8ddc81..ceb12182f 100644 --- a/.gitignore +++ b/.gitignore @@ -43,8 +43,7 @@ tmp/ _packages elvis emqx_dialyzer_*_plt -*/emqx_dashboard/priv/www -*/emqx_dashboard/priv/i18n.conf +*/emqx_dashboard/priv/ dist.zip scripts/git-token apps/*/etc/*.all @@ -71,3 +70,5 @@ apps/emqx/test/emqx_static_checks_data/master.bpapi lux_logs/ /.prepare bom.json +ct_run*/ +apps/emqx_conf/etc/emqx.conf.all.rendered* diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index ee345e9d6..e69de29bb 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1,43 +0,0 @@ -listeners.tcp.default { - bind = "0.0.0.0:1883" - max_connections = 1024000 -} - -listeners.ssl.default { - bind = "0.0.0.0:8883" - max_connections = 512000 - ssl_options { - keyfile = "{{ platform_etc_dir }}/certs/key.pem" - certfile = "{{ platform_etc_dir }}/certs/cert.pem" - cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" - } -} - -listeners.ws.default { - bind = "0.0.0.0:8083" - max_connections = 1024000 - websocket.mqtt_path = "/mqtt" -} - -listeners.wss.default { - bind = "0.0.0.0:8084" - max_connections = 512000 - websocket.mqtt_path = "/mqtt" - ssl_options { - keyfile = "{{ platform_etc_dir }}/certs/key.pem" - certfile = "{{ platform_etc_dir }}/certs/cert.pem" - cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" - } -} - -# listeners.quic.default { -# enabled = true -# bind = "0.0.0.0:14567" -# max_connections = 1024000 -# ssl_options { -# verify = verify_none -# keyfile = "{{ platform_etc_dir }}/certs/key.pem" -# certfile = "{{ platform_etc_dir }}/certs/cert.pem" -# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" -# } -# } diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 0cfba8fe3..2f44d2e1a 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.24"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.3-alpha.3"). +-define(EMQX_RELEASE_EE, "5.0.3-alpha.5"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/include/emqx_schema.hrl b/apps/emqx/include/emqx_schema.hrl new file mode 100644 index 000000000..307bb20c5 --- /dev/null +++ b/apps/emqx/include/emqx_schema.hrl @@ -0,0 +1,23 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_SCHEMA_HRL). +-define(EMQX_SCHEMA_HRL, true). + +-define(TOMBSTONE_TYPE, marked_for_deletion). +-define(TOMBSTONE_VALUE, <<"marked_for_deletion">>). +-define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion). + +-endif. diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 3c132f20f..5053a353c 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.3"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index d42478fea..5ca8fc797 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.24"}, + {vsn, "5.0.25"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 9561263ca..54648fca6 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -35,7 +35,6 @@ save_to_config_map/2, save_to_override_conf/3 ]). --export([raw_conf_with_default/4]). -export([merge_envs/2]). -export([ @@ -90,7 +89,7 @@ ]). -ifdef(TEST). --export([erase_schema_mod_and_names/0]). +-export([erase_all/0]). -endif. -include("logger.hrl"). @@ -329,7 +328,7 @@ init_load(SchemaMod, ConfFiles) -> -spec init_load(module(), [string()] | binary() | hocon:config()) -> ok. init_load(SchemaMod, Conf, Opts) when is_list(Conf) orelse is_binary(Conf) -> HasDeprecatedFile = has_deprecated_file(), - RawConf = parse_hocon(HasDeprecatedFile, Conf), + RawConf = load_config_files(HasDeprecatedFile, Conf), init_load(HasDeprecatedFile, SchemaMod, RawConf, Opts). init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) -> @@ -339,18 +338,16 @@ init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) -> RawConfWithEnvs = merge_envs(SchemaMod, RawConf), Overrides = read_override_confs(), RawConfWithOverrides = hocon:deep_merge(RawConfWithEnvs, Overrides), - RootNames = get_root_names(), - RawConfAll = raw_conf_with_default(SchemaMod, RootNames, RawConfWithOverrides, Opts), + RawConfAll = maybe_fill_defaults(SchemaMod, RawConfWithOverrides, Opts), %% check configs against the schema {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConfAll, #{}), save_to_app_env(AppEnvs), ok = save_to_config_map(CheckedConf, RawConfAll); init_load(false, SchemaMod, RawConf, Opts) when is_map(RawConf) -> ok = save_schema_mod_and_names(SchemaMod), - RootNames = get_root_names(), %% Merge environment variable overrides on top RawConfWithEnvs = merge_envs(SchemaMod, RawConf), - RawConfAll = raw_conf_with_default(SchemaMod, RootNames, RawConfWithEnvs, Opts), + RawConfAll = maybe_fill_defaults(SchemaMod, RawConfWithEnvs, Opts), %% check configs against the schema {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConfAll, #{}), save_to_app_env(AppEnvs), @@ -363,47 +360,61 @@ read_override_confs() -> hocon:deep_merge(ClusterOverrides, LocalOverrides). %% keep the raw and non-raw conf has the same keys to make update raw conf easier. -raw_conf_with_default(SchemaMod, RootNames, RawConf, #{raw_with_default := true}) -> - Fun = fun(Name, Acc) -> - case maps:is_key(Name, RawConf) of - true -> - Acc; - false -> - case lists:keyfind(Name, 1, hocon_schema:roots(SchemaMod)) of - false -> - Acc; - {_, {_, Schema}} -> - Acc#{Name => schema_default(Schema)} - end - end - end, - RawDefault = lists:foldl(Fun, #{}, RootNames), - maps:merge(RawConf, fill_defaults(SchemaMod, RawDefault, #{})); -raw_conf_with_default(_SchemaMod, _RootNames, RawConf, _Opts) -> +%% TODO: remove raw_with_default as it's now always true. +maybe_fill_defaults(SchemaMod, RawConf0, #{raw_with_default := true}) -> + RootSchemas = hocon_schema:roots(SchemaMod), + %% the roots which are missing from the loaded configs + MissingRoots = lists:filtermap( + fun({BinName, Sc}) -> + case maps:is_key(BinName, RawConf0) orelse is_already_loaded(BinName) of + true -> false; + false -> {true, Sc} + end + end, + RootSchemas + ), + RawConf = lists:foldl( + fun({RootName, Schema}, Acc) -> + Acc#{bin(RootName) => seed_default(Schema)} + end, + RawConf0, + MissingRoots + ), + fill_defaults(RawConf); +maybe_fill_defaults(_SchemaMod, RawConf, _Opts) -> RawConf. -schema_default(Schema) -> - case hocon_schema:field_schema(Schema, type) of - ?ARRAY(_) -> - []; - _ -> - #{} +%% So far, this can only return true when testing. +%% e.g. when testing an app, we need to load its config first +%% then start emqx_conf application which will load the +%% possibly empty config again (then filled with defaults). +is_already_loaded(Name) -> + ?MODULE:get_raw([Name], #{}) =/= #{}. + +%% if a root is not found in the raw conf, fill it with default values. +seed_default(Schema) -> + case hocon_schema:field_schema(Schema, default) of + undefined -> + %% so far all roots without a default value are objects + #{}; + Value -> + Value end. -parse_hocon(HasDeprecatedFile, Conf) -> +load_config_files(HasDeprecatedFile, Conf) -> IncDirs = include_dirs(), case do_parse_hocon(HasDeprecatedFile, Conf, IncDirs) of {ok, HoconMap} -> HoconMap; {error, Reason} -> ?SLOG(error, #{ - msg => "failed_to_load_hocon_file", + msg => "failed_to_load_config_file", reason => Reason, pwd => file:get_cwd(), include_dirs => IncDirs, config_file => Conf }), - error(failed_to_load_hocon_file) + error(failed_to_load_config_file) end. do_parse_hocon(true, Conf, IncDirs) -> @@ -548,7 +559,9 @@ save_schema_mod_and_names(SchemaMod) -> }). -ifdef(TEST). -erase_schema_mod_and_names() -> +erase_all() -> + Names = get_root_names(), + lists:foreach(fun erase/1, Names), persistent_term:erase(?PERSIS_SCHEMA_MODS). -endif. diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index e664a7dd7..0bad19f9e 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -18,6 +18,7 @@ -module(emqx_config_handler). -include("logger.hrl"). +-include("emqx_schema.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -behaviour(gen_server). @@ -447,11 +448,17 @@ merge_to_override_config(RawConf, Opts) -> up_req({remove, _Opts}) -> '$remove'; up_req({{update, Req}, _Opts}) -> Req. -return_change_result(ConfKeyPath, {{update, _Req}, Opts}) -> - #{ - config => emqx_config:get(ConfKeyPath), - raw_config => return_rawconf(ConfKeyPath, Opts) - }; +return_change_result(ConfKeyPath, {{update, Req}, Opts}) -> + case Req =/= ?TOMBSTONE_CONFIG_CHANGE_REQ of + true -> + #{ + config => emqx_config:get(ConfKeyPath), + raw_config => return_rawconf(ConfKeyPath, Opts) + }; + false -> + %% like remove, nothing to return + #{} + end; return_change_result(_ConfKeyPath, {remove, _Opts}) -> #{}. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index f82aebe7c..b3043effc 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -20,6 +20,7 @@ -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]). -include("emqx_mqtt.hrl"). +-include("emqx_schema.hrl"). -include("logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -33,7 +34,8 @@ is_running/1, current_conns/2, max_conns/2, - id_example/0 + id_example/0, + default_max_conn/0 ]). -export([ @@ -61,8 +63,11 @@ -export([certs_dir/2]). -endif. +-type listener_id() :: atom() | binary(). + -define(CONF_KEY_PATH, [listeners, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). +-define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ). -spec id_example() -> atom(). id_example() -> 'tcp:default'. @@ -105,19 +110,22 @@ do_list_raw() -> format_raw_listeners({Type0, Conf}) -> Type = binary_to_atom(Type0), - lists:map( - fun({LName, LConf0}) when is_map(LConf0) -> - Bind = parse_bind(LConf0), - Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), - LConf1 = maps:remove(<<"authentication">>, LConf0), - LConf3 = maps:put(<<"running">>, Running, LConf1), - CurrConn = - case Running of - true -> current_conns(Type, LName, Bind); - false -> 0 - end, - LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3), - {Type0, LName, LConf4} + lists:filtermap( + fun + ({LName, LConf0}) when is_map(LConf0) -> + Bind = parse_bind(LConf0), + Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), + LConf1 = maps:remove(<<"authentication">>, LConf0), + LConf2 = maps:put(<<"running">>, Running, LConf1), + CurrConn = + case Running of + true -> current_conns(Type, LName, Bind); + false -> 0 + end, + LConf = maps:put(<<"current_connections">>, CurrConn, LConf2), + {true, {Type0, LName, LConf}}; + ({_LName, _MarkDel}) -> + false end, maps:to_list(Conf) ). @@ -195,7 +203,7 @@ start() -> ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), foreach_listeners(fun start_listener/3). --spec start_listener(atom()) -> ok | {error, term()}. +-spec start_listener(listener_id()) -> ok | {error, term()}. start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). @@ -246,7 +254,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> restart() -> foreach_listeners(fun restart_listener/3). --spec restart_listener(atom()) -> ok | {error, term()}. +-spec restart_listener(listener_id()) -> ok | {error, term()}. restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). @@ -271,7 +279,7 @@ stop() -> _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), foreach_listeners(fun stop_listener/3). --spec stop_listener(atom()) -> ok | {error, term()}. +-spec stop_listener(listener_id()) -> ok | {error, term()}. stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). @@ -419,7 +427,9 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> end. %% Update the listeners at runtime -pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) -> +pre_config_update([listeners, Type, Name], {create, NewConf}, V) when + V =:= undefined orelse V =:= ?TOMBSTONE_VALUE +-> CertsDir = certs_dir(Type, Name), {ok, convert_certs(CertsDir, NewConf)}; pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) -> @@ -434,6 +444,8 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) -> pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) -> NewConf = emqx_utils_maps:deep_merge(RawConf, Updated), {ok, NewConf}; +pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) -> + {ok, ?TOMBSTONE_VALUE}; pre_config_update(_Path, _Request, RawConf) -> {ok, RawConf}. @@ -441,13 +453,15 @@ post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefin start_listener(Type, Name, NewConf); post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf), + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), case NewConf of #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf}); _ -> ok end; -post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) -> - ok; -post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) -> +post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when + Op =:= ?MARK_DEL andalso is_map(OldConf) +-> + ok = unregister_ocsp_stapling_refresh(Type, Name), case stop_listener(Type, Name, OldConf) of ok -> _ = emqx_authentication:delete_chain(listener_id(Type, Name)), @@ -460,10 +474,18 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo #{enabled := NewEnabled} = NewConf, #{enabled := OldEnabled} = OldConf, case {NewEnabled, OldEnabled} of - {true, true} -> restart_listener(Type, Name, {OldConf, NewConf}); - {true, false} -> start_listener(Type, Name, NewConf); - {false, true} -> stop_listener(Type, Name, OldConf); - {false, false} -> stop_listener(Type, Name, OldConf) + {true, true} -> + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + restart_listener(Type, Name, {OldConf, NewConf}); + {true, false} -> + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + start_listener(Type, Name, NewConf); + {false, true} -> + ok = unregister_ocsp_stapling_refresh(Type, Name), + stop_listener(Type, Name, OldConf); + {false, false} -> + ok = unregister_ocsp_stapling_refresh(Type, Name), + stop_listener(Type, Name, OldConf) end; post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. @@ -601,6 +623,7 @@ format_bind(Bin) when is_binary(Bin) -> listener_id(Type, ListenerName) -> list_to_atom(lists:append([str(Type), ":", str(ListenerName)])). +-spec parse_listener_id(listener_id()) -> {ok, #{type => atom(), name => atom()}} | {error, term()}. parse_listener_id(Id) -> case string:split(str(Id), ":", leading) of [Type, Name] -> @@ -813,3 +836,22 @@ inject_crl_config( }; inject_crl_config(Conf) -> Conf. + +maybe_unregister_ocsp_stapling_refresh( + ssl = Type, Name, #{ssl_options := #{ocsp := #{enable_ocsp_stapling := false}}} = _Conf +) -> + unregister_ocsp_stapling_refresh(Type, Name), + ok; +maybe_unregister_ocsp_stapling_refresh(_Type, _Name, _Conf) -> + ok. + +unregister_ocsp_stapling_refresh(Type, Name) -> + ListenerId = listener_id(Type, Name), + emqx_ocsp_cache:unregister_listener(ListenerId), + ok. + +%% There is currently an issue with frontend +%% infinity is not a good value for it, so we use 5m for now +default_max_conn() -> + %% TODO: <<"infinity">> + 5_000_000. diff --git a/apps/emqx/src/emqx_ocsp_cache.erl b/apps/emqx/src/emqx_ocsp_cache.erl index 3bb10ee5c..ef0411b37 100644 --- a/apps/emqx/src/emqx_ocsp_cache.erl +++ b/apps/emqx/src/emqx_ocsp_cache.erl @@ -30,6 +30,7 @@ sni_fun/2, fetch_response/1, register_listener/2, + unregister_listener/1, inject_sni_fun/2 ]). @@ -107,6 +108,9 @@ fetch_response(ListenerID) -> register_listener(ListenerID, Opts) -> gen_server:call(?MODULE, {register_listener, ListenerID, Opts}, ?CALL_TIMEOUT). +unregister_listener(ListenerID) -> + gen_server:cast(?MODULE, {unregister_listener, ListenerID}). + -spec inject_sni_fun(emqx_listeners:listener_id(), map()) -> map(). inject_sni_fun(ListenerID, Conf0) -> SNIFun = emqx_const_v1:make_sni_fun(ListenerID), @@ -160,6 +164,18 @@ handle_call({register_listener, ListenerID, Conf}, _From, State0) -> handle_call(Call, _From, State) -> {reply, {error, {unknown_call, Call}}, State}. +handle_cast({unregister_listener, ListenerID}, State0) -> + State2 = + case maps:take(?REFRESH_TIMER(ListenerID), State0) of + error -> + State0; + {TRef, State1} -> + emqx_utils:cancel_timer(TRef), + State1 + end, + State = maps:remove({refresh_interval, ListenerID}, State2), + ?tp(ocsp_cache_listener_unregistered, #{listener_id => ListenerID}), + {noreply, State}; handle_cast(_Cast, State) -> {noreply, State}. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 69f234e47..55c9ecaee 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -23,6 +23,7 @@ -dialyzer(no_fail_call). -elvis([{elvis_style, invalid_dynamic_call, disable}]). +-include("emqx_schema.hrl"). -include("emqx_authentication.hrl"). -include("emqx_access_control.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -77,7 +78,8 @@ user_lookup_fun_tr/2, validate_alarm_actions/1, non_empty_string/1, - validations/0 + validations/0, + naive_env_interpolation/1 ]). -export([qos/0]). @@ -110,6 +112,12 @@ convert_servers/2 ]). +%% tombstone types +-export([ + tombstone_map/2, + get_tombstone_map_value_type/1 +]). + -behaviour(hocon_schema). -reflect_type([ @@ -787,41 +795,48 @@ fields("listeners") -> [ {"tcp", sc( - map(name, ref("mqtt_tcp_listener")), + tombstone_map(name, ref("mqtt_tcp_listener")), #{ desc => ?DESC(fields_listeners_tcp), + converter => fun(X, _) -> + ensure_default_listener(X, tcp) + end, required => {false, recursively} } )}, {"ssl", sc( - map(name, ref("mqtt_ssl_listener")), + tombstone_map(name, ref("mqtt_ssl_listener")), #{ desc => ?DESC(fields_listeners_ssl), + converter => fun(X, _) -> ensure_default_listener(X, ssl) end, required => {false, recursively} } )}, {"ws", sc( - map(name, ref("mqtt_ws_listener")), + tombstone_map(name, ref("mqtt_ws_listener")), #{ desc => ?DESC(fields_listeners_ws), + converter => fun(X, _) -> ensure_default_listener(X, ws) end, required => {false, recursively} } )}, {"wss", sc( - map(name, ref("mqtt_wss_listener")), + tombstone_map(name, ref("mqtt_wss_listener")), #{ desc => ?DESC(fields_listeners_wss), + converter => fun(X, _) -> ensure_default_listener(X, wss) end, required => {false, recursively} } )}, {"quic", sc( - map(name, ref("mqtt_quic_listener")), + tombstone_map(name, ref("mqtt_quic_listener")), #{ desc => ?DESC(fields_listeners_quic), + converter => fun keep_default_tombstone/2, required => {false, recursively} } )} @@ -832,7 +847,7 @@ fields("crl_cache") -> %% same URL. If they had diverging timeout options, it would be %% confusing. [ - {"refresh_interval", + {refresh_interval, sc( duration(), #{ @@ -840,7 +855,7 @@ fields("crl_cache") -> desc => ?DESC("crl_cache_refresh_interval") } )}, - {"http_timeout", + {http_timeout, sc( duration(), #{ @@ -848,7 +863,7 @@ fields("crl_cache") -> desc => ?DESC("crl_cache_refresh_http_timeout") } )}, - {"capacity", + {capacity, sc( pos_integer(), #{ @@ -1365,7 +1380,7 @@ fields("ssl_client_opts") -> client_ssl_opts_schema(#{}); fields("ocsp") -> [ - {"enable_ocsp_stapling", + {enable_ocsp_stapling, sc( boolean(), #{ @@ -1373,7 +1388,7 @@ fields("ocsp") -> desc => ?DESC("server_ssl_opts_schema_enable_ocsp_stapling") } )}, - {"responder_url", + {responder_url, sc( url(), #{ @@ -1381,7 +1396,7 @@ fields("ocsp") -> desc => ?DESC("server_ssl_opts_schema_ocsp_responder_url") } )}, - {"issuer_pem", + {issuer_pem, sc( binary(), #{ @@ -1389,7 +1404,7 @@ fields("ocsp") -> desc => ?DESC("server_ssl_opts_schema_ocsp_issuer_pem") } )}, - {"refresh_interval", + {refresh_interval, sc( duration(), #{ @@ -1397,7 +1412,7 @@ fields("ocsp") -> desc => ?DESC("server_ssl_opts_schema_ocsp_refresh_interval") } )}, - {"refresh_http_timeout", + {refresh_http_timeout, sc( duration(), #{ @@ -1947,7 +1962,7 @@ base_listener(Bind) -> sc( hoconsc:union([infinity, pos_integer()]), #{ - default => <<"infinity">>, + default => emqx_listeners:default_max_conn(), desc => ?DESC(base_listener_max_connections) } )}, @@ -2323,12 +2338,12 @@ server_ssl_opts_schema(Defaults, IsRanchListener) -> Field || not IsRanchListener, Field <- [ - {"gc_after_handshake", + {gc_after_handshake, sc(boolean(), #{ default => false, desc => ?DESC(server_ssl_opts_schema_gc_after_handshake) })}, - {"ocsp", + {ocsp, sc( ref("ocsp"), #{ @@ -2336,7 +2351,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) -> validator => fun ocsp_inner_validator/1 } )}, - {"enable_crl_check", + {enable_crl_check, sc( boolean(), #{ @@ -2799,6 +2814,7 @@ authentication(Which) -> hoconsc:mk(Type, #{ desc => Desc, converter => fun ensure_array/2, + default => [], importance => Importance }). @@ -3203,3 +3219,138 @@ assert_required_field(Conf, Key, ErrorMessage) -> _ -> ok end. + +default_listener(tcp) -> + #{ + <<"bind">> => <<"0.0.0.0:1883">> + }; +default_listener(ws) -> + #{ + <<"bind">> => <<"0.0.0.0:8083">>, + <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>} + }; +default_listener(SSLListener) -> + %% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate + CertFile = fun(Name) -> + iolist_to_binary("${EMQX_ETC_DIR}/" ++ filename:join(["certs", Name])) + end, + SslOptions = #{ + <<"cacertfile">> => CertFile(<<"cacert.pem">>), + <<"certfile">> => CertFile(<<"cert.pem">>), + <<"keyfile">> => CertFile(<<"key.pem">>) + }, + case SSLListener of + ssl -> + #{ + <<"bind">> => <<"0.0.0.0:8883">>, + <<"ssl_options">> => SslOptions + }; + wss -> + #{ + <<"bind">> => <<"0.0.0.0:8084">>, + <<"ssl_options">> => SslOptions, + <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>} + } + end. + +%% @doc This function helps to perform a naive string interpolation which +%% only looks at the first segment of the string and tries to replace it. +%% For example +%% "$MY_FILE_PATH" +%% "${MY_FILE_PATH}" +%% "$ENV_VARIABLE/sub/path" +%% "${ENV_VARIABLE}/sub/path" +%% "${ENV_VARIABLE}\sub\path" # windows +%% This function returns undefined if the input is undefined +%% otherwise always return string. +naive_env_interpolation(undefined) -> + undefined; +naive_env_interpolation(Bin) when is_binary(Bin) -> + naive_env_interpolation(unicode:characters_to_list(Bin, utf8)); +naive_env_interpolation("$" ++ Maybe = Original) -> + {Env, Tail} = split_path(Maybe), + case resolve_env(Env) of + {ok, Path} -> + filename:join([Path, Tail]); + error -> + Original + end; +naive_env_interpolation(Other) -> + Other. + +split_path(Path) -> + split_path(Path, []). + +split_path([], Acc) -> + {lists:reverse(Acc), []}; +split_path([Char | Rest], Acc) when Char =:= $/ orelse Char =:= $\\ -> + {lists:reverse(Acc), string:trim(Rest, leading, "/\\")}; +split_path([Char | Rest], Acc) -> + split_path(Rest, [Char | Acc]). + +resolve_env(Name0) -> + Name = string:trim(Name0, both, "{}"), + Value = os:getenv(Name), + case Value =/= false andalso Value =/= "" of + true -> + {ok, Value}; + false -> + special_env(Name) + end. + +-ifdef(TEST). +%% when running tests, we need to mock the env variables +special_env("EMQX_ETC_DIR") -> + {ok, filename:join([code:lib_dir(emqx), etc])}; +special_env("EMQX_LOG_DIR") -> + {ok, "log"}; +special_env(_Name) -> + %% only in tests + error. +-else. +special_env(_Name) -> error. +-endif. + +%% The tombstone atom. +tombstone() -> + ?TOMBSTONE_TYPE. + +%% Make a map type, the value of which is allowed to be 'marked_for_deletion' +%% 'marked_for_delition' is a special value which means the key is deleted. +%% This is used to support the 'delete' operation in configs, +%% since deleting the key would result in default value being used. +tombstone_map(Name, Type) -> + %% marked_for_deletion must be the last member of the union + %% because we need to first union member to populate the default values + map(Name, ?UNION([Type, ?TOMBSTONE_TYPE])). + +%% inverse of mark_del_map +get_tombstone_map_value_type(Schema) -> + %% TODO: violation of abstraction, expose an API in hoconsc + %% hoconsc:map_value_type(Schema) + ?MAP(_Name, Union) = hocon_schema:field_schema(Schema, type), + %% TODO: violation of abstraction, fix hoconsc:union_members/1 + ?UNION(Members) = Union, + Tombstone = tombstone(), + [Type, Tombstone] = hoconsc:union_members(Members), + Type. + +%% Keep the 'default' tombstone, but delete others. +keep_default_tombstone(Map, _Opts) when is_map(Map) -> + maps:filter( + fun(Key, Value) -> + Key =:= <<"default">> orelse Value =/= ?TOMBSTONE_VALUE + end, + Map + ); +keep_default_tombstone(Value, _Opts) -> + Value. + +ensure_default_listener(undefined, ListenerType) -> + %% let the schema's default value do its job + #{<<"default">> => default_listener(ListenerType)}; +ensure_default_listener(#{<<"default">> := _} = Map, _ListenerType) -> + keep_default_tombstone(Map, #{}); +ensure_default_listener(Map, ListenerType) -> + NewMap = Map#{<<"default">> => default_listener(ListenerType)}, + keep_default_tombstone(NewMap, #{}). diff --git a/apps/emqx/src/emqx_tls_lib.erl b/apps/emqx/src/emqx_tls_lib.erl index d1c57bf0d..2683d2a9d 100644 --- a/apps/emqx/src/emqx_tls_lib.erl +++ b/apps/emqx/src/emqx_tls_lib.erl @@ -309,19 +309,19 @@ ensure_ssl_files(Dir, SSL, Opts) -> case ensure_ssl_file_key(SSL, RequiredKeys) of ok -> KeyPaths = ?SSL_FILE_OPT_PATHS ++ ?SSL_FILE_OPT_PATHS_A, - ensure_ssl_files(Dir, SSL, KeyPaths, Opts); + ensure_ssl_files_per_key(Dir, SSL, KeyPaths, Opts); {error, _} = Error -> Error end. -ensure_ssl_files(_Dir, SSL, [], _Opts) -> +ensure_ssl_files_per_key(_Dir, SSL, [], _Opts) -> {ok, SSL}; -ensure_ssl_files(Dir, SSL, [KeyPath | KeyPaths], Opts) -> +ensure_ssl_files_per_key(Dir, SSL, [KeyPath | KeyPaths], Opts) -> case ensure_ssl_file(Dir, KeyPath, SSL, emqx_utils_maps:deep_get(KeyPath, SSL, undefined), Opts) of {ok, NewSSL} -> - ensure_ssl_files(Dir, NewSSL, KeyPaths, Opts); + ensure_ssl_files_per_key(Dir, NewSSL, KeyPaths, Opts); {error, Reason} -> {error, Reason#{which_options => [KeyPath]}} end. @@ -472,7 +472,8 @@ hex_str(Bin) -> iolist_to_binary([io_lib:format("~2.16.0b", [X]) || <> <= Bin]). %% @doc Returns 'true' when the file is a valid pem, otherwise {error, Reason}. -is_valid_pem_file(Path) -> +is_valid_pem_file(Path0) -> + Path = resolve_cert_path_for_read(Path0), case file:read_file(Path) of {ok, Pem} -> is_pem(Pem) orelse {error, not_pem}; {error, Reason} -> {error, Reason} @@ -513,10 +514,16 @@ do_drop_invalid_certs([KeyPath | KeyPaths], SSL) -> to_server_opts(Type, Opts) -> Versions = integral_versions(Type, maps:get(versions, Opts, undefined)), Ciphers = integral_ciphers(Versions, maps:get(ciphers, Opts, undefined)), - maps:to_list(Opts#{ - ciphers => Ciphers, - versions => Versions - }). + Path = fun(Key) -> resolve_cert_path_for_read_strict(maps:get(Key, Opts, undefined)) end, + filter( + maps:to_list(Opts#{ + keyfile => Path(keyfile), + certfile => Path(certfile), + cacertfile => Path(cacertfile), + ciphers => Ciphers, + versions => Versions + }) + ). %% @doc Convert hocon-checked tls client options (map()) to %% proplist accepted by ssl library. @@ -530,11 +537,12 @@ to_client_opts(Opts) -> to_client_opts(Type, Opts) -> GetD = fun(Key, Default) -> fuzzy_map_get(Key, Opts, Default) end, Get = fun(Key) -> GetD(Key, undefined) end, + Path = fun(Key) -> resolve_cert_path_for_read_strict(Get(Key)) end, case GetD(enable, false) of true -> - KeyFile = ensure_str(Get(keyfile)), - CertFile = ensure_str(Get(certfile)), - CAFile = ensure_str(Get(cacertfile)), + KeyFile = Path(keyfile), + CertFile = Path(certfile), + CAFile = Path(cacertfile), Verify = GetD(verify, verify_none), SNI = ensure_sni(Get(server_name_indication)), Versions = integral_versions(Type, Get(versions)), @@ -556,6 +564,31 @@ to_client_opts(Type, Opts) -> [] end. +resolve_cert_path_for_read_strict(Path) -> + case resolve_cert_path_for_read(Path) of + undefined -> + undefined; + ResolvedPath -> + case filelib:is_regular(ResolvedPath) of + true -> + ResolvedPath; + false -> + PathToLog = ensure_str(Path), + LogData = + case PathToLog =:= ResolvedPath of + true -> + #{path => PathToLog}; + false -> + #{path => PathToLog, resolved_path => ResolvedPath} + end, + ?SLOG(error, LogData#{msg => "cert_file_not_found"}), + undefined + end + end. + +resolve_cert_path_for_read(Path) -> + emqx_schema:naive_env_interpolation(Path). + filter([]) -> []; filter([{_, undefined} | T]) -> filter(T); filter([{_, ""} | T]) -> filter(T); diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 71e1bee84..926331e56 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -31,6 +31,7 @@ start_apps/2, start_apps/3, stop_apps/1, + stop_apps/2, reload/2, app_path/2, proj_root/0, @@ -250,11 +251,20 @@ start_app(App, SpecAppConfig, Opts) -> case application:ensure_all_started(App) of {ok, _} -> ok = ensure_dashboard_listeners_started(App), + ok = wait_for_app_processes(App), ok; {error, Reason} -> error({failed_to_start_app, App, Reason}) end. +wait_for_app_processes(emqx_conf) -> + %% emqx_conf app has a gen_server which + %% initializes its state asynchronously + gen_server:call(emqx_cluster_rpc, dummy), + ok; +wait_for_app_processes(_) -> + ok. + app_conf_file(emqx_conf) -> "emqx.conf.all"; app_conf_file(App) -> atom_to_list(App) ++ ".conf". @@ -273,8 +283,7 @@ mustache_vars(App, Opts) -> Defaults = #{ node_cookie => atom_to_list(erlang:get_cookie()), platform_data_dir => app_path(App, "data"), - platform_etc_dir => app_path(App, "etc"), - platform_log_dir => app_path(App, "log") + platform_etc_dir => app_path(App, "etc") }, maps:merge(Defaults, ExtraMustacheVars). @@ -307,12 +316,21 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) -> -spec stop_apps(list()) -> ok. stop_apps(Apps) -> + stop_apps(Apps, #{}). + +stop_apps(Apps, Opts) -> [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]], ok = mria_mnesia:delete_schema(), %% to avoid inter-suite flakiness application:unset_env(emqx, init_config_load_done), persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY), - emqx_config:erase_schema_mod_and_names(), + case Opts of + #{erase_all_configs := false} -> + %% FIXME: this means inter-suite or inter-test dependencies + ok; + _ -> + emqx_config:erase_all() + end, ok = emqx_config:delete_override_conf_files(), application:unset_env(emqx, local_override_conf_file), application:unset_env(emqx, cluster_override_conf_file), @@ -492,7 +510,7 @@ load_config(SchemaModule, Config, Opts) -> ok. load_config(SchemaModule, Config) -> - load_config(SchemaModule, Config, #{raw_with_default => false}). + load_config(SchemaModule, Config, #{raw_with_default => true}). -spec is_all_tcp_servers_available(Servers) -> Result when Servers :: [{Host, Port}], diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 3b75ee4f3..6cabfbfe9 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -63,14 +63,14 @@ t_init_load(_Config) -> ConfFile = "./test_emqx.conf", ok = file:write_file(ConfFile, <<"">>), ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)), - emqx_config:erase_schema_mod_and_names(), + emqx_config:erase_all(), {ok, DeprecatedFile} = application:get_env(emqx, cluster_override_conf_file), ?assertEqual(false, filelib:is_regular(DeprecatedFile), DeprecatedFile), %% Don't has deprecated file ok = emqx_config:init_load(emqx_schema, [ConfFile]), ?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())), ?assertMatch({ok, #{raw_config := 256}}, emqx:update_config([mqtt, max_topic_levels], 256)), - emqx_config:erase_schema_mod_and_names(), + emqx_config:erase_all(), %% Has deprecated file ok = file:write_file(DeprecatedFile, <<"{}">>), ok = emqx_config:init_load(emqx_schema, [ConfFile]), diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index f0c18fa30..fa0713cf0 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -279,8 +279,7 @@ render_config_file() -> mustache_vars() -> [ {platform_data_dir, local_path(["data"])}, - {platform_etc_dir, local_path(["etc"])}, - {platform_log_dir, local_path(["log"])} + {platform_etc_dir, local_path(["etc"])} ]. generate_config() -> diff --git a/apps/emqx/test/emqx_logger_SUITE.erl b/apps/emqx/test/emqx_logger_SUITE.erl index c8ff63c75..e8d7d7a34 100644 --- a/apps/emqx/test/emqx_logger_SUITE.erl +++ b/apps/emqx/test/emqx_logger_SUITE.erl @@ -22,7 +22,6 @@ -include_lib("eunit/include/eunit.hrl"). -define(LOGGER, emqx_logger). --define(a, "a"). -define(SUPPORTED_LEVELS, [emergency, alert, critical, error, warning, notice, info, debug]). all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index 15ca29853..75c41b9fb 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -254,10 +254,15 @@ does_module_exist(Mod) -> end. assert_no_http_get() -> + Timeout = 0, + Error = should_be_cached, + assert_no_http_get(Timeout, Error). + +assert_no_http_get(Timeout, Error) -> receive {http_get, _URL} -> - error(should_be_cached) - after 0 -> + error(Error) + after Timeout -> ok end. @@ -702,7 +707,9 @@ do_t_update_listener(Config) -> %% the API converts that to an internally %% managed file <<"issuer_pem">> => IssuerPem, - <<"responder_url">> => <<"http://localhost:9877">> + <<"responder_url">> => <<"http://localhost:9877">>, + %% for quicker testing; min refresh in tests is 5 s. + <<"refresh_interval">> => <<"5s">> } } }, @@ -739,6 +746,70 @@ do_t_update_listener(Config) -> ) ), assert_http_get(1, 5_000), + + %% Disable OCSP Stapling; the periodic refreshes should stop + RefreshInterval = emqx_config:get([listeners, ssl, default, ssl_options, ocsp, refresh_interval]), + OCSPConfig1 = + #{ + <<"ssl_options">> => + #{ + <<"ocsp">> => + #{ + <<"enable_ocsp_stapling">> => false + } + } + }, + ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, OCSPConfig1), + {ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3), + ?assertMatch( + #{ + <<"ssl_options">> := + #{ + <<"ocsp">> := + #{ + <<"enable_ocsp_stapling">> := false + } + } + }, + ListenerData4 + ), + + assert_no_http_get(2 * RefreshInterval, should_stop_refreshing), + + ok. + +t_double_unregister(_Config) -> + ListenerID = <<"ssl:test_ocsp">>, + Conf = emqx_config:get_listener_conf(ssl, test_ocsp, []), + ?check_trace( + begin + {ok, {ok, _}} = + ?wait_async_action( + emqx_ocsp_cache:register_listener(ListenerID, Conf), + #{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID}, + 5_000 + ), + assert_http_get(1), + + {ok, {ok, _}} = + ?wait_async_action( + emqx_ocsp_cache:unregister_listener(ListenerID), + #{?snk_kind := ocsp_cache_listener_unregistered, listener_id := ListenerID}, + 5_000 + ), + + %% Should be idempotent and not crash + {ok, {ok, _}} = + ?wait_async_action( + emqx_ocsp_cache:unregister_listener(ListenerID), + #{?snk_kind := ocsp_cache_listener_unregistered, listener_id := ListenerID}, + 5_000 + ), + ok + end, + [] + ), + ok. t_ocsp_responder_error_responses(_Config) -> diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index cb51aca46..81991f26e 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -694,3 +694,81 @@ url_type_test_() -> typerefl:from_string(emqx_schema:url(), <<"">>) ) ]. + +env_test_() -> + Do = fun emqx_schema:naive_env_interpolation/1, + [ + {"undefined", fun() -> ?assertEqual(undefined, Do(undefined)) end}, + {"full env abs path", + with_env_fn( + "MY_FILE", + "/path/to/my/file", + fun() -> ?assertEqual("/path/to/my/file", Do("$MY_FILE")) end + )}, + {"full env relative path", + with_env_fn( + "MY_FILE", + "path/to/my/file", + fun() -> ?assertEqual("path/to/my/file", Do("${MY_FILE}")) end + )}, + %% we can not test windows style file join though + {"windows style", + with_env_fn( + "MY_FILE", + "path\\to\\my\\file", + fun() -> ?assertEqual("path\\to\\my\\file", Do("$MY_FILE")) end + )}, + {"dir no {}", + with_env_fn( + "MY_DIR", + "/mydir", + fun() -> ?assertEqual("/mydir/foobar", Do(<<"$MY_DIR/foobar">>)) end + )}, + {"dir with {}", + with_env_fn( + "MY_DIR", + "/mydir", + fun() -> ?assertEqual("/mydir/foobar", Do(<<"${MY_DIR}/foobar">>)) end + )}, + %% a trailing / should not cause the sub path to become absolute + {"env dir with trailing /", + with_env_fn( + "MY_DIR", + "/mydir//", + fun() -> ?assertEqual("/mydir/foobar", Do(<<"${MY_DIR}/foobar">>)) end + )}, + {"string dir with doulbe /", + with_env_fn( + "MY_DIR", + "/mydir/", + fun() -> ?assertEqual("/mydir/foobar", Do(<<"${MY_DIR}//foobar">>)) end + )}, + {"env not found", + with_env_fn( + "MY_DIR", + "/mydir/", + fun() -> ?assertEqual("${MY_DIR2}//foobar", Do(<<"${MY_DIR2}//foobar">>)) end + )} + ]. + +with_env_fn(Name, Value, F) -> + fun() -> + with_envs(F, [{Name, Value}]) + end. + +with_envs(Fun, Envs) -> + with_envs(Fun, [], Envs). + +with_envs(Fun, Args, [{_Name, _Value} | _] = Envs) -> + set_envs(Envs), + try + apply(Fun, Args) + after + unset_envs(Envs) + end. + +set_envs([{_Name, _Value} | _] = Envs) -> + lists:map(fun({Name, Value}) -> os:putenv(Name, Value) end, Envs). + +unset_envs([{_Name, _Value} | _] = Envs) -> + lists:map(fun({Name, _}) -> os:unsetenv(Name) end, Envs). diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 813656e6a..60abe3d3c 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -138,13 +138,13 @@ end_per_testcase(t_ws_non_check_origin, Config) -> del_bucket(), PrevConfig = ?config(prev_config, Config), emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), - emqx_common_test_helpers:stop_apps([]), + stop_apps(), ok; end_per_testcase(_, Config) -> del_bucket(), PrevConfig = ?config(prev_config, Config), emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), - emqx_common_test_helpers:stop_apps([]), + stop_apps(), Config. init_per_suite(Config) -> @@ -156,6 +156,10 @@ end_per_suite(_) -> emqx_common_test_helpers:stop_apps([]), ok. +%% FIXME: this is a temp fix to tests share configs. +stop_apps() -> + emqx_common_test_helpers:stop_apps([], #{erase_all_configs => false}). + %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- diff --git a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl index c7f718dfc..6d9203c95 100644 --- a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl @@ -67,7 +67,7 @@ init_per_suite(Config) -> emqx_config:erase(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY), _ = application:load(emqx_conf), ok = emqx_mgmt_api_test_util:init_suite( - [emqx_authn] + [emqx_conf, emqx_authn] ), ?AUTHN:delete_chain(?GLOBAL), diff --git a/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl b/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl index 59865ab41..98215e853 100644 --- a/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl @@ -42,15 +42,16 @@ init_per_testcase(_Case, Config) -> <<"backend">> => <<"built_in_database">>, <<"user_id_type">> => <<"clientid">> }, - emqx:update_config( + {ok, _} = emqx:update_config( ?PATH, {create_authenticator, ?GLOBAL, AuthnConfig} ), - - emqx_conf:update( - [listeners, tcp, listener_authn_enabled], {create, listener_mqtt_tcp_conf(18830, true)}, #{} + {ok, _} = emqx_conf:update( + [listeners, tcp, listener_authn_enabled], + {create, listener_mqtt_tcp_conf(18830, true)}, + #{} ), - emqx_conf:update( + {ok, _} = emqx_conf:update( [listeners, tcp, listener_authn_disabled], {create, listener_mqtt_tcp_conf(18831, false)}, #{} diff --git a/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl b/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl index 94c07ca96..bd18367b6 100644 --- a/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl @@ -37,7 +37,7 @@ init_per_testcase(_, Config) -> init_per_suite(Config) -> _ = application:load(emqx_conf), - emqx_common_test_helpers:start_apps([emqx_authn]), + emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]), application:ensure_all_started(emqx_resource), application:ensure_all_started(emqx_connector), Config. diff --git a/apps/emqx_authn/test/emqx_authn_schema_SUITE.erl b/apps/emqx_authn/test/emqx_authn_schema_SUITE.erl index 7a766281b..3afb8e973 100644 --- a/apps/emqx_authn/test/emqx_authn_schema_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_schema_SUITE.erl @@ -78,7 +78,8 @@ t_check_schema(_Config) -> ). t_union_member_selector(_) -> - ?assertMatch(#{authentication := undefined}, check(undefined)), + %% default value for authentication + ?assertMatch(#{authentication := []}, check(undefined)), C1 = #{<<"backend">> => <<"built_in_database">>}, ?assertThrow( #{ diff --git a/apps/emqx_authz/etc/emqx_authz.conf b/apps/emqx_authz/etc/emqx_authz.conf index 3bdc180c5..167b12b3f 100644 --- a/apps/emqx_authz/etc/emqx_authz.conf +++ b/apps/emqx_authz/etc/emqx_authz.conf @@ -2,14 +2,4 @@ authorization { deny_action = ignore no_match = allow cache = { enable = true } - sources = [ - { - type = file - enable = true - # This file is immutable to EMQX. - # Once new rules are created from dashboard UI or HTTP API, - # the file 'data/authz/acl.conf' is used instead of this one - path = "{{ platform_etc_dir }}/acl.conf" - } - ] } diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index dd658a6aa..dd0325694 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.18"}, + {vsn, "0.1.19"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 2220e8f6e..d332f009f 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -205,7 +205,7 @@ sources(get, _) -> }, AccIn ) -> - case file:read_file(Path) of + case emqx_authz_file:read_file(Path) of {ok, Rules} -> lists:append(AccIn, [ #{ @@ -242,7 +242,7 @@ source(get, #{bindings := #{type := Type}}) -> Type, fun (#{<<"type">> := <<"file">>, <<"enable">> := Enable, <<"path">> := Path}) -> - case file:read_file(Path) of + case emqx_authz_file:read_file(Path) of {ok, Rules} -> {200, #{ type => file, diff --git a/apps/emqx_authz/src/emqx_authz_file.erl b/apps/emqx_authz/src/emqx_authz_file.erl index ede4a9582..54f1775c6 100644 --- a/apps/emqx_authz/src/emqx_authz_file.erl +++ b/apps/emqx_authz/src/emqx_authz_file.erl @@ -32,13 +32,15 @@ create/1, update/1, destroy/1, - authorize/4 + authorize/4, + read_file/1 ]). description() -> "AuthZ with static rules". -create(#{path := Path} = Source) -> +create(#{path := Path0} = Source) -> + Path = filename(Path0), Rules = case file:consult(Path) of {ok, Terms} -> @@ -63,3 +65,9 @@ destroy(_Source) -> ok. authorize(Client, PubSub, Topic, #{annotations := #{rules := Rules}}) -> emqx_authz_rule:matches(Client, PubSub, Topic, Rules). + +read_file(Path) -> + file:read_file(filename(Path)). + +filename(PathMaybeTemplate) -> + emqx_schema:naive_env_interpolation(PathMaybeTemplate). diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 7aaa68b62..a2a7c6b52 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -491,7 +491,7 @@ authz_fields() -> ?HOCON( ?ARRAY(?UNION(UnionMemberSelector)), #{ - default => [], + default => [default_authz()], desc => ?DESC(sources), %% doc_lift is force a root level reference instead of nesting sub-structs extra => #{doc_lift => true}, @@ -501,3 +501,10 @@ authz_fields() -> } )} ]. + +default_authz() -> + #{ + <<"type">> => <<"file">>, + <<"enable">> => true, + <<"path">> => <<"${EMQX_ETC_DIR}/acl.conf">> + }. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 4b9b7e3fe..f58805b6b 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -230,7 +230,12 @@ webhook_bridge_converter(Conf0, _HoconOpts) -> undefined -> undefined; _ -> - do_convert_webhook_config(Conf1) + maps:map( + fun(_Name, Conf) -> + do_convert_webhook_config(Conf) + end, + Conf1 + ) end. do_convert_webhook_config( diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 58e4a1984..1bde274f3 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, ecql]}, {env, []}, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index d0a1df7a8..a3032a9df 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -281,7 +281,7 @@ proc_cql_params(query, SQL, Params, _State) -> exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query -> - case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of + case exec(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}) of {error, Reason} = Result -> ?tp( error, @@ -295,7 +295,7 @@ exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when end. exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> - case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of + case exec(PoolName, {?MODULE, batch_query, [Async, CQLs]}) of {error, Reason} = Result -> ?tp( error, @@ -308,6 +308,13 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> Result end. +%% Pick one of the pool members to do the query. +%% Using 'no_handoever' strategy, +%% meaning the buffer worker does the gen_server call or gen_server cast +%% towards the connection process. +exec(PoolName, Query) -> + ecpool:pick_and_do(PoolName, Query, no_handover). + on_get_status(_InstId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of true -> @@ -346,17 +353,23 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepar query(Conn, sync, CQL, Params) -> ecql:query(Conn, CQL, Params); query(Conn, {async, Callback}, CQL, Params) -> - ecql:async_query(Conn, CQL, Params, one, Callback). + ok = ecql:async_query(Conn, CQL, Params, one, Callback), + %% return the connection pid for buffer worker to monitor + {ok, Conn}. prepared_query(Conn, sync, PreparedKey, Params) -> ecql:execute(Conn, PreparedKey, Params); prepared_query(Conn, {async, Callback}, PreparedKey, Params) -> - ecql:async_execute(Conn, PreparedKey, Params, Callback). + ok = ecql:async_execute(Conn, PreparedKey, Params, Callback), + %% return the connection pid for buffer worker to monitor + {ok, Conn}. batch_query(Conn, sync, Rows) -> ecql:batch(Conn, Rows); batch_query(Conn, {async, Callback}, Rows) -> - ecql:async_batch(Conn, Rows, Callback). + ok = ecql:async_batch(Conn, Rows, Callback), + %% return the connection pid for buffer worker to monitor + {ok, Conn}. %%-------------------------------------------------------------------- %% callbacks for ecpool diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 7865f0415..cda27f6e4 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -404,7 +404,7 @@ t_setup_via_config_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(cassandra_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _Pid}}], Trace), ok end ), @@ -443,7 +443,7 @@ t_setup_via_http_api_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(cassandra_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _Pid}}], Trace), ok end ), @@ -604,7 +604,7 @@ t_missing_data(Config) -> fun(Trace0) -> %% 1. ecql driver will return `ok` first in async query Trace = ?of_kind(cassandra_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _Pid}}], Trace), %% 2. then it will return an error in callback function Trace1 = ?of_kind(handle_async_reply, Trace0), ?assertMatch([#{result := {error, {8704, _}}}], Trace1), diff --git a/apps/emqx_conf/etc/emqx_conf.conf b/apps/emqx_conf/etc/emqx_conf.conf index a54894dcd..76e3c0805 100644 --- a/apps/emqx_conf/etc/emqx_conf.conf +++ b/apps/emqx_conf/etc/emqx_conf.conf @@ -1,7 +1,13 @@ ## NOTE: -## The EMQX configuration is prioritized (overlayed) in the following order: -## `data/configs/cluster.hocon < etc/emqx.conf < environment variables`. - +## This config file overrides data/configs/cluster.hocon, +## and is merged with environment variables which start with 'EMQX_' prefix. +## +## Config changes made from EMQX dashboard UI, management HTTP API, or CLI +## are stored in data/configs/cluster.hocon. +## To avoid confusion, please do not store the same configs in both files. +## +## See https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html +## Configuration full example can be found in emqx.conf.example node { name = "emqx@127.0.0.1" @@ -9,13 +15,6 @@ node { data_dir = "{{ platform_data_dir }}" } -log { - file_handlers.default { - level = warning - file = "{{ platform_log_dir }}/emqx.log" - } -} - cluster { name = emqxcl discovery_strategy = manual diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index eaa16ab5a..6668d6424 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -18,12 +18,14 @@ -compile({no_auto_import, [get/1, get/2]}). -include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). -export([add_handler/2, remove_handler/1]). -export([get/1, get/2, get_raw/1, get_raw/2, get_all/1]). -export([get_by_node/2, get_by_node/3]). -export([update/3, update/4]). -export([remove/2, remove/3]). +-export([tombstone/2]). -export([reset/2, reset/3]). -export([dump_schema/2]). -export([schema_module/0]). @@ -114,6 +116,10 @@ update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> update(Node, KeyPath, UpdateReq, Opts) -> emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts). +%% @doc Mark the specified key path as tombstone +tombstone(KeyPath, Opts) -> + update(KeyPath, ?TOMBSTONE_CONFIG_CHANGE_REQ, Opts). + %% @doc remove all value of key path in cluster-override.conf or local-override.conf. -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index fbfb97a79..2231b8336 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -32,12 +32,8 @@ start(_StartType, _StartArgs) -> ok = init_conf() catch C:E:St -> - ?SLOG(critical, #{ - msg => failed_to_init_config, - exception => C, - reason => E, - stacktrace => St - }), + %% logger is not quite ready. + io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]), init:stop(1) end, ok = emqx_config_logger:refresh_config(), @@ -92,15 +88,8 @@ sync_data_from_node() -> %% Internal functions %% ------------------------------------------------------------------------------ --ifdef(TEST). -init_load() -> - emqx_config:init_load(emqx_conf:schema_module(), #{raw_with_default => false}). - --else. - init_load() -> emqx_config:init_load(emqx_conf:schema_module(), #{raw_with_default => true}). --endif. init_conf() -> %% Workaround for https://github.com/emqx/mria/issues/94: diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index abccca9fb..e6ccc3842 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -93,7 +93,10 @@ roots() -> {"log", sc( ?R_REF("log"), - #{translate_to => ["kernel"]} + #{ + translate_to => ["kernel"], + importance => ?IMPORTANCE_HIGH + } )}, {"rpc", sc( @@ -472,7 +475,7 @@ fields("node") -> %% for now, it's tricky to use a different data_dir %% otherwise data paths in cluster config may differ %% TODO: change configurable data file paths to relative - importance => ?IMPORTANCE_HIDDEN, + importance => ?IMPORTANCE_LOW, desc => ?DESC(node_data_dir) } )}, @@ -863,15 +866,25 @@ fields("rpc") -> ]; fields("log") -> [ - {"console_handler", ?R_REF("console_handler")}, + {"console_handler", + sc( + ?R_REF("console_handler"), + #{importance => ?IMPORTANCE_HIGH} + )}, {"file_handlers", sc( map(name, ?R_REF("log_file_handler")), - #{desc => ?DESC("log_file_handlers")} + #{ + desc => ?DESC("log_file_handlers"), + %% because file_handlers is a map + %% so there has to be a default value in order to populate the raw configs + default => #{<<"default">> => #{<<"level">> => <<"warning">>}}, + importance => ?IMPORTANCE_HIGH + } )} ]; fields("console_handler") -> - log_handler_common_confs(false); + log_handler_common_confs(console); fields("log_file_handler") -> [ {"file", @@ -879,6 +892,8 @@ fields("log_file_handler") -> file(), #{ desc => ?DESC("log_file_handler_file"), + default => <<"${EMQX_LOG_DIR}/emqx.log">>, + converter => fun emqx_schema:naive_env_interpolation/1, validator => fun validate_file_location/1 } )}, @@ -892,10 +907,11 @@ fields("log_file_handler") -> hoconsc:union([infinity, emqx_schema:bytesize()]), #{ default => <<"50MB">>, - desc => ?DESC("log_file_handler_max_size") + desc => ?DESC("log_file_handler_max_size"), + importance => ?IMPORTANCE_MEDIUM } )} - ] ++ log_handler_common_confs(true); + ] ++ log_handler_common_confs(file); fields("log_rotation") -> [ {"enable", @@ -1104,14 +1120,33 @@ tr_logger_level(Conf) -> tr_logger_handlers(Conf) -> emqx_config_logger:tr_handlers(Conf). -log_handler_common_confs(Enable) -> +log_handler_common_confs(Handler) -> + lists:map( + fun + ({_Name, #{importance := _}} = F) -> F; + ({Name, Sc}) -> {Name, Sc#{importance => ?IMPORTANCE_LOW}} + end, + do_log_handler_common_confs(Handler) + ). +do_log_handler_common_confs(Handler) -> + %% we rarely support dynamic defaults like this + %% for this one, we have build-time defualut the same as runtime default + %% so it's less tricky + EnableValues = + case Handler of + console -> ["console", "both"]; + file -> ["file", "both", "", false] + end, + EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"), + Enable = lists:member(EnvValue, EnableValues), [ {"enable", sc( boolean(), #{ default => Enable, - desc => ?DESC("common_handler_enable") + desc => ?DESC("common_handler_enable"), + importance => ?IMPORTANCE_LOW } )}, {"level", @@ -1128,7 +1163,8 @@ log_handler_common_confs(Enable) -> #{ default => <<"system">>, desc => ?DESC("common_handler_time_offset"), - validator => fun validate_time_offset/1 + validator => fun validate_time_offset/1, + importance => ?IMPORTANCE_LOW } )}, {"chars_limit", @@ -1136,7 +1172,8 @@ log_handler_common_confs(Enable) -> hoconsc:union([unlimited, range(100, inf)]), #{ default => unlimited, - desc => ?DESC("common_handler_chars_limit") + desc => ?DESC("common_handler_chars_limit"), + importance => ?IMPORTANCE_LOW } )}, {"formatter", @@ -1144,7 +1181,8 @@ log_handler_common_confs(Enable) -> hoconsc:enum([text, json]), #{ default => text, - desc => ?DESC("common_handler_formatter") + desc => ?DESC("common_handler_formatter"), + importance => ?IMPORTANCE_MEDIUM } )}, {"single_line", @@ -1152,7 +1190,8 @@ log_handler_common_confs(Enable) -> boolean(), #{ default => true, - desc => ?DESC("common_handler_single_line") + desc => ?DESC("common_handler_single_line"), + importance => ?IMPORTANCE_LOW } )}, {"sync_mode_qlen", @@ -1200,7 +1239,7 @@ log_handler_common_confs(Enable) -> ]. crash_dump_file_default() -> - case os:getenv("RUNNER_LOG_DIR") of + case os:getenv("EMQX_LOG_DIR") of false -> %% testing, or running emqx app as deps <<"log/erl_crash.dump">>; diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index c0a19824c..db55c7032 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.21"}, + {vsn, "0.1.22"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 5b71e3f3a..ffb4bd8a9 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -305,7 +305,20 @@ on_query( Retry ) of - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + {error, Reason} when + Reason =:= econnrefused; + Reason =:= timeout; + Reason =:= {shutdown, normal}; + Reason =:= {shutdown, closed} + -> + ?SLOG(warning, #{ + msg => "http_connector_do_request_failed", + reason => Reason, + connector => InstId + }), + {error, {recoverable_error, Reason}}; + {error, {closed, _Message} = Reason} -> + %% _Message = "The connection was lost." ?SLOG(warning, #{ msg => "http_connector_do_request_failed", reason => Reason, @@ -593,7 +606,16 @@ reply_delegator(ReplyFunAndArgs, Result) -> case Result of %% The normal reason happens when the HTTP connection times out before %% the request has been fully processed - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal -> + {error, Reason} when + Reason =:= econnrefused; + Reason =:= timeout; + Reason =:= normal; + Reason =:= {shutdown, normal} + -> + Result1 = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + {error, {closed, _Message} = Reason} -> + %% _Message = "The connection was lost." Result1 = {error, {recoverable_error, Reason}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); _ -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index fec9717ba..528fcd972 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -286,18 +286,17 @@ parse_spec_ref(Module, Path, Options) -> Schema = try erlang:apply(Module, schema, [Path]) - %% better error message catch - error:Reason:Stacktrace -> - %% raise a new error with the same stacktrace. - %% it's a bug if this happens. - %% i.e. if a path is listed in the spec but the module doesn't - %% implement it or crashes when trying to build the schema. - erlang:raise( - error, - #{mfa => {Module, schema, [Path]}, reason => Reason}, - Stacktrace - ) + Error:Reason:Stacktrace -> + %% This error is intended to fail the build + %% hence print to standard_error + io:format( + standard_error, + "Failed to generate swagger for path ~p in module ~p~n" + "error:~p~nreason:~p~n~p~n", + [Module, Path, Error, Reason, Stacktrace] + ), + error({failed_to_generate_swagger_spec, Module, Path}) end, {Specs, Refs} = maps:fold( fun(Method, Meta, {Acc, RefsAcc}) -> diff --git a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl index f2ba56e08..af4b901b2 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl @@ -308,10 +308,7 @@ t_nest_ref(_Config) -> t_none_ref(_Config) -> Path = "/ref/none", ?assertError( - #{ - mfa := {?MODULE, schema, [Path]}, - reason := function_clause - }, + {failed_to_generate_swagger_spec, ?MODULE, Path}, emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}) ), ok. diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl index cda533cc2..a3d2b4e75 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl @@ -278,10 +278,7 @@ t_bad_ref(_Config) -> t_none_ref(_Config) -> Path = "/ref/none", ?assertError( - #{ - mfa := {?MODULE, schema, ["/ref/none"]}, - reason := function_clause - }, + {failed_to_generate_swagger_spec, ?MODULE, Path}, validate(Path, #{}, []) ), ok. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index d12f99917..ff313c8c8 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -301,10 +301,10 @@ t_cluster_name(_) -> ok end, - emqx_common_test_helpers:stop_apps([emqx, emqx_exhook]), + stop_apps([emqx, emqx_exhook]), emqx_common_test_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), on_exit(fun() -> - emqx_common_test_helpers:stop_apps([emqx, emqx_exhook]), + stop_apps([emqx, emqx_exhook]), load_cfg(?CONF_DEFAULT), emqx_common_test_helpers:start_apps([emqx_exhook]), mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]) @@ -489,3 +489,7 @@ data_file(Name) -> cert_file(Name) -> data_file(filename:join(["certs", Name])). + +%% FIXME: this creats inter-test dependency +stop_apps(Apps) -> + emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}). diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index de86700ef..152ccc599 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -293,12 +293,14 @@ listeners_type() -> listeners_info(Opts) -> Listeners = hocon_schema:fields(emqx_schema, "listeners"), lists:map( - fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> - Fields0 = hocon_schema:fields(Mod, Field), + fun({ListenerType, Schema}) -> + Type = emqx_schema:get_tombstone_map_value_type(Schema), + ?R_REF(Mod, StructName) = Type, + Fields0 = hocon_schema:fields(Mod, StructName), Fields1 = lists:keydelete("authentication", 1, Fields0), Fields3 = required_bind(Fields1, Opts), - Ref = listeners_ref(Type, Opts), - TypeAtom = list_to_existing_atom(Type), + Ref = listeners_ref(ListenerType, Opts), + TypeAtom = list_to_existing_atom(ListenerType), #{ ref => ?R_REF(Ref), schema => [ @@ -642,7 +644,7 @@ create(Path, Conf) -> wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))). ensure_remove(Path) -> - wrap(emqx_conf:remove(Path, ?OPTS(cluster))). + wrap(emqx_conf:tombstone(Path, ?OPTS(cluster))). wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason}; wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 33cb66eb2..977c81c2b 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -20,18 +20,51 @@ -include_lib("eunit/include/eunit.hrl"). --define(PORT, (20000 + ?LINE)). +-define(PORT(Base), (Base + ?LINE)). +-define(PORT, ?PORT(20000)). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, with_defaults_in_file}, + {group, without_defaults_in_file} + ]. + +groups() -> + AllTests = emqx_common_test_helpers:all(?MODULE), + [ + {with_defaults_in_file, AllTests}, + {without_defaults_in_file, AllTests} + ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite([emqx_conf]), Config. -end_per_suite(_) -> - emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}), - emqx_conf:remove([listeners, tcp, new1], #{override_to => local}), +end_per_suite(_Config) -> + ok. + +init_per_group(without_defaults_in_file, Config) -> + emqx_mgmt_api_test_util:init_suite([emqx_conf]), + Config; +init_per_group(with_defaults_in_file, Config) -> + %% we have to materialize the config file with default values for this test group + %% because we want to test the deletion of non-existing listener + %% if there is no config file, the such deletion would result in a deletion + %% of the default listener. + Name = atom_to_list(?MODULE) ++ "-default-listeners", + TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hocon_text()), + emqx_mgmt_api_test_util:init_suite([emqx_conf]), + [{injected_conf_file, TmpConfFullPath} | Config]. + +end_per_group(Group, Config) -> + emqx_conf:tombstone([listeners, tcp, new], #{override_to => cluster}), + emqx_conf:tombstone([listeners, tcp, new1], #{override_to => local}), + case Group =:= with_defaults_in_file of + true -> + {_, File} = lists:keyfind(injected_conf_file, 1, Config), + ok = file:delete(File); + false -> + ok + end, emqx_mgmt_api_test_util:end_suite([emqx_conf]). init_per_testcase(Case, Config) -> @@ -52,30 +85,25 @@ end_per_testcase(Case, Config) -> t_max_connection_default({init, Config}) -> emqx_mgmt_api_test_util:end_suite([emqx_conf]), - Etc = filename:join(["etc", "emqx.conf.all"]), - TmpConfName = atom_to_list(?FUNCTION_NAME) ++ ".conf", - Inc = filename:join(["etc", TmpConfName]), - ConfFile = emqx_common_test_helpers:app_path(emqx_conf, Etc), - IncFile = emqx_common_test_helpers:app_path(emqx_conf, Inc), Port = integer_to_binary(?PORT), Bin = <<"listeners.tcp.max_connection_test {bind = \"0.0.0.0:", Port/binary, "\"}">>, - ok = file:write_file(IncFile, Bin), - ok = file:write_file(ConfFile, ["include \"", TmpConfName, "\""], [append]), + TmpConfName = atom_to_list(?FUNCTION_NAME) ++ ".conf", + TmpConfFullPath = inject_tmp_config_content(TmpConfName, Bin), emqx_mgmt_api_test_util:init_suite([emqx_conf]), - [{tmp_config_file, IncFile} | Config]; + [{tmp_config_file, TmpConfFullPath} | Config]; t_max_connection_default({'end', Config}) -> ok = file:delete(proplists:get_value(tmp_config_file, Config)); t_max_connection_default(Config) when is_list(Config) -> - %% Check infinity is binary not atom. #{<<"listeners">> := Listeners} = emqx_mgmt_api_listeners:do_list_listeners(), Target = lists:filter( fun(#{<<"id">> := Id}) -> Id =:= 'tcp:max_connection_test' end, Listeners ), - ?assertMatch([#{<<"max_connections">> := <<"infinity">>}], Target), + DefaultMaxConn = emqx_listeners:default_max_conn(), + ?assertMatch([#{<<"max_connections">> := DefaultMaxConn}], Target), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:max_connection_test"]), - ?assertMatch(#{<<"max_connections">> := <<"infinity">>}, request(get, NewPath, [], [])), - emqx_conf:remove([listeners, tcp, max_connection_test], #{override_to => cluster}), + ?assertMatch(#{<<"max_connections">> := DefaultMaxConn}, request(get, NewPath, [], [])), + emqx_conf:tombstone([listeners, tcp, max_connection_test], #{override_to => cluster}), ok. t_list_listeners(Config) when is_list(Config) -> @@ -86,7 +114,7 @@ t_list_listeners(Config) when is_list(Config) -> %% POST /listeners ListenerId = <<"tcp:default">>, - NewListenerId = <<"tcp:new">>, + NewListenerId = <<"tcp:new11">>, OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), @@ -100,7 +128,7 @@ t_list_listeners(Config) when is_list(Config) -> OriginListener2 = maps:remove(<<"id">>, OriginListener), Port = integer_to_binary(?PORT), NewConf = OriginListener2#{ - <<"name">> => <<"new">>, + <<"name">> => <<"new11">>, <<"bind">> => <<"0.0.0.0:", Port/binary>>, <<"max_connections">> := <<"infinity">> }, @@ -123,7 +151,7 @@ t_tcp_crud_listeners_by_id(Config) when is_list(Config) -> MinListenerId = <<"tcp:min">>, BadId = <<"tcp:bad">>, Type = <<"tcp">>, - crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 31000). t_ssl_crud_listeners_by_id(Config) when is_list(Config) -> ListenerId = <<"ssl:default">>, @@ -131,7 +159,7 @@ t_ssl_crud_listeners_by_id(Config) when is_list(Config) -> MinListenerId = <<"ssl:min">>, BadId = <<"ssl:bad">>, Type = <<"ssl">>, - crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 32000). t_ws_crud_listeners_by_id(Config) when is_list(Config) -> ListenerId = <<"ws:default">>, @@ -139,7 +167,7 @@ t_ws_crud_listeners_by_id(Config) when is_list(Config) -> MinListenerId = <<"ws:min">>, BadId = <<"ws:bad">>, Type = <<"ws">>, - crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 33000). t_wss_crud_listeners_by_id(Config) when is_list(Config) -> ListenerId = <<"wss:default">>, @@ -147,7 +175,7 @@ t_wss_crud_listeners_by_id(Config) when is_list(Config) -> MinListenerId = <<"wss:min">>, BadId = <<"wss:bad">>, Type = <<"wss">>, - crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 34000). t_api_listeners_list_not_ready(Config) when is_list(Config) -> net_kernel:start(['listeners@127.0.0.1', longnames]), @@ -266,7 +294,7 @@ cluster(Specs) -> end} ]). -crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) -> +crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, PortBase) -> OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), OriginListener = request(get, OriginPath, [], []), @@ -274,8 +302,8 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) -> %% create with full options ?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), - Port1 = integer_to_binary(?PORT), - Port2 = integer_to_binary(?PORT), + Port1 = integer_to_binary(?PORT(PortBase)), + Port2 = integer_to_binary(?PORT(PortBase)), NewConf = OriginListener#{ <<"id">> => NewListenerId, <<"bind">> => <<"0.0.0.0:", Port1/binary>> @@ -284,7 +312,7 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) -> ?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))), Get1 = request(get, NewPath, [], []), ?assertMatch(Create, Get1), - ?assert(is_running(NewListenerId)), + ?assertEqual({true, NewListenerId}, {is_running(NewListenerId), NewListenerId}), %% create with required options MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]), @@ -417,3 +445,21 @@ data_file(Name) -> cert_file(Name) -> data_file(filename:join(["certs", Name])). + +default_listeners_hocon_text() -> + Sc = #{roots => emqx_schema:fields("listeners")}, + Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}), + Config = #{<<"listeners">> => Listeners}, + hocon_pp:do(Config, #{}). + +%% inject a 'include' at the end of emqx.conf.all +%% the 'include' can be kept after test, +%% as long as the file has been deleted it is a no-op +inject_tmp_config_content(TmpFile, Content) -> + Etc = filename:join(["etc", "emqx.conf.all"]), + Inc = filename:join(["etc", TmpFile]), + ConfFile = emqx_common_test_helpers:app_path(emqx_conf, Etc), + TmpFileFullPath = emqx_common_test_helpers:app_path(emqx_conf, Inc), + ok = file:write_file(TmpFileFullPath, Content), + ok = file:write_file(ConfFile, ["\ninclude \"", TmpFileFullPath, "\"\n"], [append]), + TmpFileFullPath. diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index ffea436bb..59ab0269c 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -169,10 +169,10 @@ t_cluster(_) -> emqx_delayed_proto_v1:get_delayed_message(node(), Id) ), - ?assertEqual( - emqx_delayed:get_delayed_message(Id), - emqx_delayed_proto_v1:get_delayed_message(node(), Id) - ), + %% The 'local' and the 'fake-remote' values should be the same, + %% however there is a race condition, so we are just assert that they are both 'ok' tuples + ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)), + ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)), ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id), diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index bb5f39c1f..cc9a8b20f 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -463,6 +463,16 @@ t_num_clients(_Config) -> ok. t_advanced_mqtt_features(_) -> + try + ok = test_advanced_mqtt_features() + catch + _:_ -> + %% delayed messages' metrics might not be reported yet + timer:sleep(1000), + test_advanced_mqtt_features() + end. + +test_advanced_mqtt_features() -> {ok, TelemetryData} = emqx_telemetry:get_telemetry(), AdvFeats = get_value(advanced_mqtt_features, TelemetryData), ?assertEqual( diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index b5669e4b9..1f7c4688e 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -92,8 +92,10 @@ t_server_validator(_) -> ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?DEFAULT_CONF, #{ raw_with_default => true }), - undefined = emqx_conf:get_raw([statsd, server], undefined), - ?assertMatch("127.0.0.1:8125", emqx_conf:get([statsd, server])), + DefaultServer = default_server(), + ?assertEqual(DefaultServer, emqx_conf:get_raw([statsd, server])), + DefaultServerStr = binary_to_list(DefaultServer), + ?assertEqual(DefaultServerStr, emqx_conf:get([statsd, server])), %% recover ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF, #{ raw_with_default => true @@ -204,3 +206,7 @@ request(Method, Body) -> {ok, _Status, _} -> error end. + +default_server() -> + {server, Schema} = lists:keyfind(server, 1, emqx_statsd_schema:fields("statsd")), + hocon_schema:field_schema(Schema, default). diff --git a/bin/emqx b/bin/emqx index 6d4c5cd4e..60c292f9c 100755 --- a/bin/emqx +++ b/bin/emqx @@ -304,7 +304,7 @@ if [ "$ES" -ne 0 ]; then fi # Make sure log directory exists -mkdir -p "$RUNNER_LOG_DIR" +mkdir -p "$EMQX_LOG_DIR" # turn off debug as this is static set +x @@ -757,7 +757,7 @@ generate_config() { local node_name="$2" ## Delete the *.siz files first or it can't start after ## changing the config 'log.rotation.size' - rm -f "${RUNNER_LOG_DIR}"/*.siz + rm -f "${EMQX_LOG_DIR}"/*.siz ## timestamp for each generation local NOW_TIME @@ -861,7 +861,13 @@ wait_until_return_val() { done } -# backward compatible with 4.x +# First, there is EMQX_DEFAULT_LOG_HANDLER which can control the default values +# to be used when generating configs. +# It's set in docker entrypoint and in systemd service file. +# +# To be backward compatible with 4.x and v5.0.0 ~ v5.0.24/e5.0.2: +# if EMQX_LOG__TO is set, we try to enable handlers from environment variables. +# i.e. it overrides the default value set in EMQX_DEFAULT_LOG_HANDLER tr_log_to_env() { local log_to=${EMQX_LOG__TO:-undefined} # unset because it's unknown to 5.0 @@ -893,13 +899,11 @@ tr_log_to_env() { maybe_log_to_console() { if [ "${EMQX_LOG__TO:-}" = 'default' ]; then - # want to use config file defaults, do nothing + # want to use defaults, do nothing unset EMQX_LOG__TO else tr_log_to_env - # ensure defaults - export EMQX_LOG__CONSOLE_HANDLER__ENABLE="${EMQX_LOG__CONSOLE_HANDLER__ENABLE:-true}" - export EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE="${EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE:-false}" + export EMQX_DEFAULT_LOG_HANDLER=${EMQX_DEFAULT_LOG_HANDLER:-console} fi } @@ -979,7 +983,7 @@ diagnose_boot_failure_and_die() { local ps_line ps_line="$(find_emqx_process)" if [ -z "$ps_line" ]; then - echo "Find more information in the latest log file: ${RUNNER_LOG_DIR}/erlang.log.*" + echo "Find more information in the latest log file: ${EMQX_LOG_DIR}/erlang.log.*" exit 1 fi if ! relx_nodetool "ping" > /dev/null; then @@ -990,7 +994,7 @@ diagnose_boot_failure_and_die() { fi if ! relx_nodetool 'eval' 'true = emqx:is_running()' > /dev/null; then logerr "$NAME node is started, but failed to complete the boot sequence in time." - echo "Please collect the logs in ${RUNNER_LOG_DIR} and report a bug to EMQX team at https://github.com/emqx/emqx/issues/new/choose" + echo "Please collect the logs in ${EMQX_LOG_DIR} and report a bug to EMQX team at https://github.com/emqx/emqx/issues/new/choose" pipe_shutdown exit 3 fi @@ -1065,7 +1069,7 @@ case "${COMMAND}" in mkdir -p "$PIPE_DIR" - "$BINDIR/run_erl" -daemon "$PIPE_DIR" "$RUNNER_LOG_DIR" \ + "$BINDIR/run_erl" -daemon "$PIPE_DIR" "$EMQX_LOG_DIR" \ "$(relx_start_command)" WAIT_TIME=${EMQX_WAIT_FOR_START:-120} diff --git a/bin/node_dump b/bin/node_dump index 1c4df08b5..60c995885 100755 --- a/bin/node_dump +++ b/bin/node_dump @@ -10,10 +10,10 @@ echo "Running node dump in ${RUNNER_ROOT_DIR}" cd "${RUNNER_ROOT_DIR}" -DUMP="$RUNNER_LOG_DIR/node_dump_$(date +"%Y%m%d_%H%M%S").tar.gz" -CONF_DUMP="$RUNNER_LOG_DIR/conf.dump" -LICENSE_INFO="$RUNNER_LOG_DIR/license_info.txt" -SYSINFO="$RUNNER_LOG_DIR/sysinfo.txt" +DUMP="$EMQX_LOG_DIR/node_dump_$(date +"%Y%m%d_%H%M%S").tar.gz" +CONF_DUMP="$EMQX_LOG_DIR/conf.dump" +LICENSE_INFO="$EMQX_LOG_DIR/license_info.txt" +SYSINFO="$EMQX_LOG_DIR/sysinfo.txt" LOG_MAX_AGE_DAYS=3 @@ -74,7 +74,7 @@ done # Pack files { - find "$RUNNER_LOG_DIR" -mtime -"${LOG_MAX_AGE_DAYS}" \( -name '*.log.*' -or -name 'run_erl.log*' \) + find "$EMQX_LOG_DIR" -mtime -"${LOG_MAX_AGE_DAYS}" \( -name '*.log.*' -or -name 'run_erl.log*' \) echo "${SYSINFO}" echo "${CONF_DUMP}" echo "${LICENSE_INFO}" diff --git a/changes/ce/fix-10548.en.md b/changes/ce/fix-10548.en.md new file mode 100644 index 000000000..d96f0b57f --- /dev/null +++ b/changes/ce/fix-10548.en.md @@ -0,0 +1,2 @@ +Fixed a race condition in the HTTP driver that would result in an error rather than a retry of the request. +Related fix in the driver: https://github.com/emqx/ehttpc/pull/45 diff --git a/deploy/docker/Dockerfile.msodbc b/deploy/docker/Dockerfile.msodbc new file mode 100644 index 000000000..d7b3457ac --- /dev/null +++ b/deploy/docker/Dockerfile.msodbc @@ -0,0 +1,25 @@ +## This Dockerfile should not run in GitHub Action or any other automated process. +## It should be manually executed by the needs of the user. +## +## Before manaually execute: +## Please confirm the EMQX-Enterprise version you are using and modify the base layer image tag +## ```bash +## $ docker build -f=Dockerfile.msodbc -t emqx-enterprise-with-msodbc:5.0.3-alpha.2 . +## ``` + +# FROM emqx/emqx-enterprise:latest +FROM emqx/emqx-enterprise:5.0.3-alpha.2 + +USER root + +RUN apt-get update \ + && apt-get install -y gnupg2 curl apt-utils \ + && curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \ + && curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-mkc crelease.list \ + && apt-get update \ + && ACCEPT_EULA=Y apt-get install -y msodbcsql17 unixodbc-dev \ + && sed -i 's/ODBC Driver 17 for SQL Server/ms-sql/g' /etc/odbcinst.ini \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +USER emqx diff --git a/deploy/docker/docker-entrypoint.sh b/deploy/docker/docker-entrypoint.sh index 1824e1ee0..056f0675f 100755 --- a/deploy/docker/docker-entrypoint.sh +++ b/deploy/docker/docker-entrypoint.sh @@ -1,9 +1,7 @@ #!/usr/bin/env bash -## EMQ docker image start script -# Huang Rui -# EMQX Team -## Shell setting +## EMQ docker image start script + if [[ -n "$DEBUG" ]]; then set -ex else diff --git a/deploy/packages/emqx.service b/deploy/packages/emqx.service index d826e358b..2dbe550bc 100644 --- a/deploy/packages/emqx.service +++ b/deploy/packages/emqx.service @@ -10,8 +10,8 @@ Group=emqx Type=simple Environment=HOME=/var/lib/emqx -# Enable logging to file -Environment=EMQX_LOG__TO=default +# log to file by default (if no log handler config) +Environment=EMQX_DEFAULT_LOG_HANDLER=file # Start 'foreground' but not 'start' (daemon) mode. # Because systemd monitor/restarts 'simple' services diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 3baf056ec..b18872cf1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -207,7 +207,7 @@ kafka_structs() -> #{ desc => <<"Kafka Producer Bridge Config">>, required => false, - converter => fun emqx_bridge_kafka:kafka_producer_converter/2 + converter => fun kafka_producer_converter/2 } )}, {kafka_consumer, @@ -302,3 +302,13 @@ sqlserver_structs() -> } )} ]. + +kafka_producer_converter(undefined, _) -> + undefined; +kafka_producer_converter(Map, Opts) -> + maps:map( + fun(_Name, Config) -> + emqx_bridge_kafka:kafka_producer_converter(Config, Opts) + end, + Map + ). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 82f556bdb..baf54eff1 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 2e1730b52..1e597f813 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -114,18 +114,19 @@ on_start( sync_timeout => SyncTimeout, templates => Templates, producers_map_pid => ProducersMapPID, - producers_opts => ProducerOpts + producers_opts => emqx_secret:wrap(ProducerOpts) }, case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; - {error, _Reason} = Error -> + {error, Reason0} -> + Reason = redact(Reason0), ?tp( rocketmq_connector_start_failed, - #{error => _Reason} + #{error => Reason} ), - Error + {error, Reason} end. on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> @@ -222,7 +223,7 @@ safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, R produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) catch _Type:Reason -> - {error, {unrecoverable_error, Reason}} + {error, {unrecoverable_error, redact(Reason)}} end. produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) -> @@ -337,7 +338,7 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> _ -> ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]), {ok, Producers0} = rocketmq:ensure_supervised_producers( - ClientId, ProducerGroup, Topic1, ProducerOpts + ClientId, ProducerGroup, Topic1, emqx_secret:unwrap(ProducerOpts) ), ets:insert(ClientId, {TopicKey, Producers0}), Producers0 diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 90d90cb36..bb5a1f78d 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -34,8 +34,6 @@ on_stop/2, on_query/3, on_batch_query/3, - on_query_async/4, - on_batch_query_async/4, on_get_status/2 ]). @@ -43,7 +41,7 @@ -export([connect/1]). %% Internal exports used to execute code with ecpool worker --export([do_get_status/1, worker_do_insert/3, do_async_reply/2]). +-export([do_get_status/1, worker_do_insert/3]). -import(emqx_plugin_libs_rule, [str/1]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -51,7 +49,6 @@ -define(ACTION_SEND_MESSAGE, send_message). -define(SYNC_QUERY_MODE, handover). --define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}). -define(SQLSERVER_HOST_OPTIONS, #{ default_port => 1433 @@ -169,7 +166,7 @@ server() -> %% Callbacks defined in emqx_resource %%==================================================================== -callback_mode() -> async_if_possible. +callback_mode() -> always_sync. is_buffer_supported() -> false. @@ -252,27 +249,6 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> ), do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). --spec on_query_async( - manager_id(), - {?ACTION_SEND_MESSAGE, map()}, - {ReplyFun :: function(), Args :: list()}, - state() -) -> - {ok, any()} - | {error, term()}. -on_query_async( - InstanceId, - {?ACTION_SEND_MESSAGE, _Msg} = Query, - ReplyFunAndArgs, - State -) -> - ?TRACE( - "SINGLE_QUERY_ASYNC", - "bridge_sqlserver_received", - #{requests => Query, connector => InstanceId, state => State} - ), - do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). - -spec on_batch_query( manager_id(), [{?ACTION_SEND_MESSAGE, map()}], @@ -290,20 +266,6 @@ on_batch_query(InstanceId, BatchRequests, State) -> ), do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). --spec on_batch_query_async( - manager_id(), - [{?ACTION_SEND_MESSAGE, map()}], - {ReplyFun :: function(), Args :: list()}, - state() -) -> {ok, any()}. -on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> - ?TRACE( - "BATCH_QUERY_ASYNC", - "bridge_sqlserver_received", - #{requests => Requests, connector => InstanceId, state => State} - ), - do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). - on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, @@ -364,13 +326,11 @@ conn_str([{password, Password} | Opts], Acc) -> conn_str([{_, _} | Opts], Acc) -> conn_str(Opts, Acc). -%% Sync & Async query with singe & batch sql statement +%% Query with singe & batch sql statement -spec do_query( manager_id(), Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], - ApplyMode :: - handover - | {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}}, + ApplyMode :: handover, state() ) -> {ok, list()} @@ -530,6 +490,3 @@ apply_template(Query, Templates) -> %% TODO: more detail infomatoin ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), {error, failed_to_apply_sql_template}. - -do_async_reply(Result, {ReplyFun, Args}) -> - erlang:apply(ReplyFun, Args ++ [Result]). diff --git a/mix.exs b/mix.exs index 8e100967b..4f5ecceea 100644 --- a/mix.exs +++ b/mix.exs @@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do {:redbug, "2.0.8"}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.7", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, @@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.3", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.4", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, @@ -702,7 +702,6 @@ defmodule EMQXUmbrella.MixProject do emqx_default_erlang_cookie: default_cookie(), platform_data_dir: "data", platform_etc_dir: "etc", - platform_log_dir: "log", platform_plugins_dir: "plugins", runner_bin_dir: "$RUNNER_ROOT_DIR/bin", emqx_etc_dir: "$RUNNER_ROOT_DIR/etc", @@ -725,7 +724,6 @@ defmodule EMQXUmbrella.MixProject do emqx_default_erlang_cookie: default_cookie(), platform_data_dir: "/var/lib/emqx", platform_etc_dir: "/etc/emqx", - platform_log_dir: "/var/log/emqx", platform_plugins_dir: "/var/lib/emqx/plugins", runner_bin_dir: "/usr/bin", emqx_etc_dir: "/etc/emqx", diff --git a/rebar.config b/rebar.config index 05e9b9a28..e11f0c108 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {gpb, "4.19.7"} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} @@ -75,7 +75,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.3"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 5c83d1ea0..61d420e48 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -352,7 +352,6 @@ overlay_vars_pkg(bin) -> [ {platform_data_dir, "data"}, {platform_etc_dir, "etc"}, - {platform_log_dir, "log"}, {platform_plugins_dir, "plugins"}, {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"}, {emqx_etc_dir, "$RUNNER_ROOT_DIR/etc"}, @@ -365,7 +364,6 @@ overlay_vars_pkg(pkg) -> [ {platform_data_dir, "/var/lib/emqx"}, {platform_etc_dir, "/etc/emqx"}, - {platform_log_dir, "/var/log/emqx"}, {platform_plugins_dir, "/var/lib/emqx/plugins"}, {runner_bin_dir, "/usr/bin"}, {emqx_etc_dir, "/etc/emqx"}, diff --git a/rel/emqx_vars b/rel/emqx_vars index e3965d40c..f37968f1f 100644 --- a/rel/emqx_vars +++ b/rel/emqx_vars @@ -11,7 +11,8 @@ RUNNER_LIB_DIR="{{ runner_lib_dir }}" IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}" ## Allow users to pre-set `EMQX_LOG_DIR` because it only affects boot commands like `start` and `console`, ## but not other commands such as `ping` and `ctl`. -RUNNER_LOG_DIR="${EMQX_LOG_DIR:-${RUNNER_LOG_DIR:-{{ runner_log_dir }}}}" +## RUNNER_LOG_DIR is kept for backward compatibility. +export EMQX_LOG_DIR="${EMQX_LOG_DIR:-${RUNNER_LOG_DIR:-{{ runner_log_dir }}}}" EMQX_ETC_DIR="{{ emqx_etc_dir }}" RUNNER_USER="{{ runner_user }}" SCHEMA_MOD="{{ emqx_schema_mod }}" diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index f4016d32e..079132454 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -84,8 +84,10 @@ common_handler_max_depth.label: """Max Depth""" desc_log.desc: -"""EMQX logging supports multiple sinks for the log events. -Each sink is represented by a _log handler_, which can be configured independently.""" +"""EMQX supports multiple log handlers, one console handler and multiple file handlers. +EMQX by default logs to console when running in docker or in console/foreground mode, +otherwise it logs to file $EMQX_LOG_DIR/emqx.log. +For advanced configuration, you can find more parameters in this section.""" desc_log.label: """Log""" diff --git a/rel/i18n/zh/emqx_conf_schema.hocon b/rel/i18n/zh/emqx_conf_schema.hocon index 4c1edbdee..d52ae44cc 100644 --- a/rel/i18n/zh/emqx_conf_schema.hocon +++ b/rel/i18n/zh/emqx_conf_schema.hocon @@ -90,7 +90,8 @@ common_handler_max_depth.label: """最大深度""" desc_log.desc: -"""EMQX 日志记录支持日志事件的多个接收器。 每个接收器由一个_log handler_表示,可以独立配置。""" +"""EMQX 支持同时多个日志输出,一个控制台输出,和多个文件输出。 +默认情况下,EMQX 运行在容器中,或者在 'console' 或 'foreground' 模式下运行时,会输出到 控制台,否则输出到文件。""" desc_log.label: """日志""" diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index ad0736bb3..3e4e3d44b 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -47,7 +47,7 @@ while [ "$#" -gt 0 ]; do exit 0 ;; --app) - WHICH_APP="$2" + WHICH_APP="${2%/}" shift 2 ;; --only-up)