554 lines
20 KiB
Erlang
554 lines
20 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_client_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-import(lists, [nth/2]).
|
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-define(TOPICS, [<<"TopicA">>,
|
|
<<"TopicA/B">>,
|
|
<<"Topic/C">>,
|
|
<<"TopicA/C">>,
|
|
<<"/TopicA">>
|
|
]).
|
|
|
|
-define(WILD_TOPICS, [<<"TopicA/+">>,
|
|
<<"+/C">>,
|
|
<<"#">>,
|
|
<<"/#">>,
|
|
<<"/+">>,
|
|
<<"+/+">>,
|
|
<<"TopicA/#">>
|
|
]).
|
|
|
|
-define(do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
|
|
receive
|
|
{tcp, SOCKET, PACKET} -> EXPRESS
|
|
after TIMEOUT ->
|
|
ct:fail({receive_timeout, TIMEOUT})
|
|
end).
|
|
|
|
-define(receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
|
|
fun() ->
|
|
?do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS)
|
|
end()).
|
|
|
|
all() ->
|
|
[{group, mqttv3},
|
|
{group, mqttv4},
|
|
{group, mqttv5},
|
|
{group, others},
|
|
{group, bugfixes}
|
|
].
|
|
|
|
groups() ->
|
|
[{mqttv3, [non_parallel_tests],
|
|
[t_basic_v3
|
|
]},
|
|
{mqttv4, [non_parallel_tests],
|
|
[t_basic_v4,
|
|
t_cm,
|
|
t_cm_registry,
|
|
%% t_will_message,
|
|
%% t_offline_message_queueing,
|
|
t_overlapping_subscriptions,
|
|
%% t_keepalive,
|
|
%% t_redelivery_on_reconnect,
|
|
%% subscribe_failure_test,
|
|
t_dollar_topics
|
|
]},
|
|
{mqttv5, [non_parallel_tests],
|
|
[t_basic_with_props_v5
|
|
]},
|
|
{others, [non_parallel_tests],
|
|
[t_username_as_clientid,
|
|
t_certcn_as_clientid_default_config_tls,
|
|
t_certcn_as_clientid_tlsv1_3,
|
|
t_certcn_as_clientid_tlsv1_2
|
|
]},
|
|
{bugfixes, [non_parallel_tests],
|
|
[ t_qos2_no_pubrel_received
|
|
, t_retry_timer_after_session_taken_over
|
|
]}
|
|
].
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([], fun set_special_confs/1),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_ct_helpers:stop_apps([]).
|
|
|
|
init_per_testcase(Func, Cfg) ->
|
|
maybe_run_fun(setup, Func, Cfg).
|
|
|
|
end_per_testcase(Func, Cfg) ->
|
|
maybe_run_fun(teardown, Func, Cfg).
|
|
|
|
maybe_run_fun(Tag, Func, Cfg) ->
|
|
try ?MODULE:Func(Tag, Cfg)
|
|
catch
|
|
error:undef -> Cfg
|
|
end.
|
|
|
|
set_special_confs(emqx) ->
|
|
emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]);
|
|
set_special_confs(_) ->
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for MQTT v3
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_basic_v3(_) ->
|
|
t_basic([{proto_ver, v3}]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for MQTT v4
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_basic_v4(_Config) ->
|
|
t_basic([{proto_ver, v4}]).
|
|
|
|
t_cm(_) ->
|
|
IdleTimeout = emqx_zone:get_env(external, idle_timeout, 30000),
|
|
emqx_zone:set_env(external, idle_timeout, 1000),
|
|
ClientId = <<"myclient">>,
|
|
{ok, C} = emqtt:start_link([{clientid, ClientId}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
ct:sleep(500),
|
|
#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId),
|
|
emqtt:subscribe(C, <<"mytopic">>, 0),
|
|
ct:sleep(1200),
|
|
Stats = emqx_cm:get_chan_stats(ClientId),
|
|
?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)),
|
|
emqx_zone:set_env(external, idle_timeout, IdleTimeout).
|
|
|
|
t_cm_registry(_) ->
|
|
Info = supervisor:which_children(emqx_cm_sup),
|
|
{_, Pid, _, _} = lists:keyfind(registry, 1, Info),
|
|
ignored = gen_server:call(Pid, <<"Unexpected call">>),
|
|
gen_server:cast(Pid, <<"Unexpected cast">>),
|
|
Pid ! <<"Unexpected info">>.
|
|
|
|
t_will_message(_Config) ->
|
|
{ok, C1} = emqtt:start_link([{clean_start, true},
|
|
{will_topic, nth(3, ?TOPICS)},
|
|
{will_payload, <<"client disconnected">>},
|
|
{keepalive, 1}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
|
{ok, C2} = emqtt:start_link(),
|
|
{ok, _} = emqtt:connect(C2),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(C2, nth(3, ?TOPICS), 2),
|
|
timer:sleep(5),
|
|
ok = emqtt:stop(C1),
|
|
timer:sleep(5),
|
|
?assertEqual(1, length(recv_msgs(1))),
|
|
ok = emqtt:disconnect(C2),
|
|
ct:pal("Will message test succeeded").
|
|
|
|
t_offline_message_queueing(_) ->
|
|
{ok, C1} = emqtt:start_link([{clean_start, false},
|
|
{clientid, <<"c1">>}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
|
|
ok = emqtt:disconnect(C1),
|
|
{ok, C2} = emqtt:start_link([{clean_start, true},
|
|
{clientid, <<"c2">>}]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
|
|
ok = emqtt:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
|
|
{ok, _} = emqtt:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
|
|
{ok, _} = emqtt:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
|
|
timer:sleep(10),
|
|
emqtt:disconnect(C2),
|
|
{ok, C3} = emqtt:start_link([{clean_start, false}, {clientid, <<"c1">>}]),
|
|
{ok, _} = emqtt:connect(C3),
|
|
|
|
timer:sleep(10),
|
|
emqtt:disconnect(C3),
|
|
?assertEqual(3, length(recv_msgs(3))).
|
|
|
|
t_overlapping_subscriptions(_) ->
|
|
{ok, C} = emqtt:start_link([]),
|
|
{ok, _} = emqtt:connect(C),
|
|
|
|
{ok, _, [2, 1]} = emqtt:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
|
|
{nth(1, ?WILD_TOPICS), 1}]),
|
|
timer:sleep(10),
|
|
{ok, _} = emqtt:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
|
|
timer:sleep(10),
|
|
|
|
Num = length(recv_msgs(2)),
|
|
?assert(lists:member(Num, [1, 2])),
|
|
if
|
|
Num == 1 ->
|
|
ct:pal("This server is publishing one message for all
|
|
matching overlapping subscriptions, not one for each.");
|
|
Num == 2 ->
|
|
ct:pal("This server is publishing one message per each
|
|
matching overlapping subscription.");
|
|
true -> ok
|
|
end,
|
|
emqtt:disconnect(C).
|
|
|
|
%% t_keepalive_test(_) ->
|
|
%% ct:print("Keepalive test starting"),
|
|
%% {ok, C1, _} = emqtt:start_link([{clean_start, true},
|
|
%% {keepalive, 5},
|
|
%% {will_flag, true},
|
|
%% {will_topic, nth(5, ?TOPICS)},
|
|
%% %% {will_qos, 2},
|
|
%% {will_payload, <<"keepalive expiry">>}]),
|
|
%% ok = emqtt:pause(C1),
|
|
%% {ok, C2, _} = emqtt:start_link([{clean_start, true},
|
|
%% {keepalive, 0}]),
|
|
%% {ok, _, [2]} = emqtt:subscribe(C2, nth(5, ?TOPICS), 2),
|
|
%% ok = emqtt:disconnect(C2),
|
|
%% ?assertEqual(1, length(recv_msgs(1))),
|
|
%% ct:print("Keepalive test succeeded").
|
|
|
|
t_redelivery_on_reconnect(_) ->
|
|
ct:pal("Redelivery on reconnect test starting"),
|
|
{ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, <<"c">>}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
|
|
timer:sleep(10),
|
|
ok = emqtt:pause(C1),
|
|
{ok, _} = emqtt:publish(C1, nth(2, ?TOPICS), <<>>,
|
|
[{qos, 1}, {retain, false}]),
|
|
{ok, _} = emqtt:publish(C1, nth(4, ?TOPICS), <<>>,
|
|
[{qos, 2}, {retain, false}]),
|
|
timer:sleep(10),
|
|
ok = emqtt:disconnect(C1),
|
|
?assertEqual(0, length(recv_msgs(2))),
|
|
{ok, C2} = emqtt:start_link([{clean_start, false}, {clientid, <<"c">>}]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
|
|
timer:sleep(10),
|
|
ok = emqtt:disconnect(C2),
|
|
?assertEqual(2, length(recv_msgs(2))).
|
|
|
|
%% t_subscribe_sys_topics(_) ->
|
|
%% ct:print("Subscribe failure test starting"),
|
|
%% {ok, C, _} = emqtt:start_link([]),
|
|
%% {ok, _, [2]} = emqtt:subscribe(C, <<"$SYS/#">>, 2),
|
|
%% timer:sleep(10),
|
|
%% ct:print("Subscribe failure test succeeded").
|
|
|
|
t_dollar_topics(_) ->
|
|
ct:pal("$ topics test starting"),
|
|
{ok, C} = emqtt:start_link([{clean_start, true},
|
|
{keepalive, 0}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
|
|
{ok, _, [1]} = emqtt:subscribe(C, nth(6, ?WILD_TOPICS), 1),
|
|
{ok, _} = emqtt:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
|
|
<<"test">>, [{qos, 1}, {retain, false}]),
|
|
timer:sleep(10),
|
|
?assertEqual(0, length(recv_msgs(1))),
|
|
ok = emqtt:disconnect(C),
|
|
ct:pal("$ topics test succeeded").
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for MQTT v5
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_basic_with_props_v5(_) ->
|
|
t_basic([{proto_ver, v5},
|
|
{properties, #{'Receive-Maximum' => 4}}
|
|
]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% General test cases.
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_basic(_Opts) ->
|
|
Topic = nth(1, ?TOPICS),
|
|
{ok, C} = emqtt:start_link([{proto_ver, v4}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
|
{ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
|
?assertEqual(3, length(recv_msgs(3))),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
t_username_as_clientid(_) ->
|
|
emqx_zone:set_env(external, use_username_as_clientid, true),
|
|
Username = <<"usera">>,
|
|
{ok, C} = emqtt:start_link([{username, Username}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
#{clientinfo := #{clientid := Username}} = emqx_cm:get_chan_info(Username),
|
|
emqtt:disconnect(C),
|
|
erlang:process_flag(trap_exit, true),
|
|
{ok, C1} = emqtt:start_link([{username, <<>>}]),
|
|
?assertEqual({error, {client_identifier_not_valid, undefined}}, emqtt:connect(C1)),
|
|
receive
|
|
{'EXIT', _, {shutdown, client_identifier_not_valid}} -> ok
|
|
after 100 ->
|
|
throw({error, "expect_client_identifier_not_valid"})
|
|
end.
|
|
|
|
t_certcn_as_clientid_default_config_tls(_) ->
|
|
tls_certcn_as_clientid(default).
|
|
|
|
t_certcn_as_clientid_tlsv1_3(_) ->
|
|
tls_certcn_as_clientid('tlsv1.3').
|
|
|
|
t_certcn_as_clientid_tlsv1_2(_) ->
|
|
tls_certcn_as_clientid('tlsv1.2').
|
|
|
|
t_qos2_no_pubrel_received(setup, Cfg) ->
|
|
OldMaxAwaitRel = emqx_zone:get_env(external, max_awaiting_rel),
|
|
OldWaitTimeout = emqx_zone:get_env(external, await_rel_timeout),
|
|
emqx_zone:set_env(external, max_awaiting_rel, 2),
|
|
emqx_zone:set_env(external, await_rel_timeout, 1),
|
|
[{old_max_await_rel, OldMaxAwaitRel},
|
|
{old_wait_timeout, OldWaitTimeout} | Cfg];
|
|
t_qos2_no_pubrel_received(teardown, Cfg) ->
|
|
OldMaxAwaitRel = ?config(old_max_await_rel, Cfg),
|
|
OldWaitTimeout = ?config(old_wait_timeout, Cfg),
|
|
emqx_zone:set_env(external, max_awaiting_rel, OldMaxAwaitRel),
|
|
emqx_zone:set_env(external, await_rel_timeout, OldWaitTimeout).
|
|
t_qos2_no_pubrel_received(_) ->
|
|
%% [The Scenario]:
|
|
%% Client --- CONN: clean_session=false --> EMQX
|
|
%% Client --- PUB: QoS2 ---> EMQX
|
|
%% Client --- PUB: QoS2 ---> EMQX
|
|
%% Client --- ... ---> EMQX
|
|
%% Client <---- PUBREC ---- EMQX
|
|
%% Client <---- PUBREC ---- EMQX
|
|
%% Client <---- ... ---- EMQX
|
|
%% Client --- PUBREL --X--> EMQX (PUBREL not received)
|
|
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX
|
|
%%
|
|
%% [A few hours later..]:
|
|
%% Client --- CONN: clean_session=false --> EMQX
|
|
%% Client --- PUB: QoS2 ---> EMQX
|
|
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX (we should clear the awaiting_rel queue but it is still full).
|
|
ct:pal("1. reconnect after awaiting_rel is cleared"),
|
|
qos2_no_pubrel_received(fun
|
|
(1) -> timer:sleep(1500); %% reconnect 1.5s later, ensure the await_rel_timeout triggered
|
|
(2) -> ok
|
|
end),
|
|
ct:pal("2. reconnect before awaiting_rel is cleared"),
|
|
qos2_no_pubrel_received(fun
|
|
(1) -> ok; %% reconnect as fast as possiable, ensure the await_rel_timeout NOT triggered
|
|
(2) -> timer:sleep(1500) %% send msgs 1.5s later, ensure the await_rel_timeout triggered
|
|
end).
|
|
|
|
qos2_no_pubrel_received(Actions) ->
|
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
Topic = <<"t/foo">>,
|
|
{ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
|
|
gen_tcp:send(Sock, make_connect_packet(ClientId, false)),
|
|
timer:sleep(100),
|
|
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 100, <<"foo">>)),
|
|
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 101, <<"foo">>)),
|
|
%% Send the 3rd publish with id 103 will got disconnected:
|
|
%% - "Dropped the qos2 packet 103 due to awaiting_rel is full".
|
|
?assertMatch(ok, gen_tcp:send(Sock, make_publish_packet(Topic, 2, 103, <<"foo">>))),
|
|
%% not the connections should be closed by the server due to ?RC_RECEIVE_MAXIMUM_EXCEEDED
|
|
receive
|
|
{tcp_closed, Sock} -> ok
|
|
after 500 ->
|
|
ct:fail({wait_tcp_close_timeout, 500})
|
|
end,
|
|
|
|
%% before reconnecting
|
|
action_point(Actions, 1),
|
|
|
|
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
|
|
gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
|
|
|
|
%% after connected and before sending any msgs
|
|
action_point(Actions, 2),
|
|
|
|
ok = gen_tcp:send(Sock1, make_publish_packet(Topic, 2, 104, <<"foo">>)),
|
|
|
|
receive
|
|
{tcp_closed, Sock1} ->
|
|
%% this is the bug, the wait_rel queue should have expired but not cleared
|
|
ct:fail(unexpected_disconnect)
|
|
after 500 -> ok
|
|
end,
|
|
gen_tcp:close(Sock1),
|
|
ok.
|
|
|
|
t_retry_timer_after_session_taken_over(setup, Cfg) ->
|
|
%emqx_logger:set_log_level(debug),
|
|
OldRetryInterval = emqx_zone:get_env(external, retry_interval),
|
|
emqx_zone:set_env(external, retry_interval, 1),
|
|
[{old_retry_interval, OldRetryInterval} | Cfg];
|
|
t_retry_timer_after_session_taken_over(teardown, Cfg) ->
|
|
%emqx_logger:set_log_level(warning),
|
|
OldRetryInterval = ?config(old_retry_interval, Cfg),
|
|
emqx_zone:set_env(external, retry_interval, OldRetryInterval).
|
|
t_retry_timer_after_session_taken_over(_) ->
|
|
%% [The Scenario]:
|
|
%% Client --- CONN: clean_session=false --> EMQX
|
|
%% Client <--- PUB: QoS1 --- EMQX
|
|
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
|
|
%% Client <---- PUB: QoS1 ---- EMQX (resend the PUBLISH msg)
|
|
%% Client --- DISCONN ---> EMQX
|
|
%% [A few seconds later..]:
|
|
%% Client --- CONN: clean_session=false --> EMQX
|
|
%% Client <--- PUB: QoS1 --- EMQX (resume session and resend the inflight messages)
|
|
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
|
|
TcpOpts = [binary, {active, true}],
|
|
|
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
Topic = <<"t/foo">>,
|
|
Payload = <<"DO NOT REPLY!">>,
|
|
|
|
%% CONNECT
|
|
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
|
|
ok = gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
|
|
?receive_tcp_packet(1000, Sock1, _, ok),
|
|
|
|
%% SUBSCRIBE
|
|
ok = gen_tcp:send(Sock1, make_subscribe_packet(Topic, 2, 1)),
|
|
?receive_tcp_packet(200, Sock1, _, ok),
|
|
|
|
emqx_broker:publish(emqx_message:make(<<"publisher">>, 1, Topic, Payload)),
|
|
%% note that here we don't reply the publish with puback
|
|
?receive_tcp_packet(200, Sock1, PubPacket1,
|
|
begin
|
|
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
|
|
emqx_frame:parse(PubPacket1))
|
|
end),
|
|
|
|
ok = gen_tcp:close(Sock1),
|
|
timer:sleep(100),
|
|
|
|
%% CONNECT again
|
|
{ok, Sock2} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
|
|
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
|
|
ConnAckRem = ?do_receive_tcp_packet(200, Sock2, ConnPack2, begin
|
|
ConnAck = iolist_to_binary(make_connack_packet(?CONNACK_ACCEPT, 1)),
|
|
ct:pal("--- connack: ~p, got: ~p", [ConnAck, ConnPack2]),
|
|
<<ConnAck:4/binary, Rem/binary>> = ConnPack2,
|
|
Rem
|
|
end),
|
|
|
|
%% emqx should resend the non-ACKed messages now
|
|
PubPacket2 = case ConnAckRem of
|
|
<<>> ->
|
|
?receive_tcp_packet(200, Sock2, Packet, Packet);
|
|
_ ->
|
|
ConnAckRem
|
|
end,
|
|
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
|
|
emqx_frame:parse(PubPacket2)),
|
|
|
|
%% ... and emqx should resend the message 1s later, as we didn't ACK it.
|
|
?receive_tcp_packet(1200, Sock2, Packet,
|
|
begin
|
|
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
|
|
emqx_frame:parse(Packet))
|
|
end),
|
|
|
|
%% we ACK it now, then emqx should stop the resending
|
|
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
|
|
receive
|
|
{tcp, Sock2, _PACKET} ->
|
|
{ok, MqttPacket, <<>>, _} = emqx_frame:parse(_PACKET),
|
|
?assertNotMatch(?PUBLISH_PACKET(_), MqttPacket)
|
|
after 1200 -> ok
|
|
end,
|
|
ok = gen_tcp:close(Sock1).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
action_point(Action, N) ->
|
|
Action(N).
|
|
|
|
make_connect_packet(ClientId, CleanStart) ->
|
|
emqx_frame:serialize(?CONNECT_PACKET(
|
|
#mqtt_packet_connect{
|
|
proto_name = <<"MQTT">>,
|
|
clean_start = CleanStart,
|
|
keepalive = 0,
|
|
clientid = ClientId
|
|
}
|
|
)).
|
|
make_connack_packet(Code, SP) ->
|
|
emqx_frame:serialize(?CONNACK_PACKET(Code, SP)).
|
|
|
|
make_publish_packet(Topic, QoS, PacketId, Payload) ->
|
|
emqx_frame:serialize(
|
|
?PUBLISH_PACKET(QoS, Topic, PacketId, Payload)).
|
|
|
|
make_subscribe_packet(TopicFileter, QoS, PacketId) ->
|
|
emqx_frame:serialize(
|
|
?SUBSCRIBE_PACKET(PacketId, [{TopicFileter, #{rh => 0, rap => 0, nl => 0, qos => QoS}}])).
|
|
|
|
recv_msgs(Count) ->
|
|
recv_msgs(Count, []).
|
|
|
|
recv_msgs(0, Msgs) ->
|
|
Msgs;
|
|
recv_msgs(Count, Msgs) ->
|
|
receive
|
|
{publish, Msg} ->
|
|
recv_msgs(Count-1, [Msg|Msgs]);
|
|
_Other -> recv_msgs(Count, Msgs) %%TODO:: remove the branch?
|
|
after 100 ->
|
|
Msgs
|
|
end.
|
|
|
|
|
|
confirm_tls_version( Client, RequiredProtocol ) ->
|
|
Info = emqtt:info(Client),
|
|
SocketInfo = proplists:get_value( socket, Info ),
|
|
%% emqtt_sock has #ssl_socket.ssl
|
|
SSLSocket = element( 3, SocketInfo ),
|
|
{ ok, SSLInfo } = ssl:connection_information(SSLSocket),
|
|
Protocol = proplists:get_value( protocol, SSLInfo ),
|
|
RequiredProtocol = Protocol.
|
|
|
|
|
|
tls_certcn_as_clientid(default = TLSVsn) ->
|
|
tls_certcn_as_clientid(TLSVsn, 'tlsv1.3');
|
|
tls_certcn_as_clientid(TLSVsn) ->
|
|
tls_certcn_as_clientid(TLSVsn, TLSVsn).
|
|
|
|
tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) ->
|
|
CN = <<"Client">>,
|
|
emqx_zone:set_env(external, use_username_as_clientid, true),
|
|
SslConf = emqx_ct_helpers:client_ssl_twoway(TLSVsn),
|
|
{ok, Client} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
#{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN),
|
|
confirm_tls_version( Client, RequiredTLSVsn ),
|
|
emqtt:disconnect(Client).
|