Merge pull request #12180 from keynslug/fix/gw-dtls-opts

fix(gw): use more conservative set of DTLS options
This commit is contained in:
Andrew Mayorov 2023-12-20 15:24:56 +01:00 committed by GitHub
commit f39af14524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 566 additions and 132 deletions

View File

@ -65,6 +65,11 @@
select_free_port/1 select_free_port/1
]). ]).
-export([
ssl_verify_fun_allow_any_host/0,
ssl_verify_fun_allow_any_host_impl/3
]).
-export([ -export([
emqx_cluster/1, emqx_cluster/1,
emqx_cluster/2, emqx_cluster/2,
@ -1421,3 +1426,24 @@ group_path(Config) ->
_:_ -> _:_ ->
[] []
end. end.
%% almost verify_none equivalent, but only ignores 'hostname_check_failed'
ssl_verify_fun_allow_any_host_impl(_Cert, Event, State) ->
case Event of
valid ->
{valid, State};
valid_peer ->
{valid, State};
{bad_cert, hostname_check_failed} ->
{valid, State};
{bad_cert, _} ->
{fail, Event};
{extension, _} ->
{unknown, State}
end.
ssl_verify_fun_allow_any_host() ->
[
{verify, verify_peer},
{verify_fun, {fun ?MODULE:ssl_verify_fun_allow_any_host_impl/3, _State = #{}}}
].

View File

@ -58,7 +58,6 @@
-module(emqx_cth_suite). -module(emqx_cth_suite).
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_access_control.hrl").
-export([start/2]). -export([start/2]).
-export([stop/1]). -export([stop/1]).

View File

@ -0,0 +1,339 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cth_tls).
-include_lib("public_key/include/public_key.hrl").
-export([gen_cert/1]).
-export([write_cert/2]).
-export([write_cert/3]).
-export([write_pem/2]).
%% -------------------------------------------------------------------
%% Certificate Issuing
%% Heavily inspired by: ${ERL_SRC}/lib/public_key/test/erl_make_certs.erl
%% -------------------------------------------------------------------
-type pem_entry() :: public_key:pem_entry().
-type certificate() :: pem_entry().
-type private_key() :: pem_entry().
-type cert_subject() :: #{
name => string(),
email => string(),
city => string(),
state => string(),
org => string(),
org_unit => string(),
country => string(),
serial => string(),
title => string(),
dnQualifer => string()
}.
-type cert_validity() ::
{_From :: calendar:date(), _To :: calendar:date()}.
-type cert_extensions() :: #{
basic_constraints => false | ca | _PathLenContraint :: pos_integer(),
key_usage => false | certsign
}.
%% @doc Generate a certificate and a private key.
%% If you need root (CA) certificate, use `root` as `issuer` option. By default, the
%% generated certificate will have according extensions (constraints, key usage, etc).
%% Once root certificate + private key pair is generated, you can use the result
%% as `issuer` option to generate other certificates signed by this root.
-spec gen_cert(Opts) -> {certificate(), private_key()} when
Opts :: #{
key := ec | rsa | PrivKeyIn,
issuer := root | {CertificateIn, PrivKeyIn},
subject => cert_subject(),
validity => cert_validity(),
extensions => cert_extensions() | false
},
CertificateIn :: certificate() | public_key:der_encoded() | #'OTPCertificate'{},
PrivKeyIn :: private_key() | _PEM :: binary().
gen_cert(Opts) ->
SubjectPrivateKey = get_privkey(Opts),
{TBSCert, IssuerKey} = make_tbs(SubjectPrivateKey, Opts),
Cert = public_key:pkix_sign(TBSCert, IssuerKey),
true = verify_signature(Cert, IssuerKey),
{encode_cert(Cert), encode_privkey(SubjectPrivateKey)}.
get_privkey(#{key := Algo}) when is_atom(Algo) ->
gen_privkey(Algo);
get_privkey(#{key := Key}) ->
decode_privkey(Key).
make_tbs(SubjectKey, Opts) ->
{Issuer, IssuerKey} = issuer(Opts, SubjectKey),
Subject =
case Opts of
#{issuer := root} ->
Issuer;
#{} ->
subject(Opts)
end,
{
#'OTPTBSCertificate'{
version = v3,
serialNumber = rand:uniform(1000000000000),
signature = sign_algorithm(IssuerKey, Opts),
issuer = Issuer,
validity = validity(Opts),
subject = Subject,
subjectPublicKeyInfo = publickey(SubjectKey),
extensions = extensions(Opts)
},
IssuerKey
}.
issuer(Opts = #{issuer := root}, SubjectKey) ->
%% Self signed
{subject(Opts), SubjectKey};
issuer(#{issuer := {Issuer, IssuerKey}}, _SubjectKey) ->
{issuer_subject(Issuer), decode_privkey(IssuerKey)}.
issuer_subject({'Certificate', IssuerDer, _}) when is_binary(IssuerDer) ->
issuer_subject(IssuerDer);
issuer_subject(IssuerDer) when is_binary(IssuerDer) ->
issuer_subject(public_key:pkix_decode_cert(IssuerDer, otp));
issuer_subject(#'OTPCertificate'{tbsCertificate = #'OTPTBSCertificate'{subject = Subject}}) ->
Subject.
subject(Opts = #{}) ->
Subject = maps:get(subject, Opts, #{}),
Entries = maps:map(
fun(N, V) -> [subject_entry(N, V)] end,
maps:merge(default_subject(Opts), Subject)
),
{rdnSequence, maps:values(Entries)}.
subject_entry(name, Name) ->
typed_attr(?'id-at-commonName', {printableString, Name});
subject_entry(email, Email) ->
typed_attr(?'id-emailAddress', Email);
subject_entry(city, City) ->
typed_attr(?'id-at-localityName', {printableString, City});
subject_entry(state, State) ->
typed_attr(?'id-at-stateOrProvinceName', {printableString, State});
subject_entry(org, Org) ->
typed_attr(?'id-at-organizationName', {printableString, Org});
subject_entry(org_unit, OrgUnit) ->
typed_attr(?'id-at-organizationalUnitName', {printableString, OrgUnit});
subject_entry(country, Country) ->
typed_attr(?'id-at-countryName', Country);
subject_entry(serial, Serial) ->
typed_attr(?'id-at-serialNumber', Serial);
subject_entry(title, Title) ->
typed_attr(?'id-at-title', {printableString, Title});
subject_entry(dnQualifer, DnQ) ->
typed_attr(?'id-at-dnQualifier', DnQ).
subject_info(Info, Subject, Default) ->
case subject_info(Info, Subject) of
undefined -> Default;
Value -> Value
end.
subject_info(Info, {rdnSequence, Entries}) ->
subject_info(Info, Entries);
subject_info(name, Entries) when is_list(Entries) ->
get_string(find_subject_entry(?'id-at-commonName', Entries));
subject_info(org, Entries) when is_list(Entries) ->
get_string(find_subject_entry(?'id-at-organizationName', Entries));
subject_info(org_unit, Entries) when is_list(Entries) ->
get_string(find_subject_entry(?'id-at-organizationalUnitName', Entries));
subject_info(country, Entries) when is_list(Entries) ->
find_subject_entry(?'id-at-countryName', Entries).
find_subject_entry(Oid, Entries) ->
emqx_maybe:from_list([
Value
|| Attrs <- Entries,
#'AttributeTypeAndValue'{type = T, value = Value} <- Attrs,
T =:= Oid
]).
get_string({printableString, String}) ->
String;
get_string(undefined) ->
undefined.
typed_attr(Type, Value) ->
#'AttributeTypeAndValue'{type = Type, value = Value}.
sign_algorithm(#'ECPrivateKey'{parameters = Parms}, _Opts) ->
#'SignatureAlgorithm'{
algorithm = ?'ecdsa-with-SHA256',
parameters = Parms
}.
validity(Opts) ->
{From, To} = maps:get(validity, Opts, default_validity()),
#'Validity'{
notBefore = {generalTime, format_date(From)},
notAfter = {generalTime, format_date(To)}
}.
publickey(#'ECPrivateKey'{parameters = Params, publicKey = PubKey}) ->
#'OTPSubjectPublicKeyInfo'{
algorithm = #'PublicKeyAlgorithm'{
algorithm = ?'id-ecPublicKey',
parameters = Params
},
subjectPublicKey = #'ECPoint'{point = PubKey}
}.
extensions(#{extensions := false}) ->
asn1_NOVALUE;
extensions(Opts) ->
Exts = maps:get(extensions, Opts, #{}),
Default = default_extensions(Opts),
maps:fold(
fun(Name, Data, Acc) -> Acc ++ extension(Name, Data) end,
[],
maps:merge(Default, Exts)
).
extension(basic_constraints, false) ->
[];
extension(basic_constraints, ca) ->
[
#'Extension'{
extnID = ?'id-ce-basicConstraints',
extnValue = #'BasicConstraints'{cA = true},
critical = true
}
];
extension(basic_constraints, Len) when is_integer(Len) ->
[
#'Extension'{
extnID = ?'id-ce-basicConstraints',
extnValue = #'BasicConstraints'{cA = true, pathLenConstraint = Len},
critical = true
}
];
extension(key_usage, false) ->
[];
extension(key_usage, certsign) ->
[
#'Extension'{
extnID = ?'id-ce-keyUsage',
extnValue = [keyCertSign],
critical = true
}
].
default_validity() ->
{shift_date(date(), -1), shift_date(date(), +7)}.
default_subject(#{issuer := root}) ->
#{
name => "RootCA",
org => "EMQ",
org_unit => "EMQX",
country => "CN"
};
default_subject(#{}) ->
#{
name => "Server",
org => "EMQ",
org_unit => "EMQX",
country => "CN"
}.
default_extensions(#{issuer := root}) ->
#{
basic_constraints => ca,
key_usage => certsign
};
default_extensions(#{}) ->
#{}.
%% -------------------------------------------------------------------
verify_signature(CertDer, #'ECPrivateKey'{parameters = Params, publicKey = PubKey}) ->
public_key:pkix_verify(CertDer, {#'ECPoint'{point = PubKey}, Params});
verify_signature(CertDer, KeyPem) ->
verify_signature(CertDer, decode_privkey(KeyPem)).
%% -------------------------------------------------------------------
gen_privkey(ec) ->
public_key:generate_key({namedCurve, secp256k1});
gen_privkey(rsa) ->
public_key:generate_key({rsa, 2048, 17}).
decode_privkey(#'ECPrivateKey'{} = Key) ->
Key;
decode_privkey(#'RSAPrivateKey'{} = Key) ->
Key;
decode_privkey(PemEntry = {_, _, _}) ->
public_key:pem_entry_decode(PemEntry);
decode_privkey(PemBinary) when is_binary(PemBinary) ->
[KeyInfo] = public_key:pem_decode(PemBinary),
decode_privkey(KeyInfo).
-spec encode_privkey(#'ECPrivateKey'{} | #'RSAPrivateKey'{}) -> private_key().
encode_privkey(Key = #'ECPrivateKey'{}) ->
{ok, Der} = 'OTP-PUB-KEY':encode('ECPrivateKey', Key),
{'ECPrivateKey', Der, not_encrypted};
encode_privkey(Key = #'RSAPrivateKey'{}) ->
{ok, Der} = 'OTP-PUB-KEY':encode('RSAPrivateKey', Key),
{'RSAPrivateKey', Der, not_encrypted}.
-spec encode_cert(public_key:der_encoded()) -> certificate().
encode_cert(Der) ->
{'Certificate', Der, not_encrypted}.
%% -------------------------------------------------------------------
shift_date(Date, Offset) ->
calendar:gregorian_days_to_date(calendar:date_to_gregorian_days(Date) + Offset).
format_date({Y, M, D}) ->
lists:flatten(io_lib:format("~w~2..0w~2..0w000000Z", [Y, M, D])).
%% -------------------------------------------------------------------
%% @doc Write certificate + private key pair to respective files.
%% Files are created in the given directory. The filenames are derived
%% from the subject information in the certificate.
-spec write_cert(_Dir :: file:name(), {certificate(), private_key()}) ->
{file:name(), file:name()}.
write_cert(Dir, {Cert, Key}) ->
Subject = issuer_subject(Cert),
Filename = subject_info(org, Subject, "ORG") ++ "." ++ subject_info(name, Subject, "XXX"),
write_cert(Dir, Filename, {Cert, Key}).
-spec write_cert(_Dir :: file:name(), _Prefix :: string(), {certificate(), private_key()}) ->
{file:name(), file:name()}.
write_cert(Dir, Filename, {Cert, Key}) ->
Certfile = filename:join(Dir, Filename ++ ".crt"),
Keyfile = filename:join(Dir, Filename ++ ".key"),
ok = write_pem(Certfile, Cert),
ok = write_pem(Keyfile, Key),
{Certfile, Keyfile}.
-spec write_pem(file:name(), pem_entry() | [pem_entry()]) ->
ok | {error, file:posix()}.
write_pem(Name, Entries = [_ | _]) ->
file:write_file(Name, public_key:pem_encode(Entries));
write_pem(Name, Entry) ->
write_pem(Name, [Entry]).

View File

@ -174,7 +174,7 @@ fields(dtls_opts) ->
reuse_sessions => true, reuse_sessions => true,
versions => dtls_all_available versions => dtls_all_available
}, },
false _IsRanchListener = false
). ).
desc(gateway) -> desc(gateway) ->

View File

@ -273,7 +273,7 @@ merge_default(Udp, Options) ->
udp -> udp ->
{udp_options, default_udp_options()}; {udp_options, default_udp_options()};
dtls -> dtls ->
{udp_options, default_udp_options()}; {dtls_options, default_udp_options()};
tcp -> tcp ->
{tcp_options, default_tcp_options()}; {tcp_options, default_tcp_options()};
ssl -> ssl ->
@ -525,9 +525,11 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) ->
udp -> udp ->
Opts2#{udp_options => sock_opts(udp_options, Opts0)}; Opts2#{udp_options => sock_opts(udp_options, Opts0)};
dtls -> dtls ->
UDPOpts = sock_opts(udp_options, Opts0),
DTLSOpts = ssl_opts(dtls_options, Opts0),
Opts2#{ Opts2#{
udp_options => sock_opts(udp_options, Opts0), udp_options => UDPOpts,
dtls_options => ssl_opts(dtls_options, Opts0) dtls_options => DTLSOpts
} }
end end
). ).
@ -541,12 +543,37 @@ sock_opts(Name, Opts) ->
). ).
ssl_opts(Name, Opts) -> ssl_opts(Name, Opts) ->
Type = SSLOpts = maps:get(Name, Opts, #{}),
case Name of emqx_utils:run_fold(
ssl_options -> tls; [
dtls_options -> dtls fun ssl_opts_crl_config/2,
end, fun ssl_opts_drop_unsupported/2,
emqx_tls_lib:to_server_opts(Type, maps:get(Name, Opts, #{})). fun ssl_server_opts/2
],
SSLOpts,
Name
).
ssl_opts_crl_config(#{enable_crl_check := true} = SSLOpts, _Name) ->
HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
NSSLOpts = maps:remove(enable_crl_check, SSLOpts),
NSSLOpts#{
%% `crl_check => true' doesn't work
crl_check => peer,
crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
};
ssl_opts_crl_config(SSLOpts, _Name) ->
%% NOTE: Removing this because DTLS doesn't like any unknown options.
maps:remove(enable_crl_check, SSLOpts).
ssl_opts_drop_unsupported(SSLOpts, _Name) ->
%% TODO: Support OCSP stapling
maps:without([ocsp], SSLOpts).
ssl_server_opts(SSLOpts, ssl_options) ->
emqx_tls_lib:to_server_opts(tls, SSLOpts);
ssl_server_opts(SSLOpts, dtls_options) ->
emqx_tls_lib:to_server_opts(dtls, SSLOpts).
ranch_opts(Type, ListenOn, Opts) -> ranch_opts(Type, ListenOn, Opts) ->
NumAcceptors = maps:get(acceptors, Opts, 4), NumAcceptors = maps:get(acceptors, Opts, 4),
@ -635,7 +662,7 @@ default_tcp_options() ->
]. ].
default_udp_options() -> default_udp_options() ->
[binary]. [].
default_subopts() -> default_subopts() ->
%% Retain Handling %% Retain Handling

View File

@ -238,9 +238,12 @@ http_authz_config() ->
init_gateway_conf() -> init_gateway_conf() ->
ok = emqx_common_test_helpers:load_config( ok = emqx_common_test_helpers:load_config(
emqx_gateway_schema, emqx_gateway_schema,
merge_conf([X:default_config() || X <- ?CONFS], []) merge_conf(list_gateway_conf(), [])
). ).
list_gateway_conf() ->
[X:default_config() || X <- ?CONFS].
merge_conf([Conf | T], Acc) -> merge_conf([Conf | T], Acc) ->
case re:run(Conf, "\s*gateway\\.(.*)", [global, {capture, all_but_first, list}, dotall]) of case re:run(Conf, "\s*gateway\\.(.*)", [global, {capture, all_but_first, list}, dotall]) of
{match, [[Content]]} -> {match, [[Content]]} ->

View File

@ -22,7 +22,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]). -import(emqx_gateway_auth_ct, [with_resource/3]).
-define(checkMatch(Guard), -define(checkMatch(Guard),
(fun(Expr) -> (fun(Expr) ->
@ -54,40 +54,37 @@ groups() ->
emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS). emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS).
init_per_group(AuthName, Conf) -> init_per_group(AuthName, Conf) ->
ct:pal("on group start:~p~n", [AuthName]), Apps = emqx_cth_suite:start(
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), [
emqx_gateway_auth_ct:start_auth(AuthName), emqx_conf,
timer:sleep(500), emqx_auth,
Conf. emqx_auth_http,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
{emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()}
| emqx_gateway_test_utils:all_gateway_apps()
],
#{work_dir => emqx_cth_suite:work_dir(Conf)}
),
_ = emqx_common_test_http:create_default_app(),
ok = emqx_gateway_auth_ct:start_auth(AuthName),
[{group_apps, Apps} | Conf].
end_per_group(AuthName, Conf) -> end_per_group(AuthName, Conf) ->
ct:pal("on group stop:~p~n", [AuthName]), ok = emqx_gateway_auth_ct:stop_auth(AuthName),
emqx_gateway_auth_ct:stop_auth(AuthName), _ = emqx_common_test_http:delete_default_app(),
ok = emqx_cth_suite:stop(?config(group_apps, Conf)),
Conf. Conf.
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_gateway_test_utils:load_all_gateway_apps(), {ok, Apps1} = application:ensure_all_started(grpc),
emqx_config:erase(gateway), {ok, Apps2} = application:ensure_all_started(cowboy),
init_gateway_conf(), {ok, _} = emqx_gateway_auth_ct:start(),
emqx_mgmt_api_test_util:init_suite([grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway]), [{suite_apps, Apps1 ++ Apps2} | Config].
application:ensure_all_started(cowboy),
emqx_gateway_auth_ct:start(),
timer:sleep(500),
Config.
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_gateway_auth_ct:stop(), ok = emqx_gateway_auth_ct:stop(),
emqx_config:erase(gateway), ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)),
emqx_mgmt_api_test_util:end_suite([
cowboy, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway, grpc
]),
Config.
init_per_testcase(_Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_Case, Config) ->
Config. Config.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -22,7 +22,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]). -import(emqx_gateway_auth_ct, [with_resource/3]).
-define(checkMatch(Guard), -define(checkMatch(Guard),
(fun(Expr) -> (fun(Expr) ->
@ -54,44 +54,33 @@ groups() ->
emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS). emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS).
init_per_group(AuthName, Conf) -> init_per_group(AuthName, Conf) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Apps = emqx_cth_suite:start(
ok = emqx_authz_test_lib:reset_authorizers(), [
emqx_gateway_auth_ct:start_auth(AuthName), {emqx_conf, "authorization { no_match = deny, cache { enable = false } }"},
timer:sleep(500), emqx_auth,
Conf. emqx_auth_http,
{emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()}
| emqx_gateway_test_utils:all_gateway_apps()
],
#{work_dir => emqx_cth_suite:work_dir(Conf)}
),
ok = emqx_gateway_auth_ct:start_auth(AuthName),
[{group_apps, Apps} | Conf].
end_per_group(AuthName, Conf) -> end_per_group(AuthName, Conf) ->
emqx_gateway_auth_ct:stop_auth(AuthName), ok = emqx_gateway_auth_ct:stop_auth(AuthName),
ok = emqx_cth_suite:stop(?config(group_apps, Conf)),
Conf. Conf.
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_config:erase(gateway), {ok, Apps1} = application:ensure_all_started(grpc),
emqx_gateway_test_utils:load_all_gateway_apps(), {ok, Apps2} = application:ensure_all_started(cowboy),
init_gateway_conf(), {ok, _} = emqx_gateway_auth_ct:start(),
emqx_mgmt_api_test_util:init_suite([ [{suite_apps, Apps1 ++ Apps2} | Config].
grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway
]),
meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_authz_file, create, fun(S) -> S end),
application:ensure_all_started(cowboy),
emqx_gateway_auth_ct:start(),
Config.
end_per_suite(Config) -> end_per_suite(Config) ->
meck:unload(emqx_authz_file), ok = emqx_gateway_auth_ct:stop(),
emqx_gateway_auth_ct:stop(), ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)),
ok = emqx_authz_test_lib:restore_authorizers(),
emqx_config:erase(gateway),
emqx_mgmt_api_test_util:end_suite([
emqx_gateway, emqx_auth_http, emqx_auth, emqx_conf, grpc
]),
Config.
init_per_testcase(_Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_Case, Config) ->
Config. Config.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -103,12 +103,18 @@ assert_fields_exist(Ks, Map) ->
end, end,
Ks Ks
). ).
load_all_gateway_apps() -> load_all_gateway_apps() ->
application:load(emqx_gateway_stomp), emqx_cth_suite:load_apps(all_gateway_apps()).
application:load(emqx_gateway_mqttsn),
application:load(emqx_gateway_coap), all_gateway_apps() ->
application:load(emqx_gateway_lwm2m), [
application:load(emqx_gateway_exproto). emqx_gateway_stomp,
emqx_gateway_mqttsn,
emqx_gateway_coap,
emqx_gateway_lwm2m,
emqx_gateway_exproto
].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% http %% http

View File

@ -20,7 +20,6 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -44,14 +43,6 @@
-define(TCPOPTS, [binary, {active, false}]). -define(TCPOPTS, [binary, {active, false}]).
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
-define(PORT, 7993).
-define(DEFAULT_CLIENT, #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(CONF_DEFAULT, << -define(CONF_DEFAULT, <<
"\n" "\n"
@ -126,15 +117,33 @@ init_per_group(_, Cfg) ->
init_per_group(LisType, ServiceName, Scheme, Cfg) -> init_per_group(LisType, ServiceName, Scheme, Cfg) ->
Svrs = emqx_exproto_echo_svr:start(Scheme), Svrs = emqx_exproto_echo_svr:start(Scheme),
application:load(emqx_gateway_exproto), Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
emqx_common_test_helpers:start_apps( GWConfig = #{
[emqx_conf, emqx_auth, emqx_gateway], server => #{bind => 9100},
fun(App) -> idle_timeout => 5000,
set_special_cfg(App, LisType, ServiceName, Scheme) mountpoint => <<"ct/">>,
end handler => #{
address => Addrs,
service_name => ServiceName,
ssl_options => #{enable => Scheme == https}
},
listeners => listener_confs(LisType)
},
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_auth,
{emqx_gateway, #{
config =>
#{gateway => #{exproto => GWConfig}}
}},
emqx_gateway_exproto
],
#{work_dir => emqx_cth_suite:work_dir(Cfg)}
), ),
[ [
{servers, Svrs}, {servers, Svrs},
{apps, Apps},
{listener_type, LisType}, {listener_type, LisType},
{service_name, ServiceName}, {service_name, ServiceName},
{grpc_client_scheme, Scheme} {grpc_client_scheme, Scheme}
@ -142,8 +151,7 @@ init_per_group(LisType, ServiceName, Scheme, Cfg) ->
]. ].
end_per_group(_, Cfg) -> end_per_group(_, Cfg) ->
emqx_config:erase(gateway), ok = emqx_cth_suite:stop(proplists:get_value(apps, Cfg)),
emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_auth, emqx_conf]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
init_per_testcase(TestCase, Cfg) when init_per_testcase(TestCase, Cfg) when
@ -159,27 +167,12 @@ init_per_testcase(_TestCase, Cfg) ->
end_per_testcase(_TestCase, _Cfg) -> end_per_testcase(_TestCase, _Cfg) ->
ok. ok.
set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) ->
Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
emqx_config:put(
[gateway, exproto],
#{
server => #{bind => 9100},
idle_timeout => 5000,
mountpoint => <<"ct/">>,
handler => #{
address => Addrs,
service_name => ServiceName,
ssl_options => #{enable => Scheme == https}
},
listeners => listener_confs(LisType)
}
);
set_special_cfg(_, _, _, _) ->
ok.
listener_confs(Type) -> listener_confs(Type) ->
Default = #{bind => 7993, acceptors => 8}, Default = #{
bind => 7993,
max_connections => 64,
access_rules => ["allow all"]
},
#{Type => #{'default' => maps:merge(Default, socketopts(Type))}}. #{Type => #{'default' => maps:merge(Default, socketopts(Type))}}.
default_config() -> default_config() ->
@ -636,9 +629,13 @@ close({dtls, Sock}) ->
%% Server-Opts %% Server-Opts
socketopts(tcp) -> socketopts(tcp) ->
#{tcp_options => tcp_opts()}; #{
acceptors => 8,
tcp_options => tcp_opts()
};
socketopts(ssl) -> socketopts(ssl) ->
#{ #{
acceptors => 8,
tcp_options => tcp_opts(), tcp_options => tcp_opts(),
ssl_options => ssl_opts() ssl_options => ssl_opts()
}; };
@ -646,6 +643,7 @@ socketopts(udp) ->
#{udp_options => udp_opts()}; #{udp_options => udp_opts()};
socketopts(dtls) -> socketopts(dtls) ->
#{ #{
acceptors => 8,
udp_options => udp_opts(), udp_options => udp_opts(),
dtls_options => dtls_opts() dtls_options => dtls_opts()
}. }.

View File

@ -66,7 +66,6 @@
-elvis([{elvis_style, dont_repeat_yourself, disable}]). -elvis([{elvis_style, dont_repeat_yourself, disable}]).
-define(CONF_DEFAULT, << -define(CONF_DEFAULT, <<
"\n"
"gateway.mqttsn {\n" "gateway.mqttsn {\n"
" gateway_id = 1\n" " gateway_id = 1\n"
" broadcast = true\n" " broadcast = true\n"
@ -89,6 +88,20 @@
"}\n" "}\n"
>>). >>).
-define(CONF_DTLS, <<
"\n"
"gateway.mqttsn {"
" listeners.dtls.default {\n"
" bind = 1885\n"
" dtls_options {\n"
" cacertfile = \"${cacertfile}\"\n"
" certfile = \"${certfile}\"\n"
" keyfile = \"${keyfile}\"\n"
" }\n"
" }\n"
"}\n"
>>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -97,9 +110,22 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
PrivDir = ?config(priv_dir, Config),
Root = emqx_cth_tls:gen_cert(#{key => ec, issuer => root}),
Server = emqx_cth_tls:gen_cert(#{key => ec, issuer => Root}),
{CACertfile, _} = emqx_cth_tls:write_cert(PrivDir, Root),
{Certfile, Keyfile} = emqx_cth_tls:write_cert(PrivDir, Server),
Conf = emqx_template:render_strict(
emqx_template:parse([?CONF_DEFAULT, ?CONF_DTLS]),
#{
cacertfile => CACertfile,
certfile => Certfile,
keyfile => Keyfile
}
),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[ [
{emqx_conf, ?CONF_DEFAULT}, {emqx_conf, Conf},
emqx_gateway, emqx_gateway,
emqx_auth, emqx_auth,
emqx_management, emqx_management,
@ -108,7 +134,7 @@ init_per_suite(Config) ->
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
emqx_common_test_http:create_default_app(), emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config]. [{suite_apps, Apps}, {cacertfile, CACertfile} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
{ok, _} = emqx:remove_config([gateway, mqttsn]), {ok, _} = emqx:remove_config([gateway, mqttsn]),
@ -191,6 +217,25 @@ t_first_disconnect(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
t_connect_dtls(Config) ->
SockName = {'mqttsn:dtls:default', 1885},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
ClientOpts = [
binary,
{active, false},
{protocol, dtls},
{cacertfile, ?config(cacertfile, Config)}
| emqx_common_test_helpers:ssl_verify_fun_allow_any_host()
],
{ok, Socket} = ssl:connect(?HOST, 1885, ClientOpts, 1000),
ok = ssl:send(Socket, make_connect_msg(<<"client_id_test1">>, 1)),
?assertEqual({ok, <<3, ?SN_CONNACK, 0>>}, ssl:recv(Socket, 0, 1000)),
ok = ssl:send(Socket, make_disconnect_msg(undefined)),
?assertEqual({ok, <<2, ?SN_DISCONNECT>>}, ssl:recv(Socket, 0, 1000)),
ssl:close(Socket).
t_subscribe(_) -> t_subscribe(_) ->
Dup = 0, Dup = 0,
QoS = 0, QoS = 0,
@ -2444,10 +2489,7 @@ send_searchgw_msg(Socket) ->
Radius = 0, Radius = 0,
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>). ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
send_connect_msg(Socket, ClientId) -> make_connect_msg(ClientId, CleanSession) when
send_connect_msg(Socket, ClientId, 1).
send_connect_msg(Socket, ClientId, CleanSession) when
CleanSession == 0; CleanSession == 0;
CleanSession == 1 CleanSession == 1
-> ->
@ -2460,9 +2502,14 @@ send_connect_msg(Socket, ClientId, CleanSession) when
TopicIdType = 0, TopicIdType = 0,
ProtocolId = 1, ProtocolId = 1,
Duration = 10, Duration = 10,
Packet = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2,
<<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2, ProtocolId:8, Duration:16, ClientId/binary>>.
ProtocolId:8, Duration:16, ClientId/binary>>,
send_connect_msg(Socket, ClientId) ->
send_connect_msg(Socket, ClientId, 1).
send_connect_msg(Socket, ClientId, CleanSession) ->
Packet = make_connect_msg(ClientId, CleanSession),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
send_connect_msg_with_will(Socket, Duration, ClientId) -> send_connect_msg_with_will(Socket, Duration, ClientId) ->
@ -2724,15 +2771,17 @@ send_pingreq_msg(Socket, ClientId) ->
?LOG("send_pingreq_msg ClientId=~p", [ClientId]), ?LOG("send_pingreq_msg ClientId=~p", [ClientId]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket). ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket).
send_disconnect_msg(Socket, Duration) -> make_disconnect_msg(Duration) ->
Length = 2, Length = 2,
Length2 = 4, Length2 = 4,
MsgType = ?SN_DISCONNECT, MsgType = ?SN_DISCONNECT,
DisConnectPacket = case Duration of
case Duration of undefined -> <<Length:8, MsgType:8>>;
undefined -> <<Length:8, MsgType:8>>; Other -> <<Length2:8, MsgType:8, Other:16>>
Other -> <<Length2:8, MsgType:8, Other:16>> end.
end,
send_disconnect_msg(Socket, Duration) ->
DisConnectPacket = make_disconnect_msg(Duration),
?LOG("send_disconnect_msg Duration=~p", [Duration]), ?LOG("send_disconnect_msg Duration=~p", [Duration]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket). ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket).

View File

@ -0,0 +1 @@
Fix an issue where DTLS enabled MQTT-SN gateways could not be started, caused by incompatibility of default listener configuration with the DTLS implementation.