emqx/test/emqx_protocol_SUITE.erl

612 lines
34 KiB
Erlang

%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_protocol_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_mqtt.hrl").
-define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>,
<<"/TopicA">>]).
-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
username = <<"emqx">>,
password = <<"public">>})).
all() ->
[{group, mqtt_common},
{group, mqttv4},
{group, mqttv5},
{group, acl},
{group, frame_partial}].
groups() ->
[{mqtt_common, [sequence],
[will_topic_check,
will_acl_check]},
{mqttv4, [sequence],
[connect_v4,
subscribe_v4]},
{mqttv5, [sequence],
[connect_v5,
subscribe_v5]},
{acl, [sequence],
[acl_deny_action_ct]},
{frame_partial, [sequence],
[handle_followed_packet]}].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(MqttCaps#{max_topic_alias => 20})],
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
batch_connect(NumberOfConnections) ->
batch_connect([], NumberOfConnections).
batch_connect(Socks, 0) ->
Socks;
batch_connect(Socks, NumberOfConnections) ->
{ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw}, {active, false}],
3000),
batch_connect([Sock | Socks], NumberOfConnections - 1).
with_connection(DoFun, NumberOfConnections) ->
Socks = batch_connect(NumberOfConnections),
try
DoFun(Socks)
after
lists:foreach(fun(Sock) ->
emqx_client_sock:close(Sock)
end, Socks)
end.
with_connection(DoFun) ->
with_connection(DoFun, 1).
handle_followed_packet(_Config) ->
ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>,
PartialPkt2 = <<48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>,
%% This is a PUBLISH message (Qos=1)
PubPkt = <<PartialPkt1/binary, PartialPkt2/binary>>,
ComplexPkt = <<PubPkt/binary, PubPkt/binary, PubPkt/binary, PartialPkt1/binary>>,
AssertConnAck = fun(R) -> ?assertEqual({ok, <<32,2,0,0>>}, R) end,
AssertPubAck = fun(R) -> ?assertEqual({ok, <<64,2,0,1>>}, R) end,
{ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [{active, false}, binary]),
%% CONNECT
ok = gen_tcp:send(Sock, ConnPkt),
AssertConnAck(gen_tcp:recv(Sock, 4, 500)),
%% Once Publish
ok = gen_tcp:send(Sock, PubPkt),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
%% Complex Packet
ok = gen_tcp:send(Sock, ComplexPkt),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
ok = gen_tcp:send(Sock, PartialPkt2),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
gen_tcp:close(Sock).
connect_v4(_) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))),
{error, closed} =gen_tcp:recv(Sock, 0)
end),
with_connection(fun([Sock]) ->
ConnectPacket = raw_send_serialize(?CONNECT_PACKET
(#mqtt_packet_connect{
client_id = <<"mqttv4_client">>,
username = <<"admin">>,
password = <<"public">>,
proto_ver = ?MQTT_PROTO_V4
})),
emqx_client_sock:send(Sock, ConnectPacket),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4),
emqx_client_sock:send(Sock, ConnectPacket),
{error, closed} = gen_tcp:recv(Sock, 0)
end),
ok.
connect_v5(_) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Request-Response-Information' => -1}}))),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Request-Problem-Information' => 2}}))),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Request-Response-Information' => 1}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), <<>>, _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),
?assertNot(maps:is_key('Response-Information', Props))
end),
% topic alias = 0
with_connection(fun([Sock]) ->
%% ct:log("emqx_protocol: ~p~n", [emqx_zone:get_zone(external, max_topic_alias)]),
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
client_id = "hello",
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Topic-Alias-Maximum' => 10}}),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Topic-Alias-Maximum' := 20}), <<>>, _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 0}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5)
end),
% topic alias maximum
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Topic-Alias-Maximum' => 10}}),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Topic-Alias-Maximum' := 20}), <<>>, _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]), #{version => ?MQTT_PROTO_V5})),
{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 15}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data3} = gen_tcp:recv(Sock, 6),
{ok, ?PUBACK_PACKET(1, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
{ok, Data4} = gen_tcp:recv(Sock, 0),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 2, #{'Topic-Alias' => 21}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data5} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5)
end),
% test clean start
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = true,
client_id = <<"myclient">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(
?DISCONNECT_PACKET(?RC_SUCCESS)
))
end),
timer:sleep(1000),
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = false,
client_id = <<"myclient">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
% test will message publish and cancel
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = true,
client_id = <<"myclient">>,
will_flag = true,
will_qos = ?QOS_1,
will_retain = false,
will_props = #{'Will-Delay-Interval' => 5},
will_topic = <<"TopicA">>,
will_payload = <<"will message">>,
properties = #{'Session-Expiry-Interval' => 0}
}
)
)
),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
{ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw},
{active, false}], 3000),
do_connect(Sock2, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]), #{version => ?MQTT_PROTO_V5})),
{ok, SubData} = gen_tcp:recv(Sock2, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(
?DISCONNECT_PACKET(?RC_SUCCESS))),
{error, timeout} = gen_tcp:recv(Sock2, 0, 2000),
% session resumed
{ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw},
{active, false}], 3000),
emqx_client_sock:send(Sock3,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = false,
client_id = <<"myclient">>,
will_flag = true,
will_qos = ?QOS_1,
will_retain = false,
will_props = #{'Will-Delay-Interval' => 5},
will_topic = <<"TopicA">>,
will_payload = <<"will message 2">>,
properties = #{'Session-Expiry-Interval' => 3}
}
),
#{version => ?MQTT_PROTO_V5}
)
),
{ok, Data3} = gen_tcp:recv(Sock3, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock3, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE),
#{version => ?MQTT_PROTO_V5}
)
),
{ok, WillData} = gen_tcp:recv(Sock2, 0, 5000),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), <<>>, _}
= raw_recv_parse(WillData, ?MQTT_PROTO_V5)
end),
% duplicate client id
with_connection(fun([Sock, Sock1]) ->
emqx_zone:set_env(external, use_username_as_clientid, true),
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = true,
client_id = <<"myclient">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock1,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = false,
client_id = <<"myclient">>,
username = <<"admin">>,
password = <<"public">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data1} = gen_tcp:recv(Sock1, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
{ok, SubData} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock1, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
{ok, SubData1} = gen_tcp:recv(Sock1, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5)
end, 2),
ok.
do_connect(Sock, ProtoVer) ->
emqx_client_sock:send(Sock, raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
proto_ver = ProtoVer
}))),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ProtoVer).
subscribe_v4(_) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V4),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(15,
[{<<"topic">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}])),
emqx_client_sock:send(Sock, SubPacket),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(15, _), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4)
end),
ok.
subscribe_v5(_) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},[]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), <<>>, _} =
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
end),
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(0, #{}, [{<<"TopicQos0">>,
#{rh => 1, qos => ?QOS_2,
rap => 0, nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
?assertMatch(
{ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), <<>>, _},
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5))
end),
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 0},
[{<<"TopicQos0">>,
#{rh => 1, qos => ?QOS_2,
rap => 0, nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
?assertMatch(
{ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), <<>>, _},
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5))
end),
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1},
[{<<"TopicQos0">>,
#{rh => 1, qos => ?QOS_2,
rap => 0, nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, SubData} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _}
= raw_recv_parse(SubData, ?MQTT_PROTO_V5)
end),
ok.
publish_v4(_) ->
ok.
publish_v5(_) ->
ok.
raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet).
raw_send_serialize(Packet, Opts) ->
emqx_frame:serialize(Packet, Opts).
raw_recv_parse(Bin, ProtoVer) ->
emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ProtoVer})).
acl_deny_action_ct(_) ->
emqx_zone:set_env(external, acl_deny_action, disconnect),
process_flag(trap_exit, true),
[acl_deny_do_disconnect(subscribe, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
[acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
emqx_zone:set_env(external, acl_deny_action, ignore),
ok.
will_topic_check(_) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
{will_flag, true},
{will_topic, <<"aaa">>},
{will_payload, <<"I have died">>},
{will_qos, 0}]),
{ok, _} = emqx_client:connect(Client),
{ok, T} = emqx_client:start_link([{client_id, <<"client">>}]),
emqx_client:connect(T),
emqx_client:subscribe(T, <<"aaa">>),
ct:sleep(200),
emqx_client:stop(Client),
ct:sleep(100),
false = is_process_alive(Client),
emqx_ct_helpers:wait_mqtt_payload(<<"I have died">>),
emqx_client:stop(T).
will_acl_check(_) ->
%% The connection will be rejected if publishing of the will message is not allowed by
%% ACL rules
process_flag(trap_exit, true),
{ok, Client} = emqx_client:start_link([{username, <<"pub_deny">>},
{will_flag, true},
{will_topic, <<"pub_deny">>},
{will_payload, <<"I have died">>},
{will_qos, 0}]),
?assertMatch({error,{_,_}}, emqx_client:connect(Client)).
acl_deny_do_disconnect(publish, QoS, Topic) ->
process_flag(trap_exit, true),
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
{ok, _} = emqx_client:connect(Client),
emqx_client:publish(Client, Topic, <<"test">>, QoS),
receive
{disconnected, shutdown, tcp_closed} ->
ct:pal(info, "[OK] after publish, client got disconnected: tcp_closed", []);
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);
{'EXIT', Client, Reason} ->
ct:pal(info, "[OK] after publish, client got disconnected: ~p", [Reason])
after 1000 -> ct:fail({timeout, wait_tcp_closed})
end;
acl_deny_do_disconnect(subscribe, QoS, Topic) ->
process_flag(trap_exit, true),
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
{ok, _} = emqx_client:connect(Client),
{ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS),
receive
{disconnected, shutdown, tcp_closed} ->
ct:pal(info, "[OK] after subscribe, client got disconnected: tcp_closed", []);
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);
{'EXIT', Client, Reason} ->
ct:pal(info, "[OK] after subscribe, client got disconnected: ~p", [Reason])
after 1000 -> ct:fail({timeout, wait_tcp_closed})
end.
set_special_configs(emqx) ->
application:set_env(emqx, enable_acl_cache, false),
application:set_env(emqx, plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")),
application:set_env(emqx, acl_deny_action, disconnect),
application:set_env(emqx, acl_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
set_special_configs(_App) ->
ok.