741 lines
20 KiB
Erlang
741 lines
20 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_exproto_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/asserts.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-import(
|
|
emqx_exproto_echo_svr,
|
|
[
|
|
frame_connect/2,
|
|
frame_connack/1,
|
|
frame_publish/3,
|
|
frame_raw_publish/3,
|
|
frame_puback/1,
|
|
frame_subscribe/2,
|
|
frame_suback/1,
|
|
frame_unsubscribe/1,
|
|
frame_unsuback/1,
|
|
frame_disconnect/0
|
|
]
|
|
).
|
|
|
|
-define(TCPOPTS, [binary, {active, false}]).
|
|
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
-define(CONF_DEFAULT, <<
|
|
"\n"
|
|
"gateway.exproto {\n"
|
|
" server.bind = 9100,\n"
|
|
" handler.address = \"http://127.0.0.1:9001\"\n"
|
|
" handler.service_name = \"ConnectionHandler\"\n"
|
|
" listeners.tcp.default {\n"
|
|
" bind = 7993,\n"
|
|
" acceptors = 8\n"
|
|
" }\n"
|
|
"}\n"
|
|
>>).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Setups
|
|
%%--------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[
|
|
{group, tcp_listener},
|
|
{group, ssl_listener},
|
|
{group, udp_listener},
|
|
{group, dtls_listener},
|
|
{group, https_grpc_server},
|
|
{group, streaming_connection_handler}
|
|
].
|
|
|
|
suite() ->
|
|
[{timetrap, {seconds, 30}}].
|
|
|
|
groups() ->
|
|
MainCases = [
|
|
t_keepalive_timeout,
|
|
t_mountpoint_echo,
|
|
t_raw_publish,
|
|
t_auth_deny,
|
|
t_acl_deny,
|
|
t_auth_expire,
|
|
t_hook_connected_disconnected,
|
|
t_hook_session_subscribed_unsubscribed,
|
|
t_hook_message_delivered
|
|
],
|
|
[
|
|
{tcp_listener, [sequence], MainCases},
|
|
{ssl_listener, [sequence], MainCases},
|
|
{udp_listener, [sequence], MainCases},
|
|
{dtls_listener, [sequence], MainCases},
|
|
{streaming_connection_handler, [sequence], MainCases},
|
|
{https_grpc_server, [sequence], MainCases}
|
|
].
|
|
|
|
init_per_group(GrpName, Cfg) when
|
|
GrpName == tcp_listener;
|
|
GrpName == ssl_listener;
|
|
GrpName == udp_listener;
|
|
GrpName == dtls_listener
|
|
->
|
|
LisType =
|
|
case GrpName of
|
|
tcp_listener -> tcp;
|
|
ssl_listener -> ssl;
|
|
udp_listener -> udp;
|
|
dtls_listener -> dtls
|
|
end,
|
|
init_per_group(LisType, 'ConnectionUnaryHandler', http, Cfg);
|
|
init_per_group(https_grpc_server, Cfg) ->
|
|
init_per_group(tcp, 'ConnectionUnaryHandler', https, Cfg);
|
|
init_per_group(streaming_connection_handler, Cfg) ->
|
|
init_per_group(tcp, 'ConnectionHandler', http, Cfg);
|
|
init_per_group(_, Cfg) ->
|
|
init_per_group(tcp, 'ConnectionUnaryHandler', http, Cfg).
|
|
|
|
init_per_group(LisType, ServiceName, Scheme, Cfg) ->
|
|
Svrs = emqx_exproto_echo_svr:start(Scheme),
|
|
Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
|
|
GWConfig = #{
|
|
server => #{bind => 9100},
|
|
idle_timeout => 5000,
|
|
mountpoint => <<"ct/">>,
|
|
handler => #{
|
|
address => Addrs,
|
|
service_name => ServiceName,
|
|
ssl_options => #{enable => Scheme == https}
|
|
},
|
|
listeners => listener_confs(LisType)
|
|
},
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
emqx_conf,
|
|
emqx_auth,
|
|
{emqx_gateway, #{
|
|
config =>
|
|
#{gateway => #{exproto => GWConfig}}
|
|
}},
|
|
emqx_gateway_exproto
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Cfg)}
|
|
),
|
|
[
|
|
{servers, Svrs},
|
|
{apps, Apps},
|
|
{listener_type, LisType},
|
|
{service_name, ServiceName},
|
|
{grpc_client_scheme, Scheme}
|
|
| Cfg
|
|
].
|
|
|
|
end_per_group(_, Cfg) ->
|
|
ok = emqx_cth_suite:stop(proplists:get_value(apps, Cfg)),
|
|
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
|
|
|
|
init_per_testcase(TestCase, Cfg) when
|
|
TestCase == t_enter_passive_mode
|
|
->
|
|
snabbkaffe:start_trace(),
|
|
case proplists:get_value(listener_type, Cfg) of
|
|
udp -> {skip, ignore};
|
|
_ -> Cfg
|
|
end;
|
|
init_per_testcase(_TestCase, Cfg) ->
|
|
snabbkaffe:start_trace(),
|
|
Cfg.
|
|
|
|
end_per_testcase(_TestCase, _Cfg) ->
|
|
snabbkaffe:stop(),
|
|
ok.
|
|
|
|
listener_confs(Type) ->
|
|
Default = #{
|
|
bind => 7993,
|
|
max_connections => 64,
|
|
access_rules => ["allow all"]
|
|
},
|
|
#{Type => #{'default' => maps:merge(Default, socketopts(Type))}}.
|
|
|
|
default_config() ->
|
|
?CONF_DEFAULT.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Tests cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_mountpoint_echo(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>,
|
|
%% deperated since v5.1.0, and this value will be ignored
|
|
mountpoint => <<"deperated/">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SubBin = frame_subscribe(<<"t/dn">>, 1),
|
|
SubAckBin = frame_suback(0),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
|
|
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>),
|
|
{ok, PubBin1} = recv(Sock, 5000),
|
|
|
|
PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>),
|
|
PubAckBin = frame_puback(0),
|
|
|
|
emqx:subscribe(<<"ct/t/up">>),
|
|
|
|
send(Sock, PubBin2),
|
|
{ok, PubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
{deliver, _, _} -> ok
|
|
after 1000 ->
|
|
error(echo_not_running)
|
|
end,
|
|
close(Sock).
|
|
|
|
t_raw_publish(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>,
|
|
mountpoint => <<>>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
PubBin2 = frame_raw_publish(<<"t/up">>, 0, <<"echo">>),
|
|
PubAckBin = frame_puback(0),
|
|
|
|
%% mountpoint is not used in raw publish
|
|
emqx:subscribe(<<"t/up">>),
|
|
|
|
send(Sock, PubBin2),
|
|
{ok, PubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
{deliver, _, _} -> ok
|
|
after 1000 ->
|
|
error(echo_not_running)
|
|
end,
|
|
close(Sock).
|
|
|
|
t_auth_deny(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ok = meck:new(emqx_gateway_ctx, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(
|
|
emqx_gateway_ctx,
|
|
authenticate,
|
|
fun(_, _) -> {error, ?RC_NOT_AUTHORIZED} end
|
|
),
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(1),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SockType =/= udp andalso
|
|
begin
|
|
{error, closed} = recv(Sock, 5000)
|
|
end,
|
|
meck:unload([emqx_gateway_ctx]).
|
|
|
|
t_auth_expire(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ok = meck:new(emqx_access_control, [passthrough, no_history]),
|
|
ok = meck:expect(
|
|
emqx_access_control,
|
|
authenticate,
|
|
fun(_) ->
|
|
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
|
|
end
|
|
),
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
?assertWaitEvent(
|
|
begin
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000)
|
|
end,
|
|
#{
|
|
?snk_kind := conn_process_terminated,
|
|
clientid := <<"test_client_1">>,
|
|
reason := {shutdown, expired}
|
|
},
|
|
5000
|
|
).
|
|
|
|
t_acl_deny(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ok = meck:new(emqx_gateway_ctx, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_gateway_ctx, authorize, fun(_, _, _, _) -> deny end),
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SubBin = frame_subscribe(<<"t/#">>, 1),
|
|
SubAckBin = frame_suback(1),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
|
|
|
|
PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
|
|
PubBinFailedAck = frame_puback(1),
|
|
PubBinSuccesAck = frame_puback(0),
|
|
|
|
send(Sock, PubBin),
|
|
{ok, PubBinFailedAck} = recv(Sock, 5000),
|
|
|
|
meck:unload([emqx_gateway_ctx]),
|
|
|
|
send(Sock, PubBin),
|
|
{ok, PubBinSuccesAck} = recv(Sock, 5000),
|
|
close(Sock).
|
|
|
|
t_keepalive_timeout(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
ClientId1 = <<"keepalive_test_client1">>,
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => ClientId1,
|
|
keepalive => 5
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock),
|
|
|
|
case SockType of
|
|
udp ->
|
|
%% another udp client should not affect the first
|
|
%% udp client keepalive check
|
|
timer:sleep(4000),
|
|
Sock2 = open(SockType),
|
|
ConnBin2 = frame_connect(
|
|
Client#{clientid => <<"keepalive_test_client2">>},
|
|
Password
|
|
),
|
|
send(Sock2, ConnBin2),
|
|
%% first client will be keepalive timeouted in 6s
|
|
?assertMatch(
|
|
{ok, #{
|
|
clientid := ClientId1,
|
|
reason := {shutdown, keepalive_timeout}
|
|
}},
|
|
?block_until(#{?snk_kind := conn_process_terminated}, 8000)
|
|
);
|
|
_ ->
|
|
?assertMatch(
|
|
{ok, #{
|
|
clientid := ClientId1,
|
|
reason := {shutdown, keepalive_timeout}
|
|
}},
|
|
?block_until(#{?snk_kind := conn_process_terminated}, 12000)
|
|
),
|
|
Trace = snabbkaffe:collect_trace(),
|
|
%% conn process should be terminated
|
|
?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
|
|
%% socket port should be closed
|
|
?assertEqual({error, closed}, recv(Sock, 5000))
|
|
end.
|
|
|
|
t_hook_connected_disconnected(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
Parent = self(),
|
|
emqx_hooks:add('client.connected', {?MODULE, hook_fun1, [Parent]}, 1000),
|
|
emqx_hooks:add('client.disconnected', {?MODULE, hook_fun2, [Parent]}, 1000),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
connected -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
DisconnectBin = frame_disconnect(),
|
|
send(Sock, DisconnectBin),
|
|
|
|
receive
|
|
disconnected -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
SockType =/= udp andalso
|
|
begin
|
|
{error, closed} = recv(Sock, 5000)
|
|
end,
|
|
emqx_hooks:del('client.connected', {?MODULE, hook_fun1}),
|
|
emqx_hooks:del('client.disconnected', {?MODULE, hook_fun2}).
|
|
|
|
t_hook_session_subscribed_unsubscribed(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
Parent = self(),
|
|
emqx_hooks:add('session.subscribed', {?MODULE, hook_fun3, [Parent]}, 1000),
|
|
emqx_hooks:add('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}, 1000),
|
|
|
|
SubBin = frame_subscribe(<<"t/#">>, 1),
|
|
SubAckBin = frame_suback(0),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
subscribed -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
UnsubBin = frame_unsubscribe(<<"t/#">>),
|
|
UnsubAckBin = frame_unsuback(0),
|
|
|
|
send(Sock, UnsubBin),
|
|
{ok, UnsubAckBin} = recv(Sock, 5000),
|
|
|
|
receive
|
|
unsubscribed -> ok
|
|
after 1000 ->
|
|
error(hook_is_not_running)
|
|
end,
|
|
|
|
send(Sock, frame_disconnect()),
|
|
|
|
close(Sock),
|
|
emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}),
|
|
emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}).
|
|
|
|
t_hook_message_delivered(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => <<"test_client_1">>
|
|
},
|
|
Password = <<"123456">>,
|
|
|
|
ConnBin = frame_connect(Client, Password),
|
|
ConnAckBin = frame_connack(0),
|
|
|
|
send(Sock, ConnBin),
|
|
{ok, ConnAckBin} = recv(Sock, 5000),
|
|
|
|
SubBin = frame_subscribe(<<"t/#">>, 1),
|
|
SubAckBin = frame_suback(0),
|
|
|
|
send(Sock, SubBin),
|
|
{ok, SubAckBin} = recv(Sock, 5000),
|
|
|
|
emqx_hooks:add('message.delivered', {?MODULE, hook_fun5, []}, 1000),
|
|
|
|
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"1">>)),
|
|
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
|
|
{ok, PubBin1} = recv(Sock, 5000),
|
|
|
|
close(Sock),
|
|
emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
|
|
|
|
t_idle_timeout(Cfg) ->
|
|
SockType = proplists:get_value(listener_type, Cfg),
|
|
Sock = open(SockType),
|
|
|
|
%% need to create udp client by sending something
|
|
case SockType of
|
|
udp ->
|
|
%% nothing to do
|
|
ok = meck:new(emqx_exproto_gcli, [passthrough, no_history]),
|
|
ok = meck:expect(
|
|
emqx_exproto_gcli,
|
|
async_call,
|
|
fun(FunName, _Req, _GClient) ->
|
|
self() ! {hreply, FunName, ok},
|
|
ok
|
|
end
|
|
),
|
|
%% send request, but nobody can respond to it
|
|
ClientId = <<"idle_test_client1">>,
|
|
Client = #{
|
|
proto_name => <<"demo">>,
|
|
proto_ver => <<"v0.1">>,
|
|
clientid => ClientId,
|
|
keepalive => 5
|
|
},
|
|
Password = <<"123456">>,
|
|
ConnBin = frame_connect(Client, Password),
|
|
send(Sock, ConnBin),
|
|
?assertMatch(
|
|
{ok, #{reason := {shutdown, idle_timeout}}},
|
|
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
|
|
),
|
|
ok = meck:unload(emqx_exproto_gcli);
|
|
_ ->
|
|
?assertMatch(
|
|
{ok, #{reason := {shutdown, idle_timeout}}},
|
|
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
|
|
)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Utils
|
|
|
|
hook_fun1(_, _, Parent) ->
|
|
Parent ! connected,
|
|
ok.
|
|
hook_fun2(_, _, _, Parent) ->
|
|
Parent ! disconnected,
|
|
ok.
|
|
|
|
hook_fun3(_, _, _, Parent) ->
|
|
Parent ! subscribed,
|
|
ok.
|
|
hook_fun4(_, _, _, Parent) ->
|
|
Parent ! unsubscribed,
|
|
ok.
|
|
|
|
hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}.
|
|
|
|
rand_bytes() ->
|
|
crypto:strong_rand_bytes(rand:uniform(256)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Sock funcs
|
|
|
|
open(tcp) ->
|
|
{ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS),
|
|
{tcp, Sock};
|
|
open(udp) ->
|
|
{ok, Sock} = gen_udp:open(0, ?TCPOPTS),
|
|
{udp, Sock};
|
|
open(ssl) ->
|
|
SslOpts = client_ssl_opts(),
|
|
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
|
|
{ssl, SslSock};
|
|
open(dtls) ->
|
|
SslOpts = client_ssl_opts(),
|
|
{ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts),
|
|
{dtls, SslSock}.
|
|
|
|
send({tcp, Sock}, Bin) ->
|
|
gen_tcp:send(Sock, Bin);
|
|
send({udp, Sock}, Bin) ->
|
|
gen_udp:send(Sock, "127.0.0.1", 7993, Bin);
|
|
send({ssl, Sock}, Bin) ->
|
|
ssl:send(Sock, Bin);
|
|
send({dtls, Sock}, Bin) ->
|
|
ssl:send(Sock, Bin).
|
|
|
|
recv(Sock) ->
|
|
recv(Sock, infinity).
|
|
|
|
recv({tcp, Sock}, Ts) ->
|
|
gen_tcp:recv(Sock, 0, Ts);
|
|
recv({udp, Sock}, Ts) ->
|
|
{ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts),
|
|
{ok, Bin};
|
|
recv({ssl, Sock}, Ts) ->
|
|
ssl:recv(Sock, 0, Ts);
|
|
recv({dtls, Sock}, Ts) ->
|
|
ssl:recv(Sock, 0, Ts).
|
|
|
|
close({tcp, Sock}) ->
|
|
gen_tcp:close(Sock);
|
|
close({udp, Sock}) ->
|
|
gen_udp:close(Sock);
|
|
close({ssl, Sock}) ->
|
|
ssl:close(Sock);
|
|
close({dtls, Sock}) ->
|
|
ssl:close(Sock).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Server-Opts
|
|
|
|
socketopts(tcp) ->
|
|
#{
|
|
acceptors => 8,
|
|
tcp_options => tcp_opts()
|
|
};
|
|
socketopts(ssl) ->
|
|
#{
|
|
acceptors => 8,
|
|
tcp_options => tcp_opts(),
|
|
ssl_options => ssl_opts()
|
|
};
|
|
socketopts(udp) ->
|
|
#{udp_options => udp_opts()};
|
|
socketopts(dtls) ->
|
|
#{
|
|
acceptors => 8,
|
|
udp_options => udp_opts(),
|
|
dtls_options => dtls_opts()
|
|
}.
|
|
|
|
tcp_opts() ->
|
|
maps:merge(
|
|
udp_opts(),
|
|
#{
|
|
send_timeout => 15000,
|
|
send_timeout_close => true,
|
|
backlog => 100,
|
|
nodelay => true
|
|
}
|
|
).
|
|
|
|
udp_opts() ->
|
|
#{
|
|
%% NOTE
|
|
%% Making those too small will lead to inability to accept connections.
|
|
recbuf => 2048,
|
|
sndbuf => 2048,
|
|
buffer => 2048,
|
|
reuseaddr => true
|
|
}.
|
|
|
|
ssl_opts() ->
|
|
Certs = certs("key.pem", "cert.pem", "cacert.pem"),
|
|
maps:merge(
|
|
Certs,
|
|
#{
|
|
versions => emqx_tls_lib:available_versions(tls),
|
|
ciphers => [],
|
|
verify => verify_peer,
|
|
fail_if_no_peer_cert => true,
|
|
secure_renegotiate => false,
|
|
reuse_sessions => true,
|
|
honor_cipher_order => true
|
|
}
|
|
).
|
|
|
|
dtls_opts() ->
|
|
maps:merge(ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client-Opts
|
|
|
|
client_ssl_opts() ->
|
|
OptsWithCerts = certs("client-key.pem", "client-cert.pem", "cacert.pem"),
|
|
[{verify, verify_none} | maps:to_list(OptsWithCerts)].
|
|
|
|
certs(Key, Cert, CACert) ->
|
|
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
|
|
#{
|
|
keyfile => filename:join([CertsPath, Key]),
|
|
certfile => filename:join([CertsPath, Cert]),
|
|
cacertfile => filename:join([CertsPath, CACert])
|
|
}.
|