215 lines
8.1 KiB
Erlang
215 lines
8.1 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqttd_SUITE).
|
|
|
|
-compile(export_all).
|
|
|
|
-include("emqttd.hrl").
|
|
|
|
all() ->
|
|
[{group, pubsub},
|
|
{group, router},
|
|
{group, session},
|
|
{group, retainer},
|
|
{group, broker},
|
|
{group, metrics},
|
|
{group, stats}].
|
|
|
|
groups() ->
|
|
[{pubsub, [sequence],
|
|
[create_topic,
|
|
create_subscription,
|
|
subscribe_unsubscribe,
|
|
publish_message]},
|
|
{router, [sequence],
|
|
[add_delete_routes,
|
|
add_delete_route,
|
|
route_message]},
|
|
{session, [sequence],
|
|
[start_session]},
|
|
{retainer, [sequence],
|
|
[retain_message]},
|
|
{broker, [sequence],
|
|
[hook_unhook]},
|
|
{metrics, [sequence],
|
|
[inc_dec_metric]},
|
|
{stats, [sequence],
|
|
[set_get_stat]}].
|
|
|
|
init_per_suite(Config) ->
|
|
application:start(lager),
|
|
application:ensure_all_started(emqttd),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
application:stop(emqttd),
|
|
application:stop(esockd),
|
|
application:stop(gproc),
|
|
emqttd_mnesia:ensure_stopped().
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
create_topic(_) ->
|
|
Node = node(),
|
|
ok = emqttd_pubsub:create(topic, <<"topic/create">>),
|
|
ok = emqttd_pubsub:create(topic, <<"topic/create2">>),
|
|
[#mqtt_topic{topic = <<"topic/create">>, node = Node}]
|
|
= emqttd_pubsub:lookup(topic, <<"topic/create">>).
|
|
|
|
create_subscription(_) ->
|
|
ok = emqttd_pubsub:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
|
|
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
|
|
= emqttd_pubsub:lookup(subscription, <<"clientId">>),
|
|
ok = emqttd_pubsub:delete(subscription, <<"clientId">>),
|
|
[] = emqttd_pubsub:lookup(subscription, <<"clientId">>).
|
|
|
|
subscribe_unsubscribe(_) ->
|
|
{ok, [1]} = emqttd_pubsub:subscribe({<<"topic/subunsub">>, 1}),
|
|
{ok, [1, 2]} = emqttd_pubsub:subscribe([{<<"topic/subunsub1">>, 1}, {<<"topic/subunsub2">>, 2}]),
|
|
ok = emqttd_pubsub:unsubscribe(<<"topic/subunsub">>),
|
|
ok = emqttd_pubsub:unsubscribe([<<"topic/subunsub1">>, <<"topic/subunsub2">>]),
|
|
|
|
{ok, [1]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, {<<"topic/subunsub">>, 1}),
|
|
{ok, [1,2]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, [{<<"topic/subunsub1">>, 1},
|
|
{<<"topic/subunsub2">>, 2}]),
|
|
ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, <<"topic/subunsub">>),
|
|
ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, [<<"topic/subunsub1">>,
|
|
<<"topic/subunsub2">>]).
|
|
|
|
publish_message(_) ->
|
|
Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
|
|
{ok, [1]} = emqttd_pubsub:subscribe({<<"test/+">>, qos1}),
|
|
emqttd_pubsub:publish(Msg),
|
|
true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Route Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
add_delete_route(_) ->
|
|
Self = self(),
|
|
emqttd_router:add_route(<<"topic1">>, Self),
|
|
true = emqttd_router:has_route(<<"topic1">>),
|
|
emqttd_router:add_route(<<"topic2">>, Self),
|
|
true = emqttd_router:has_route(<<"topic2">>),
|
|
[Self] = emqttd_router:lookup_routes(<<"topic1">>),
|
|
[Self] = emqttd_router:lookup_routes(<<"topic2">>),
|
|
%% Del topic1
|
|
emqttd_router:delete_route(<<"topic1">>, Self),
|
|
erlang:yield(),
|
|
timer:sleep(10),
|
|
false = emqttd_router:has_route(<<"topic1">>),
|
|
%% Del topic2
|
|
emqttd_router:delete_route(<<"topic2">>, Self),
|
|
erlang:yield(),
|
|
timer:sleep(10),
|
|
false = emqttd_router:has_route(<<"topic2">>).
|
|
|
|
add_delete_routes(_) ->
|
|
Self = self(),
|
|
emqttd_router:add_routes([], Self),
|
|
emqttd_router:add_routes([<<"t0">>], Self),
|
|
emqttd_router:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self),
|
|
true = emqttd_router:has_route(<<"t1">>),
|
|
[Self] = emqttd_router:lookup_routes(<<"t1">>),
|
|
[Self] = emqttd_router:lookup_routes(<<"t2">>),
|
|
[Self] = emqttd_router:lookup_routes(<<"t3">>),
|
|
|
|
emqttd_router:delete_routes([<<"t3">>], Self),
|
|
emqttd_router:delete_routes([<<"t0">>, <<"t1">>], Self),
|
|
erlang:yield(),
|
|
timer:sleep(10),
|
|
false = emqttd_router:has_route(<<"t0">>),
|
|
false = emqttd_router:has_route(<<"t1">>),
|
|
true = emqttd_router:has_route(<<"t2">>),
|
|
false = emqttd_router:has_route(<<"t3">>).
|
|
|
|
route_message(_) ->
|
|
Self = self(),
|
|
Pid = spawn_link(fun() -> timer:sleep(1000) end),
|
|
emqttd_router:add_routes([<<"$Q/1">>,<<"t/2">>,<<"t/3">>], Self),
|
|
emqttd_router:add_routes([<<"t/2">>], Pid),
|
|
Msg1 = #mqtt_message{topic = <<"$Q/1">>, payload = <<"q">>},
|
|
Msg2 = #mqtt_message{topic = <<"t/2">>, payload = <<"t2">>},
|
|
Msg3 = #mqtt_message{topic = <<"t/3">>, payload = <<"t3">>},
|
|
emqttd_router:route(<<"$Q/1">>, Msg1),
|
|
emqttd_router:route(<<"t/2">>, Msg2),
|
|
emqttd_router:route(<<"t/3">>, Msg3),
|
|
[Msg1, Msg2, Msg3] = recv_loop([]),
|
|
emqttd_router:add_route(<<"$Q/1">>, Self),
|
|
emqttd_router:route(<<"$Q/1">>, Msg1).
|
|
|
|
recv_loop(Msgs) ->
|
|
receive
|
|
{dispatch, _Topic, Msg} ->
|
|
recv_loop([Msg|Msgs])
|
|
after
|
|
100 -> lists:reverse(Msgs)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Session Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_session(_) ->
|
|
{ok, ClientPid} = emqttd_mock_client:start_link(<<"clientId">>),
|
|
{ok, SessPid} = emqttd_mock_client:start_session(ClientPid),
|
|
Message = emqttd_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
|
|
Message1 = Message#mqtt_message{pktid = 1},
|
|
emqttd_session:publish(SessPid, Message1),
|
|
emqttd_session:pubrel(SessPid, 1),
|
|
emqttd_session:subscribe(SessPid, [{<<"topic/session">>, 2}]),
|
|
Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
|
|
emqttd_session:publish(SessPid, Message2),
|
|
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
|
|
emqttd_mock_client:stop(ClientPid).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Retainer Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
retain_message(_) ->
|
|
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
|
|
payload = <<"payload">>},
|
|
emqttd_retainer:retain(Msg),
|
|
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
|
|
true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
|
|
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
|
|
[] = mnesia:dirty_read({retained, <<"a/b/c">>}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Broker Group
|
|
%%--------------------------------------------------------------------
|
|
hook_unhook(_) ->
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Metric Group
|
|
%%--------------------------------------------------------------------
|
|
inc_dec_metric(_) ->
|
|
emqttd_metrics:inc(gauge, 'messages/retained', 10),
|
|
emqttd_metrics:dec(gauge, 'messages/retained', 10).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Stats Group
|
|
%%--------------------------------------------------------------------
|
|
set_get_stat(_) ->
|
|
emqttd_stats:setstat('retained/max', 99),
|
|
99 = emqttd_stats:getstat('retained/max').
|