emqx/apps/emqx_gateway/test/emqx_exproto_SUITE.erl

456 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exproto_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_exproto_echo_svr,
[ frame_connect/2
, frame_connack/1
, frame_publish/3
, frame_puback/1
, frame_subscribe/2
, frame_suback/1
, frame_unsubscribe/1
, frame_unsuback/1
, frame_disconnect/0
]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(TCPOPTS, [binary, {active, false}]).
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
[{group, Name} || Name <- metrics()].
groups() ->
Cases = emqx_common_test_helpers:all(?MODULE),
[{Name, Cases} || Name <- metrics()].
%% @private
metrics() ->
[tcp, ssl, udp, dtls].
init_per_group(GrpName, Cfg) ->
put(grpname, GrpName),
Svrs = emqx_exproto_echo_svr:start(),
emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
emqx_logger:set_log_level(debug),
[{servers, Svrs}, {listener_type, GrpName} | Cfg].
end_per_group(_, Cfg) ->
emqx_common_test_helpers:stop_apps([emqx_gateway]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
set_special_cfg(emqx_gateway) ->
LisType = get(grpname),
emqx_config:put(
[gateway, exproto],
#{server => #{bind => 9100},
handler => #{address => "http://127.0.0.1:9001"},
listeners => listener_confs(LisType)
});
set_special_cfg(_App) ->
ok.
listener_confs(Type) ->
Default = #{bind => 7993, acceptors => 8},
#{Type => #{'default' => maps:merge(Default, socketopts(Type))}}.
%%--------------------------------------------------------------------
%% Tests cases
%%--------------------------------------------------------------------
t_start_stop(_) ->
ok.
t_mountpoint_echo(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
mountpoint => <<"ct/">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/dn">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>),
{ok, PubBin1} = recv(Sock, 5000),
PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>),
PubAckBin = frame_puback(0),
emqx:subscribe(<<"ct/t/up">>),
send(Sock, PubBin2),
{ok, PubAckBin} = recv(Sock, 5000),
receive
{deliver, _, _} -> ok
after 1000 ->
error(echo_not_running)
end,
close(Sock).
t_auth_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_gateway_ctx, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_gateway_ctx, authenticate,
fun(_, _) -> {error, ?RC_NOT_AUTHORIZED} end),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(1),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end,
meck:unload([emqx_gateway_ctx]).
t_acl_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_gateway_ctx, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_gateway_ctx, authorize, fun(_, _, _, _) -> deny end),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(1),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)),
PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
PubBinFailedAck = frame_puback(1),
PubBinSuccesAck = frame_puback(0),
send(Sock, PubBin),
{ok, PubBinFailedAck} = recv(Sock, 5000),
meck:unload([emqx_gateway_ctx]),
send(Sock, PubBin),
{ok, PubBinSuccesAck} = recv(Sock, 5000),
close(Sock).
t_keepalive_timeout(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
keepalive => 2
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
DisconnectBin = frame_disconnect(),
{ok, DisconnectBin} = recv(Sock, 10000),
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end, ok.
t_hook_connected_disconnected(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
Parent = self(),
emqx:hook('client.connected', {?MODULE, hook_fun1, [Parent]}),
emqx:hook('client.disconnected',{?MODULE, hook_fun2, [Parent]}),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
receive
connected -> ok
after 1000 ->
error(hook_is_not_running)
end,
DisconnectBin = frame_disconnect(),
send(Sock, DisconnectBin),
receive
disconnected -> ok
after 1000 ->
error(hook_is_not_running)
end,
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end,
emqx:unhook('client.connected', {?MODULE, hook_fun1}),
emqx:unhook('client.disconnected', {?MODULE, hook_fun2}).
t_hook_session_subscribed_unsubscribed(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
Parent = self(),
emqx:hook('session.subscribed', {?MODULE, hook_fun3, [Parent]}),
emqx:hook('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
receive
subscribed -> ok
after 1000 ->
error(hook_is_not_running)
end,
UnsubBin = frame_unsubscribe(<<"t/#">>),
UnsubAckBin = frame_unsuback(0),
send(Sock, UnsubBin),
{ok, UnsubAckBin} = recv(Sock, 5000),
receive
unsubscribed -> ok
after 1000 ->
error(hook_is_not_running)
end,
close(Sock),
emqx:unhook('session.subscribed', {?MODULE, hook_fun3}),
emqx:unhook('session.unsubscribed', {?MODULE, hook_fun4}).
t_hook_message_delivered(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:hook('message.delivered', {?MODULE, hook_fun5, []}),
emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
{ok, PubBin1} = recv(Sock, 5000),
close(Sock),
emqx:unhook('message.delivered', {?MODULE, hook_fun5}).
%%--------------------------------------------------------------------
%% Utils
hook_fun1(_, _, Parent) -> Parent ! connected, ok.
hook_fun2(_, _, _, Parent) -> Parent ! disconnected, ok.
hook_fun3(_, _, _, Parent) -> Parent ! subscribed, ok.
hook_fun4(_, _, _, Parent) -> Parent ! unsubscribed, ok.
hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}.
rand_bytes() ->
crypto:strong_rand_bytes(rand:uniform(256)).
%%--------------------------------------------------------------------
%% Sock funcs
open(tcp) ->
{ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS),
{tcp, Sock};
open(udp) ->
{ok, Sock} = gen_udp:open(0, ?TCPOPTS),
{udp, Sock};
open(ssl) ->
SslOpts = maps:to_list(client_ssl_opts()),
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
{ssl, SslSock};
open(dtls) ->
SslOpts = maps:to_list(client_ssl_opts()),
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts),
{dtls, SslSock}.
send({tcp, Sock}, Bin) ->
gen_tcp:send(Sock, Bin);
send({udp, Sock}, Bin) ->
gen_udp:send(Sock, "127.0.0.1", 7993, Bin);
send({ssl, Sock}, Bin) ->
ssl:send(Sock, Bin);
send({dtls, Sock}, Bin) ->
ssl:send(Sock, Bin).
recv({tcp, Sock}, Ts) ->
gen_tcp:recv(Sock, 0, Ts);
recv({udp, Sock}, Ts) ->
{ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts),
{ok, Bin};
recv({ssl, Sock}, Ts) ->
ssl:recv(Sock, 0, Ts);
recv({dtls, Sock}, Ts) ->
ssl:recv(Sock, 0, Ts).
close({tcp, Sock}) ->
gen_tcp:close(Sock);
close({udp, Sock}) ->
gen_udp:close(Sock);
close({ssl, Sock}) ->
ssl:close(Sock);
close({dtls, Sock}) ->
ssl:close(Sock).
%%--------------------------------------------------------------------
%% Server-Opts
socketopts(tcp) ->
#{tcp => tcp_opts()};
socketopts(ssl) ->
#{tcp => tcp_opts(),
ssl => ssl_opts()};
socketopts(udp) ->
#{udp => udp_opts()};
socketopts(dtls) ->
#{udp => udp_opts(),
dtls => dtls_opts()}.
tcp_opts() ->
maps:merge(
udp_opts(),
#{send_timeout => 15000,
send_timeout_close => true,
backlog => 100,
nodelay => true}
).
udp_opts() ->
#{recbuf => 1024,
sndbuf => 1024,
buffer => 1024,
reuseaddr => true}.
ssl_opts() ->
Certs = certs("key.pem", "cert.pem", "cacert.pem"),
maps:merge(
Certs,
#{versions => emqx_tls_lib:default_versions(),
ciphers => emqx_tls_lib:default_ciphers(),
verify => verify_peer,
fail_if_no_peer_cert => true,
secure_renegotiate => false,
reuse_sessions => true,
honor_cipher_order => true}
).
dtls_opts() ->
maps:merge(ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}).
%%--------------------------------------------------------------------
%% Client-Opts
client_ssl_opts() ->
certs("client-key.pem", "client-cert.pem", "cacert.pem").
certs(Key, Cert, CACert) ->
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
#{keyfile => filename:join([ CertsPath, Key ]),
certfile => filename:join([ CertsPath, Cert ]),
cacertfile => filename:join([ CertsPath, CACert])}.