Merge pull request #12201 from keynslug/feat/EMQX-11527/tls-hot-update

feat(listen): support hot config update of MQTT listeners
This commit is contained in:
Andrew Mayorov 2023-12-20 09:45:12 +01:00 committed by GitHub
commit d3b32b64e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 516 additions and 348 deletions

View File

@ -83,6 +83,28 @@
end)()
).
-define(assertExceptionOneOf(CT1, CT2, EXPR),
(fun() ->
X__Attrs = [
{module, ?MODULE},
{line, ?LINE},
{expression, (??EXPR)},
{pattern, "[ " ++ (??CT1) ++ ", " ++ (??CT2) ++ " ]"}
],
X__Exc =
try (EXPR) of
X__V -> erlang:error({assertException, [{unexpected_success, X__V} | X__Attrs]})
catch
X__C:X__T:X__S -> {X__C, X__T, X__S}
end,
case {element(1, X__Exc), element(2, X__Exc)} of
CT1 -> ok;
CT2 -> ok;
_ -> erlang:error({assertException, [{unexpected_exception, X__Exc} | X__Attrs]})
end
end)()
).
-define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
__TEST_CASE = ?FUNCTION_NAME,
(fun

View File

@ -27,7 +27,7 @@
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.3"}}},

View File

@ -55,7 +55,6 @@
]).
-export([pre_config_update/3, post_config_update/5]).
-export([create_listener/3, remove_listener/3, update_listener/3]).
-export([format_bind/1]).
@ -66,6 +65,11 @@
-export_type([listener_id/0]).
-type listener_id() :: atom() | binary().
-type listener_type() :: tcp | ssl | ws | wss | quic | dtls.
-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)).
-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)).
-define(ROOT_KEY, listeners).
-define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']).
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) ->
-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
is_running(ListenerId) ->
case
[
Running
|| {Id, #{running := Running}} <- list(),
Id =:= ListenerId
]
of
[] -> {error, not_found};
[IsRunning] -> IsRunning
case lists:keyfind(ListenerId, 1, list()) of
{_Id, #{running := Running}} -> Running;
false -> {error, not_found}
end.
is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
@ -229,24 +227,26 @@ start() ->
start_listener(ListenerId) ->
apply_on_listener(ListenerId, fun start_listener/3).
-spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
case do_start_listener(Type, ListenerName, Conf) of
-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) ->
ListenerId = listener_id(Type, Name),
Limiter = limiter(Conf),
ok = add_limiter_bucket(ListenerId, Limiter),
case do_start_listener(Type, Name, ListenerId, Conf) of
{ok, {skipped, Reason}} when
Reason =:= listener_disabled;
Reason =:= quic_app_missing
->
?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
console_print(
"Listener ~ts is NOT started due to: ~p.~n",
[listener_id(Type, ListenerName), Reason]
[ListenerId, Reason]
),
ok;
{ok, _} ->
?tp(listener_started, #{type => Type, bind => Bind}),
console_print(
"Listener ~ts on ~ts started.~n",
[listener_id(Type, ListenerName), format_bind(Bind)]
[ListenerId, format_bind(Bind)]
),
ok;
{error, {already_started, Pid}} ->
@ -255,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
}),
{error, {already_started, Pid}};
{error, Reason} ->
ok = del_limiter_bucket(ListenerId, Limiter),
?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
ListenerId = listener_id(Type, ListenerName),
BindStr = format_bind(Bind),
?ELOG(
"Failed to start listener ~ts on ~ts: ~0p.~n",
@ -269,7 +269,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
)
),
{error, {failed_to_start, Msg}}
end.
end;
start_listener(Type, Name, #{enable := false}) ->
console_print(
"Listener ~ts is NOT started due to: disabled.~n",
[listener_id(Type, Name)]
),
ok.
%% @doc Restart all listeners
-spec restart() -> ok.
@ -280,16 +286,33 @@ restart() ->
restart_listener(ListenerId) ->
apply_on_listener(ListenerId, fun restart_listener/3).
-spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}.
restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
restart_listener(Type, ListenerName, OldConf, NewConf);
-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
restart_listener(Type, ListenerName, Conf) ->
restart_listener(Type, ListenerName, Conf, Conf).
restart_listener(Type, ListenerName, OldConf, NewConf) ->
case stop_listener(Type, ListenerName, OldConf) of
ok -> start_listener(Type, ListenerName, NewConf);
{error, Reason} -> {error, Reason}
update_listener(_Type, _Name, #{enable := false}, #{enable := false}) ->
ok;
update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) ->
stop_listener(Type, Name, Conf);
update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) ->
start_listener(Type, Name, Conf);
update_listener(Type, Name, OldConf, NewConf) ->
Id = listener_id(Type, Name),
ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)),
case do_update_listener(Type, Name, OldConf, NewConf) of
ok ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
ok;
{error, _Reason} ->
restart_listener(Type, Name, OldConf, NewConf)
end.
restart_listener(Type, Name, OldConf, NewConf) ->
case stop_listener(Type, Name, OldConf) of
ok ->
start_listener(Type, Name, NewConf);
{error, Reason} ->
{error, Reason}
end.
%% @doc Stop all listeners.
@ -305,9 +328,10 @@ stop() ->
stop_listener(ListenerId) ->
apply_on_listener(ListenerId, fun stop_listener/3).
stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
Id = listener_id(Type, ListenerName),
ok = del_limiter_bucket(Id, Conf),
stop_listener(Type, Name, #{bind := Bind} = Conf) ->
Id = listener_id(Type, Name),
ok = del_limiter_bucket(Id, limiter(Conf)),
ok = unregister_ocsp_stapling_refresh(Type, Name),
case do_stop_listener(Type, Id, Conf) of
ok ->
console_print(
@ -325,11 +349,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
{error, Reason}
end.
-spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) ->
esockd:close(Id, ListenOn);
do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss ->
do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
case cowboy:stop_listener(Id) of
ok ->
wait_listener_stopped(ListenOn);
@ -369,45 +392,25 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
console_print(_Fmt, _Args) -> ok.
-endif.
%% Start MQTT/TCP listener
-spec do_start_listener(atom(), atom(), map()) ->
-spec do_start_listener(listener_type(), atom(), listener_id(), map()) ->
{ok, pid() | {skipped, atom()}} | {error, term()}.
do_start_listener(_Type, _ListenerName, #{enable := false}) ->
{ok, {skipped, listener_disabled}};
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
Type == tcp; Type == ssl
->
Id = listener_id(Type, ListenerName),
Limiter = limiter(Opts),
add_limiter_bucket(Id, Limiter),
%% Start MQTT/TCP listener
do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
esockd:open(
Id,
ListenOn,
merge_default(esockd_opts(Id, Type, Opts)),
{emqx_connection, start_link, [
#{
listener => {Type, ListenerName},
zone => zone(Opts),
limiter => Limiter,
enable_authn => enable_authn(Opts)
}
]}
merge_default(esockd_opts(Id, Type, Name, Opts))
);
%% Start MQTT/WS listener
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
Type == ws; Type == wss
->
Id = listener_id(Type, ListenerName),
Limiter = limiter(Opts),
add_limiter_bucket(Id, Limiter),
RanchOpts = ranch_opts(Type, ListenOn, Opts),
WsOpts = ws_opts(Type, ListenerName, Opts, Limiter),
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
RanchOpts = ranch_opts(Type, Opts),
WsOpts = ws_opts(Type, Name, Opts),
case Type of
ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
end;
%% Start MQTT/QUIC listener
do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
ListenOn =
case Bind of
{Addr, Port} when tuple_size(Addr) == 4 ->
@ -457,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
zone => zone(Opts),
listener => {quic, ListenerName},
listener => {quic, Name},
limiter => Limiter
},
StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},
Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Limiter),
quicer:spawn_listener(
Id,
ListenOn,
@ -476,6 +476,39 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
{ok, {skipped, quic_app_missing}}
end.
do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
?ESOCKD_LISTENER(Type)
->
Id = listener_id(Type, Name),
case maps:get(bind, OldConf) of
ListenOn ->
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
{error, not_supported}
end;
do_update_listener(Type, Name, OldConf, NewConf) when
?COWBOY_LISTENER(Type)
->
Id = listener_id(Type, Name),
RanchOpts = ranch_opts(Type, NewConf),
WsOpts = ws_opts(Type, Name, NewConf),
case ranch_opts(Type, OldConf) of
RanchOpts ->
%% Transport options did not change, no need to touch the listener.
ok;
_Different ->
%% Transport options changed, we need to tear down the listener.
ok = ranch:suspend_listener(Id),
ok = ranch:set_transport_options(Id, RanchOpts)
end,
ok = ranch:set_protocol_options(Id, WsOpts),
%% No-op if the listener was not suspended.
ranch:resume_listener(Id);
do_update_listener(_Type, _Name, _OldConf, _NewConf) ->
{error, not_supported}.
%% Update the listeners at runtime
pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when
V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
@ -501,69 +534,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when
OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE
->
create_listener(Type, Name, NewConf);
post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
update_listener(Type, Name, {OldConf, NewConf});
post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) ->
remove_listener(Type, Name, OldConf);
post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
#{enable := NewEnabled} = NewConf,
#{enable := OldEnabled} = OldConf,
case {NewEnabled, OldEnabled} of
{true, true} ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
restart_listener(Type, Name, {OldConf, NewConf});
{true, false} ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
start_listener(Type, Name, NewConf);
{false, true} ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
update_listener(Type, Name, OldConf, NewConf);
post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) ->
stop_listener(Type, Name, OldConf);
{false, false} ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
stop_listener(Type, Name, OldConf)
end;
post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
update_listener(Type, Name, OldConf, NewConf);
post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) ->
ok;
post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf),
Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed),
perform_listener_changes([
{fun ?MODULE:remove_listener/3, Removed},
{fun ?MODULE:update_listener/3, Updated},
{fun ?MODULE:create_listener/3, Added}
]);
%% TODO
%% This currently lacks transactional semantics. If one of the changes fails,
%% previous changes will not be rolled back.
perform_listener_changes(
[{update, L} || L <- Changed] ++
[{stop, L} || L <- Removed] ++
[{start, L} || L <- Added]
);
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
ok.
create_listener(Type, Name, NewConf) ->
start_listener(Type, Name, NewConf).
remove_listener(Type, Name, OldConf) ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
stop_listener(Type, Name, OldConf).
update_listener(Type, Name, {OldConf, NewConf}) ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
restart_listener(Type, Name, {OldConf, NewConf}).
perform_listener_changes([]) ->
ok;
perform_listener_changes([{Action, ConfL} | Tasks]) ->
case perform_listener_changes(Action, ConfL) of
ok -> perform_listener_changes(Tasks);
perform_listener_changes([{Action, Listener} | Rest]) ->
case perform_listener_change(Action, Listener) of
ok -> perform_listener_changes(Rest);
{error, Reason} -> {error, Reason}
end.
perform_listener_changes(_Action, []) ->
ok;
perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) ->
case Action(Type, Name, Diff) of
ok -> perform_listener_changes(Action, MapConf);
{error, Reason} -> {error, Reason}
end.
perform_listener_change(start, {Type, Name, Conf}) ->
start_listener(Type, Name, Conf);
perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
update_listener(Type, Name, ConfOld, ConfNew);
perform_listener_change(stop, {Type, Name, Conf}) ->
stop_listener(Type, Name, Conf).
esockd_opts(ListenerId, Type, Opts0) ->
esockd_opts(ListenerId, Type, Name, Opts0) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
Opts2 =
@ -579,7 +587,16 @@ esockd_opts(ListenerId, Type, Opts0) ->
end,
Opts3 = Opts2#{
access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])),
tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]},
connection_mfargs =>
{emqx_connection, start_link, [
#{
listener => {Type, Name},
zone => zone(Opts0),
limiter => Limiter,
enable_authn => enable_authn(Opts0)
}
]}
},
maps:to_list(
case Type of
@ -593,20 +610,21 @@ esockd_opts(ListenerId, Type, Opts0) ->
end
).
ws_opts(Type, ListenerName, Opts, Limiter) ->
WsPaths = [
{emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
ws_opts(Type, ListenerName, Opts) ->
WsPath = emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"),
WsRoutes = [
{WsPath, emqx_ws_connection, #{
zone => zone(Opts),
listener => {Type, ListenerName},
limiter => Limiter,
limiter => limiter(Opts),
enable_authn => enable_authn(Opts)
}}
],
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
Dispatch = cowboy_router:compile([{'_', WsRoutes}]),
ProxyProto = maps:get(proxy_protocol, Opts, false),
#{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
ranch_opts(Type, ListenOn, Opts) ->
ranch_opts(Type, Opts = #{bind := ListenOn}) ->
NumAcceptors = maps:get(acceptors, Opts, 4),
MaxConnections = maps:get(max_connections, Opts, 1024),
SocketOpts =
@ -725,41 +743,47 @@ add_limiter_bucket(Id, Limiter) ->
maps:without([client], Limiter)
).
del_limiter_bucket(Id, Conf) ->
case limiter(Conf) of
undefined ->
del_limiter_bucket(_Id, undefined) ->
ok;
Limiter ->
lists:foreach(
fun(Type) ->
del_limiter_bucket(Id, Limiter) ->
maps:foreach(
fun(Type, _) ->
emqx_limiter_server:del_bucket(Id, Type)
end,
maps:keys(Limiter)
)
end.
Limiter
).
update_limiter_bucket(Id, Limiter, undefined) ->
del_limiter_bucket(Id, Limiter);
update_limiter_bucket(Id, undefined, Limiter) ->
add_limiter_bucket(Id, Limiter);
update_limiter_bucket(Id, OldLimiter, NewLimiter) ->
ok = add_limiter_bucket(Id, NewLimiter),
Outdated = maps:without(maps:keys(NewLimiter), OldLimiter),
del_limiter_bucket(Id, Outdated).
diff_confs(NewConfs, OldConfs) ->
emqx_utils:diff_lists(
flatten_confs(NewConfs),
flatten_confs(OldConfs),
fun({Key, _}) -> Key end
fun({Type, Name, _}) -> {Type, Name} end
).
flatten_confs(Conf0) ->
flatten_confs(Confs) ->
lists:flatmap(
fun({Type, Conf}) ->
do_flatten_confs(Type, Conf)
fun({Type, Listeners}) ->
do_flatten_confs(Type, Listeners)
end,
maps:to_list(Conf0)
maps:to_list(Confs)
).
do_flatten_confs(Type, Conf0) ->
do_flatten_confs(Type, Listeners) ->
FilterFun =
fun
({_Name, ?TOMBSTONE_TYPE}) -> false;
({Name, Conf}) -> {true, {{Type, Name}, Conf}}
({Name, Conf}) -> {true, {Type, Name, Conf}}
end,
lists:filtermap(FilterFun, maps:to_list(Conf0)).
lists:filtermap(FilterFun, maps:to_list(Listeners)).
enable_authn(Opts) ->
maps:get(enable_authn, Opts, true).

View File

@ -20,122 +20,46 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_schema.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CERTS_PATH(CertName), filename:join(["../../lib/emqx/etc/certs/", CertName])).
-define(SERVER_KEY_PASSWORD, "sErve7r8Key$!").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
NewConfig = generate_config(),
application:ensure_all_started(esockd),
application:ensure_all_started(quicer),
application:ensure_all_started(cowboy),
generate_tls_certs(Config),
lists:foreach(fun set_app_env/1, NewConfig),
Config.
WorkDir = emqx_cth_suite:work_dir(Config),
Apps = emqx_cth_suite:start([quicer, emqx], #{work_dir => WorkDir}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
application:stop(esockd),
application:stop(cowboy).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) when
Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp
Case =:= t_start_stop_listeners;
Case =:= t_restart_listeners;
Case =:= t_restart_listeners_with_hibernate_after_disabled
->
catch emqx_config_handler:stop(),
Port = emqx_common_test_helpers:select_free_port(tcp),
{ok, _} = emqx_config_handler:start_link(),
PrevListeners = emqx_config:get([listeners], #{}),
PureListeners = remove_default_limiter(PrevListeners),
PureListeners2 = PureListeners#{
tcp => #{
listener_test => #{
bind => {"127.0.0.1", Port},
max_connections => 4321,
limiter => #{}
}
}
},
emqx_config:put([listeners], PureListeners2),
ok = emqx_listeners:start(),
[
{prev_listener_conf, PrevListeners},
{tcp_port, Port}
| Config
];
init_per_testcase(t_wss_conn, Config) ->
catch emqx_config_handler:stop(),
Port = emqx_common_test_helpers:select_free_port(ssl),
{ok, _} = emqx_config_handler:start_link(),
PrevListeners = emqx_config:get([listeners], #{}),
PureListeners = remove_default_limiter(PrevListeners),
PureListeners2 = PureListeners#{
wss => #{
listener_test => #{
bind => {{127, 0, 0, 1}, Port},
limiter => #{},
ssl_options => #{
cacertfile => ?CERTS_PATH("cacert.pem"),
certfile => ?CERTS_PATH("cert.pem"),
keyfile => ?CERTS_PATH("key.pem")
}
}
}
},
emqx_config:put([listeners], PureListeners2),
ok = emqx_listeners:start(),
[
{prev_listener_conf, PrevListeners},
{wss_port, Port}
| Config
];
ok = emqx_listeners:stop(),
Config;
init_per_testcase(_, Config) ->
catch emqx_config_handler:stop(),
{ok, _} = emqx_config_handler:start_link(),
PrevListeners = emqx_config:get([listeners], #{}),
PureListeners = remove_default_limiter(PrevListeners),
emqx_config:put([listeners], PureListeners),
[
{prev_listener_conf, PrevListeners}
| Config
].
ok = emqx_listeners:start(),
Config.
end_per_testcase(Case, Config) when
Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp
->
PrevListener = ?config(prev_listener_conf, Config),
emqx_listeners:stop(),
emqx_config:put([listeners], PrevListener),
_ = emqx_config_handler:stop(),
ok;
end_per_testcase(t_wss_conn, Config) ->
PrevListener = ?config(prev_listener_conf, Config),
emqx_listeners:stop(),
emqx_config:put([listeners], PrevListener),
_ = emqx_config_handler:stop(),
ok;
end_per_testcase(_, Config) ->
PrevListener = ?config(prev_listener_conf, Config),
emqx_config:put([listeners], PrevListener),
_ = emqx_config_handler:stop(),
end_per_testcase(_, _Config) ->
ok.
t_start_stop_listeners(_) ->
ok = emqx_listeners:start(),
?assertException(error, _, emqx_listeners:start_listener({ws, {"127.0.0.1", 8083}, []})),
?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})),
ok = emqx_listeners:stop().
t_restart_listeners(_) ->
ok = emqx_listeners:start(),
ok = emqx_listeners:stop(),
%% flakyness: eaddrinuse
timer:sleep(timer:seconds(2)),
ok = emqx_listeners:restart(),
ok = emqx_listeners:stop().
@ -168,45 +92,70 @@ t_restart_listeners_with_hibernate_after_disabled(_Config) ->
),
ok = emqx_listeners:start(),
ok = emqx_listeners:stop(),
%% flakyness: eaddrinuse
timer:sleep(timer:seconds(2)),
ok = emqx_listeners:restart(),
ok = emqx_listeners:stop(),
emqx_config:put([listeners], OldLConf).
t_max_conns_tcp(Config) ->
t_max_conns_tcp(_Config) ->
%% Note: Using a string representation for the bind address like
%% "127.0.0.1" does not work
Port = emqx_common_test_helpers:select_free_port(tcp),
Conf = #{
<<"bind">> => format_bind({"127.0.0.1", Port}),
<<"max_connections">> => 4321,
<<"limiter">> => #{}
},
with_listener(tcp, maxconns, Conf, fun() ->
?assertEqual(
4321,
emqx_listeners:max_conns('tcp:listener_test', {{127, 0, 0, 1}, ?config(tcp_port, Config)})
).
emqx_listeners:max_conns('tcp:maxconns', {{127, 0, 0, 1}, Port})
)
end).
t_current_conns_tcp(Config) ->
t_current_conns_tcp(_Config) ->
Port = emqx_common_test_helpers:select_free_port(tcp),
Conf = #{
<<"bind">> => format_bind({"127.0.0.1", Port}),
<<"max_connections">> => 42,
<<"limiter">> => #{}
},
with_listener(tcp, curconns, Conf, fun() ->
?assertEqual(
0,
emqx_listeners:current_conns('tcp:listener_test', {
{127, 0, 0, 1}, ?config(tcp_port, Config)
})
).
emqx_listeners:current_conns('tcp:curconns', {{127, 0, 0, 1}, Port})
)
end).
t_wss_conn(Config) ->
{ok, Socket} = ssl:connect(
{127, 0, 0, 1}, ?config(wss_port, Config), [{verify, verify_none}], 1000
),
ok = ssl:close(Socket).
PrivDir = ?config(priv_dir, Config),
Port = emqx_common_test_helpers:select_free_port(ssl),
Conf = #{
<<"bind">> => format_bind({"127.0.0.1", Port}),
<<"limiter">> => #{},
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"certfile">> => filename:join(PrivDir, "server.pem"),
<<"keyfile">> => filename:join(PrivDir, "server.key")
}
},
with_listener(wss, wssconn, Conf, fun() ->
{ok, Socket} = ssl:connect({127, 0, 0, 1}, Port, [{verify, verify_none}], 1000),
ok = ssl:close(Socket)
end).
t_quic_conn(Config) ->
PrivDir = ?config(priv_dir, Config),
Port = emqx_common_test_helpers:select_free_port(quic),
DataDir = ?config(data_dir, Config),
SSLOpts = #{
password => ?SERVER_KEY_PASSWORD,
certfile => filename:join(DataDir, "server-password.pem"),
cacertfile => filename:join(DataDir, "ca.pem"),
keyfile => filename:join(DataDir, "server-password.key")
Conf = #{
<<"bind">> => format_bind({"127.0.0.1", Port}),
<<"ssl_options">> => #{
<<"password">> => ?SERVER_KEY_PASSWORD,
<<"certfile">> => filename:join(PrivDir, "server-password.pem"),
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key")
}
},
emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, Port, #{ssl_options => SSLOpts}),
ct:pal("~p", [emqx_listeners:list()]),
with_listener(quic, ?FUNCTION_NAME, Conf, fun() ->
{ok, Conn} = quicer:connect(
{127, 0, 0, 1},
Port,
@ -216,29 +165,242 @@ t_quic_conn(Config) ->
],
1000
),
ok = quicer:close_connection(Conn),
emqx_listeners:stop_listener(quic, ?FUNCTION_NAME, #{bind => Port}).
ok = quicer:close_connection(Conn)
end).
t_ssl_password_cert(Config) ->
PrivDir = ?config(priv_dir, Config),
Port = emqx_common_test_helpers:select_free_port(ssl),
DataDir = ?config(data_dir, Config),
SSLOptsPWD = #{
password => ?SERVER_KEY_PASSWORD,
certfile => filename:join(DataDir, "server-password.pem"),
cacertfile => filename:join(DataDir, "ca.pem"),
keyfile => filename:join(DataDir, "server-password.key")
<<"password">> => ?SERVER_KEY_PASSWORD,
<<"certfile">> => filename:join(PrivDir, "server-password.pem"),
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key")
},
LConf = #{
enable => true,
bind => {{127, 0, 0, 1}, Port},
mountpoint => <<>>,
zone => default,
ssl_options => SSLOptsPWD
<<"enable">> => true,
<<"bind">> => format_bind({{127, 0, 0, 1}, Port}),
<<"ssl_options">> => SSLOptsPWD
},
ok = emqx_listeners:start_listener(ssl, ?FUNCTION_NAME, LConf),
with_listener(ssl, ?FUNCTION_NAME, LConf, fun() ->
{ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]),
ssl:close(SSLSocket),
emqx_listeners:stop_listener(ssl, ?FUNCTION_NAME, LConf).
ssl:close(SSLSocket)
end).
t_ssl_update_opts(Config) ->
PrivDir = ?config(priv_dir, Config),
Host = "127.0.0.1",
Port = emqx_common_test_helpers:select_free_port(ssl),
Conf = #{
<<"enable">> => true,
<<"bind">> => format_bind({Host, Port}),
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"password">> => ?SERVER_KEY_PASSWORD,
<<"certfile">> => filename:join(PrivDir, "server-password.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key"),
<<"verify">> => verify_none
}
},
ClientSSLOpts = [
{verify, verify_peer},
{customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
],
with_listener(ssl, updated, Conf, fun() ->
%% Client connects successfully.
C1 = emqtt_connect_ssl(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
]),
%% Change the listener SSL configuration: another set of cert/key files.
{ok, _} = emqx:update_config(
[listeners, ssl, updated],
{update, #{
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
<<"certfile">> => filename:join(PrivDir, "server.pem"),
<<"keyfile">> => filename:join(PrivDir, "server.key")
}
}}
),
%% Unable to connect with old SSL options, server's cert is signed by another CA.
?assertError(
{tls_alert, {unknown_ca, _}},
emqtt_connect_ssl(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
])
),
C2 = emqtt_connect_ssl(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
]),
%% Change the listener SSL configuration: require peer certificate.
{ok, _} = emqx:update_config(
[listeners, ssl, updated],
{update, #{
<<"ssl_options">> => #{
<<"verify">> => verify_peer,
<<"fail_if_no_peer_cert">> => true
}
}}
),
%% Unable to connect with old SSL options, certificate is now required.
?assertExceptionOneOf(
{error, {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}},
{error, closed},
emqtt_connect_ssl(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
])
),
C3 = emqtt_connect_ssl(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")},
{certfile, filename:join(PrivDir, "client.pem")},
{keyfile, filename:join(PrivDir, "client.key")}
| ClientSSLOpts
]),
%% Both pre- and post-update clients should be alive.
?assertEqual(pong, emqtt:ping(C1)),
?assertEqual(pong, emqtt:ping(C2)),
?assertEqual(pong, emqtt:ping(C3)),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2),
ok = emqtt:stop(C3)
end).
t_wss_update_opts(Config) ->
PrivDir = ?config(priv_dir, Config),
Host = "127.0.0.1",
Port = emqx_common_test_helpers:select_free_port(ssl),
Conf = #{
<<"enable">> => true,
<<"bind">> => format_bind({Host, Port}),
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"certfile">> => filename:join(PrivDir, "server-password.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key"),
<<"password">> => ?SERVER_KEY_PASSWORD,
<<"verify">> => verify_none
}
},
ClientSSLOpts = [
{verify, verify_peer},
{customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
],
with_listener(wss, updated, Conf, fun() ->
%% Start a client.
C1 = emqtt_connect_wss(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")}
| ClientSSLOpts
]),
%% Change the listener SSL configuration.
%% 1. Another set of (password protected) cert/key files.
%% 2. Require peer certificate.
{ok, _} = emqx:update_config(
[listeners, wss, updated],
{update, #{
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
<<"certfile">> => filename:join(PrivDir, "server.pem"),
<<"keyfile">> => filename:join(PrivDir, "server.key")
}
}}
),
%% Unable to connect with old SSL options, server's cert is signed by another CA.
%% Due to a bug `emqtt` exits with `badmatch` in this case.
?assertExit(
_Badmatch,
emqtt_connect_wss(Host, Port, ClientSSLOpts)
),
C2 = emqtt_connect_wss(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")}
| ClientSSLOpts
]),
%% Change the listener SSL configuration: require peer certificate.
{ok, _} = emqx:update_config(
[listeners, wss, updated],
{update, #{
<<"ssl_options">> => #{
<<"verify">> => verify_peer,
<<"fail_if_no_peer_cert">> => true
}
}}
),
%% Unable to connect with old SSL options, certificate is now required.
%% Due to a bug `emqtt` does not instantly report that socket was closed.
?assertError(
timeout,
emqtt_connect_wss(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")}
| ClientSSLOpts
])
),
C3 = emqtt_connect_wss(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")},
{certfile, filename:join(PrivDir, "client.pem")},
{keyfile, filename:join(PrivDir, "client.key")}
| ClientSSLOpts
]),
%% Both pre- and post-update clients should be alive.
?assertEqual(pong, emqtt:ping(C1)),
?assertEqual(pong, emqtt:ping(C2)),
?assertEqual(pong, emqtt:ping(C3)),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2),
ok = emqtt:stop(C3)
end).
with_listener(Type, Name, Config, Then) ->
{ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}),
try
Then()
after
emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ)
end.
emqtt_connect_ssl(Host, Port, SSLOpts) ->
emqtt_connect(fun emqtt:connect/1, #{
hosts => [{Host, Port}],
connect_timeout => 1,
ssl => true,
ssl_opts => SSLOpts
}).
emqtt_connect_wss(Host, Port, SSLOpts) ->
emqtt_connect(fun emqtt:ws_connect/1, #{
hosts => [{Host, Port}],
connect_timeout => 1,
ws_transport_options => [
{protocols, [http]},
{transport, tls},
{tls_opts, SSLOpts}
]
}).
emqtt_connect(Connect, Opts) ->
case emqtt:start_link(Opts) of
{ok, Client} ->
true = erlang:unlink(Client),
case Connect(Client) of
{ok, _} -> Client;
{error, Reason} -> error(Reason, [Opts])
end;
{error, Reason} ->
error(Reason, [Opts])
end.
t_format_bind(_) ->
?assertEqual(
@ -266,67 +428,15 @@ t_format_bind(_) ->
lists:flatten(emqx_listeners:format_bind(":1883"))
).
render_config_file() ->
Path = local_path(["etc", "emqx.conf"]),
{ok, Temp} = file:read_file(Path),
Vars0 = mustache_vars(),
Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0],
Targ = bbmustache:render(Temp, Vars),
NewName = Path ++ ".rendered",
ok = file:write_file(NewName, Targ),
NewName.
mustache_vars() ->
[
{platform_data_dir, local_path(["data"])},
{platform_etc_dir, local_path(["etc"])}
].
generate_config() ->
ConfFile = render_config_file(),
{ok, Conf} = hocon:load(ConfFile, #{format => richmap}),
hocon_tconf:generate(emqx_schema, Conf).
set_app_env({App, Lists}) ->
lists:foreach(
fun
({authz_file, _Var}) ->
application:set_env(App, authz_file, local_path(["etc", "authz.conf"]));
({Par, Var}) ->
application:set_env(App, Par, Var)
end,
Lists
).
local_path(Components, Module) ->
filename:join([get_base_dir(Module) | Components]).
local_path(Components) ->
local_path(Components, ?MODULE).
get_base_dir(Module) ->
{file, Here} = code:is_loaded(Module),
filename:dirname(filename:dirname(Here)).
get_base_dir() ->
get_base_dir(?MODULE).
remove_default_limiter(Listeners) ->
maps:map(
fun(_, X) ->
maps:map(
fun(_, E) ->
maps:remove(limiter, E)
end,
X
)
end,
Listeners
).
generate_tls_certs(Config) ->
DataDir = ?config(data_dir, Config),
emqx_common_test_helpers:gen_ca(DataDir, "ca"),
emqx_common_test_helpers:gen_host_cert("server-password", "ca", DataDir, #{
PrivDir = ?config(priv_dir, Config),
emqx_common_test_helpers:gen_ca(PrivDir, "ca"),
emqx_common_test_helpers:gen_ca(PrivDir, "ca-next"),
emqx_common_test_helpers:gen_host_cert("server", "ca-next", PrivDir, #{}),
emqx_common_test_helpers:gen_host_cert("client", "ca-next", PrivDir, #{}),
emqx_common_test_helpers:gen_host_cert("server-password", "ca", PrivDir, #{
password => ?SERVER_KEY_PASSWORD
}).
format_bind(Bind) ->
iolist_to_binary(emqx_listeners:format_bind(Bind)).

View File

@ -663,9 +663,11 @@ tcp_opts() ->
udp_opts() ->
#{
recbuf => 1024,
sndbuf => 1024,
buffer => 1024,
%% NOTE
%% Making those too small will lead to inability to accept connections.
recbuf => 2048,
sndbuf => 2048,
buffer => 2048,
reuseaddr => true
}.

View File

@ -0,0 +1,10 @@
Support hot update of TCP/SSL/WS/WSS MQTT listeners configuration, which allows changing most of the configuration parameters without restarting the listener and disconnecting the clients.
In case of TCP/SSL listeners, changes to the following parameters still require full listener restart:
* `bind`
* `tcp_options.backlog`
In case of WS/WSS listeners, any parameter can be freely changed without losing the connected clients. However, changing transport related parameters will cause listening socket to be re-opened, namely:
* `bind`
* `tcp_options.*`
* `ssl_options.*`

View File

@ -53,7 +53,7 @@ defmodule EMQXUmbrella.MixProject do
{:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.9", override: true},
{:esockd, github: "emqx/esockd", tag: "5.11.1", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
{:ekka, github: "emqx/ekka", tag: "0.17.0", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},

View File

@ -69,7 +69,7 @@
, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}