452 lines
13 KiB
Erlang
452 lines
13 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_exproto_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-import(emqx_exproto_echo_svr,
|
|
[ frame_connect/2
|
|
, frame_connack/1
|
|
, frame_publish/3
|
|
, frame_puback/1
|
|
, frame_subscribe/2
|
|
, frame_suback/1
|
|
, frame_unsubscribe/1
|
|
, frame_unsuback/1
|
|
, frame_disconnect/0
|
|
]).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
-define(TCPOPTS, [binary, {active, false}]).
|
|
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Setups
|
|
%%--------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[{group, Name} || Name <- metrics()].
|
|
|
|
groups() ->
|
|
Cases = emqx_ct:all(?MODULE),
|
|
[{Name, Cases} || Name <- metrics()].
|
|
|
|
%% @private
|
|
metrics() ->
|
|
[tcp, ssl, udp, dtls].
|
|
|
|
init_per_group(GrpName, Cfg) ->
|
|
put(grpname, GrpName),
|
|
Svrs = emqx_exproto_echo_svr:start(),
|
|
emqx_ct_helpers:start_apps([emqx_exproto], fun set_sepecial_cfg/1),
|
|
emqx_logger:set_log_level(debug),
|
|
[{servers, Svrs}, {listener_type, GrpName} | Cfg].
|
|
|
|
end_per_group(_, Cfg) ->
|
|
emqx_ct_helpers:stop_apps([emqx_exproto]),
|
|
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
|
|
|
|
set_sepecial_cfg(emqx_exproto) ->
|
|
LisType = get(grpname),
|
|
Listeners = application:get_env(emqx_exproto, listeners, []),
|
|
SockOpts = socketopts(LisType),
|
|
UpgradeOpts = fun(Opts) ->
|
|
Opts2 = lists:keydelete(tcp_options, 1, Opts),
|
|
Opts3 = lists:keydelete(ssl_options, 1, Opts2),
|
|
Opts4 = lists:keydelete(udp_options, 1, Opts3),
|
|
Opts5 = lists:keydelete(dtls_options, 1, Opts4),
|
|
SockOpts ++ Opts5
|
|
end,
|
|
NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)}
|
|
|| {Proto, _Type, LisOn, Opts} <- Listeners],
|
|
application:set_env(emqx_exproto, listeners, NListeners);
|
|
set_sepecial_cfg(emqx) ->
|
|
application:set_env(emqx, allow_anonymous, true),
|
|
application:set_env(emqx, enable_acl_cache, false),
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Tests cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_start_stop(_) ->
|
|
ok.
|
|
|
|
t_mountpoint_echo(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>,
|
|
mountpoint => <<"ct/">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SubBin = frame_subscribe(<<"t/dn">>, 1),
|
|
SubAckBin = frame_suback(0),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
|
|
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>),
|
|
{ok, PubBin1} = recv(Sock, 5000),
|
|
|
|
PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>),
|
|
PubAckBin = frame_puback(0),
|
|
|
|
emqx:subscribe(<<"ct/t/up">>),
|
|
|
|
send(Sock, PubBin2),
|
|
{ok, PubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
{deliver, _, _} -> ok
|
|
after 1000 ->
|
|
error(echo_not_running)
|
|
end,
|
|
close(Sock).
|
|
|
|
t_auth_deny(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_access_control, authenticate,
|
|
fun(_) -> {error, ?RC_NOT_AUTHORIZED} end),
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(1),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SockType =/= udp andalso begin
|
|
{error, closed} = recv(Sock, 5000)
|
|
end,
|
|
meck:unload([emqx_access_control]).
|
|
|
|
t_acl_deny(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> deny end),
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SubBin = frame_subscribe(<<"t/#">>, 1),
|
|
SubAckBin = frame_suback(1),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)),
|
|
|
|
PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
|
|
PubBinFailedAck = frame_puback(1),
|
|
PubBinSuccesAck = frame_puback(0),
|
|
|
|
send(Sock, PubBin),
|
|
{ok, PubBinFailedAck} = recv(Sock, 5000),
|
|
|
|
meck:unload([emqx_access_control]),
|
|
|
|
send(Sock, PubBin),
|
|
{ok, PubBinSuccesAck} = recv(Sock, 5000),
|
|
close(Sock).
|
|
|
|
t_keepalive_timeout(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>,
|
|
keepalive => 2
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
DisconnectBin = frame_disconnect(),
|
|
{ok, DisconnectBin} = recv(Sock, 10000),
|
|
|
|
SockType =/= udp andalso begin
|
|
{error, closed} = recv(Sock, 5000)
|
|
end, ok.
|
|
|
|
t_hook_connected_disconnected(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
Parent = self(),
|
|
HookFun1 = fun(_, _) -> Parent ! connected, ok end,
|
|
HookFun2 = fun(_, _, _) -> Parent ! disconnected, ok end,
|
|
emqx:hook('client.connected', HookFun1),
|
|
emqx:hook('client.disconnected', HookFun2),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
connected -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
DisconnectBin = frame_disconnect(),
|
|
send(Sock, DisconnectBin),
|
|
|
|
receive
|
|
disconnected -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
SockType =/= udp andalso begin
|
|
{error, closed} = recv(Sock, 5000)
|
|
end,
|
|
emqx:unhook('client.connected', HookFun1),
|
|
emqx:unhook('client.disconnected', HookFun2).
|
|
|
|
t_hook_session_subscribed_unsubscribed(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
Parent = self(),
|
|
HookFun1 = fun(_, _, _) -> Parent ! subscribed, ok end,
|
|
HookFun2 = fun(_, _, _) -> Parent ! unsubscribed, ok end,
|
|
emqx:hook('session.subscribed', HookFun1),
|
|
emqx:hook('session.unsubscribed', HookFun2),
|
|
|
|
SubBin = frame_subscribe(<<"t/#">>, 1),
|
|
SubAckBin = frame_suback(0),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
subscribed -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
UnsubBin = frame_unsubscribe(<<"t/#">>),
|
|
UnsubAckBin = frame_unsuback(0),
|
|
|
|
send(Sock, UnsubBin),
|
|
{ok, UnsubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
unsubscribed -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
close(Sock),
|
|
emqx:unhook('session.subscribed', HookFun1),
|
|
emqx:unhook('session.unsubscribed', HookFun2).
|
|
|
|
t_hook_message_delivered(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SubBin = frame_subscribe(<<"t/#">>, 1),
|
|
SubAckBin = frame_suback(0),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
HookFun1 = fun(_, Msg) -> {ok, Msg#message{payload = <<"2">>}} end,
|
|
emqx:hook('message.delivered', HookFun1),
|
|
|
|
emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)),
|
|
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
|
|
{ok, PubBin1} = recv(Sock, 5000),
|
|
|
|
close(Sock),
|
|
emqx:unhook('message.delivered', HookFun1).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Utils
|
|
|
|
rand_bytes() ->
|
|
crypto:strong_rand_bytes(rand:uniform(256)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Sock funcs
|
|
|
|
open(tcp) ->
|
|
{ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS),
|
|
{tcp, Sock};
|
|
open(udp) ->
|
|
{ok, Sock} = gen_udp:open(0, ?TCPOPTS),
|
|
{udp, Sock};
|
|
open(ssl) ->
|
|
SslOpts = client_ssl_opts(),
|
|
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
|
|
{ssl, SslSock};
|
|
open(dtls) ->
|
|
SslOpts = client_ssl_opts(),
|
|
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts),
|
|
{dtls, SslSock}.
|
|
|
|
send({tcp, Sock}, Bin) ->
|
|
gen_tcp:send(Sock, Bin);
|
|
send({udp, Sock}, Bin) ->
|
|
gen_udp:send(Sock, "127.0.0.1", 7993, Bin);
|
|
send({ssl, Sock}, Bin) ->
|
|
ssl:send(Sock, Bin);
|
|
send({dtls, Sock}, Bin) ->
|
|
ssl:send(Sock, Bin).
|
|
|
|
recv({tcp, Sock}, Ts) ->
|
|
gen_tcp:recv(Sock, 0, Ts);
|
|
recv({udp, Sock}, Ts) ->
|
|
{ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts),
|
|
{ok, Bin};
|
|
recv({ssl, Sock}, Ts) ->
|
|
ssl:recv(Sock, 0, Ts);
|
|
recv({dtls, Sock}, Ts) ->
|
|
ssl:recv(Sock, 0, Ts).
|
|
|
|
close({tcp, Sock}) ->
|
|
gen_tcp:close(Sock);
|
|
close({udp, Sock}) ->
|
|
gen_udp:close(Sock);
|
|
close({ssl, Sock}) ->
|
|
ssl:close(Sock);
|
|
close({dtls, Sock}) ->
|
|
ssl:close(Sock).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Server-Opts
|
|
|
|
socketopts(tcp) ->
|
|
[{tcp_options, tcp_opts()}];
|
|
socketopts(ssl) ->
|
|
[{tcp_options, tcp_opts()},
|
|
{ssl_options, ssl_opts()}];
|
|
socketopts(udp) ->
|
|
[{udp_options, udp_opts()}];
|
|
socketopts(dtls) ->
|
|
[{udp_options, udp_opts()},
|
|
{dtls_options, dtls_opts()}].
|
|
|
|
tcp_opts() ->
|
|
[{send_timeout, 15000},
|
|
{send_timeout_close, true},
|
|
{backlog, 100},
|
|
{nodelay, true} | udp_opts()].
|
|
|
|
udp_opts() ->
|
|
[{recbuf, 1024},
|
|
{sndbuf, 1024},
|
|
{buffer, 1024},
|
|
{reuseaddr, true}].
|
|
|
|
ssl_opts() ->
|
|
Certs = certs("key.pem", "cert.pem", "cacert.pem"),
|
|
[{versions, emqx_tls_lib:default_versions()},
|
|
{ciphers, emqx_tls_lib:default_ciphers()},
|
|
{verify, verify_peer},
|
|
{fail_if_no_peer_cert, true},
|
|
{secure_renegotiate, false},
|
|
{reuse_sessions, true},
|
|
{honor_cipher_order, true}]++Certs.
|
|
|
|
dtls_opts() ->
|
|
Opts = ssl_opts(),
|
|
lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client-Opts
|
|
|
|
client_ssl_opts() ->
|
|
certs( "client-key.pem", "client-cert.pem", "cacert.pem" ).
|
|
|
|
certs( Key, Cert, CACert ) ->
|
|
CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"),
|
|
[ { keyfile, filename:join([ CertsPath, Key ]) },
|
|
{ certfile, filename:join([ CertsPath, Cert ]) },
|
|
{ cacertfile, filename:join([ CertsPath, CACert ]) } ].
|
|
|