diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index 489c47862..63dad42aa 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -83,6 +83,28 @@ end)() ). +-define(assertExceptionOneOf(CT1, CT2, EXPR), + (fun() -> + X__Attrs = [ + {module, ?MODULE}, + {line, ?LINE}, + {expression, (??EXPR)}, + {pattern, "[ " ++ (??CT1) ++ ", " ++ (??CT2) ++ " ]"} + ], + X__Exc = + try (EXPR) of + X__V -> erlang:error({assertException, [{unexpected_success, X__V} | X__Attrs]}) + catch + X__C:X__T:X__S -> {X__C, X__T, X__S} + end, + case {element(1, X__Exc), element(2, X__Exc)} of + CT1 -> ok; + CT2 -> ok; + _ -> erlang:error({assertException, [{unexpected_exception, X__Exc} | X__Attrs]}) + end + end)() +). + -define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin __TEST_CASE = ?FUNCTION_NAME, (fun diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 171c65be8..ea90bf5dd 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.4.0-alpha.2"). +-define(EMQX_RELEASE_CE, "5.4.0-rc.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.4.0-alpha.2"). +-define(EMQX_RELEASE_EE, "5.4.0-rc.1"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index ad711a709..01b51ed88 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,9 +27,9 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}}, - {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}, + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.3"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 617cb9b30..672ecb1da 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -94,6 +94,7 @@ -export([ensure_atom_conf_path/2]). -export([load_config_files/2]). +-export([upgrade_raw_conf/2]). -ifdef(TEST). -export([erase_all/0, backup_and_write/2]). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 5cf631ccb..dc1f6d9ad 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -55,7 +55,6 @@ ]). -export([pre_config_update/3, post_config_update/5]). --export([create_listener/3, remove_listener/3, update_listener/3]). -export([format_bind/1]). @@ -66,6 +65,11 @@ -export_type([listener_id/0]). -type listener_id() :: atom() | binary(). +-type listener_type() :: tcp | ssl | ws | wss | quic | dtls. + +-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)). +-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)). + -define(ROOT_KEY, listeners). -define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). @@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) -> -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> - case - [ - Running - || {Id, #{running := Running}} <- list(), - Id =:= ListenerId - ] - of - [] -> {error, not_found}; - [IsRunning] -> IsRunning + case lists:keyfind(ListenerId, 1, list()) of + {_Id, #{running := Running}} -> Running; + false -> {error, not_found} end. is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> @@ -229,24 +227,26 @@ start() -> start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). --spec start_listener(atom(), atom(), map()) -> ok | {error, term()}. -start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - case do_start_listener(Type, ListenerName, Conf) of +-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) -> + ListenerId = listener_id(Type, Name), + Limiter = limiter(Conf), + ok = add_limiter_bucket(ListenerId, Limiter), + case do_start_listener(Type, Name, ListenerId, Conf) of {ok, {skipped, Reason}} when - Reason =:= listener_disabled; Reason =:= quic_app_missing -> ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), console_print( "Listener ~ts is NOT started due to: ~p.~n", - [listener_id(Type, ListenerName), Reason] + [ListenerId, Reason] ), ok; {ok, _} -> ?tp(listener_started, #{type => Type, bind => Bind}), console_print( "Listener ~ts on ~ts started.~n", - [listener_id(Type, ListenerName), format_bind(Bind)] + [ListenerId, format_bind(Bind)] ), ok; {error, {already_started, Pid}} -> @@ -255,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> }), {error, {already_started, Pid}}; {error, Reason} -> + ok = del_limiter_bucket(ListenerId, Limiter), ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}), - ListenerId = listener_id(Type, ListenerName), BindStr = format_bind(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p.~n", @@ -269,7 +269,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ) ), {error, {failed_to_start, Msg}} - end. + end; +start_listener(Type, Name, #{enable := false}) -> + console_print( + "Listener ~ts is NOT started due to: disabled.~n", + [listener_id(Type, Name)] + ), + ok. %% @doc Restart all listeners -spec restart() -> ok. @@ -280,16 +286,33 @@ restart() -> restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). --spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}. -restart_listener(Type, ListenerName, {OldConf, NewConf}) -> - restart_listener(Type, ListenerName, OldConf, NewConf); +-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}. restart_listener(Type, ListenerName, Conf) -> restart_listener(Type, ListenerName, Conf, Conf). -restart_listener(Type, ListenerName, OldConf, NewConf) -> - case stop_listener(Type, ListenerName, OldConf) of - ok -> start_listener(Type, ListenerName, NewConf); - {error, Reason} -> {error, Reason} +update_listener(_Type, _Name, #{enable := false}, #{enable := false}) -> + ok; +update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> + stop_listener(Type, Name, Conf); +update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> + start_listener(Type, Name, Conf); +update_listener(Type, Name, OldConf, NewConf) -> + Id = listener_id(Type, Name), + ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)), + case do_update_listener(Type, Name, OldConf, NewConf) of + ok -> + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + ok; + {error, _Reason} -> + restart_listener(Type, Name, OldConf, NewConf) + end. + +restart_listener(Type, Name, OldConf, NewConf) -> + case stop_listener(Type, Name, OldConf) of + ok -> + start_listener(Type, Name, NewConf); + {error, Reason} -> + {error, Reason} end. %% @doc Stop all listeners. @@ -305,9 +328,10 @@ stop() -> stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). -stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - Id = listener_id(Type, ListenerName), - ok = del_limiter_bucket(Id, Conf), +stop_listener(Type, Name, #{bind := Bind} = Conf) -> + Id = listener_id(Type, Name), + ok = del_limiter_bucket(Id, limiter(Conf)), + ok = unregister_ocsp_stapling_refresh(Type, Name), case do_stop_listener(Type, Id, Conf) of ok -> console_print( @@ -325,11 +349,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> {error, Reason} end. --spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}. - -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl -> +-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) -> esockd:close(Id, ListenOn); -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss -> +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) -> case cowboy:stop_listener(Id) of ok -> wait_listener_stopped(ListenOn); @@ -369,45 +392,25 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args). console_print(_Fmt, _Args) -> ok. -endif. -%% Start MQTT/TCP listener --spec do_start_listener(atom(), atom(), map()) -> +-spec do_start_listener(listener_type(), atom(), listener_id(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}. -do_start_listener(_Type, _ListenerName, #{enable := false}) -> - {ok, {skipped, listener_disabled}}; -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == tcp; Type == ssl --> - Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), +%% Start MQTT/TCP listener +do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> esockd:open( Id, ListenOn, - merge_default(esockd_opts(Id, Type, Opts)), - {emqx_connection, start_link, [ - #{ - listener => {Type, ListenerName}, - zone => zone(Opts), - limiter => Limiter, - enable_authn => enable_authn(Opts) - } - ]} + merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == ws; Type == wss --> - Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), - RanchOpts = ranch_opts(Type, ListenOn, Opts), - WsOpts = ws_opts(Type, ListenerName, Opts, Limiter), +do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) -> + RanchOpts = ranch_opts(Type, Opts), + WsOpts = ws_opts(Type, Name, Opts), case Type of ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) end; %% Start MQTT/QUIC listener -do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> +do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) -> ListenOn = case Bind of {Addr, Port} when tuple_size(Addr) == 4 -> @@ -457,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), zone => zone(Opts), - listener => {quic, ListenerName}, + listener => {quic, Name}, limiter => Limiter }, StreamOpts = #{ stream_callback => emqx_quic_stream, active => 1 }, - - Id = listener_id(quic, ListenerName), - add_limiter_bucket(Id, Limiter), quicer:spawn_listener( Id, ListenOn, @@ -476,6 +476,39 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {ok, {skipped, quic_app_missing}} end. +do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when + ?ESOCKD_LISTENER(Type) +-> + Id = listener_id(Type, Name), + case maps:get(bind, OldConf) of + ListenOn -> + esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); + _Different -> + %% TODO + %% Again, we're not strictly required to drop live connections in this case. + {error, not_supported} + end; +do_update_listener(Type, Name, OldConf, NewConf) when + ?COWBOY_LISTENER(Type) +-> + Id = listener_id(Type, Name), + RanchOpts = ranch_opts(Type, NewConf), + WsOpts = ws_opts(Type, Name, NewConf), + case ranch_opts(Type, OldConf) of + RanchOpts -> + %% Transport options did not change, no need to touch the listener. + ok; + _Different -> + %% Transport options changed, we need to tear down the listener. + ok = ranch:suspend_listener(Id), + ok = ranch:set_transport_options(Id, RanchOpts) + end, + ok = ranch:set_protocol_options(Id, WsOpts), + %% No-op if the listener was not suspended. + ranch:resume_listener(Id); +do_update_listener(_Type, _Name, _OldConf, _NewConf) -> + {error, not_supported}. + %% Update the listeners at runtime pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when V =:= undefined orelse V =:= ?TOMBSTONE_VALUE @@ -501,69 +534,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE -> - create_listener(Type, Name, NewConf); + start_listener(Type, Name, NewConf); post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> - update_listener(Type, Name, {OldConf, NewConf}); + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) -> - remove_listener(Type, Name, OldConf); + stop_listener(Type, Name, OldConf); post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> - #{enable := NewEnabled} = NewConf, - #{enable := OldEnabled} = OldConf, - case {NewEnabled, OldEnabled} of - {true, true} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}); - {true, false} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - start_listener(Type, Name, NewConf); - {false, true} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf); - {false, false} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf) - end; + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) -> ok; post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) -> #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf), - Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed), - perform_listener_changes([ - {fun ?MODULE:remove_listener/3, Removed}, - {fun ?MODULE:update_listener/3, Updated}, - {fun ?MODULE:create_listener/3, Added} - ]); + %% TODO + %% This currently lacks transactional semantics. If one of the changes fails, + %% previous changes will not be rolled back. + perform_listener_changes( + [{update, L} || L <- Changed] ++ + [{stop, L} || L <- Removed] ++ + [{start, L} || L <- Added] + ); post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. -create_listener(Type, Name, NewConf) -> - start_listener(Type, Name, NewConf). - -remove_listener(Type, Name, OldConf) -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf). - -update_listener(Type, Name, {OldConf, NewConf}) -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}). - perform_listener_changes([]) -> ok; -perform_listener_changes([{Action, ConfL} | Tasks]) -> - case perform_listener_changes(Action, ConfL) of - ok -> perform_listener_changes(Tasks); +perform_listener_changes([{Action, Listener} | Rest]) -> + case perform_listener_change(Action, Listener) of + ok -> perform_listener_changes(Rest); {error, Reason} -> {error, Reason} end. -perform_listener_changes(_Action, []) -> - ok; -perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) -> - case Action(Type, Name, Diff) of - ok -> perform_listener_changes(Action, MapConf); - {error, Reason} -> {error, Reason} - end. +perform_listener_change(start, {Type, Name, Conf}) -> + start_listener(Type, Name, Conf); +perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) -> + update_listener(Type, Name, ConfOld, ConfNew); +perform_listener_change(stop, {Type, Name, Conf}) -> + stop_listener(Type, Name, Conf). -esockd_opts(ListenerId, Type, Opts0) -> +esockd_opts(ListenerId, Type, Name, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = @@ -579,7 +587,16 @@ esockd_opts(ListenerId, Type, Opts0) -> end, Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])), - tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]} + tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}, + connection_mfargs => + {emqx_connection, start_link, [ + #{ + listener => {Type, Name}, + zone => zone(Opts0), + limiter => Limiter, + enable_authn => enable_authn(Opts0) + } + ]} }, maps:to_list( case Type of @@ -593,20 +610,21 @@ esockd_opts(ListenerId, Type, Opts0) -> end ). -ws_opts(Type, ListenerName, Opts, Limiter) -> - WsPaths = [ - {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ +ws_opts(Type, ListenerName, Opts) -> + WsPath = emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), + WsRoutes = [ + {WsPath, emqx_ws_connection, #{ zone => zone(Opts), listener => {Type, ListenerName}, - limiter => Limiter, + limiter => limiter(Opts), enable_authn => enable_authn(Opts) }} ], - Dispatch = cowboy_router:compile([{'_', WsPaths}]), + Dispatch = cowboy_router:compile([{'_', WsRoutes}]), ProxyProto = maps:get(proxy_protocol, Opts, false), #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. -ranch_opts(Type, ListenOn, Opts) -> +ranch_opts(Type, Opts = #{bind := ListenOn}) -> NumAcceptors = maps:get(acceptors, Opts, 4), MaxConnections = maps:get(max_connections, Opts, 1024), SocketOpts = @@ -725,41 +743,47 @@ add_limiter_bucket(Id, Limiter) -> maps:without([client], Limiter) ). -del_limiter_bucket(Id, Conf) -> - case limiter(Conf) of - undefined -> - ok; - Limiter -> - lists:foreach( - fun(Type) -> - emqx_limiter_server:del_bucket(Id, Type) - end, - maps:keys(Limiter) - ) - end. +del_limiter_bucket(_Id, undefined) -> + ok; +del_limiter_bucket(Id, Limiter) -> + maps:foreach( + fun(Type, _) -> + emqx_limiter_server:del_bucket(Id, Type) + end, + Limiter + ). + +update_limiter_bucket(Id, Limiter, undefined) -> + del_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, undefined, Limiter) -> + add_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, OldLimiter, NewLimiter) -> + ok = add_limiter_bucket(Id, NewLimiter), + Outdated = maps:without(maps:keys(NewLimiter), OldLimiter), + del_limiter_bucket(Id, Outdated). diff_confs(NewConfs, OldConfs) -> emqx_utils:diff_lists( flatten_confs(NewConfs), flatten_confs(OldConfs), - fun({Key, _}) -> Key end + fun({Type, Name, _}) -> {Type, Name} end ). -flatten_confs(Conf0) -> +flatten_confs(Confs) -> lists:flatmap( - fun({Type, Conf}) -> - do_flatten_confs(Type, Conf) + fun({Type, Listeners}) -> + do_flatten_confs(Type, Listeners) end, - maps:to_list(Conf0) + maps:to_list(Confs) ). -do_flatten_confs(Type, Conf0) -> +do_flatten_confs(Type, Listeners) -> FilterFun = fun ({_Name, ?TOMBSTONE_TYPE}) -> false; - ({Name, Conf}) -> {true, {{Type, Name}, Conf}} + ({Name, Conf}) -> {true, {Type, Name, Conf}} end, - lists:filtermap(FilterFun, maps:to_list(Conf0)). + lists:filtermap(FilterFun, maps:to_list(Listeners)). enable_authn(Opts) -> maps:get(enable_authn, Opts, true). diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 10ce3ad74..d9c9470eb 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -67,6 +67,11 @@ select_free_port/1 ]). +-export([ + ssl_verify_fun_allow_any_host/0, + ssl_verify_fun_allow_any_host_impl/3 +]). + -export([ emqx_cluster/1, emqx_cluster/2, diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 5e91b92c9..042ef91db 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -58,7 +58,6 @@ -module(emqx_cth_suite). -include_lib("common_test/include/ct.hrl"). --include_lib("emqx/include/emqx_access_control.hrl"). -export([start/2]). -export([stop/1]). diff --git a/apps/emqx/test/emqx_cth_tls.erl b/apps/emqx/test/emqx_cth_tls.erl new file mode 100644 index 000000000..ccec7626d --- /dev/null +++ b/apps/emqx/test/emqx_cth_tls.erl @@ -0,0 +1,339 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cth_tls). + +-include_lib("public_key/include/public_key.hrl"). + +-export([gen_cert/1]). +-export([write_cert/2]). +-export([write_cert/3]). +-export([write_pem/2]). + +%% ------------------------------------------------------------------- +%% Certificate Issuing +%% Heavily inspired by: ${ERL_SRC}/lib/public_key/test/erl_make_certs.erl +%% ------------------------------------------------------------------- + +-type pem_entry() :: public_key:pem_entry(). +-type certificate() :: pem_entry(). +-type private_key() :: pem_entry(). + +-type cert_subject() :: #{ + name => string(), + email => string(), + city => string(), + state => string(), + org => string(), + org_unit => string(), + country => string(), + serial => string(), + title => string(), + dnQualifer => string() +}. + +-type cert_validity() :: + {_From :: calendar:date(), _To :: calendar:date()}. + +-type cert_extensions() :: #{ + basic_constraints => false | ca | _PathLenContraint :: pos_integer(), + key_usage => false | certsign +}. + +%% @doc Generate a certificate and a private key. +%% If you need root (CA) certificate, use `root` as `issuer` option. By default, the +%% generated certificate will have according extensions (constraints, key usage, etc). +%% Once root certificate + private key pair is generated, you can use the result +%% as `issuer` option to generate other certificates signed by this root. +-spec gen_cert(Opts) -> {certificate(), private_key()} when + Opts :: #{ + key := ec | rsa | PrivKeyIn, + issuer := root | {CertificateIn, PrivKeyIn}, + subject => cert_subject(), + validity => cert_validity(), + extensions => cert_extensions() | false + }, + CertificateIn :: certificate() | public_key:der_encoded() | #'OTPCertificate'{}, + PrivKeyIn :: private_key() | _PEM :: binary(). +gen_cert(Opts) -> + SubjectPrivateKey = get_privkey(Opts), + {TBSCert, IssuerKey} = make_tbs(SubjectPrivateKey, Opts), + Cert = public_key:pkix_sign(TBSCert, IssuerKey), + true = verify_signature(Cert, IssuerKey), + {encode_cert(Cert), encode_privkey(SubjectPrivateKey)}. + +get_privkey(#{key := Algo}) when is_atom(Algo) -> + gen_privkey(Algo); +get_privkey(#{key := Key}) -> + decode_privkey(Key). + +make_tbs(SubjectKey, Opts) -> + {Issuer, IssuerKey} = issuer(Opts, SubjectKey), + Subject = + case Opts of + #{issuer := root} -> + Issuer; + #{} -> + subject(Opts) + end, + { + #'OTPTBSCertificate'{ + version = v3, + serialNumber = rand:uniform(1000000000000), + signature = sign_algorithm(IssuerKey, Opts), + issuer = Issuer, + validity = validity(Opts), + subject = Subject, + subjectPublicKeyInfo = publickey(SubjectKey), + extensions = extensions(Opts) + }, + IssuerKey + }. + +issuer(Opts = #{issuer := root}, SubjectKey) -> + %% Self signed + {subject(Opts), SubjectKey}; +issuer(#{issuer := {Issuer, IssuerKey}}, _SubjectKey) -> + {issuer_subject(Issuer), decode_privkey(IssuerKey)}. + +issuer_subject({'Certificate', IssuerDer, _}) when is_binary(IssuerDer) -> + issuer_subject(IssuerDer); +issuer_subject(IssuerDer) when is_binary(IssuerDer) -> + issuer_subject(public_key:pkix_decode_cert(IssuerDer, otp)); +issuer_subject(#'OTPCertificate'{tbsCertificate = #'OTPTBSCertificate'{subject = Subject}}) -> + Subject. + +subject(Opts = #{}) -> + Subject = maps:get(subject, Opts, #{}), + Entries = maps:map( + fun(N, V) -> [subject_entry(N, V)] end, + maps:merge(default_subject(Opts), Subject) + ), + {rdnSequence, maps:values(Entries)}. + +subject_entry(name, Name) -> + typed_attr(?'id-at-commonName', {printableString, Name}); +subject_entry(email, Email) -> + typed_attr(?'id-emailAddress', Email); +subject_entry(city, City) -> + typed_attr(?'id-at-localityName', {printableString, City}); +subject_entry(state, State) -> + typed_attr(?'id-at-stateOrProvinceName', {printableString, State}); +subject_entry(org, Org) -> + typed_attr(?'id-at-organizationName', {printableString, Org}); +subject_entry(org_unit, OrgUnit) -> + typed_attr(?'id-at-organizationalUnitName', {printableString, OrgUnit}); +subject_entry(country, Country) -> + typed_attr(?'id-at-countryName', Country); +subject_entry(serial, Serial) -> + typed_attr(?'id-at-serialNumber', Serial); +subject_entry(title, Title) -> + typed_attr(?'id-at-title', {printableString, Title}); +subject_entry(dnQualifer, DnQ) -> + typed_attr(?'id-at-dnQualifier', DnQ). + +subject_info(Info, Subject, Default) -> + case subject_info(Info, Subject) of + undefined -> Default; + Value -> Value + end. + +subject_info(Info, {rdnSequence, Entries}) -> + subject_info(Info, Entries); +subject_info(name, Entries) when is_list(Entries) -> + get_string(find_subject_entry(?'id-at-commonName', Entries)); +subject_info(org, Entries) when is_list(Entries) -> + get_string(find_subject_entry(?'id-at-organizationName', Entries)); +subject_info(org_unit, Entries) when is_list(Entries) -> + get_string(find_subject_entry(?'id-at-organizationalUnitName', Entries)); +subject_info(country, Entries) when is_list(Entries) -> + find_subject_entry(?'id-at-countryName', Entries). + +find_subject_entry(Oid, Entries) -> + emqx_maybe:from_list([ + Value + || Attrs <- Entries, + #'AttributeTypeAndValue'{type = T, value = Value} <- Attrs, + T =:= Oid + ]). + +get_string({printableString, String}) -> + String; +get_string(undefined) -> + undefined. + +typed_attr(Type, Value) -> + #'AttributeTypeAndValue'{type = Type, value = Value}. + +sign_algorithm(#'ECPrivateKey'{parameters = Parms}, _Opts) -> + #'SignatureAlgorithm'{ + algorithm = ?'ecdsa-with-SHA256', + parameters = Parms + }. + +validity(Opts) -> + {From, To} = maps:get(validity, Opts, default_validity()), + #'Validity'{ + notBefore = {generalTime, format_date(From)}, + notAfter = {generalTime, format_date(To)} + }. + +publickey(#'ECPrivateKey'{parameters = Params, publicKey = PubKey}) -> + #'OTPSubjectPublicKeyInfo'{ + algorithm = #'PublicKeyAlgorithm'{ + algorithm = ?'id-ecPublicKey', + parameters = Params + }, + subjectPublicKey = #'ECPoint'{point = PubKey} + }. + +extensions(#{extensions := false}) -> + asn1_NOVALUE; +extensions(Opts) -> + Exts = maps:get(extensions, Opts, #{}), + Default = default_extensions(Opts), + maps:fold( + fun(Name, Data, Acc) -> Acc ++ extension(Name, Data) end, + [], + maps:merge(Default, Exts) + ). + +extension(basic_constraints, false) -> + []; +extension(basic_constraints, ca) -> + [ + #'Extension'{ + extnID = ?'id-ce-basicConstraints', + extnValue = #'BasicConstraints'{cA = true}, + critical = true + } + ]; +extension(basic_constraints, Len) when is_integer(Len) -> + [ + #'Extension'{ + extnID = ?'id-ce-basicConstraints', + extnValue = #'BasicConstraints'{cA = true, pathLenConstraint = Len}, + critical = true + } + ]; +extension(key_usage, false) -> + []; +extension(key_usage, certsign) -> + [ + #'Extension'{ + extnID = ?'id-ce-keyUsage', + extnValue = [keyCertSign], + critical = true + } + ]. + +default_validity() -> + {shift_date(date(), -1), shift_date(date(), +7)}. + +default_subject(#{issuer := root}) -> + #{ + name => "RootCA", + org => "EMQ", + org_unit => "EMQX", + country => "CN" + }; +default_subject(#{}) -> + #{ + name => "Server", + org => "EMQ", + org_unit => "EMQX", + country => "CN" + }. + +default_extensions(#{issuer := root}) -> + #{ + basic_constraints => ca, + key_usage => certsign + }; +default_extensions(#{}) -> + #{}. + +%% ------------------------------------------------------------------- + +verify_signature(CertDer, #'ECPrivateKey'{parameters = Params, publicKey = PubKey}) -> + public_key:pkix_verify(CertDer, {#'ECPoint'{point = PubKey}, Params}); +verify_signature(CertDer, KeyPem) -> + verify_signature(CertDer, decode_privkey(KeyPem)). + +%% ------------------------------------------------------------------- + +gen_privkey(ec) -> + public_key:generate_key({namedCurve, secp256k1}); +gen_privkey(rsa) -> + public_key:generate_key({rsa, 2048, 17}). + +decode_privkey(#'ECPrivateKey'{} = Key) -> + Key; +decode_privkey(#'RSAPrivateKey'{} = Key) -> + Key; +decode_privkey(PemEntry = {_, _, _}) -> + public_key:pem_entry_decode(PemEntry); +decode_privkey(PemBinary) when is_binary(PemBinary) -> + [KeyInfo] = public_key:pem_decode(PemBinary), + decode_privkey(KeyInfo). + +-spec encode_privkey(#'ECPrivateKey'{} | #'RSAPrivateKey'{}) -> private_key(). +encode_privkey(Key = #'ECPrivateKey'{}) -> + {ok, Der} = 'OTP-PUB-KEY':encode('ECPrivateKey', Key), + {'ECPrivateKey', Der, not_encrypted}; +encode_privkey(Key = #'RSAPrivateKey'{}) -> + {ok, Der} = 'OTP-PUB-KEY':encode('RSAPrivateKey', Key), + {'RSAPrivateKey', Der, not_encrypted}. + +-spec encode_cert(public_key:der_encoded()) -> certificate(). +encode_cert(Der) -> + {'Certificate', Der, not_encrypted}. + +%% ------------------------------------------------------------------- + +shift_date(Date, Offset) -> + calendar:gregorian_days_to_date(calendar:date_to_gregorian_days(Date) + Offset). + +format_date({Y, M, D}) -> + lists:flatten(io_lib:format("~w~2..0w~2..0w000000Z", [Y, M, D])). + +%% ------------------------------------------------------------------- + +%% @doc Write certificate + private key pair to respective files. +%% Files are created in the given directory. The filenames are derived +%% from the subject information in the certificate. +-spec write_cert(_Dir :: file:name(), {certificate(), private_key()}) -> + {file:name(), file:name()}. +write_cert(Dir, {Cert, Key}) -> + Subject = issuer_subject(Cert), + Filename = subject_info(org, Subject, "ORG") ++ "." ++ subject_info(name, Subject, "XXX"), + write_cert(Dir, Filename, {Cert, Key}). + +-spec write_cert(_Dir :: file:name(), _Prefix :: string(), {certificate(), private_key()}) -> + {file:name(), file:name()}. +write_cert(Dir, Filename, {Cert, Key}) -> + Certfile = filename:join(Dir, Filename ++ ".crt"), + Keyfile = filename:join(Dir, Filename ++ ".key"), + ok = write_pem(Certfile, Cert), + ok = write_pem(Keyfile, Key), + {Certfile, Keyfile}. + +-spec write_pem(file:name(), pem_entry() | [pem_entry()]) -> + ok | {error, file:posix()}. +write_pem(Name, Entries = [_ | _]) -> + file:write_file(Name, public_key:pem_encode(Entries)); +write_pem(Name, Entry) -> + write_pem(Name, [Entry]). diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index b8d0c39f6..476f02eb3 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -20,122 +20,46 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CERTS_PATH(CertName), filename:join(["../../lib/emqx/etc/certs/", CertName])). - -define(SERVER_KEY_PASSWORD, "sErve7r8Key$!"). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - NewConfig = generate_config(), - application:ensure_all_started(esockd), - application:ensure_all_started(quicer), - application:ensure_all_started(cowboy), generate_tls_certs(Config), - lists:foreach(fun set_app_env/1, NewConfig), - Config. + WorkDir = emqx_cth_suite:work_dir(Config), + Apps = emqx_cth_suite:start([quicer, emqx], #{work_dir => WorkDir}), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - application:stop(esockd), - application:stop(cowboy). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) when - Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp + Case =:= t_start_stop_listeners; + Case =:= t_restart_listeners; + Case =:= t_restart_listeners_with_hibernate_after_disabled -> - catch emqx_config_handler:stop(), - Port = emqx_common_test_helpers:select_free_port(tcp), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - PureListeners2 = PureListeners#{ - tcp => #{ - listener_test => #{ - bind => {"127.0.0.1", Port}, - max_connections => 4321, - limiter => #{} - } - } - }, - emqx_config:put([listeners], PureListeners2), - - ok = emqx_listeners:start(), - [ - {prev_listener_conf, PrevListeners}, - {tcp_port, Port} - | Config - ]; -init_per_testcase(t_wss_conn, Config) -> - catch emqx_config_handler:stop(), - Port = emqx_common_test_helpers:select_free_port(ssl), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - PureListeners2 = PureListeners#{ - wss => #{ - listener_test => #{ - bind => {{127, 0, 0, 1}, Port}, - limiter => #{}, - ssl_options => #{ - cacertfile => ?CERTS_PATH("cacert.pem"), - certfile => ?CERTS_PATH("cert.pem"), - keyfile => ?CERTS_PATH("key.pem") - } - } - } - }, - emqx_config:put([listeners], PureListeners2), - - ok = emqx_listeners:start(), - [ - {prev_listener_conf, PrevListeners}, - {wss_port, Port} - | Config - ]; + ok = emqx_listeners:stop(), + Config; init_per_testcase(_, Config) -> - catch emqx_config_handler:stop(), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - emqx_config:put([listeners], PureListeners), - [ - {prev_listener_conf, PrevListeners} - | Config - ]. + ok = emqx_listeners:start(), + Config. -end_per_testcase(Case, Config) when - Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp --> - PrevListener = ?config(prev_listener_conf, Config), - emqx_listeners:stop(), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), - ok; -end_per_testcase(t_wss_conn, Config) -> - PrevListener = ?config(prev_listener_conf, Config), - emqx_listeners:stop(), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), - ok; -end_per_testcase(_, Config) -> - PrevListener = ?config(prev_listener_conf, Config), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), +end_per_testcase(_, _Config) -> ok. t_start_stop_listeners(_) -> ok = emqx_listeners:start(), - ?assertException(error, _, emqx_listeners:start_listener({ws, {"127.0.0.1", 8083}, []})), + ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})), ok = emqx_listeners:stop(). t_restart_listeners(_) -> ok = emqx_listeners:start(), ok = emqx_listeners:stop(), - %% flakyness: eaddrinuse - timer:sleep(timer:seconds(2)), ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). @@ -168,77 +92,315 @@ t_restart_listeners_with_hibernate_after_disabled(_Config) -> ), ok = emqx_listeners:start(), ok = emqx_listeners:stop(), - %% flakyness: eaddrinuse - timer:sleep(timer:seconds(2)), ok = emqx_listeners:restart(), ok = emqx_listeners:stop(), emqx_config:put([listeners], OldLConf). -t_max_conns_tcp(Config) -> +t_max_conns_tcp(_Config) -> %% Note: Using a string representation for the bind address like %% "127.0.0.1" does not work - ?assertEqual( - 4321, - emqx_listeners:max_conns('tcp:listener_test', {{127, 0, 0, 1}, ?config(tcp_port, Config)}) - ). + Port = emqx_common_test_helpers:select_free_port(tcp), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"max_connections">> => 4321, + <<"limiter">> => #{} + }, + with_listener(tcp, maxconns, Conf, fun() -> + ?assertEqual( + 4321, + emqx_listeners:max_conns('tcp:maxconns', {{127, 0, 0, 1}, Port}) + ) + end). -t_current_conns_tcp(Config) -> - ?assertEqual( - 0, - emqx_listeners:current_conns('tcp:listener_test', { - {127, 0, 0, 1}, ?config(tcp_port, Config) - }) - ). +t_current_conns_tcp(_Config) -> + Port = emqx_common_test_helpers:select_free_port(tcp), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"max_connections">> => 42, + <<"limiter">> => #{} + }, + with_listener(tcp, curconns, Conf, fun() -> + ?assertEqual( + 0, + emqx_listeners:current_conns('tcp:curconns', {{127, 0, 0, 1}, Port}) + ) + end). t_wss_conn(Config) -> - {ok, Socket} = ssl:connect( - {127, 0, 0, 1}, ?config(wss_port, Config), [{verify, verify_none}], 1000 - ), - ok = ssl:close(Socket). + PrivDir = ?config(priv_dir, Config), + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"limiter">> => #{}, + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }, + with_listener(wss, wssconn, Conf, fun() -> + {ok, Socket} = ssl:connect({127, 0, 0, 1}, Port, [{verify, verify_none}], 1000), + ok = ssl:close(Socket) + end). t_quic_conn(Config) -> + PrivDir = ?config(priv_dir, Config), Port = emqx_common_test_helpers:select_free_port(quic), - DataDir = ?config(data_dir, Config), - SSLOpts = #{ - password => ?SERVER_KEY_PASSWORD, - certfile => filename:join(DataDir, "server-password.pem"), - cacertfile => filename:join(DataDir, "ca.pem"), - keyfile => filename:join(DataDir, "server-password.key") + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"ssl_options">> => #{ + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") + } }, - emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, Port, #{ssl_options => SSLOpts}), - ct:pal("~p", [emqx_listeners:list()]), - {ok, Conn} = quicer:connect( - {127, 0, 0, 1}, - Port, - [ - {verify, verify_none}, - {alpn, ["mqtt"]} - ], - 1000 - ), - ok = quicer:close_connection(Conn), - emqx_listeners:stop_listener(quic, ?FUNCTION_NAME, #{bind => Port}). + with_listener(quic, ?FUNCTION_NAME, Conf, fun() -> + {ok, Conn} = quicer:connect( + {127, 0, 0, 1}, + Port, + [ + {verify, verify_none}, + {alpn, ["mqtt"]} + ], + 1000 + ), + ok = quicer:close_connection(Conn) + end). t_ssl_password_cert(Config) -> + PrivDir = ?config(priv_dir, Config), Port = emqx_common_test_helpers:select_free_port(ssl), - DataDir = ?config(data_dir, Config), SSLOptsPWD = #{ - password => ?SERVER_KEY_PASSWORD, - certfile => filename:join(DataDir, "server-password.pem"), - cacertfile => filename:join(DataDir, "ca.pem"), - keyfile => filename:join(DataDir, "server-password.key") + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") }, LConf = #{ - enable => true, - bind => {{127, 0, 0, 1}, Port}, - mountpoint => <<>>, - zone => default, - ssl_options => SSLOptsPWD + <<"enable">> => true, + <<"bind">> => format_bind({{127, 0, 0, 1}, Port}), + <<"ssl_options">> => SSLOptsPWD }, - ok = emqx_listeners:start_listener(ssl, ?FUNCTION_NAME, LConf), - {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]), - ssl:close(SSLSocket), - emqx_listeners:stop_listener(ssl, ?FUNCTION_NAME, LConf). + with_listener(ssl, ?FUNCTION_NAME, LConf, fun() -> + {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]), + ssl:close(SSLSocket) + end). + +t_ssl_update_opts(Config) -> + PrivDir = ?config(priv_dir, Config), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(ssl, updated, Conf, fun() -> + %% Client connects successfully. + C1 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: another set of cert/key files. + {ok, _} = emqx:update_config( + [listeners, ssl, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + ?assertError( + {tls_alert, {unknown_ca, _}}, + emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]) + ), + + C2 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. + {ok, _} = emqx:update_config( + [listeners, ssl, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + ?assertExceptionOneOf( + {error, {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}}, + {error, closed}, + emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]) + ), + + C3 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + end). + +t_wss_update_opts(Config) -> + PrivDir = ?config(priv_dir, Config), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(wss, updated, Conf, fun() -> + %% Start a client. + C1 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} + | ClientSSLOpts + ]), + + %% Change the listener SSL configuration. + %% 1. Another set of (password protected) cert/key files. + %% 2. Require peer certificate. + {ok, _} = emqx:update_config( + [listeners, wss, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + %% Due to a bug `emqtt` exits with `badmatch` in this case. + ?assertExit( + _Badmatch, + emqtt_connect_wss(Host, Port, ClientSSLOpts) + ), + + C2 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} + | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. + {ok, _} = emqx:update_config( + [listeners, wss, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + %% Due to a bug `emqtt` does not instantly report that socket was closed. + ?assertError( + timeout, + emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} + | ClientSSLOpts + ]) + ), + + C3 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + end). + +with_listener(Type, Name, Config, Then) -> + {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}), + try + Then() + after + emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ) + end. + +emqtt_connect_ssl(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ssl => true, + ssl_opts => SSLOpts + }). + +emqtt_connect_wss(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:ws_connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ws_transport_options => [ + {protocols, [http]}, + {transport, tls}, + {tls_opts, SSLOpts} + ] + }). + +emqtt_connect(Connect, Opts) -> + case emqtt:start_link(Opts) of + {ok, Client} -> + true = erlang:unlink(Client), + case Connect(Client) of + {ok, _} -> Client; + {error, Reason} -> error(Reason, [Opts]) + end; + {error, Reason} -> + error(Reason, [Opts]) + end. t_format_bind(_) -> ?assertEqual( @@ -266,67 +428,15 @@ t_format_bind(_) -> lists:flatten(emqx_listeners:format_bind(":1883")) ). -render_config_file() -> - Path = local_path(["etc", "emqx.conf"]), - {ok, Temp} = file:read_file(Path), - Vars0 = mustache_vars(), - Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], - Targ = bbmustache:render(Temp, Vars), - NewName = Path ++ ".rendered", - ok = file:write_file(NewName, Targ), - NewName. - -mustache_vars() -> - [ - {platform_data_dir, local_path(["data"])}, - {platform_etc_dir, local_path(["etc"])} - ]. - -generate_config() -> - ConfFile = render_config_file(), - {ok, Conf} = hocon:load(ConfFile, #{format => richmap}), - hocon_tconf:generate(emqx_schema, Conf). - -set_app_env({App, Lists}) -> - lists:foreach( - fun - ({authz_file, _Var}) -> - application:set_env(App, authz_file, local_path(["etc", "authz.conf"])); - ({Par, Var}) -> - application:set_env(App, Par, Var) - end, - Lists - ). - -local_path(Components, Module) -> - filename:join([get_base_dir(Module) | Components]). - -local_path(Components) -> - local_path(Components, ?MODULE). - -get_base_dir(Module) -> - {file, Here} = code:is_loaded(Module), - filename:dirname(filename:dirname(Here)). - -get_base_dir() -> - get_base_dir(?MODULE). - -remove_default_limiter(Listeners) -> - maps:map( - fun(_, X) -> - maps:map( - fun(_, E) -> - maps:remove(limiter, E) - end, - X - ) - end, - Listeners - ). - generate_tls_certs(Config) -> - DataDir = ?config(data_dir, Config), - emqx_common_test_helpers:gen_ca(DataDir, "ca"), - emqx_common_test_helpers:gen_host_cert("server-password", "ca", DataDir, #{ + PrivDir = ?config(priv_dir, Config), + emqx_common_test_helpers:gen_ca(PrivDir, "ca"), + emqx_common_test_helpers:gen_ca(PrivDir, "ca-next"), + emqx_common_test_helpers:gen_host_cert("server", "ca-next", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("client", "ca-next", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("server-password", "ca", PrivDir, #{ password => ?SERVER_KEY_PASSWORD }). + +format_bind(Bind) -> + iolist_to_binary(emqx_listeners:format_bind(Bind)). diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index c6304d9f7..e9c51edfa 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -24,6 +24,7 @@ action_type_to_connector_type/1, action_type_to_bridge_v1_type/2, bridge_v1_type_to_action_type/1, + bridge_v1_type_name/1, is_action_type/1, registered_schema_modules/0, connector_action_config_to_bridge_v1_config/2, @@ -144,6 +145,20 @@ get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) -> get_confs(_, _) -> undefined. +%% We need this hack because of the bugs introduced by associating v2/action/source types +%% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or +%% `confluent_producer', which has no v1 equivalent.... +bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) -> + bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin)); +bridge_v1_type_name(ActionType) -> + Module = get_action_info_module(ActionType), + case erlang:function_exported(Module, bridge_v1_type_name, 0) of + true -> + {ok, Module:bridge_v1_type_name()}; + false -> + {error, no_v1_equivalent} + end. + %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index edc8da113..1595f040d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -621,6 +621,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> end. lookup_from_local_node(ActionType, ActionName) -> + %% TODO: BUG: shouldn't accept an action type here, only V1 types.... case emqx_bridge:lookup(ActionType, ActionName) of {ok, Res} -> {ok, format_resource(Res, node())}; Error -> Error diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 00d3ef6ec..9436ac0ea 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1086,7 +1086,8 @@ bridge_v1_lookup_and_transform(ActionType, Name) -> case lookup(ActionType, Name) of {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} -> BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig), - case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of + HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType), + case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of true -> ConnectorType = connector_type(ActionType), case emqx_connector:lookup(ConnectorType, ConnectorName) of @@ -1112,6 +1113,12 @@ bridge_v1_lookup_and_transform(ActionType, Name) -> not_bridge_v1_compatible_error() -> {error, not_bridge_v1_compatible}. +has_bridge_v1_equivalent(ActionType) -> + case emqx_action_info:bridge_v1_type_name(ActionType) of + {ok, _} -> true; + {error, no_v1_equivalent} -> false + end. + connector_raw_config(Connector, ConnectorType) -> get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema). diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index b268c127d..dadc0a09c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -136,6 +136,9 @@ setup_mocks() -> end ), + catch meck:new(emqx_action_info, MeckOpts), + meck:expect(emqx_action_info, bridge_v1_type_name, 1, {ok, bridge_type()}), + ok. con_mod() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index e56ead313..1f5373b70 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -343,6 +343,42 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> ct:pal("bridge probe result: ~p", [Res]), Res. +list_bridges_http_api_v1() -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + ct:pal("list bridges (http v1)"), + Res = request(get, Path, _Params = []), + ct:pal("list bridges (http v1) result:\n ~p", [Res]), + Res. + +list_actions_http_api() -> + Path = emqx_mgmt_api_test_util:api_path(["actions"]), + ct:pal("list actions (http v2)"), + Res = request(get, Path, _Params = []), + ct:pal("list actions (http v2) result:\n ~p", [Res]), + Res. + +list_connectors_http_api() -> + Path = emqx_mgmt_api_test_util:api_path(["connectors"]), + ct:pal("list connectors"), + Res = request(get, Path, _Params = []), + ct:pal("list connectors result:\n ~p", [Res]), + Res. + +update_rule_http(RuleId, Params) -> + Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]), + ct:pal("update rule ~p:\n ~p", [RuleId, Params]), + Res = request(put, Path, Params), + ct:pal("update rule ~p result:\n ~p", [RuleId, Res]), + Res. + +enable_rule_http(RuleId) -> + Params = #{<<"enable">> => true}, + update_rule_http(RuleId, Params). + +is_rule_enabled(RuleId) -> + {ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId), + Enable. + try_decode_error(Body0) -> case emqx_utils_json:safe_decode(Body0, [return_maps]) of {ok, #{<<"message">> := Msg0} = Body1} -> diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 2977f72cf..420da1275 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -10,8 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(BRIDGE_TYPE, confluent_producer). --define(BRIDGE_TYPE_BIN, <<"confluent_producer">>). +-define(ACTION_TYPE, confluent_producer). +-define(ACTION_TYPE_BIN, <<"confluent_producer">>). -define(CONNECTOR_TYPE, confluent_producer). -define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>). -define(KAFKA_BRIDGE_TYPE, kafka_producer). @@ -93,7 +93,7 @@ common_init_per_testcase(TestCase, Config) -> {connector_type, ?CONNECTOR_TYPE}, {connector_name, Name}, {connector_config, ConnectorConfig}, - {bridge_type, ?BRIDGE_TYPE}, + {bridge_type, ?ACTION_TYPE}, {bridge_name, Name}, {bridge_config, BridgeConfig} | Config @@ -212,7 +212,7 @@ serde_roundtrip(InnerConfigMap0) -> InnerConfigMap. parse_and_check_bridge_config(InnerConfigMap, Name) -> - TypeBin = ?BRIDGE_TYPE_BIN, + TypeBin = ?ACTION_TYPE_BIN, RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}}, hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}), InnerConfigMap. @@ -341,3 +341,43 @@ t_same_name_confluent_kafka_bridges(Config) -> end ), ok. + +t_list_v1_bridges(Config) -> + ?check_trace( + begin + {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config), + + ?assertMatch( + {error, no_v1_equivalent}, + emqx_action_info:bridge_v1_type_name(confluent_producer) + ), + + ?assertMatch( + {ok, {{_, 200, _}, _, []}}, emqx_bridge_v2_testlib:list_bridges_http_api_v1() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_actions_http_api() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_connectors_http_api() + ), + + RuleTopic = <<"t/c">>, + {ok, #{<<"id">> := RuleId0}} = + emqx_bridge_v2_testlib:create_rule_and_action_http( + ?ACTION_TYPE_BIN, + RuleTopic, + Config, + #{overrides => #{enable => true}} + ), + ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)), + ?assertMatch( + {ok, {{_, 200, _}, _, _}}, emqx_bridge_v2_testlib:enable_rule_http(RuleId0) + ), + ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 93f8fd8c3..f860e3635 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -566,6 +566,7 @@ do_acknowledge(State0) -> Path = path(State1, ack), Body = body(State1, ack, #{ack_ids => AckIds}), PreparedRequest = {prepared_request, {Method, Path, Body}}, + ?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, Reason} -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index b747e9262..a35a3eecc 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -706,7 +706,9 @@ prop_all_pulled_are_acked(Trace) -> || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace), #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs ], - AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)), + %% we just need to check that it _tries_ to ack each id; the result itself doesn't + %% matter, as it might timeout. + AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_will_acknowledge, Trace)), AckedMsgIds1 = [ MsgId || PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks) @@ -1172,7 +1174,12 @@ t_multiple_topic_mappings(Config) -> ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( - create_bridge(Config), + create_bridge( + Config, + #{ + <<"consumer">> => #{<<"ack_deadline">> => <<"10m">>} + } + ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 40_000 ) @@ -1233,7 +1240,7 @@ t_multiple_topic_mappings(Config) -> ], Published ), - wait_acked(#{n => 2}), + ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 20_000), ?retry( _Interval = 200, _NAttempts = 20, @@ -1275,10 +1282,6 @@ t_multiple_pull_workers(Config) -> <<"ack_deadline">> => <<"10m">>, <<"ack_retry_interval">> => <<"1s">>, <<"consumer_workers_per_topic">> => NConsumers - }, - <<"resource_opts">> => #{ - %% reduce flakiness - <<"request_ttl">> => <<"20s">> } } ), diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index 96b87bbdf..75419570f 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -101,7 +101,14 @@ namespace() -> "bridge_redis". roots() -> []. fields(action_parameters) -> - [{command_template, fun command_template/1}]; + [ + command_template(), + {redis_type, + ?HOCON( + ?ENUM([single, sentinel, cluster]), + #{required => true, desc => ?DESC(redis_type)} + )} + ]; fields("post_single") -> method_fields(post, redis_single); fields("post_sentinel") -> @@ -147,8 +154,8 @@ method_fields(put, ConnectorType) -> redis_bridge_common_fields(Type) -> emqx_bridge_schema:common_bridge_fields() ++ [ - {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})} - | fields(action_parameters) + {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}, + command_template() ] ++ v1_resource_fields(Type). @@ -222,3 +229,6 @@ is_command_template_valid(CommandSegments) -> "the value of the field 'command_template' should be a nonempty " "list of strings (templates for Redis command and arguments)"} end. + +command_template() -> + {command_template, fun command_template/1}. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index 0fb043eda..086332658 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -76,13 +76,7 @@ fields(redis_action) -> ) ), [ResOpts] = emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts), - RedisType = - {redis_type, - ?HOCON( - ?ENUM([single, sentinel, cluster]), - #{required => true, desc => ?DESC(redis_type)} - )}, - [RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)]; + lists:keyreplace(resource_opts, 1, Schema, ResOpts); fields(action_resource_opts) -> emqx_bridge_v2_schema:resource_opts_fields([ {batch_size, #{desc => ?DESC(batch_size)}}, @@ -130,7 +124,7 @@ resource_opts_converter(Conf, _Opts) -> maps:map( fun(_Name, SubConf) -> case SubConf of - #{<<"redis_type">> := <<"cluster">>} -> + #{<<"parameters">> := #{<<"redis_type">> := <<"cluster">>}} -> ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}), %% cluster don't support batch SubConf#{ @@ -218,12 +212,12 @@ action_example(RedisType, get) -> ); action_example(RedisType, put) -> #{ - redis_type => RedisType, enable => true, connector => <<"my_connector_name">>, description => <<"My action">>, parameters => #{ - command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>] + command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], + redis_type => RedisType }, resource_opts => #{batch_size => 1} }. diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl index 556e28d1c..18cbc126d 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl @@ -229,7 +229,10 @@ action_config(Name, Path, ConnectorId) -> <<"enable">> => true, <<"connector">> => ConnectorId, <<"parameters">> => - #{<<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]}, + #{ + <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>], + <<"redis_type">> => atom_to_binary(RedisType) + }, <<"local_topic">> => <<"t/redis">>, <<"resource_opts">> => #{ <<"batch_size">> => 1, @@ -246,18 +249,9 @@ action_config(Name, Path, ConnectorId) -> <<"worker_pool_size">> => <<"1">> } }, - PerTypeCfg = per_type_action_config(RedisType), - InnerConfigMap0 = emqx_utils_maps:deep_merge(CommonCfg, PerTypeCfg), - InnerConfigMap = serde_roundtrip(InnerConfigMap0), + InnerConfigMap = serde_roundtrip(CommonCfg), parse_and_check_bridge_config(InnerConfigMap, Name). -per_type_action_config(single) -> - #{<<"redis_type">> => <<"single">>}; -per_type_action_config(sentinel) -> - #{<<"redis_type">> => <<"sentinel">>}; -per_type_action_config(cluster) -> - #{<<"redis_type">> => <<"cluster">>}. - %% check it serializes correctly serde_roundtrip(InnerConfigMap0) -> IOList = hocon_pp:do(InnerConfigMap0, #{}), diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index e452bc58e..140b008d1 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -188,7 +188,7 @@ gen_schema_json(Dir, SchemaModule, Lang) -> gen_preformat_md_json_files(Dir, StructsJsonArray, Lang) -> NestedStruct = reformat_schema_dump(StructsJsonArray), %% write to files - NestedJsonFile = filename:join([Dir, "schmea-v2-" ++ Lang ++ ".json"]), + NestedJsonFile = filename:join([Dir, "schema-v2-" ++ Lang ++ ".json"]), io:format(user, "===< Generating: ~s~n", [NestedJsonFile]), ok = file:write_file( NestedJsonFile, emqx_utils_json:encode(NestedStruct, [pretty, force_utf8]) diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b1970997f..07f5b034d 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -233,7 +233,10 @@ load_config(Bin, Opts) when is_binary(Bin) -> {error, Reason} end. -load_config_from_raw(RawConf, Opts) -> +load_config_from_raw(RawConf0, Opts) -> + SchemaMod = emqx_conf:schema_module(), + RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0), + RawConf = emqx_config:fill_defaults(RawConf1), case check_config(RawConf) of ok -> Error = @@ -452,8 +455,21 @@ sorted_fold(Func, Conf) -> Error -> {error, Error} end. -to_sorted_list(Conf) -> - lists:keysort(1, maps:to_list(Conf)). +to_sorted_list(Conf0) -> + %% connectors > actions/bridges > rule_engine + Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>], + {HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []), + HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)). + +split_high_priority_conf([], Conf0, Acc) -> + {lists:reverse(Acc), Conf0}; +split_high_priority_conf([Key | Keys], Conf0, Acc) -> + case maps:take(Key, Conf0) of + error -> + split_high_priority_conf(Keys, Conf0, Acc); + {Value, Conf1} -> + split_high_priority_conf(Keys, Conf1, [{Key, Value} | Acc]) + end. merge_conf(Key, NewConf) -> OldConf = emqx_conf:get_raw([Key]), diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index e15087c2e..eeb0d6de8 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -596,7 +596,7 @@ fields("node") -> #{ mapping => "mria.shard_transport", importance => ?IMPORTANCE_HIDDEN, - default => gen_rpc, + default => distr, desc => ?DESC(db_default_shard_transport) } )}, diff --git a/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl b/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl index 75acd091e..b3ecdd0cf 100644 --- a/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl @@ -40,7 +40,7 @@ t_load_config(Config) -> ConfBin = hocon_pp:do(#{<<"authorization">> => #{<<"sources">> => []}}, #{}), ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config), ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]), - ?assertEqual(#{<<"sources">> => []}, emqx_conf:get_raw([Authz])), + ?assertMatch(#{<<"sources">> := []}, emqx_conf:get_raw([Authz])), ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}), ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config), @@ -73,6 +73,10 @@ t_conflict_mix_conf(Config) -> AuthNInit = emqx_conf:get_raw([authentication]), Redis = #{ <<"backend">> => <<"redis">>, + <<"database">> => 0, + <<"password_hash_algorithm">> => + #{<<"name">> => <<"sha256">>, <<"salt_position">> => <<"prefix">>}, + <<"pool_size">> => 8, <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt">>, <<"enable">> => false, <<"mechanism">> => <<"password_based">>, @@ -85,10 +89,15 @@ t_conflict_mix_conf(Config) -> ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config), %% init with redis sources ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]), - ?assertMatch([Redis], emqx_conf:get_raw([authentication])), + [RedisRaw] = emqx_conf:get_raw([authentication]), + ?assertEqual( + maps:to_list(Redis), + maps:to_list(maps:remove(<<"ssl">>, RedisRaw)), + {Redis, RedisRaw} + ), %% change redis type from single to cluster %% the server field will become servers field - RedisCluster = maps:remove(<<"server">>, Redis#{ + RedisCluster = maps:without([<<"server">>, <<"database">>], Redis#{ <<"redis_type">> => cluster, <<"servers">> => [<<"127.0.0.1:6379">>] }), diff --git a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl index 45046ab2e..1bb42f324 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl @@ -315,8 +315,6 @@ t_none_ref(_Config) -> ), ok. -namespace() -> undefined. - t_sub_fields(_Config) -> Spec = #{ post => #{ @@ -815,6 +813,9 @@ to_schema(Body) -> post => #{requestBody => Body, responses => #{200 => <<"ok">>}} }. +%% Don't warning hocon callback namespace/0 undef. +namespace() -> atom_to_list(?MODULE). + fields(good_ref) -> [ {'webhook-host', mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:80">>})}, diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index aedb4b0fa..121cb4064 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -197,6 +197,10 @@ subscriptions(get, #{ case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of {error, not_found} -> return_http_error(404, "client process not found"); + {error, ignored} -> + return_http_error( + 400, "get subscriptions failed: unsupported" + ); {error, Reason} -> return_http_error(400, Reason); {ok, Subs} -> @@ -222,7 +226,13 @@ subscriptions(post, #{ ) of {error, not_found} -> - return_http_error(404, "client process not found"); + return_http_error( + 404, "client process not found" + ); + {error, ignored} -> + return_http_error( + 400, "subscribe failed: unsupported" + ); {error, Reason} -> return_http_error(400, Reason); {ok, {NTopic, NSubOpts}} -> @@ -241,8 +251,14 @@ subscriptions(delete, #{ with_gateway(Name0, fun(GwName, _) -> case lookup_topic(GwName, ClientId, Topic) of {ok, _} -> - _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), - {204}; + case emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic) of + {error, ignored} -> + return_http_error( + 400, "unsubscribe failed: unsupported" + ); + _ -> + {204} + end; {error, not_found} -> return_http_error(404, "Resource not found") end diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 677176acc..802bbb689 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -378,6 +378,8 @@ client_call(GwName, ClientId, Req) -> of undefined -> {error, not_found}; + ignored -> + {error, ignored}; Res -> Res catch diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index c0abb48ce..d51bf93a9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -174,7 +174,7 @@ fields(dtls_opts) -> reuse_sessions => true, versions => dtls_all_available }, - false + _IsRanchListener = false ). desc(gateway) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index c8cf979e3..47e1f7583 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -273,7 +273,7 @@ merge_default(Udp, Options) -> udp -> {udp_options, default_udp_options()}; dtls -> - {udp_options, default_udp_options()}; + {dtls_options, default_udp_options()}; tcp -> {tcp_options, default_tcp_options()}; ssl -> @@ -525,9 +525,11 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) -> udp -> Opts2#{udp_options => sock_opts(udp_options, Opts0)}; dtls -> + UDPOpts = sock_opts(udp_options, Opts0), + DTLSOpts = ssl_opts(dtls_options, Opts0), Opts2#{ - udp_options => sock_opts(udp_options, Opts0), - dtls_options => ssl_opts(dtls_options, Opts0) + udp_options => UDPOpts, + dtls_options => DTLSOpts } end ). @@ -541,12 +543,37 @@ sock_opts(Name, Opts) -> ). ssl_opts(Name, Opts) -> - Type = - case Name of - ssl_options -> tls; - dtls_options -> dtls - end, - emqx_tls_lib:to_server_opts(Type, maps:get(Name, Opts, #{})). + SSLOpts = maps:get(Name, Opts, #{}), + emqx_utils:run_fold( + [ + fun ssl_opts_crl_config/2, + fun ssl_opts_drop_unsupported/2, + fun ssl_server_opts/2 + ], + SSLOpts, + Name + ). + +ssl_opts_crl_config(#{enable_crl_check := true} = SSLOpts, _Name) -> + HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)), + NSSLOpts = maps:remove(enable_crl_check, SSLOpts), + NSSLOpts#{ + %% `crl_check => true' doesn't work + crl_check => peer, + crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}} + }; +ssl_opts_crl_config(SSLOpts, _Name) -> + %% NOTE: Removing this because DTLS doesn't like any unknown options. + maps:remove(enable_crl_check, SSLOpts). + +ssl_opts_drop_unsupported(SSLOpts, _Name) -> + %% TODO: Support OCSP stapling + maps:without([ocsp], SSLOpts). + +ssl_server_opts(SSLOpts, ssl_options) -> + emqx_tls_lib:to_server_opts(tls, SSLOpts); +ssl_server_opts(SSLOpts, dtls_options) -> + emqx_tls_lib:to_server_opts(dtls, SSLOpts). ranch_opts(Type, ListenOn, Opts) -> NumAcceptors = maps:get(acceptors, Opts, 4), @@ -635,7 +662,7 @@ default_tcp_options() -> ]. default_udp_options() -> - [binary]. + []. default_subopts() -> %% Retain Handling diff --git a/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl b/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl index 215302105..81ab2f368 100644 --- a/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl +++ b/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl @@ -238,9 +238,12 @@ http_authz_config() -> init_gateway_conf() -> ok = emqx_common_test_helpers:load_config( emqx_gateway_schema, - merge_conf([X:default_config() || X <- ?CONFS], []) + merge_conf(list_gateway_conf(), []) ). +list_gateway_conf() -> + [X:default_config() || X <- ?CONFS]. + merge_conf([Conf | T], Acc) -> case re:run(Conf, "\s*gateway\\.(.*)", [global, {capture, all_but_first, list}, dotall]) of {match, [[Content]]} -> diff --git a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl index 0072447b6..7495c5858 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl @@ -22,7 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]). +-import(emqx_gateway_auth_ct, [with_resource/3]). -define(checkMatch(Guard), (fun(Expr) -> @@ -54,40 +54,37 @@ groups() -> emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS). init_per_group(AuthName, Conf) -> - ct:pal("on group start:~p~n", [AuthName]), - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - emqx_gateway_auth_ct:start_auth(AuthName), - timer:sleep(500), - Conf. + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx_auth, + emqx_auth_http, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}, + {emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()} + | emqx_gateway_test_utils:all_gateway_apps() + ], + #{work_dir => emqx_cth_suite:work_dir(Conf)} + ), + _ = emqx_common_test_http:create_default_app(), + ok = emqx_gateway_auth_ct:start_auth(AuthName), + [{group_apps, Apps} | Conf]. end_per_group(AuthName, Conf) -> - ct:pal("on group stop:~p~n", [AuthName]), - emqx_gateway_auth_ct:stop_auth(AuthName), + ok = emqx_gateway_auth_ct:stop_auth(AuthName), + _ = emqx_common_test_http:delete_default_app(), + ok = emqx_cth_suite:stop(?config(group_apps, Conf)), Conf. init_per_suite(Config) -> - emqx_gateway_test_utils:load_all_gateway_apps(), - emqx_config:erase(gateway), - init_gateway_conf(), - emqx_mgmt_api_test_util:init_suite([grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway]), - application:ensure_all_started(cowboy), - emqx_gateway_auth_ct:start(), - timer:sleep(500), - Config. + {ok, Apps1} = application:ensure_all_started(grpc), + {ok, Apps2} = application:ensure_all_started(cowboy), + {ok, _} = emqx_gateway_auth_ct:start(), + [{suite_apps, Apps1 ++ Apps2} | Config]. end_per_suite(Config) -> - emqx_gateway_auth_ct:stop(), - emqx_config:erase(gateway), - emqx_mgmt_api_test_util:end_suite([ - cowboy, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway, grpc - ]), - Config. - -init_per_testcase(_Case, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - Config. - -end_per_testcase(_Case, Config) -> + ok = emqx_gateway_auth_ct:stop(), + ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)), Config. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl index dd149133b..9ae464ebb 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl @@ -22,7 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]). +-import(emqx_gateway_auth_ct, [with_resource/3]). -define(checkMatch(Guard), (fun(Expr) -> @@ -54,44 +54,33 @@ groups() -> emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS). init_per_group(AuthName, Conf) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - ok = emqx_authz_test_lib:reset_authorizers(), - emqx_gateway_auth_ct:start_auth(AuthName), - timer:sleep(500), - Conf. + Apps = emqx_cth_suite:start( + [ + {emqx_conf, "authorization { no_match = deny, cache { enable = false } }"}, + emqx_auth, + emqx_auth_http, + {emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()} + | emqx_gateway_test_utils:all_gateway_apps() + ], + #{work_dir => emqx_cth_suite:work_dir(Conf)} + ), + ok = emqx_gateway_auth_ct:start_auth(AuthName), + [{group_apps, Apps} | Conf]. end_per_group(AuthName, Conf) -> - emqx_gateway_auth_ct:stop_auth(AuthName), + ok = emqx_gateway_auth_ct:stop_auth(AuthName), + ok = emqx_cth_suite:stop(?config(group_apps, Conf)), Conf. init_per_suite(Config) -> - emqx_config:erase(gateway), - emqx_gateway_test_utils:load_all_gateway_apps(), - init_gateway_conf(), - emqx_mgmt_api_test_util:init_suite([ - grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway - ]), - meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_authz_file, create, fun(S) -> S end), - application:ensure_all_started(cowboy), - emqx_gateway_auth_ct:start(), - Config. + {ok, Apps1} = application:ensure_all_started(grpc), + {ok, Apps2} = application:ensure_all_started(cowboy), + {ok, _} = emqx_gateway_auth_ct:start(), + [{suite_apps, Apps1 ++ Apps2} | Config]. end_per_suite(Config) -> - meck:unload(emqx_authz_file), - emqx_gateway_auth_ct:stop(), - ok = emqx_authz_test_lib:restore_authorizers(), - emqx_config:erase(gateway), - emqx_mgmt_api_test_util:end_suite([ - emqx_gateway, emqx_auth_http, emqx_auth, emqx_conf, grpc - ]), - Config. - -init_per_testcase(_Case, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - Config. - -end_per_testcase(_Case, Config) -> + ok = emqx_gateway_auth_ct:stop(), + ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)), Config. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl index 950ae1bcf..2e8be5119 100644 --- a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl +++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl @@ -103,12 +103,18 @@ assert_fields_exist(Ks, Map) -> end, Ks ). + load_all_gateway_apps() -> - application:load(emqx_gateway_stomp), - application:load(emqx_gateway_mqttsn), - application:load(emqx_gateway_coap), - application:load(emqx_gateway_lwm2m), - application:load(emqx_gateway_exproto). + emqx_cth_suite:load_apps(all_gateway_apps()). + +all_gateway_apps() -> + [ + emqx_gateway_stomp, + emqx_gateway_mqttsn, + emqx_gateway_coap, + emqx_gateway_lwm2m, + emqx_gateway_exproto + ]. %%-------------------------------------------------------------------- %% http diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl index 83e83f4d6..b6d2a6257 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). --include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -44,14 +43,6 @@ -define(TCPOPTS, [binary, {active, false}]). -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). --define(PORT, 7993). - --define(DEFAULT_CLIENT, #{ - proto_name => <<"demo">>, - proto_ver => <<"v0.1">>, - clientid => <<"test_client_1">> -}). - %%-------------------------------------------------------------------- -define(CONF_DEFAULT, << "\n" @@ -126,15 +117,33 @@ init_per_group(_, Cfg) -> init_per_group(LisType, ServiceName, Scheme, Cfg) -> Svrs = emqx_exproto_echo_svr:start(Scheme), - application:load(emqx_gateway_exproto), - emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_auth, emqx_gateway], - fun(App) -> - set_special_cfg(App, LisType, ServiceName, Scheme) - end + Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])), + GWConfig = #{ + server => #{bind => 9100}, + idle_timeout => 5000, + mountpoint => <<"ct/">>, + handler => #{ + address => Addrs, + service_name => ServiceName, + ssl_options => #{enable => Scheme == https} + }, + listeners => listener_confs(LisType) + }, + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx_auth, + {emqx_gateway, #{ + config => + #{gateway => #{exproto => GWConfig}} + }}, + emqx_gateway_exproto + ], + #{work_dir => emqx_cth_suite:work_dir(Cfg)} ), [ {servers, Svrs}, + {apps, Apps}, {listener_type, LisType}, {service_name, ServiceName}, {grpc_client_scheme, Scheme} @@ -142,8 +151,7 @@ init_per_group(LisType, ServiceName, Scheme, Cfg) -> ]. end_per_group(_, Cfg) -> - emqx_config:erase(gateway), - emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_auth, emqx_conf]), + ok = emqx_cth_suite:stop(proplists:get_value(apps, Cfg)), emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). init_per_testcase(TestCase, Cfg) when @@ -159,28 +167,13 @@ init_per_testcase(_TestCase, Cfg) -> end_per_testcase(_TestCase, _Cfg) -> ok. -set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) -> - Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])), - emqx_config:put( - [gateway, exproto], - #{ - server => #{bind => 9100}, - idle_timeout => 5000, - mountpoint => <<"ct/">>, - handler => #{ - address => Addrs, - service_name => ServiceName, - ssl_options => #{enable => Scheme == https} - }, - listeners => listener_confs(LisType) - } - ); -set_special_cfg(_, _, _, _) -> - ok. - listener_confs(Type) -> - Default = #{bind => 7993, acceptors => 8}, - #{Type => #{'default' => maps:merge(Default, server_socketopts(Type))}}. + Default = #{ + bind => 7993, + max_connections => 64, + access_rules => ["allow all"] + }, + #{Type => #{'default' => maps:merge(Default, socketopts(Type))}}. default_config() -> ?CONF_DEFAULT. @@ -635,24 +628,29 @@ close({dtls, Sock}) -> %%-------------------------------------------------------------------- %% Server-Opts -server_socketopts(tcp) -> - #{tcp_options => server_tcp_opts()}; -server_socketopts(ssl) -> +socketopts(tcp) -> #{ - tcp_options => server_tcp_opts(), - ssl_options => server_ssl_opts() + acceptors => 8, + tcp_options => tcp_opts() }; -server_socketopts(udp) -> - #{udp_options => server_udp_opts()}; -server_socketopts(dtls) -> +socketopts(ssl) -> #{ - udp_options => server_udp_opts(), - dtls_options => server_dtls_opts() + acceptors => 8, + tcp_options => tcp_opts(), + ssl_options => ssl_opts() + }; +socketopts(udp) -> + #{udp_options => udp_opts()}; +socketopts(dtls) -> + #{ + acceptors => 8, + udp_options => udp_opts(), + dtls_options => dtls_opts() }. -server_tcp_opts() -> +tcp_opts() -> maps:merge( - server_udp_opts(), + udp_opts(), #{ send_timeout => 15000, send_timeout_close => true, @@ -661,15 +659,17 @@ server_tcp_opts() -> } ). -server_udp_opts() -> +udp_opts() -> #{ - recbuf => 1024, - sndbuf => 1024, - buffer => 1024, + %% NOTE + %% Making those too small will lead to inability to accept connections. + recbuf => 2048, + sndbuf => 2048, + buffer => 2048, reuseaddr => true }. -server_ssl_opts() -> +ssl_opts() -> Certs = certs("key.pem", "cert.pem", "cacert.pem"), maps:merge( Certs, @@ -684,8 +684,8 @@ server_ssl_opts() -> } ). -server_dtls_opts() -> - maps:merge(server_ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}). +dtls_opts() -> + maps:merge(ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}). %%-------------------------------------------------------------------- %% Client-Opts diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl index 0c4c4e6bf..25d8bcc51 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl @@ -66,7 +66,6 @@ -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(CONF_DEFAULT, << - "\n" "gateway.mqttsn {\n" " gateway_id = 1\n" " broadcast = true\n" @@ -89,6 +88,20 @@ "}\n" >>). +-define(CONF_DTLS, << + "\n" + "gateway.mqttsn {" + " listeners.dtls.default {\n" + " bind = 1885\n" + " dtls_options {\n" + " cacertfile = \"${cacertfile}\"\n" + " certfile = \"${certfile}\"\n" + " keyfile = \"${keyfile}\"\n" + " }\n" + " }\n" + "}\n" +>>). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -97,9 +110,22 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + PrivDir = ?config(priv_dir, Config), + Root = emqx_cth_tls:gen_cert(#{key => ec, issuer => root}), + Server = emqx_cth_tls:gen_cert(#{key => ec, issuer => Root}), + {CACertfile, _} = emqx_cth_tls:write_cert(PrivDir, Root), + {Certfile, Keyfile} = emqx_cth_tls:write_cert(PrivDir, Server), + Conf = emqx_template:render_strict( + emqx_template:parse([?CONF_DEFAULT, ?CONF_DTLS]), + #{ + cacertfile => CACertfile, + certfile => Certfile, + keyfile => Keyfile + } + ), Apps = emqx_cth_suite:start( [ - {emqx_conf, ?CONF_DEFAULT}, + {emqx_conf, Conf}, emqx_gateway, emqx_auth, emqx_management, @@ -108,7 +134,7 @@ init_per_suite(Config) -> #{work_dir => emqx_cth_suite:work_dir(Config)} ), emqx_common_test_http:create_default_app(), - [{suite_apps, Apps} | Config]. + [{suite_apps, Apps}, {cacertfile, CACertfile} | Config]. end_per_suite(Config) -> {ok, _} = emqx:remove_config([gateway, mqttsn]), @@ -191,6 +217,25 @@ t_first_disconnect(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_connect_dtls(Config) -> + SockName = {'mqttsn:dtls:default', 1885}, + ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), + + ClientOpts = [ + binary, + {active, false}, + {protocol, dtls}, + {cacertfile, ?config(cacertfile, Config)} + | emqx_common_test_helpers:ssl_verify_fun_allow_any_host() + ], + {ok, Socket} = ssl:connect(?HOST, 1885, ClientOpts, 1000), + ok = ssl:send(Socket, make_connect_msg(<<"client_id_test1">>, 1)), + ?assertEqual({ok, <<3, ?SN_CONNACK, 0>>}, ssl:recv(Socket, 0, 1000)), + + ok = ssl:send(Socket, make_disconnect_msg(undefined)), + ?assertEqual({ok, <<2, ?SN_DISCONNECT>>}, ssl:recv(Socket, 0, 1000)), + ssl:close(Socket). + t_subscribe(_) -> Dup = 0, QoS = 0, @@ -2444,10 +2489,7 @@ send_searchgw_msg(Socket) -> Radius = 0, ok = gen_udp:send(Socket, ?HOST, ?PORT, <>). -send_connect_msg(Socket, ClientId) -> - send_connect_msg(Socket, ClientId, 1). - -send_connect_msg(Socket, ClientId, CleanSession) when +make_connect_msg(ClientId, CleanSession) when CleanSession == 0; CleanSession == 1 -> @@ -2460,9 +2502,14 @@ send_connect_msg(Socket, ClientId, CleanSession) when TopicIdType = 0, ProtocolId = 1, Duration = 10, - Packet = - <>, + <>. + +send_connect_msg(Socket, ClientId) -> + send_connect_msg(Socket, ClientId, 1). + +send_connect_msg(Socket, ClientId, CleanSession) -> + Packet = make_connect_msg(ClientId, CleanSession), ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_connect_msg_with_will(Socket, Duration, ClientId) -> @@ -2724,15 +2771,17 @@ send_pingreq_msg(Socket, ClientId) -> ?LOG("send_pingreq_msg ClientId=~p", [ClientId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket). -send_disconnect_msg(Socket, Duration) -> +make_disconnect_msg(Duration) -> Length = 2, Length2 = 4, MsgType = ?SN_DISCONNECT, - DisConnectPacket = - case Duration of - undefined -> <>; - Other -> <> - end, + case Duration of + undefined -> <>; + Other -> <> + end. + +send_disconnect_msg(Socket, Duration) -> + DisConnectPacket = make_disconnect_msg(Duration), ?LOG("send_disconnect_msg Duration=~p", [Duration]), ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket). diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index 0b9f864a3..3f5fe8f70 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -186,10 +186,10 @@ info(timers, #channel{timers = Timers}) -> -spec stats(channel()) -> emqx_types:stats(). stats(#channel{mqueue = MQueue}) -> - %% XXX: + %% XXX: A fake stats for managed by emqx_management SessionStats = [ - {subscriptions_cnt, 0}, - {subscriptions_max, 0}, + {subscriptions_cnt, 1}, + {subscriptions_max, 1}, {inflight_cnt, 0}, {inflight_max, 0}, {mqueue_len, queue:len(MQueue)}, @@ -524,9 +524,13 @@ handle_out(Type, Data, Channel) -> %%-------------------------------------------------------------------- apply_frame(Frames, Channel) when is_list(Frames) -> - {Outgoings, NChannel} = lists:foldl(fun apply_frame/2, {[], Channel}, Frames), + {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames), {lists:reverse(Outgoings), NChannel}; -apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> +apply_frame(Frames, Channel) -> + ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}), + Channel. + +do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> case maps:get(<<"status">>, Payload) of <<"Accepted">> -> Intv = maps:get(<<"interval">>, Payload), @@ -535,8 +539,9 @@ apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> _ -> {Outgoings, Channel} end; -apply_frame(_, Channel) -> - Channel. +do_apply_frame(Frame, Acc = {_Outgoings, Channel}) -> + ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}), + Acc. %%-------------------------------------------------------------------- %% Handle call diff --git a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl index f1198fe1f..29f08f78e 100644 --- a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl +++ b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl @@ -33,27 +33,27 @@ -define(HEARTBEAT, <<$\n>>). --define(CONF_DEFAULT, << - "\n" - "gateway.ocpp {\n" - " mountpoint = \"ocpp/\"\n" - " default_heartbeat_interval = \"60s\"\n" - " heartbeat_checking_times_backoff = 1\n" - " message_format_checking = disable\n" - " upstream {\n" - " topic = \"cp/${clientid}\"\n" - " reply_topic = \"cp/${clientid}/Reply\"\n" - " error_topic = \"cp/${clientid}/Reply\"\n" - " }\n" - " dnstream {\n" - " topic = \"cs/${clientid}\"\n" - " }\n" - " listeners.ws.default {\n" - " bind = \"0.0.0.0:33033\"\n" - " websocket.path = \"/ocpp\"\n" - " }\n" - "}\n" ->>). +%% erlfmt-ignore +-define(CONF_DEFAULT, <<" + gateway.ocpp { + mountpoint = \"ocpp/\" + default_heartbeat_interval = \"60s\" + heartbeat_checking_times_backoff = 1 + message_format_checking = disable + upstream { + topic = \"cp/${clientid}\" + reply_topic = \"cp/${clientid}/Reply\" + error_topic = \"cp/${clientid}/Reply\" + } + dnstream { + topic = \"cs/${clientid}\" + } + listeners.ws.default { + bind = \"0.0.0.0:33033\" + websocket.path = \"/ocpp\" + } + } +">>). all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 2c1225c77..ed50ecba4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -32,11 +33,15 @@ end_per_suite(_) -> init_per_testcase(TestCase = t_configs_node, Config) -> ?MODULE:TestCase({'init', Config}); +init_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) -> + ?MODULE:TestCase({'init', Config}); init_per_testcase(_TestCase, Config) -> Config. end_per_testcase(TestCase = t_configs_node, Config) -> ?MODULE:TestCase({'end', Config}); +end_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) -> + ?MODULE:TestCase({'end', Config}); end_per_testcase(_TestCase, Config) -> Config. @@ -372,6 +377,100 @@ t_get_configs_in_different_accept(_Config) -> %% returns error if it set to other type ?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)). +t_create_webhook_v1_bridges_api({'init', Config}) -> + application:ensure_all_started(emqx_connector), + application:ensure_all_started(emqx_bridge), + Config; +t_create_webhook_v1_bridges_api({'end', _}) -> + application:stop(emqx_bridge), + application:stop(emqx_connector), + ok; +t_create_webhook_v1_bridges_api(Config) -> + WebHookFile = filename:join(?config(data_dir, Config), "webhook_v1.conf"), + ?assertMatch({ok, _}, hocon:files([WebHookFile])), + {ok, WebHookBin} = file:read_file(WebHookFile), + ?assertEqual([], update_configs_with_binary(WebHookBin)), + Actions = + #{ + <<"http">> => + #{ + <<"webhook_name">> => + #{ + <<"connector">> => <<"connector_webhook_name">>, + <<"description">> => <<>>, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"body">> => <<"{\"value\": \"${value}\"}">>, + <<"headers">> => #{}, + <<"max_retries">> => 3, + <<"method">> => <<"post">>, + <<"path">> => <<>> + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"async">>, + <<"request_ttl">> => <<"45s">>, + <<"worker_pool_size">> => 4 + } + } + } + }, + ?assertEqual(Actions, emqx_conf:get_raw([<<"actions">>])), + Connectors = + #{ + <<"http">> => + #{ + <<"connector_webhook_name">> => + #{ + <<"connect_timeout">> => <<"15s">>, + <<"description">> => <<>>, + <<"enable">> => true, + <<"enable_pipelining">> => 100, + <<"headers">> => + #{ + <<"Authorization">> => <<"Bearer redacted">>, + <<"content-type">> => <<"application/json">> + }, + <<"pool_size">> => 4, + <<"pool_type">> => <<"random">>, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => true, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"user_lookup_fun">> => + <<"emqx_tls_psk:lookup">>, + <<"verify">> => <<"verify_none">>, + <<"versions">> => + [ + <<"tlsv1.3">>, + <<"tlsv1.2">>, + <<"tlsv1.1">>, + <<"tlsv1">> + ] + }, + <<"url">> => <<"https://127.0.0.1:18083">> + } + } + }, + ?assertEqual(Connectors, emqx_conf:get_raw([<<"connectors">>])), + ?assertEqual(#{<<"webhook">> => #{}}, emqx_conf:get_raw([<<"bridges">>])), + ok. + %% Helpers get_config(Name) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE_data/webhook_v1.conf b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE_data/webhook_v1.conf new file mode 100644 index 000000000..c6fd6d796 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE_data/webhook_v1.conf @@ -0,0 +1,36 @@ +bridges { + webhook { + webhook_name { + body = "{\"value\": \"${value}\"}" + connect_timeout = "15s" + enable = true + enable_pipelining = 100 + headers {Authorization = "Bearer redacted", "content-type" = "application/json"} + max_retries = 3 + method = "post" + pool_size = 4 + pool_type = "random" + request_timeout = "15s" + resource_opts { + async_inflight_window = 100 + auto_restart_interval = "60s" + enable_queue = false + health_check_interval = "15s" + max_queue_bytes = "1GB" + query_mode = "async" + worker_pool_size = 4 + } + ssl { + ciphers = [] + depth = 10 + enable = true + reuse_sessions = true + secure_renegotiate = true + user_lookup_fun = "emqx_tls_psk:lookup" + verify = "verify_none" + versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] + } + url = "https://127.0.0.1:18083" + } +} +} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index afa57dfac..70a7fc32c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -621,24 +621,36 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul BridgeIDs0 = lists:map( fun(BridgeID) -> - emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}) + %% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it + %% returns v1 types when attempting to "upgrade"..... + {Type, Name} = + emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}), + case emqx_action_info:is_action_type(Type) of + true -> {action, Type, Name}; + false -> {bridge_v1, Type, Name} + end end, get_referenced_hookpoints(Froms) ), BridgeIDs1 = lists:filtermap( fun - ({bridge_v2, Type, Name}) -> {true, {Type, Name}}; - ({bridge, Type, Name, _ResId}) -> {true, {Type, Name}}; + ({bridge_v2, Type, Name}) -> {true, {action, Type, Name}}; + ({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}}; (_) -> false end, Actions ), NonExistentBridgeIDs = lists:filter( - fun({Type, Name}) -> + fun({Kind, Type, Name}) -> + LookupFn = + case Kind of + action -> fun emqx_bridge_v2:lookup/2; + bridge_v1 -> fun emqx_bridge:lookup/2 + end, try - case emqx_bridge:lookup(Type, Name) of + case LookupFn(Type, Name) of {ok, _} -> false; {error, _} -> true end diff --git a/changes/ce/feat-12201.en.md b/changes/ce/feat-12201.en.md new file mode 100644 index 000000000..4247ec6e9 --- /dev/null +++ b/changes/ce/feat-12201.en.md @@ -0,0 +1,10 @@ +Support hot update of TCP/SSL/WS/WSS MQTT listeners configuration, which allows changing most of the configuration parameters without restarting the listener and disconnecting the clients. + +In case of TCP/SSL listeners, changes to the following parameters still require full listener restart: + * `bind` + * `tcp_options.backlog` + +In case of WS/WSS listeners, any parameter can be freely changed without losing the connected clients. However, changing transport related parameters will cause listening socket to be re-opened, namely: + * `bind` + * `tcp_options.*` + * `ssl_options.*` diff --git a/changes/ce/fix-12081.en.md b/changes/ce/fix-12081.en.md index 2e75b8dcf..d66a6a6c7 100644 --- a/changes/ce/fix-12081.en.md +++ b/changes/ce/fix-12081.en.md @@ -1,7 +1,9 @@ -Updated `gen_rpc` library to version 3.3.0. The new version includes +Updated `gen_rpc` library to version 3.3.1. The new version includes several performance improvements: - Avoid allocating extra memory for the packets before they are sent to the wire in some cases - Bypass network for the local calls + +- Avoid senstive data leaking in debug logs [#12202](https://github.com/emqx/emqx/pull/12202) diff --git a/changes/ce/fix-12180.en.md b/changes/ce/fix-12180.en.md new file mode 100644 index 000000000..3ded8a507 --- /dev/null +++ b/changes/ce/fix-12180.en.md @@ -0,0 +1 @@ +Fix an issue where DTLS enabled MQTT-SN gateways could not be started, caused by incompatibility of default listener configuration with the DTLS implementation. diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index ce9a1fb99..c60246f69 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.4.0-alpha.2 +version: 5.4.0-rc.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.4.0-alpha.2 +appVersion: 5.4.0-rc.1 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 6856b9651..7d2982c39 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.4.0-alpha.2 +version: 5.4.0-rc.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.4.0-alpha.2 +appVersion: 5.4.0-rc.1 diff --git a/mix.exs b/mix.exs index 0e387884b..dc196b5dd 100644 --- a/mix.exs +++ b/mix.exs @@ -53,10 +53,10 @@ defmodule EMQXUmbrella.MixProject do {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, - {:esockd, github: "emqx/esockd", tag: "5.9.9", override: true}, + {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, {:ekka, github: "emqx/ekka", tag: "0.17.0", override: true}, - {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true}, + {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.15", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true}, diff --git a/rebar.config b/rebar.config index 6e5cad59f..d7283b9f7 100644 --- a/rebar.config +++ b/rebar.config @@ -69,10 +69,10 @@ , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}} - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}} + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}} diff --git a/rel/i18n/emqx_bridge_redis.hocon b/rel/i18n/emqx_bridge_redis.hocon index 03831b02f..5c8a4a941 100644 --- a/rel/i18n/emqx_bridge_redis.hocon +++ b/rel/i18n/emqx_bridge_redis.hocon @@ -1,5 +1,12 @@ emqx_bridge_redis { +redis_type.label: +"""Redis Type""" +redis_type.desc: +"""Single mode. Must be set to 'single' when Redis server is running in single mode. +Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode. +Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode.""" + command_template.desc: """Redis command template used to export messages. Each list element stands for a command name or its argument. For example, to push payloads in a Redis list by key `msgs`, the elements should be the following: diff --git a/rel/i18n/emqx_bridge_redis_schema.hocon b/rel/i18n/emqx_bridge_redis_schema.hocon index 861c0c185..65c8a7ae3 100644 --- a/rel/i18n/emqx_bridge_redis_schema.hocon +++ b/rel/i18n/emqx_bridge_redis_schema.hocon @@ -10,13 +10,6 @@ producer_action.desc: producer_action.label: """Action Parameters""" -redis_type.label: -"""Redis Type""" -redis_type.desc: -"""Single mode. Must be set to 'single' when Redis server is running in single mode. -Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode. -Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode.""" - batch_size.label: """Batch Size""" batch_size.desc: