Merge pull request #8148 from zhongwencool/listener-options

feat: make tcp/ssl options more straightforward
This commit is contained in:
zhongwencool 2022-06-08 09:56:35 +08:00 committed by GitHub
commit f35f62ba87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 122 additions and 105 deletions

View File

@ -1,39 +1,39 @@
listeners.tcp.default { listeners.tcp.default {
bind: "0.0.0.0:1883" bind = "0.0.0.0:1883"
max_connections: 1024000 max_connections = 1024000
} }
listeners.ssl.default { listeners.ssl.default {
bind: "0.0.0.0:8883" bind = "0.0.0.0:8883"
max_connections: 512000 max_connections = 512000
ssl { ssl_options {
keyfile: "{{ platform_etc_dir }}/certs/key.pem" keyfile = "{{ platform_etc_dir }}/certs/key.pem"
certfile: "{{ platform_etc_dir }}/certs/cert.pem" certfile = "{{ platform_etc_dir }}/certs/cert.pem"
cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
} }
} }
listeners.ws.default { listeners.ws.default {
bind: "0.0.0.0:8083" bind = "0.0.0.0:8083"
max_connections: 1024000 max_connections = 1024000
websocket.mqtt_path: "/mqtt" websocket.mqtt_path = "/mqtt"
} }
listeners.wss.default { listeners.wss.default {
bind: "0.0.0.0:8084" bind = "0.0.0.0:8084"
max_connections: 512000 max_connections = 512000
websocket.mqtt_path: "/mqtt" websocket.mqtt_path = "/mqtt"
ssl { ssl_options {
keyfile: "{{ platform_etc_dir }}/certs/key.pem" keyfile = "{{ platform_etc_dir }}/certs/key.pem"
certfile: "{{ platform_etc_dir }}/certs/cert.pem" certfile = "{{ platform_etc_dir }}/certs/cert.pem"
cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
} }
} }
# listeners.quic.default { # listeners.quic.default {
# enabled: false # enabled = false
# bind: "0.0.0.0:14567" # bind = "0.0.0.0:14567"
# max_connections: 1024000 # max_connections = 1024000
# keyfile: "{{ platform_etc_dir }}/certs/key.pem" # keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile: "{{ platform_etc_dir }}/certs/cert.pem" # certfile = "{{ platform_etc_dir }}/certs/cert.pem"
#} #}

View File

@ -1176,5 +1176,7 @@ get_state(Pid) ->
) )
). ).
get_active_n(quic, _Listener) -> ?ACTIVE_N; get_active_n(quic, _Listener) ->
get_active_n(Type, Listener) -> emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). ?ACTIVE_N;
get_active_n(Type, Listener) ->
emqx_config:get_listener_conf(Type, Listener, [tcp_options, active_n]).

View File

@ -406,7 +406,7 @@ esockd_opts(Type, Opts0) ->
ws_opts(Type, ListenerName, Opts) -> ws_opts(Type, ListenerName, Opts) ->
WsPaths = [ WsPaths = [
{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection, #{ {emqx_map_lib:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
zone => zone(Opts), zone => zone(Opts),
listener => {Type, ListenerName}, listener => {Type, ListenerName},
limiter => limiter(Opts) limiter => limiter(Opts)
@ -497,7 +497,7 @@ limiter(Opts) ->
ssl_opts(Opts) -> ssl_opts(Opts) ->
maps:to_list( maps:to_list(
emqx_tls_lib:drop_tls13_for_old_otp( emqx_tls_lib:drop_tls13_for_old_otp(
maps:get(ssl, Opts, #{}) maps:get(ssl_options, Opts, #{})
) )
). ).
@ -505,7 +505,7 @@ tcp_opts(Opts) ->
maps:to_list( maps:to_list(
maps:without( maps:without(
[active_n], [active_n],
maps:get(tcp, Opts, #{}) maps:get(tcp_options, Opts, #{})
) )
). ).
@ -557,18 +557,18 @@ certs_dir(Type, Name) ->
iolist_to_binary(filename:join(["listeners", Type, Name])). iolist_to_binary(filename:join(["listeners", Type, Name])).
convert_certs(CertsDir, Conf) -> convert_certs(CertsDir, Conf) ->
case emqx_tls_lib:ensure_ssl_files(CertsDir, maps:get(<<"ssl">>, Conf, undefined)) of case emqx_tls_lib:ensure_ssl_files(CertsDir, maps:get(<<"ssl_options">>, Conf, undefined)) of
{ok, undefined} -> {ok, undefined} ->
Conf; Conf;
{ok, SSL} -> {ok, SSL} ->
Conf#{<<"ssl">> => SSL}; Conf#{<<"ssl_options">> => SSL};
{error, Reason} -> {error, Reason} ->
?SLOG(error, Reason#{msg => "bad_ssl_config"}), ?SLOG(error, Reason#{msg => "bad_ssl_config"}),
throw({bad_ssl_config, Reason}) throw({bad_ssl_config, Reason})
end. end.
clear_certs(CertsDir, Conf) -> clear_certs(CertsDir, Conf) ->
OldSSL = maps:get(<<"ssl">>, Conf, undefined), OldSSL = maps:get(<<"ssl_options">>, Conf, undefined),
emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL). emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).
filter_stacktrace({Reason, _Stacktrace}) -> Reason; filter_stacktrace({Reason, _Stacktrace}) -> Reason;

View File

@ -787,29 +787,32 @@ fields("listeners") ->
)} )}
]; ];
fields("mqtt_tcp_listener") -> fields("mqtt_tcp_listener") ->
mqtt_listener(1883) ++
[ [
{"tcp", {"tcp_options",
sc( sc(
ref("tcp_opts"), ref("tcp_opts"),
#{} #{}
)} )}
] ++ mqtt_listener(1883); ];
fields("mqtt_ssl_listener") -> fields("mqtt_ssl_listener") ->
mqtt_listener(8883) ++
[ [
{"tcp", {"tcp_options",
sc( sc(
ref("tcp_opts"), ref("tcp_opts"),
#{} #{}
)}, )},
{"ssl", {"ssl_options",
sc( sc(
ref("listener_ssl_opts"), ref("listener_ssl_opts"),
#{} #{}
)} )}
] ++ mqtt_listener(8883); ];
fields("mqtt_ws_listener") -> fields("mqtt_ws_listener") ->
mqtt_listener(8083) ++
[ [
{"tcp", {"tcp_options",
sc( sc(
ref("tcp_opts"), ref("tcp_opts"),
#{} #{}
@ -819,15 +822,16 @@ fields("mqtt_ws_listener") ->
ref("ws_opts"), ref("ws_opts"),
#{} #{}
)} )}
] ++ mqtt_listener(8083); ];
fields("mqtt_wss_listener") -> fields("mqtt_wss_listener") ->
mqtt_listener(8084) ++
[ [
{"tcp", {"tcp_options",
sc( sc(
ref("tcp_opts"), ref("tcp_opts"),
#{} #{}
)}, )},
{"ssl", {"ssl_options",
sc( sc(
ref("listener_wss_opts"), ref("listener_wss_opts"),
#{} #{}
@ -837,7 +841,7 @@ fields("mqtt_wss_listener") ->
ref("ws_opts"), ref("ws_opts"),
#{} #{}
)} )}
] ++ mqtt_listener(8084); ];
fields("mqtt_quic_listener") -> fields("mqtt_quic_listener") ->
[ [
{"enabled", {"enabled",

View File

@ -1046,4 +1046,4 @@ get_ws_opts(Type, Listener, Key) ->
emqx_config:get_listener_conf(Type, Listener, [websocket, Key]). emqx_config:get_listener_conf(Type, Listener, [websocket, Key]).
get_active_n(Type, Listener) -> get_active_n(Type, Listener) ->
emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). emqx_config:get_listener_conf(Type, Listener, [tcp_options, active_n]).

View File

@ -106,7 +106,7 @@ listener_mqtt_tcp_conf() ->
mountpoint => <<>>, mountpoint => <<>>,
proxy_protocol => false, proxy_protocol => false,
proxy_protocol_timeout => 3000, proxy_protocol_timeout => 3000,
tcp => #{ tcp_options => #{
active_n => 100, active_n => 100,
backlog => 1024, backlog => 1024,
buffer => 4096, buffer => 4096,
@ -128,7 +128,7 @@ listener_mqtt_ws_conf() ->
mountpoint => <<>>, mountpoint => <<>>,
proxy_protocol => false, proxy_protocol => false,
proxy_protocol_timeout => 3000, proxy_protocol_timeout => 3000,
tcp => tcp_options =>
#{ #{
active_n => 100, active_n => 100,
backlog => 1024, backlog => 1024,

View File

@ -78,7 +78,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
emqx_config:put_listener_conf(ssl, default, [ssl, verify], verify_peer), emqx_config:put_listener_conf(ssl, default, [ssl_options, verify], verify_peer),
emqx_listeners:restart_listener('ssl:default'), emqx_listeners:restart_listener('ssl:default'),
Config. Config.

View File

@ -256,9 +256,9 @@ t_handle_msg_deliver(_) ->
t_handle_msg_inet_reply(_) -> t_handle_msg_inet_reply(_) ->
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
emqx_config:put_listener_conf(tcp, default, [tcp, active_n], 0), emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 0),
?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())), ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())),
emqx_config:put_listener_conf(tcp, default, [tcp, active_n], 100), emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 100),
?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())), ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())),
?assertMatch( ?assertMatch(
{stop, {shutdown, for_testing}, _St}, {stop, {shutdown, for_testing}, _St},

View File

@ -78,7 +78,7 @@ init_per_testcase(t_wss_conn, Config) ->
listener_test => #{ listener_test => #{
bind => {{127, 0, 0, 1}, 9998}, bind => {{127, 0, 0, 1}, 9998},
limiter => #{}, limiter => #{},
ssl => #{ ssl_options => #{
cacertfile => ?CERTS_PATH("cacert.pem"), cacertfile => ?CERTS_PATH("cacert.pem"),
certfile => ?CERTS_PATH("cert.pem"), certfile => ?CERTS_PATH("cert.pem"),
keyfile => ?CERTS_PATH("key.pem") keyfile => ?CERTS_PATH("key.pem")

View File

@ -70,6 +70,9 @@ init_per_group(_, Config) ->
emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:stop_apps([]),
Config. Config.
end_per_group(quic, _Config) ->
emqx_config:put([listeners, quic], #{}),
ok;
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.

View File

@ -33,11 +33,12 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_channel_SUITE:set_test_listener_confs(),
?check_trace( ?check_trace(
?wait_async_action( ?wait_async_action(
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
#{?snk_kind := listener_started, bind := 1883}, #{?snk_kind := listener_started, bind := 1883},
timer:seconds(5) timer:seconds(10)
), ),
fun(Trace) -> fun(Trace) ->
%% more than one listener %% more than one listener

View File

@ -685,7 +685,7 @@ tcp_schema_example() ->
proxy_protocol => false, proxy_protocol => false,
proxy_protocol_timeout => <<"3s">>, proxy_protocol_timeout => <<"3s">>,
running => true, running => true,
tcp => #{ tcp_options => #{
active_n => 100, active_n => 100,
backlog => 1024, backlog => 1024,
buffer => <<"4KB">>, buffer => <<"4KB">>,

View File

@ -96,7 +96,7 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) ->
MinConf = MinConf =
case OriginListener of case OriginListener of
#{ #{
<<"ssl">> := <<"ssl_options">> :=
#{ #{
<<"cacertfile">> := CaCertFile, <<"cacertfile">> := CaCertFile,
<<"certfile">> := CertFile, <<"certfile">> := CertFile,
@ -107,7 +107,7 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) ->
<<"id">> => MinListenerId, <<"id">> => MinListenerId,
<<"bind">> => <<"0.0.0.0:3883">>, <<"bind">> => <<"0.0.0.0:3883">>,
<<"type">> => Type, <<"type">> => Type,
<<"ssl">> => #{ <<"ssl_options">> => #{
<<"cacertfile">> => CaCertFile, <<"cacertfile">> => CaCertFile,
<<"certfile">> => CertFile, <<"certfile">> => CertFile,
<<"keyfile">> => KeyFile <<"keyfile">> => KeyFile

View File

@ -30,11 +30,18 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(BASE_CONF, #{
<<"dealyed">> => <<"true">>,
<<"max_delayed_messages">> => <<"0">>
}).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true
}),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]), emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
Config. Config.

View File

@ -32,7 +32,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),

View File

@ -160,7 +160,7 @@ t_rewrite_re_error(_Config) ->
ok. ok.
t_list(_Config) -> t_list(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?REWRITE), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE, #{
raw_with_default => true raw_with_default => true
}), }),
Expect = maps:get(<<"rewrite">>, ?REWRITE), Expect = maps:get(<<"rewrite">>, ?REWRITE),
@ -168,7 +168,7 @@ t_list(_Config) ->
ok. ok.
t_update(_Config) -> t_update(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?REWRITE), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE, #{
raw_with_default => true raw_with_default => true
}), }),
Init = emqx_rewrite:list(), Init = emqx_rewrite:list(),
@ -186,7 +186,7 @@ t_update(_Config) ->
ok. ok.
t_update_disable(_Config) -> t_update_disable(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?REWRITE), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE, #{
raw_with_default => true raw_with_default => true
}), }),
?assertEqual(ok, emqx_rewrite:update([])), ?assertEqual(ok, emqx_rewrite:update([])),
@ -203,7 +203,7 @@ t_update_disable(_Config) ->
ok. ok.
t_update_re_failed(_Config) -> t_update_re_failed(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?REWRITE), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE, #{
raw_with_default => true raw_with_default => true
}), }),
Re = <<"*^test/*">>, Re = <<"*^test/*">>,
@ -260,7 +260,7 @@ receive_publish(Timeout) ->
end. end.
init() -> init() ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?REWRITE), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE, #{
raw_with_default => true raw_with_default => true
}), }),
ok = emqx_rewrite:enable(), ok = emqx_rewrite:enable(),

View File

@ -33,7 +33,7 @@ init_per_testcase(_, Config) ->
Config. Config.
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),

View File

@ -41,7 +41,7 @@ init_per_suite(Config) ->
emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf") emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf")
end end
), ),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),
emqx_common_test_helpers:start_apps( emqx_common_test_helpers:start_apps(
@ -176,7 +176,7 @@ init_per_testcase(t_uuid_restored_from_file, Config) ->
%% clear the UUIDs in the DB %% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry), {atomic, ok} = mria:clear_table(emqx_telemetry),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]), emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),
emqx_common_test_helpers:start_apps( emqx_common_test_helpers:start_apps(
@ -332,7 +332,7 @@ t_uuid_saved_to_file(_Config) ->
%% clear the UUIDs in the DB %% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry), {atomic, ok} = mria:clear_table(emqx_telemetry),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]), emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),
emqx_common_test_helpers:start_apps( emqx_common_test_helpers:start_apps(
@ -861,7 +861,7 @@ setup_slave(Node) ->
Node, Node,
emqx_common_test_helpers, emqx_common_test_helpers,
load_config, load_config,
[emqx_modules_schema, jsx:encode(?BASE_CONF), #{raw_with_default => true}] [emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true}]
), ),
ok = rpc:call( ok = rpc:call(
Node, Node,

View File

@ -29,7 +29,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),

View File

@ -28,7 +28,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?TOPIC), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?TOPIC, #{
raw_with_default => true raw_with_default => true
}), }),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]), emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),

View File

@ -40,7 +40,7 @@ init_per_testcase(_, Config) ->
Config. Config.
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, jsx:encode(?BASE_CONF), #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true raw_with_default => true
}), }),

View File

@ -84,7 +84,7 @@ t_psk_lookup(_) ->
reuseaddr => true, reuseaddr => true,
user_lookup_fun => {fun emqx_tls_psk:lookup/3, undefined} user_lookup_fun => {fun emqx_tls_psk:lookup/3, undefined}
}, },
emqx_config:put([listeners, ssl, default, ssl], ServerTLSOpts), emqx_config:put([listeners, ssl, default, ssl_options], ServerTLSOpts),
emqx_listeners:restart_listener('ssl:default'), emqx_listeners:restart_listener('ssl:default'),
{ok, Socket} = ssl:connect("127.0.0.1", 8883, maps:to_list(ClientTLSOpts)), {ok, Socket} = ssl:connect("127.0.0.1", 8883, maps:to_list(ClientTLSOpts)),