171 lines
6.3 KiB
Erlang
171 lines
6.3 KiB
Erlang
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
|
|
-module(emqx_broker_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-define(APP, emqx).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-include("emqx.hrl").
|
|
-include("emqx_mqtt.hrl").
|
|
|
|
all() ->
|
|
[{group, pubsub},
|
|
{group, session},
|
|
{group, metrics},
|
|
{group, stats}].
|
|
|
|
groups() ->
|
|
[{pubsub, [sequence], [subscribe_unsubscribe,
|
|
publish, pubsub,
|
|
t_shared_subscribe,
|
|
dispatch_with_no_sub,
|
|
'pubsub#', 'pubsub+']},
|
|
{session, [sequence], [start_session]},
|
|
{metrics, [sequence], [inc_dec_metric]},
|
|
{stats, [sequence], [set_get_stat]}].
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:start_apps([]),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_ct_helpers:stop_apps([]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
subscribe_unsubscribe(_) ->
|
|
ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
|
|
ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
|
|
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
|
|
true = emqx:subscribed(<<"clientId">>, <<"topic">>),
|
|
Topics = emqx:topics(),
|
|
lists:foreach(fun(Topic) ->
|
|
?assert(lists:member(Topic, Topics))
|
|
end, Topics),
|
|
ok = emqx:unsubscribe(<<"topic">>),
|
|
ok = emqx:unsubscribe(<<"topic/1">>),
|
|
ok = emqx:unsubscribe(<<"topic/2">>).
|
|
|
|
publish(_) ->
|
|
Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
|
|
ok = emqx:subscribe(<<"test/+">>),
|
|
timer:sleep(10),
|
|
emqx:publish(Msg),
|
|
?assert(receive {dispatch, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 100 -> false end).
|
|
|
|
dispatch_with_no_sub(_) ->
|
|
Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
|
|
Delivery = #delivery{sender = self(), message = Msg},
|
|
?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}],
|
|
emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
|
|
|
|
pubsub(_) ->
|
|
true = emqx:is_running(node()),
|
|
Self = self(),
|
|
Subscriber = <<"clientId">>,
|
|
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
|
|
#{qos := 1} = ets:lookup_element(emqx_suboption, {Self, <<"a/b/c">>}, 2),
|
|
#{qos := 1} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
|
|
true = emqx_broker:set_subopts(<<"a/b/c">>, #{qos => 0}),
|
|
#{qos := 0} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
|
|
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
|
|
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
|
|
timer:sleep(10),
|
|
[Self] = emqx_broker:subscribers(<<"a/b/c">>),
|
|
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
?assert(
|
|
receive {dispatch, <<"a/b/c">>, _ } ->
|
|
true;
|
|
P ->
|
|
ct:log("Receive Message: ~p~n",[P])
|
|
after 100 ->
|
|
false
|
|
end),
|
|
spawn(fun() ->
|
|
emqx:subscribe(<<"a/b/c">>),
|
|
emqx:subscribe(<<"c/d/e">>),
|
|
timer:sleep(10),
|
|
emqx:unsubscribe(<<"a/b/c">>)
|
|
end),
|
|
timer:sleep(20),
|
|
emqx:unsubscribe(<<"a/b/c">>).
|
|
|
|
t_shared_subscribe(_) ->
|
|
emqx:subscribe("$share/group2/topic2"),
|
|
emqx:subscribe("$queue/topic3"),
|
|
timer:sleep(10),
|
|
ct:log("share subscriptions: ~p~n", [emqx:subscriptions(self())]),
|
|
?assertEqual(2, length(emqx:subscriptions(self()))),
|
|
emqx:unsubscribe("$share/group2/topic2"),
|
|
emqx:unsubscribe("$queue/topic3"),
|
|
?assertEqual(0, length(emqx:subscriptions(self()))).
|
|
|
|
'pubsub#'(_) ->
|
|
emqx:subscribe(<<"a/#">>),
|
|
timer:sleep(10),
|
|
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
?assert(receive {dispatch, <<"a/#">>, _} -> true after 100 -> false end),
|
|
emqx:unsubscribe(<<"a/#">>).
|
|
|
|
'pubsub+'(_) ->
|
|
emqx:subscribe(<<"a/+/+">>),
|
|
timer:sleep(10),
|
|
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 100 -> false end),
|
|
emqx:unsubscribe(<<"a/+/+">>).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Session Group
|
|
%%--------------------------------------------------------------------
|
|
start_session(_) ->
|
|
ClientId = <<"clientId">>,
|
|
{ok, ClientPid} = emqx_mock_client:start_link(ClientId),
|
|
{ok, SessPid} = emqx_mock_client:open_session(ClientPid, ClientId, internal),
|
|
Message1 = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
|
|
emqx_session:publish(SessPid, 1, Message1),
|
|
emqx_session:pubrel(SessPid, 2, reasoncode),
|
|
emqx_session:subscribe(SessPid, [{<<"topic/session">>, #{qos => 2}}]),
|
|
Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
|
|
emqx_session:publish(SessPid, 3, Message2),
|
|
emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
|
|
%% emqx_mock_client:stop(ClientPid).
|
|
emqx_mock_client:close_session(ClientPid).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Broker Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Metric Group
|
|
%%--------------------------------------------------------------------
|
|
inc_dec_metric(_) ->
|
|
emqx_metrics:inc('messages.retained', 10),
|
|
emqx_metrics:dec('messages.retained', 10).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Stats Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
set_get_stat(_) ->
|
|
emqx_stats:setstat('retained.max', 99),
|
|
99 = emqx_stats:getstat('retained.max').
|