refactor(gw): keep listeners conf tree compliance with core

This commit is contained in:
JianBo He 2021-09-07 16:17:58 +08:00
parent 718dd80b48
commit e94e09075c
4 changed files with 198 additions and 122 deletions

View File

@ -42,7 +42,32 @@ gateway.stomp {
acceptors = 16 acceptors = 16
max_connections = 1024000 max_connections = 1024000
max_conn_rate = 1000 max_conn_rate = 1000
active_n = 100
## TCP options
## See ${example_common_tcp_options} for more information
tcp.active_n = 100
tcp.backlog = 1024
tcp.buffer = 4KB
}
listeners.ssl.default {
bind = 61614
acceptors = 16
max_connections = 1024000
max_conn_rate = 1000
## TCP options
## See ${example_common_tcp_options} for more information
tcp.active_n = 100
tcp.backlog = 1024
tcp.buffer = 4KB
## SSL options
## See ${example_common_ssl_options} for more information
ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
} }
} }
@ -68,6 +93,29 @@ gateway.coap {
listeners.udp.default { listeners.udp.default {
bind = 5683 bind = 5683
acceptors = 4
max_connections = 102400
max_conn_rate = 1000
## UDP Options
## See ${example_common_udp_options} for more information
udp.active_n = 100
udp.buffer = 16KB
}
listeners.dtls.default {
bind = 5684
acceptors = 4
max_connections = 102400
max_conn_rate = 1000
## UDP Options
## See ${example_common_udp_options} for more information
udp.active_n = 100
udp.buffer = 16KB
## DTLS Options
## See #{example_common_dtls_options} for more information
dtls.versions = ["dtlsv1"]
} }
} }

View File

@ -63,9 +63,9 @@ fields(stomp_structs) ->
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(stomp_frame) -> fields(stomp_frame) ->
[ {max_headers, sc(integer(), undefined, 10)} [ {max_headers, sc(integer(), 10)}
, {max_headers_length, sc(integer(), undefined, 1024)} , {max_headers_length, sc(integer(), 1024)}
, {max_body_length, sc(integer(), undefined, 8192)} , {max_body_length, sc(integer(), 8192)}
]; ];
fields(mqttsn_structs) -> fields(mqttsn_structs) ->
@ -82,11 +82,11 @@ fields(mqttsn_predefined) ->
]; ];
fields(coap_structs) -> fields(coap_structs) ->
[ {heartbeat, sc(duration(), undefined, <<"30s">>)} [ {heartbeat, sc(duration(), <<"30s">>)}
, {connection_required, sc(boolean(), undefined, false)} , {connection_required, sc(boolean(), false)}
, {notify_type, sc(union([non, con, qos]), undefined, qos)} , {notify_type, sc(union([non, con, qos]), qos)}
, {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)} , {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), coap)}
, {publish_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)} , {publish_qos, sc(union([qos0, qos1, qos2, coap]), coap)}
, {listeners, sc(ref(udp_listener_group))} , {listeners, sc(ref(udp_listener_group))}
] ++ gateway_common_options(); ] ++ gateway_common_options();
@ -166,58 +166,53 @@ fields(udp_listener) ->
fields(dtls_listener) -> fields(dtls_listener) ->
[ {"$name", sc(ref(dtls_listener_settings))}]; [ {"$name", sc(ref(dtls_listener_settings))}];
fields(listener_settings) ->
[ {enable, sc(boolean(), undefined, true)}
, {bind, sc(union(ip_port(), integer()))}
, {acceptors, sc(integer(), undefined, 8)}
, {max_connections, sc(integer(), undefined, 1024)}
, {max_conn_rate, sc(integer())}
, {active_n, sc(integer(), undefined, 100)}
%, {rate_limit, sc(comma_separated_list())}
, {access, sc(ref(access))}
, {proxy_protocol, sc(boolean())}
, {proxy_protocol_timeout, sc(duration())}
, {backlog, sc(integer(), undefined, 1024)}
, {send_timeout, sc(duration(), undefined, <<"15s">>)}
, {send_timeout_close, sc(boolean(), undefined, true)}
, {recbuf, sc(bytesize())}
, {sndbuf, sc(bytesize())}
, {buffer, sc(bytesize())}
, {high_watermark, sc(bytesize(), undefined, <<"1MB">>)}
, {tune_buffer, sc(boolean())}
, {nodelay, sc(boolean())}
, {reuseaddr, sc(boolean())}
];
fields(tcp_listener_settings) -> fields(tcp_listener_settings) ->
[ [
%% some special confs for tcp listener %% some special confs for tcp listener
] ++ fields(listener_settings); ] ++ tcp_opts()
++ proxy_protocol_opts()
++ common_listener_opts();
fields(ssl_listener_settings) -> fields(ssl_listener_settings) ->
[ [
%% some special confs for ssl listener %% some special confs for ssl listener
] ++ ] ++ tcp_opts()
ssl(undefined, #{handshake_timeout => <<"15s">> ++ ssl_opts()
, depth => 10 ++ proxy_protocol_opts()
, reuse_sessions => true}) ++ fields(listener_settings); ++ common_listener_opts();
fields(udp_listener_settings) -> fields(udp_listener_settings) ->
[ [
%% some special confs for udp listener %% some special confs for udp listener
] ++ fields(listener_settings); ] ++ udp_opts()
++ common_listener_opts();
fields(dtls_listener_settings) -> fields(dtls_listener_settings) ->
[ [
%% some special confs for dtls listener %% some special confs for dtls listener
] ++ ] ++ udp_opts()
ssl(undefined, #{handshake_timeout => <<"15s">> ++ dtls_opts()
, depth => 10 ++ common_listener_opts();
, reuse_sessions => true}) ++ fields(listener_settings);
fields(access) -> fields(udp_opts) ->
[ {"$id", #{type => binary(), [ {active_n, sc(integer(), 100)}
nullable => true}}]; , {recbuf, sc(bytesize())}
, {sndbuf, sc(bytesize())}
, {buffer, sc(bytesize())}
, {reuseaddr, sc(boolean(), true)}
];
fields(dtls_listener_ssl_opts) ->
Base = emqx_schema:fields("listener_ssl_opts"),
%% XXX: ciphers ???
DtlsVers = hoconsc:mk(
typerefl:alias("string", list(atom())),
#{ default => default_dtls_vsns(),
converter => fun (Vsns) ->
[dtls_vsn(iolist_to_binary(V)) || V <- Vsns]
end
}),
lists:keyreplace("versions", 1, Base, {"versions", DtlsVers});
fields(ExtraField) -> fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"), Mod = list_to_atom(ExtraField++"_schema"),
@ -244,14 +239,47 @@ fields(ExtraField) ->
% ]). % ]).
gateway_common_options() -> gateway_common_options() ->
[ {enable, sc(boolean(), undefined, true)} [ {enable, sc(boolean(), true)}
, {enable_stats, sc(boolean(), undefined, true)} , {enable_stats, sc(boolean(), true)}
, {idle_timeout, sc(duration(), undefined, <<"30s">>)} , {idle_timeout, sc(duration(), <<"30s">>)}
, {mountpoint, sc(binary())} , {mountpoint, sc(binary())}
, {clientinfo_override, sc(ref(clientinfo_override))} , {clientinfo_override, sc(ref(clientinfo_override))}
, {authentication, sc(hoconsc:lazy(map()))} , {authentication, sc(hoconsc:lazy(map()))}
]. ].
common_listener_opts() ->
[ {enable, sc(boolean(), true)}
, {bind, sc(union(ip_port(), integer()))}
, {acceptors, sc(integer(), 16)}
, {max_connections, sc(integer(), 1024)}
, {max_conn_rate, sc(integer())}
%, {rate_limit, sc(comma_separated_list())}
, {access_rules, sc(hoconsc:array(string()), [])}
].
tcp_opts() ->
[{tcp, sc(ref(emqx_schema, "tcp_opts"), #{})}].
udp_opts() ->
[{udp, sc(ref(udp_opts), #{})}].
ssl_opts() ->
[{ssl, sc(ref(emqx_schema, "listener_ssl_opts"), #{})}].
dtls_opts() ->
[{dtls, sc(ref(dtls_listener_ssl_opts), #{})}].
proxy_protocol_opts() ->
[ {proxy_protocol, sc(boolean())}
, {proxy_protocol_timeout, sc(duration())}
].
default_dtls_vsns() ->
[<<"dtlsv1.2">>, <<"dtlsv1">>].
dtls_vsn(<<"dtlsv1.2">>) -> 'dtlsv1.2';
dtls_vsn(<<"dtlsv1">>) -> 'dtlsv1'.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
@ -259,47 +287,11 @@ gateway_common_options() ->
sc(Type) -> #{type => Type}. sc(Type) -> #{type => Type}.
sc(Type, Mapping, Default) -> sc(Type, Default) ->
hoconsc:mk(Type, #{mapping => Mapping, default => Default}). hoconsc:mk(Type, #{default => Default}).
ref(Field) -> ref(Field) ->
hoconsc:ref(?MODULE, Field). hoconsc:ref(?MODULE, Field).
%% utils ref(Mod, Field) ->
hoconsc:ref(Mod, Field).
%% generate a ssl field.
%% ssl("emqx", #{"verify" => verify_peer}) will return
%% [ {"cacertfile", sc(string(), "emqx.cacertfile", undefined)}
%% , {"certfile", sc(string(), "emqx.certfile", undefined)}
%% , {"keyfile", sc(string(), "emqx.keyfile", undefined)}
%% , {"verify", sc(union(verify_peer, verify_none), "emqx.verify", verify_peer)}
%% , {"server_name_indication", "emqx.server_name_indication", undefined)}
%% ...
ssl(Mapping, Defaults) ->
M = fun (Field) ->
case (Mapping) of
undefined -> undefined;
_ -> Mapping ++ "." ++ Field
end end,
D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end,
[ {"enable", sc(boolean(), M("enable"), D("enable"))}
, {"cacertfile", sc(binary(), M("cacertfile"), D("cacertfile"))}
, {"certfile", sc(binary(), M("certfile"), D("certfile"))}
, {"keyfile", sc(binary(), M("keyfile"), D("keyfile"))}
, {"verify", sc(union(verify_peer, verify_none), M("verify"), D("verify"))}
, {"fail_if_no_peer_cert", sc(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))}
, {"secure_renegotiate", sc(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))}
, {"reuse_sessions", sc(boolean(), M("reuse_sessions"), D("reuse_sessions"))}
, {"honor_cipher_order", sc(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))}
, {"handshake_timeout", sc(duration(), M("handshake_timeout"), D("handshake_timeout"))}
, {"depth", sc(integer(), M("depth"), D("depth"))}
, {"password", hoconsc:mk(binary(), #{ mapping => M("key_password")
, default => D("key_password")
, sensitive => true
})}
, {"dhfile", sc(binary(), M("dhfile"), D("dhfile"))}
, {"server_name_indication", sc(union(disable, binary()), M("server_name_indication"),
D("server_name_indication"))}
, {"tls_versions", sc(comma_separated_list(), M("tls_versions"), D("tls_versions"))}
, {"ciphers", sc(comma_separated_list(), M("ciphers"), D("ciphers"))}
, {"psk_ciphers", sc(comma_separated_list(), M("ciphers"), D("ciphers"))}].

View File

@ -143,16 +143,47 @@ normalize_config(RawConf) ->
Listeners = Listeners =
maps:fold(fun(Name, Confs, AccIn2) -> maps:fold(fun(Name, Confs, AccIn2) ->
ListenOn = maps:get(bind, Confs), ListenOn = maps:get(bind, Confs),
SocketOpts = esockd:parse_opt(maps:to_list(Confs)), SocketOpts = esockd_opts(Type, Confs),
RemainCfgs = maps:without( RemainCfgs = maps:without(
[bind] ++ proplists:get_keys(SocketOpts), [bind, tcp, ssl, udp, dtls]
Confs), ++ proplists:get_keys(SocketOpts), Confs),
Cfg = maps:merge(Cfg0, RemainCfgs), Cfg = maps:merge(Cfg0, RemainCfgs),
[{Type, Name, ListenOn, SocketOpts, Cfg}|AccIn2] [{Type, Name, ListenOn, SocketOpts, Cfg}|AccIn2]
end, [], Liss), end, [], Liss),
[Listeners|AccIn1] [Listeners|AccIn1]
end, [], LisMap)). end, [], LisMap)).
esockd_opts(Type, Opts0) ->
Opts1 = maps:with([acceptors, max_connections, max_conn_rate,
proxy_protocol, proxy_protocol_timeout], Opts0),
Opts2 = Opts1#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))},
maps:to_list(case Type of
tcp -> Opts2#{tcp_options => sock_opts(tcp, Opts0)};
ssl -> Opts2#{tcp_options => sock_opts(tcp, Opts0),
ssl_options => ssl_opts(ssl, Opts0)};
udp -> Opts2#{udp_options => sock_opts(udp, Opts0)};
dtls -> Opts2#{udp_options => sock_opts(udp, Opts0),
dtls_options => ssl_opts(dtls, Opts0)}
end).
esockd_access_rules(StrRules) ->
Access = fun(S) ->
[A, CIDR] = string:tokens(S, " "),
{list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end}
end,
[Access(R) || R <- StrRules].
ssl_opts(Name, Opts) ->
maps:to_list(
emqx_tls_lib:drop_tls13_for_old_otp(
maps:without([enable],
maps:get(Name, Opts, #{})))).
sock_opts(Name, Opts) ->
maps:to_list(
maps:without([active_n],
maps:get(Name, Opts, #{}))).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Envs %% Envs

View File

@ -76,7 +76,7 @@ set_special_cfg(_App) ->
listener_confs(Type) -> listener_confs(Type) ->
Default = #{bind => 7993, acceptors => 8}, Default = #{bind => 7993, acceptors => 8},
#{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}. #{Type => #{'default' => maps:merge(Default, socketopts(Type))}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Tests cases %% Tests cases
@ -360,11 +360,11 @@ open(udp) ->
{ok, Sock} = gen_udp:open(0, ?TCPOPTS), {ok, Sock} = gen_udp:open(0, ?TCPOPTS),
{udp, Sock}; {udp, Sock};
open(ssl) -> open(ssl) ->
SslOpts = client_ssl_opts(), SslOpts = maps:to_list(client_ssl_opts()),
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts), {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
{ssl, SslSock}; {ssl, SslSock};
open(dtls) -> open(dtls) ->
SslOpts = client_ssl_opts(), SslOpts = maps:to_list(client_ssl_opts()),
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts), {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts),
{dtls, SslSock}. {dtls, SslSock}.
@ -400,51 +400,56 @@ close({dtls, Sock}) ->
%% Server-Opts %% Server-Opts
socketopts(tcp) -> socketopts(tcp) ->
[{tcp_options, tcp_opts()}]; #{tcp => tcp_opts()};
socketopts(ssl) -> socketopts(ssl) ->
[{tcp_options, tcp_opts()}, #{tcp => tcp_opts(),
{ssl_options, ssl_opts()}]; ssl => ssl_opts()};
socketopts(udp) -> socketopts(udp) ->
[{udp_options, udp_opts()}]; #{udp => udp_opts()};
socketopts(dtls) -> socketopts(dtls) ->
[{udp_options, udp_opts()}, #{udp => udp_opts(),
{dtls_options, dtls_opts()}]. dtls => dtls_opts()}.
tcp_opts() -> tcp_opts() ->
[{send_timeout, 15000}, maps:merge(
{send_timeout_close, true}, udp_opts(),
{backlog, 100}, #{send_timeout => 15000,
{nodelay, true} | udp_opts()]. send_timeout_close => true,
backlog => 100,
nodelay => true}
).
udp_opts() -> udp_opts() ->
[{recbuf, 1024}, #{recbuf => 1024,
{sndbuf, 1024}, sndbuf => 1024,
{buffer, 1024}, buffer => 1024,
{reuseaddr, true}]. reuseaddr => true}.
ssl_opts() -> ssl_opts() ->
Certs = certs("key.pem", "cert.pem", "cacert.pem"), Certs = certs("key.pem", "cert.pem", "cacert.pem"),
[{versions, emqx_tls_lib:default_versions()}, maps:merge(
{ciphers, emqx_tls_lib:default_ciphers()}, Certs,
{verify, verify_peer}, #{versions => emqx_tls_lib:default_versions(),
{fail_if_no_peer_cert, true}, ciphers => emqx_tls_lib:default_ciphers(),
{secure_renegotiate, false}, verify => verify_peer,
{reuse_sessions, true}, fail_if_no_peer_cert => true,
{honor_cipher_order, true}]++Certs. secure_renegotiate => false,
reuse_sessions => true,
honor_cipher_order => true}
).
dtls_opts() -> dtls_opts() ->
Opts = ssl_opts(), maps:merge(ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}).
lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client-Opts %% Client-Opts
client_ssl_opts() -> client_ssl_opts() ->
certs( "client-key.pem", "client-cert.pem", "cacert.pem" ). certs("client-key.pem", "client-cert.pem", "cacert.pem").
certs( Key, Cert, CACert ) -> certs(Key, Cert, CACert) ->
CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"), CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"),
[ { keyfile, filename:join([ CertsPath, Key ]) }, #{keyfile => filename:join([ CertsPath, Key ]),
{ certfile, filename:join([ CertsPath, Cert ]) }, certfile => filename:join([ CertsPath, Cert ]),
{ cacertfile, filename:join([ CertsPath, CACert ]) } ]. cacertfile => filename:join([ CertsPath, CACert])}.