emqx/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl

741 lines
20 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exproto_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(
emqx_exproto_echo_svr,
[
frame_connect/2,
frame_connack/1,
frame_publish/3,
frame_raw_publish/3,
frame_puback/1,
frame_subscribe/2,
frame_suback/1,
frame_unsubscribe/1,
frame_unsuback/1,
frame_disconnect/0
]
).
-define(TCPOPTS, [binary, {active, false}]).
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
%%--------------------------------------------------------------------
-define(CONF_DEFAULT, <<
"\n"
"gateway.exproto {\n"
" server.bind = 9100,\n"
" handler.address = \"http://127.0.0.1:9001\"\n"
" handler.service_name = \"ConnectionHandler\"\n"
" listeners.tcp.default {\n"
" bind = 7993,\n"
" acceptors = 8\n"
" }\n"
"}\n"
>>).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
[
{group, tcp_listener},
{group, ssl_listener},
{group, udp_listener},
{group, dtls_listener},
{group, https_grpc_server},
{group, streaming_connection_handler}
].
suite() ->
[{timetrap, {seconds, 30}}].
groups() ->
MainCases = [
t_keepalive_timeout,
t_mountpoint_echo,
t_raw_publish,
t_auth_deny,
t_acl_deny,
t_auth_expire,
t_hook_connected_disconnected,
t_hook_session_subscribed_unsubscribed,
t_hook_message_delivered
],
[
{tcp_listener, [sequence], MainCases},
{ssl_listener, [sequence], MainCases},
{udp_listener, [sequence], MainCases},
{dtls_listener, [sequence], MainCases},
{streaming_connection_handler, [sequence], MainCases},
{https_grpc_server, [sequence], MainCases}
].
init_per_group(GrpName, Cfg) when
GrpName == tcp_listener;
GrpName == ssl_listener;
GrpName == udp_listener;
GrpName == dtls_listener
->
LisType =
case GrpName of
tcp_listener -> tcp;
ssl_listener -> ssl;
udp_listener -> udp;
dtls_listener -> dtls
end,
init_per_group(LisType, 'ConnectionUnaryHandler', http, Cfg);
init_per_group(https_grpc_server, Cfg) ->
init_per_group(tcp, 'ConnectionUnaryHandler', https, Cfg);
init_per_group(streaming_connection_handler, Cfg) ->
init_per_group(tcp, 'ConnectionHandler', http, Cfg);
init_per_group(_, Cfg) ->
init_per_group(tcp, 'ConnectionUnaryHandler', http, Cfg).
init_per_group(LisType, ServiceName, Scheme, Cfg) ->
Svrs = emqx_exproto_echo_svr:start(Scheme),
Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
GWConfig = #{
server => #{bind => 9100},
idle_timeout => 5000,
mountpoint => <<"ct/">>,
handler => #{
address => Addrs,
service_name => ServiceName,
ssl_options => #{enable => Scheme == https}
},
listeners => listener_confs(LisType)
},
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_auth,
{emqx_gateway, #{
config =>
#{gateway => #{exproto => GWConfig}}
}},
emqx_gateway_exproto
],
#{work_dir => emqx_cth_suite:work_dir(Cfg)}
),
[
{servers, Svrs},
{apps, Apps},
{listener_type, LisType},
{service_name, ServiceName},
{grpc_client_scheme, Scheme}
| Cfg
].
end_per_group(_, Cfg) ->
ok = emqx_cth_suite:stop(proplists:get_value(apps, Cfg)),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
init_per_testcase(TestCase, Cfg) when
TestCase == t_enter_passive_mode
->
snabbkaffe:start_trace(),
case proplists:get_value(listener_type, Cfg) of
udp -> {skip, ignore};
_ -> Cfg
end;
init_per_testcase(_TestCase, Cfg) ->
snabbkaffe:start_trace(),
Cfg.
end_per_testcase(_TestCase, _Cfg) ->
snabbkaffe:stop(),
ok.
listener_confs(Type) ->
Default = #{
bind => 7993,
max_connections => 64,
access_rules => ["allow all"]
},
#{Type => #{'default' => maps:merge(Default, socketopts(Type))}}.
default_config() ->
?CONF_DEFAULT.
%%--------------------------------------------------------------------
%% Tests cases
%%--------------------------------------------------------------------
t_mountpoint_echo(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
%% deperated since v5.1.0, and this value will be ignored
mountpoint => <<"deperated/">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/dn">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>),
{ok, PubBin1} = recv(Sock, 5000),
PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>),
PubAckBin = frame_puback(0),
emqx:subscribe(<<"ct/t/up">>),
send(Sock, PubBin2),
{ok, PubAckBin} = recv(Sock, 5000),
receive
{deliver, _, _} -> ok
after 1000 ->
error(echo_not_running)
end,
close(Sock).
t_raw_publish(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
mountpoint => <<>>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
PubBin2 = frame_raw_publish(<<"t/up">>, 0, <<"echo">>),
PubAckBin = frame_puback(0),
%% mountpoint is not used in raw publish
emqx:subscribe(<<"t/up">>),
send(Sock, PubBin2),
{ok, PubAckBin} = recv(Sock, 5000),
receive
{deliver, _, _} -> ok
after 1000 ->
error(echo_not_running)
end,
close(Sock).
t_auth_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_gateway_ctx, [passthrough, no_history, no_link]),
ok = meck:expect(
emqx_gateway_ctx,
authenticate,
fun(_, _) -> {error, ?RC_NOT_AUTHORIZED} end
),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(1),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SockType =/= udp andalso
begin
{error, closed} = recv(Sock, 5000)
end,
meck:unload([emqx_gateway_ctx]).
t_auth_expire(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_access_control, [passthrough, no_history]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end
),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
?assertWaitEvent(
begin
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000)
end,
#{
?snk_kind := conn_process_terminated,
clientid := <<"test_client_1">>,
reason := {shutdown, expired}
},
5000
).
t_acl_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_gateway_ctx, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_gateway_ctx, authorize, fun(_, _, _, _) -> deny end),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(1),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
PubBinFailedAck = frame_puback(1),
PubBinSuccesAck = frame_puback(0),
send(Sock, PubBin),
{ok, PubBinFailedAck} = recv(Sock, 5000),
meck:unload([emqx_gateway_ctx]),
send(Sock, PubBin),
{ok, PubBinSuccesAck} = recv(Sock, 5000),
close(Sock).
t_keepalive_timeout(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
ClientId1 = <<"keepalive_test_client1">>,
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => ClientId1,
keepalive => 5
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock),
case SockType of
udp ->
%% another udp client should not affect the first
%% udp client keepalive check
timer:sleep(4000),
Sock2 = open(SockType),
ConnBin2 = frame_connect(
Client#{clientid => <<"keepalive_test_client2">>},
Password
),
send(Sock2, ConnBin2),
%% first client will be keepalive timeouted in 6s
?assertMatch(
{ok, #{
clientid := ClientId1,
reason := {shutdown, keepalive_timeout}
}},
?block_until(#{?snk_kind := conn_process_terminated}, 8000)
);
_ ->
?assertMatch(
{ok, #{
clientid := ClientId1,
reason := {shutdown, keepalive_timeout}
}},
?block_until(#{?snk_kind := conn_process_terminated}, 12000)
),
Trace = snabbkaffe:collect_trace(),
%% conn process should be terminated
?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
%% socket port should be closed
?assertEqual({error, closed}, recv(Sock, 5000))
end.
t_hook_connected_disconnected(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
Parent = self(),
emqx_hooks:add('client.connected', {?MODULE, hook_fun1, [Parent]}, 1000),
emqx_hooks:add('client.disconnected', {?MODULE, hook_fun2, [Parent]}, 1000),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
receive
connected -> ok
after 1000 ->
error(hook_is_not_running)
end,
DisconnectBin = frame_disconnect(),
send(Sock, DisconnectBin),
receive
disconnected -> ok
after 1000 ->
error(hook_is_not_running)
end,
SockType =/= udp andalso
begin
{error, closed} = recv(Sock, 5000)
end,
emqx_hooks:del('client.connected', {?MODULE, hook_fun1}),
emqx_hooks:del('client.disconnected', {?MODULE, hook_fun2}).
t_hook_session_subscribed_unsubscribed(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
Parent = self(),
emqx_hooks:add('session.subscribed', {?MODULE, hook_fun3, [Parent]}, 1000),
emqx_hooks:add('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}, 1000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
receive
subscribed -> ok
after 1000 ->
error(hook_is_not_running)
end,
UnsubBin = frame_unsubscribe(<<"t/#">>),
UnsubAckBin = frame_unsuback(0),
send(Sock, UnsubBin),
{ok, UnsubAckBin} = recv(Sock, 5000),
receive
unsubscribed -> ok
after 1000 ->
error(hook_is_not_running)
end,
send(Sock, frame_disconnect()),
close(Sock),
emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}),
emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}).
t_hook_message_delivered(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx_hooks:add('message.delivered', {?MODULE, hook_fun5, []}, 1000),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"1">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
{ok, PubBin1} = recv(Sock, 5000),
close(Sock),
emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
t_idle_timeout(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
%% need to create udp client by sending something
case SockType of
udp ->
%% nothing to do
ok = meck:new(emqx_exproto_gcli, [passthrough, no_history]),
ok = meck:expect(
emqx_exproto_gcli,
async_call,
fun(FunName, _Req, _GClient) ->
self() ! {hreply, FunName, ok},
ok
end
),
%% send request, but nobody can respond to it
ClientId = <<"idle_test_client1">>,
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => ClientId,
keepalive => 5
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
send(Sock, ConnBin),
?assertMatch(
{ok, #{reason := {shutdown, idle_timeout}}},
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
),
ok = meck:unload(emqx_exproto_gcli);
_ ->
?assertMatch(
{ok, #{reason := {shutdown, idle_timeout}}},
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
)
end.
%%--------------------------------------------------------------------
%% Utils
hook_fun1(_, _, Parent) ->
Parent ! connected,
ok.
hook_fun2(_, _, _, Parent) ->
Parent ! disconnected,
ok.
hook_fun3(_, _, _, Parent) ->
Parent ! subscribed,
ok.
hook_fun4(_, _, _, Parent) ->
Parent ! unsubscribed,
ok.
hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}.
rand_bytes() ->
crypto:strong_rand_bytes(rand:uniform(256)).
%%--------------------------------------------------------------------
%% Sock funcs
open(tcp) ->
{ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS),
{tcp, Sock};
open(udp) ->
{ok, Sock} = gen_udp:open(0, ?TCPOPTS),
{udp, Sock};
open(ssl) ->
SslOpts = client_ssl_opts(),
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
{ssl, SslSock};
open(dtls) ->
SslOpts = client_ssl_opts(),
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts),
{dtls, SslSock}.
send({tcp, Sock}, Bin) ->
gen_tcp:send(Sock, Bin);
send({udp, Sock}, Bin) ->
gen_udp:send(Sock, "127.0.0.1", 7993, Bin);
send({ssl, Sock}, Bin) ->
ssl:send(Sock, Bin);
send({dtls, Sock}, Bin) ->
ssl:send(Sock, Bin).
recv(Sock) ->
recv(Sock, infinity).
recv({tcp, Sock}, Ts) ->
gen_tcp:recv(Sock, 0, Ts);
recv({udp, Sock}, Ts) ->
{ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts),
{ok, Bin};
recv({ssl, Sock}, Ts) ->
ssl:recv(Sock, 0, Ts);
recv({dtls, Sock}, Ts) ->
ssl:recv(Sock, 0, Ts).
close({tcp, Sock}) ->
gen_tcp:close(Sock);
close({udp, Sock}) ->
gen_udp:close(Sock);
close({ssl, Sock}) ->
ssl:close(Sock);
close({dtls, Sock}) ->
ssl:close(Sock).
%%--------------------------------------------------------------------
%% Server-Opts
socketopts(tcp) ->
#{
acceptors => 8,
tcp_options => tcp_opts()
};
socketopts(ssl) ->
#{
acceptors => 8,
tcp_options => tcp_opts(),
ssl_options => ssl_opts()
};
socketopts(udp) ->
#{udp_options => udp_opts()};
socketopts(dtls) ->
#{
acceptors => 8,
udp_options => udp_opts(),
dtls_options => dtls_opts()
}.
tcp_opts() ->
maps:merge(
udp_opts(),
#{
send_timeout => 15000,
send_timeout_close => true,
backlog => 100,
nodelay => true
}
).
udp_opts() ->
#{
%% NOTE
%% Making those too small will lead to inability to accept connections.
recbuf => 2048,
sndbuf => 2048,
buffer => 2048,
reuseaddr => true
}.
ssl_opts() ->
Certs = certs("key.pem", "cert.pem", "cacert.pem"),
maps:merge(
Certs,
#{
versions => emqx_tls_lib:available_versions(tls),
ciphers => [],
verify => verify_peer,
fail_if_no_peer_cert => true,
secure_renegotiate => false,
reuse_sessions => true,
honor_cipher_order => true
}
).
dtls_opts() ->
maps:merge(ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}).
%%--------------------------------------------------------------------
%% Client-Opts
client_ssl_opts() ->
OptsWithCerts = certs("client-key.pem", "client-cert.pem", "cacert.pem"),
[{verify, verify_none} | maps:to_list(OptsWithCerts)].
certs(Key, Cert, CACert) ->
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
#{
keyfile => filename:join([CertsPath, Key]),
certfile => filename:join([CertsPath, Cert]),
cacertfile => filename:join([CertsPath, CACert])
}.