emqx/test/emqx_client_SUITE.erl

222 lines
8.0 KiB
Erlang

%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_client_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-import(lists, [nth/2]).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>,
<<"/TopicA">>]).
-define(WILD_TOPICS, [<<"TopicA/+">>, <<"+/C">>, <<"#">>, <<"/#">>, <<"/+">>,
<<"+/+">>, <<"TopicA/#">>]).
all() ->
[{group, mqttv4}].
groups() ->
[{mqttv4, [non_parallel_tests],
[basic_test,
will_message_test,
offline_message_queueing_test,
overlapping_subscriptions_test,
%% keepalive_test,
redelivery_on_reconnect_test,
%% subscribe_failure_test,
dollar_topics_test]}].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
Msgs;
receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
_Other ->
receive_messages(Count, Msgs)
after 100 ->
Msgs
end.
basic_test(_Config) ->
Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"),
init_caps(),
{ok, C} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C),
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
{ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
?assertEqual(3, length(receive_messages(3))),
ok = emqx_client:disconnect(C).
will_message_test(_Config) ->
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, true},
{will_topic, nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>},
{keepalive, 2}]),
{ok, _} = emqx_client:connect(C1),
{ok, C2} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C2),
{ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2),
timer:sleep(10),
ok = emqx_client:stop(C1),
timer:sleep(5),
?assertEqual(1, length(receive_messages(1))),
ok = emqx_client:disconnect(C2),
ct:print("Will message test succeeded").
offline_message_queueing_test(_) ->
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
{ok, _} = emqx_client:connect(C1),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1),
{ok, C2} = emqx_client:start_link([{clean_start, true},
{client_id, <<"c2">>}]),
{ok, _} = emqx_client:connect(C2),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
timer:sleep(10),
emqx_client:disconnect(C2),
{ok, C3} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
{ok, _} = emqx_client:connect(C3),
timer:sleep(10),
emqx_client:disconnect(C3),
?assertEqual(3, length(receive_messages(3))).
overlapping_subscriptions_test(_) ->
init_caps(),
{ok, C} = emqx_client:start_link([]),
{ok, _} = emqx_client:connect(C),
{ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
{nth(1, ?WILD_TOPICS), 1}]),
timer:sleep(10),
{ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
timer:sleep(10),
Num = length(receive_messages(2)),
?assert(lists:member(Num, [1, 2])),
if
Num == 1 ->
ct:print("This server is publishing one message for all
matching overlapping subscriptions, not one for each.");
Num == 2 ->
ct:print("This server is publishing one message per each
matching overlapping subscription.");
true -> ok
end,
emqx_client:disconnect(C).
%% keepalive_test(_) ->
%% ct:print("Keepalive test starting"),
%% {ok, C1, _} = emqx_client:start_link([{clean_start, true},
%% {keepalive, 5},
%% {will_flag, true},
%% {will_topic, nth(5, ?TOPICS)},
%% %% {will_qos, 2},
%% {will_payload, <<"keepalive expiry">>}]),
%% ok = emqx_client:pause(C1),
%% {ok, C2, _} = emqx_client:start_link([{clean_start, true},
%% {keepalive, 0}]),
%% {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2),
%% ok = emqx_client:disconnect(C2),
%% ?assertEqual(1, length(receive_messages(1))),
%% ct:print("Keepalive test succeeded").
redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"),
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
{ok, _} = emqx_client:connect(C1),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
timer:sleep(10),
ok = emqx_client:pause(C1),
{ok, _} = emqx_client:publish(C1, nth(2, ?TOPICS), <<>>,
[{qos, 1}, {retain, false}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>,
[{qos, 2}, {retain, false}]),
timer:sleep(10),
ok = emqx_client:disconnect(C1),
?assertEqual(0, length(receive_messages(2))),
{ok, C2} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
{ok, _} = emqx_client:connect(C2),
timer:sleep(10),
ok = emqx_client:disconnect(C2),
?assertEqual(2, length(receive_messages(2))).
%% subscribe_failure_test(_) ->
%% ct:print("Subscribe failure test starting"),
%% {ok, C, _} = emqx_client:start_link([]),
%% {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
%% timer:sleep(10),
%% ct:print("Subscribe failure test succeeded").
dollar_topics_test(_) ->
ct:print("$ topics test starting"),
init_caps(),
{ok, C} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, _} = emqx_client:connect(C),
{ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
{ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
<<"test">>, [{qos, 1}, {retain, false}]),
timer:sleep(10),
?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C),
ct:print("$ topics test succeeded").
init_caps() ->
Caps = #{max_qos_allowed => 2,
max_topic_levels => 0,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true,
max_topic_alias => 0,
mqtt_retain_available => true},
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100).