427 lines
15 KiB
Erlang
427 lines
15 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqttd_SUITE).
|
|
|
|
-compile(export_all).
|
|
|
|
-include("emqttd.hrl").
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
all() ->
|
|
[{group, pubsub},
|
|
{group, router},
|
|
{group, session},
|
|
{group, retainer},
|
|
{group, broker},
|
|
{group, metrics},
|
|
{group, stats},
|
|
{group, hook},
|
|
{group, backend},
|
|
{group, cli}].
|
|
|
|
groups() ->
|
|
[{pubsub, [sequence],
|
|
[create_topic,
|
|
create_subscription,
|
|
subscribe_unsubscribe,
|
|
publish, pubsub,
|
|
'pubsub#', 'pubsub+',
|
|
pubsub_queue]},
|
|
{router, [sequence],
|
|
[router_add_del,
|
|
router_print,
|
|
router_unused]},
|
|
{session, [sequence],
|
|
[start_session]},
|
|
{broker, [sequence],
|
|
[hook_unhook]},
|
|
{metrics, [sequence],
|
|
[inc_dec_metric]},
|
|
{stats, [sequence],
|
|
[set_get_stat]},
|
|
{hook, [sequence],
|
|
[add_delete_hook,
|
|
run_hooks]},
|
|
{retainer, [sequence],
|
|
[retain_messages,
|
|
dispatch_retained_messages,
|
|
expire_retained_messages]},
|
|
{backend, [sequence],
|
|
[backend_subscription]},
|
|
{cli, [sequence],
|
|
[ctl_register_cmd,
|
|
cli_status,
|
|
cli_broker,
|
|
cli_clients,
|
|
cli_sessions,
|
|
cli_routes,
|
|
cli_topics,
|
|
cli_subscriptions,
|
|
cli_bridges,
|
|
cli_plugins,
|
|
cli_listeners,
|
|
cli_vm]}].
|
|
|
|
init_per_suite(Config) ->
|
|
application:start(lager),
|
|
application:ensure_all_started(emqttd),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
application:stop(emqttd),
|
|
application:stop(esockd),
|
|
application:stop(gproc),
|
|
emqttd_mnesia:ensure_stopped().
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
create_topic(_) ->
|
|
ok = emqttd:create(topic, <<"topic/create">>),
|
|
ok = emqttd:create(topic, <<"topic/create2">>),
|
|
[#mqtt_topic{topic = <<"topic/create">>, flags = [static]}]
|
|
= emqttd:lookup(topic, <<"topic/create">>).
|
|
|
|
create_subscription(_) ->
|
|
ok = emqttd:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
|
|
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
|
|
= emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
|
ok = emqttd_backend:del_subscriptions(<<"clientId">>),
|
|
?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)).
|
|
|
|
subscribe_unsubscribe(_) ->
|
|
ok = emqttd:subscribe(<<"topic/subunsub">>),
|
|
ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
|
|
ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub2">>, 2),
|
|
ok = emqttd:unsubscribe(<<"topic/subunsub">>),
|
|
ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
|
|
ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub2">>, 2).
|
|
|
|
publish(_) ->
|
|
Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
|
|
ok = emqttd:subscribe(<<"test/+">>),
|
|
timer:sleep(10),
|
|
emqttd:publish(Msg),
|
|
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
|
|
|
|
pubsub(_) ->
|
|
Self = self(),
|
|
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}),
|
|
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}),
|
|
timer:sleep(10),
|
|
[{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
|
|
[{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
|
|
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
|
|
spawn(fun() ->
|
|
emqttd:subscribe(<<"a/b/c">>),
|
|
emqttd:subscribe(<<"c/d/e">>),
|
|
timer:sleep(10),
|
|
emqttd:unsubscribe(<<"a/b/c">>)
|
|
end),
|
|
timer:sleep(20),
|
|
emqttd:unsubscribe(<<"a/b/c">>).
|
|
|
|
'pubsub#'(_) ->
|
|
emqttd:subscribe(<<"a/#">>),
|
|
timer:sleep(10),
|
|
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
|
|
emqttd:unsubscribe(<<"a/#">>).
|
|
|
|
'pubsub+'(_) ->
|
|
emqttd:subscribe(<<"a/+/+">>),
|
|
timer:sleep(10),
|
|
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
|
|
emqttd:unsubscribe(<<"a/+/+">>).
|
|
|
|
pubsub_queue(_) ->
|
|
Self = self(), Q = <<"$queue/abc">>,
|
|
SubFun = fun() ->
|
|
emqttd:subscribe(Q),
|
|
{ok, Msgs} = loop_recv(Q, 10),
|
|
Self ! {recv, self(), Msgs}
|
|
end,
|
|
Sub1 = spawn(SubFun), Sub2 = spawn(SubFun),
|
|
timer:sleep(5),
|
|
emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)),
|
|
emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)),
|
|
emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)),
|
|
?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end),
|
|
?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end).
|
|
|
|
loop_recv(Topic, Timeout) ->
|
|
loop_recv(Topic, Timeout, []).
|
|
|
|
loop_recv(Topic, Timeout, Acc) ->
|
|
receive
|
|
{dispatch, Topic, Msg} ->
|
|
loop_recv(Topic, Timeout, [Msg|Acc])
|
|
after
|
|
Timeout -> {ok, Acc}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Router Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
router_add_del(_) ->
|
|
%% Add
|
|
emqttd_router:add_route(<<"#">>),
|
|
emqttd_router:add_route(<<"a/b/c">>),
|
|
emqttd_router:add_route(<<"+/#">>, node()),
|
|
Routes = [R1, R2 | _] = [
|
|
#mqtt_route{topic = <<"#">>, node = node()},
|
|
#mqtt_route{topic = <<"+/#">>, node = node()},
|
|
#mqtt_route{topic = <<"a/b/c">>, node = node()}],
|
|
Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
|
|
|
|
%% Batch Add
|
|
emqttd_router:add_routes(Routes),
|
|
Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
|
|
|
|
%% Del
|
|
emqttd_router:del_route(<<"a/b/c">>),
|
|
[R1, R2] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
|
|
{atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]),
|
|
|
|
%% Batch Del
|
|
R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'},
|
|
emqttd_router:add_route(R3),
|
|
emqttd_router:del_routes([R1, R2]),
|
|
emqttd_router:del_route(R3),
|
|
[] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)).
|
|
|
|
router_print(_) ->
|
|
Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()},
|
|
#mqtt_route{topic = <<"#">>, node = node()},
|
|
#mqtt_route{topic = <<"+/#">>, node = node()}],
|
|
emqttd_router:add_routes(Routes),
|
|
emqttd_router:print(<<"a/b/c">>).
|
|
|
|
router_unused(_) ->
|
|
gen_server:call(emqttd_router, bad_call),
|
|
gen_server:cast(emqttd_router, bad_msg),
|
|
emqttd_router ! bad_info.
|
|
|
|
recv_loop(Msgs) ->
|
|
receive
|
|
{dispatch, _Topic, Msg} ->
|
|
recv_loop([Msg|Msgs])
|
|
after
|
|
100 -> lists:reverse(Msgs)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Session Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_session(_) ->
|
|
{ok, ClientPid} = emqttd_mock_client:start_link(<<"clientId">>),
|
|
{ok, SessPid} = emqttd_mock_client:start_session(ClientPid),
|
|
Message = emqttd_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
|
|
Message1 = Message#mqtt_message{pktid = 1},
|
|
emqttd_session:publish(SessPid, Message1),
|
|
emqttd_session:pubrel(SessPid, 1),
|
|
emqttd_session:subscribe(SessPid, [{<<"topic/session">>, 2}]),
|
|
Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
|
|
emqttd_session:publish(SessPid, Message2),
|
|
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
|
|
emqttd_mock_client:stop(ClientPid).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Broker Group
|
|
%%--------------------------------------------------------------------
|
|
hook_unhook(_) ->
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Metric Group
|
|
%%--------------------------------------------------------------------
|
|
inc_dec_metric(_) ->
|
|
emqttd_metrics:inc(gauge, 'messages/retained', 10),
|
|
emqttd_metrics:dec(gauge, 'messages/retained', 10).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Stats Group
|
|
%%--------------------------------------------------------------------
|
|
set_get_stat(_) ->
|
|
emqttd_stats:setstat('retained/max', 99),
|
|
99 = emqttd_stats:getstat('retained/max').
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Hook Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
add_delete_hook(_) ->
|
|
emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
|
|
emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
|
|
{error, already_hooked} = emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
|
|
Callbacks = [{callback, fun ?MODULE:hook_fun1/1, [], 0},
|
|
{callback, fun ?MODULE:hook_fun2/1, [], 0}],
|
|
Callbacks = emqttd_hook:lookup(test_hook),
|
|
emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1),
|
|
emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
|
|
ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
|
|
{error, not_found} = emqttd:unhook(test_hook1, fun ?MODULE:hook_fun2/1),
|
|
[] = emqttd_hook:lookup(test_hook),
|
|
|
|
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9),
|
|
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun2/1, [], 8),
|
|
Callbacks2 = [{callback, fun ?MODULE:hook_fun2/1, [], 8},
|
|
{callback, fun ?MODULE:hook_fun1/1, [], 9}],
|
|
Callbacks2 = emqttd_hook:lookup(emqttd_hook),
|
|
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1),
|
|
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun2/1),
|
|
[] = emqttd_hook:lookup(emqttd_hook).
|
|
|
|
run_hooks(_) ->
|
|
emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]),
|
|
emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]),
|
|
emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]),
|
|
{stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []),
|
|
{ok, []} = emqttd:run_hooks(unknown_hook, [], []).
|
|
|
|
hook_fun1([]) -> ok.
|
|
hook_fun2([]) -> {ok, []}.
|
|
|
|
hook_fun3(arg1, arg2, _Acc, init) -> ok.
|
|
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
|
|
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Retainer Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
retain_messages(_) ->
|
|
Msg = emqttd_message:make(<<"clientId">>, <<"topic">>, <<"payload">>),
|
|
emqttd_backend:retain_message(Msg),
|
|
[Msg] = emqttd_backend:read_messages(<<"topic">>),
|
|
[Msg] = emqttd_backend:match_messages(<<"topic/#">>),
|
|
emqttd_backend:delete_message(<<"topic">>),
|
|
0 = emqttd_backend:retained_count().
|
|
|
|
dispatch_retained_messages(_) ->
|
|
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
|
|
payload = <<"payload">>},
|
|
emqttd_retainer:retain(Msg),
|
|
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
|
|
?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end),
|
|
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
|
|
[] = emqttd_backend:read_messages(<<"a/b/c">>).
|
|
|
|
expire_retained_messages(_) ->
|
|
Msg1 = emqttd_message:make(<<"clientId1">>, qos1, <<"topic/1">>, <<"payload1">>),
|
|
Msg2 = emqttd_message:make(<<"clientId2">>, qos2, <<"topic/2">>, <<"payload2">>),
|
|
emqttd_backend:retain_message(Msg1),
|
|
emqttd_backend:retain_message(Msg2),
|
|
timer:sleep(2000),
|
|
emqttd_backend:expire_messages(emqttd_time:now_to_secs()),
|
|
0 = emqttd_backend:retained_count().
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Backend Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
backend_subscription(_) ->
|
|
Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
|
|
Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2},
|
|
emqttd_backend:add_subscription(Sub1),
|
|
emqttd_backend:add_subscription(Sub2),
|
|
[Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
|
emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
|
|
[Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
|
emqttd_backend:del_subscriptions(<<"clientId">>),
|
|
[] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% CLI Group
|
|
%%--------------------------------------------------------------------
|
|
|
|
ctl_register_cmd(_) ->
|
|
emqttd_ctl:register_cmd(test_cmd, {?MODULE, test_cmd}),
|
|
erlang:yield(),
|
|
timer:sleep(5),
|
|
[{?MODULE, test_cmd}] = emqttd_ctl:lookup(test_cmd),
|
|
emqttd_ctl:run(["test_cmd", "arg1", "arg2"]),
|
|
emqttd_ctl:unregister_cmd(test_cmd).
|
|
|
|
test_cmd(["arg1", "arg2"]) ->
|
|
ct:print("test_cmd is called");
|
|
|
|
test_cmd([]) ->
|
|
io:format("test command").
|
|
|
|
cli_status(_) ->
|
|
emqttd_cli:status([]).
|
|
|
|
cli_broker(_) ->
|
|
emqttd_cli:broker([]),
|
|
emqttd_cli:broker(["stats"]),
|
|
emqttd_cli:broker(["metrics"]),
|
|
emqttd_cli:broker(["pubsub"]).
|
|
|
|
cli_clients(_) ->
|
|
emqttd_cli:clients(["list"]),
|
|
emqttd_cli:clients(["show", "clientId"]),
|
|
emqttd_cli:clients(["kick", "clientId"]).
|
|
|
|
cli_sessions(_) ->
|
|
emqttd_cli:sessions(["list"]),
|
|
emqttd_cli:sessions(["list", "persistent"]),
|
|
emqttd_cli:sessions(["list", "transient"]),
|
|
emqttd_cli:sessions(["show", "clientId"]).
|
|
|
|
cli_routes(_) ->
|
|
emqttd:subscribe(<<"topic/route">>),
|
|
emqttd_cli:routes(["list"]),
|
|
emqttd_cli:routes(["show", "topic/route"]),
|
|
emqttd:unsubscribe(<<"topic/route">>).
|
|
|
|
cli_topics(_) ->
|
|
emqttd:subscribe(<<"topic">>),
|
|
emqttd_cli:topics(["list"]),
|
|
emqttd_cli:topics(["show", "topic"]),
|
|
emqttd:unsubscribe(<<"topic">>).
|
|
|
|
cli_subscriptions(_) ->
|
|
emqttd_cli:subscriptions(["list"]),
|
|
emqttd_cli:subscriptions(["show", "clientId"]),
|
|
emqttd_cli:subscriptions(["add", "clientId", "topic", "2"]),
|
|
emqttd_cli:subscriptions(["del", "clientId", "topic"]).
|
|
|
|
cli_plugins(_) ->
|
|
emqttd_cli:plugins(["list"]),
|
|
emqttd_cli:plugins(["load", "emqttd_plugin_template"]),
|
|
emqttd_cli:plugins(["unload", "emqttd_plugin_template"]).
|
|
|
|
cli_bridges(_) ->
|
|
emqttd_cli:bridges(["list"]),
|
|
emqttd_cli:bridges(["start", "a@127.0.0.1", "topic"]),
|
|
emqttd_cli:bridges(["stop", "a@127.0.0.1", "topic"]).
|
|
|
|
cli_listeners(_) ->
|
|
emqttd_cli:listeners([]).
|
|
|
|
cli_vm(_) ->
|
|
emqttd_cli:vm([]),
|
|
emqttd_cli:vm(["ports"]).
|
|
|