emqx/test/emqttd_SUITE.erl

449 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_SUITE).
-compile(export_all).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
all() ->
[{group, protocol},
{group, pubsub},
{group, router},
{group, session},
{group, retainer},
{group, broker},
{group, metrics},
{group, stats},
{group, hook},
{group, backend},
{group, cli}].
groups() ->
[{protocol, [sequence],
[mqtt_connect]},
{pubsub, [sequence],
[create_topic,
create_subscription,
subscribe_unsubscribe,
publish, pubsub,
'pubsub#', 'pubsub+',
pubsub_queue]},
{router, [sequence],
[router_add_del,
router_print,
router_unused]},
{session, [sequence],
[start_session]},
{broker, [sequence],
[hook_unhook]},
{metrics, [sequence],
[inc_dec_metric]},
{stats, [sequence],
[set_get_stat]},
{hook, [sequence],
[add_delete_hook,
run_hooks]},
{retainer, [sequence],
[retain_messages,
dispatch_retained_messages,
expire_retained_messages]},
{backend, [sequence],
[backend_subscription]},
{cli, [sequence],
[ctl_register_cmd,
cli_status,
cli_broker,
cli_clients,
cli_sessions,
cli_routes,
cli_topics,
cli_subscriptions,
cli_bridges,
cli_plugins,
cli_listeners,
cli_vm]}].
init_per_suite(Config) ->
application:start(lager),
application:ensure_all_started(emqttd),
Config.
end_per_suite(_Config) ->
application:stop(emqttd),
application:stop(esockd),
application:stop(gproc),
emqttd_mnesia:ensure_stopped().
%%--------------------------------------------------------------------
%% Protocol Test
%%--------------------------------------------------------------------
mqtt_connect(_) ->
%% Issue #599
%% Empty clientId and clean_session = false
?assertEqual(<<32,2,0,2>>, connect_broker_(<<16,12,0,4,77,81,84,84,4,0,0,90,0,0>>, 4)),
%% Empty clientId and clean_session = true
?assertEqual(<<32,2,0,0>>, connect_broker_(<<16,12,0,4,77,81,84,84,4,2,0,90,0,0>>, 4)).
connect_broker_(Packet, RecvSize) ->
{ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]),
gen_tcp:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, RecvSize, 3000),
gen_tcp:close(Sock),
Data.
%%--------------------------------------------------------------------
%% PubSub Test
%%--------------------------------------------------------------------
create_topic(_) ->
ok = emqttd:create(topic, <<"topic/create">>),
ok = emqttd:create(topic, <<"topic/create2">>),
[#mqtt_topic{topic = <<"topic/create">>, flags = [static]}]
= emqttd:lookup(topic, <<"topic/create">>).
create_subscription(_) ->
ok = emqttd:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
= emqttd_backend:lookup_subscriptions(<<"clientId">>),
ok = emqttd_backend:del_subscriptions(<<"clientId">>),
?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)).
subscribe_unsubscribe(_) ->
ok = emqttd:subscribe(<<"topic/subunsub">>),
ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub2">>, 2),
ok = emqttd:unsubscribe(<<"topic/subunsub">>),
ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub2">>, 2).
publish(_) ->
Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
ok = emqttd:subscribe(<<"test/+">>),
timer:sleep(10),
emqttd:publish(Msg),
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
pubsub(_) ->
Self = self(),
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}),
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}),
timer:sleep(10),
[{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
[{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
spawn(fun() ->
emqttd:subscribe(<<"a/b/c">>),
emqttd:subscribe(<<"c/d/e">>),
timer:sleep(10),
emqttd:unsubscribe(<<"a/b/c">>)
end),
timer:sleep(20),
emqttd:unsubscribe(<<"a/b/c">>).
'pubsub#'(_) ->
emqttd:subscribe(<<"a/#">>),
timer:sleep(10),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
emqttd:unsubscribe(<<"a/#">>).
'pubsub+'(_) ->
emqttd:subscribe(<<"a/+/+">>),
timer:sleep(10),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
emqttd:unsubscribe(<<"a/+/+">>).
pubsub_queue(_) ->
Self = self(), Q = <<"$queue/abc">>,
SubFun = fun() ->
emqttd:subscribe(Q),
timer:sleep(1),
{ok, Msgs} = loop_recv(Q, 10),
Self ! {recv, self(), Msgs}
end,
Sub1 = spawn(SubFun), Sub2 = spawn(SubFun),
timer:sleep(5),
emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)),
emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)),
emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)),
?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end),
?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end).
loop_recv(Topic, Timeout) ->
loop_recv(Topic, Timeout, []).
loop_recv(Topic, Timeout, Acc) ->
receive
{dispatch, Topic, Msg} ->
loop_recv(Topic, Timeout, [Msg|Acc])
after
Timeout -> {ok, Acc}
end.
%%--------------------------------------------------------------------
%% Router Test
%%--------------------------------------------------------------------
router_add_del(_) ->
%% Add
emqttd_router:add_route(<<"#">>),
emqttd_router:add_route(<<"a/b/c">>),
emqttd_router:add_route(<<"+/#">>, node()),
Routes = [R1, R2 | _] = [
#mqtt_route{topic = <<"#">>, node = node()},
#mqtt_route{topic = <<"+/#">>, node = node()},
#mqtt_route{topic = <<"a/b/c">>, node = node()}],
Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
%% Batch Add
emqttd_router:add_routes(Routes),
Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
%% Del
emqttd_router:del_route(<<"a/b/c">>),
[R1, R2] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
{atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]),
%% Batch Del
R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'},
emqttd_router:add_route(R3),
emqttd_router:del_routes([R1, R2]),
emqttd_router:del_route(R3),
[] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)).
router_print(_) ->
Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()},
#mqtt_route{topic = <<"#">>, node = node()},
#mqtt_route{topic = <<"+/#">>, node = node()}],
emqttd_router:add_routes(Routes),
emqttd_router:print(<<"a/b/c">>).
router_unused(_) ->
gen_server:call(emqttd_router, bad_call),
gen_server:cast(emqttd_router, bad_msg),
emqttd_router ! bad_info.
recv_loop(Msgs) ->
receive
{dispatch, _Topic, Msg} ->
recv_loop([Msg|Msgs])
after
100 -> lists:reverse(Msgs)
end.
%%--------------------------------------------------------------------
%% Session Group
%%--------------------------------------------------------------------
start_session(_) ->
{ok, ClientPid} = emqttd_mock_client:start_link(<<"clientId">>),
{ok, SessPid} = emqttd_mock_client:start_session(ClientPid),
Message = emqttd_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
Message1 = Message#mqtt_message{pktid = 1},
emqttd_session:publish(SessPid, Message1),
emqttd_session:pubrel(SessPid, 1),
emqttd_session:subscribe(SessPid, [{<<"topic/session">>, 2}]),
Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
emqttd_session:publish(SessPid, Message2),
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
emqttd_mock_client:stop(ClientPid).
%%--------------------------------------------------------------------
%% Broker Group
%%--------------------------------------------------------------------
hook_unhook(_) ->
ok.
%%--------------------------------------------------------------------
%% Metric Group
%%--------------------------------------------------------------------
inc_dec_metric(_) ->
emqttd_metrics:inc(gauge, 'messages/retained', 10),
emqttd_metrics:dec(gauge, 'messages/retained', 10).
%%--------------------------------------------------------------------
%% Stats Group
%%--------------------------------------------------------------------
set_get_stat(_) ->
emqttd_stats:setstat('retained/max', 99),
99 = emqttd_stats:getstat('retained/max').
%%--------------------------------------------------------------------
%% Hook Test
%%--------------------------------------------------------------------
add_delete_hook(_) ->
emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
{error, already_hooked} = emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
Callbacks = [{callback, fun ?MODULE:hook_fun1/1, [], 0},
{callback, fun ?MODULE:hook_fun2/1, [], 0}],
Callbacks = emqttd_hook:lookup(test_hook),
emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1),
emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
{error, not_found} = emqttd:unhook(test_hook1, fun ?MODULE:hook_fun2/1),
[] = emqttd_hook:lookup(test_hook),
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9),
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun2/1, [], 8),
Callbacks2 = [{callback, fun ?MODULE:hook_fun2/1, [], 8},
{callback, fun ?MODULE:hook_fun1/1, [], 9}],
Callbacks2 = emqttd_hook:lookup(emqttd_hook),
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1),
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun2/1),
[] = emqttd_hook:lookup(emqttd_hook).
run_hooks(_) ->
emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]),
emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]),
emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]),
{stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []),
{ok, []} = emqttd:run_hooks(unknown_hook, [], []).
hook_fun1([]) -> ok.
hook_fun2([]) -> {ok, []}.
hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
%%--------------------------------------------------------------------
%% Retainer Test
%%--------------------------------------------------------------------
retain_messages(_) ->
Msg = emqttd_message:make(<<"clientId">>, <<"topic">>, <<"payload">>),
emqttd_backend:retain_message(Msg),
[Msg] = emqttd_backend:read_messages(<<"topic">>),
[Msg] = emqttd_backend:match_messages(<<"topic/#">>),
emqttd_backend:delete_message(<<"topic">>),
0 = emqttd_backend:retained_count().
dispatch_retained_messages(_) ->
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
payload = <<"payload">>},
emqttd_retainer:retain(Msg),
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end),
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
[] = emqttd_backend:read_messages(<<"a/b/c">>).
expire_retained_messages(_) ->
Msg1 = emqttd_message:make(<<"clientId1">>, qos1, <<"topic/1">>, <<"payload1">>),
Msg2 = emqttd_message:make(<<"clientId2">>, qos2, <<"topic/2">>, <<"payload2">>),
emqttd_backend:retain_message(Msg1),
emqttd_backend:retain_message(Msg2),
timer:sleep(2000),
emqttd_backend:expire_messages(emqttd_time:now_to_secs()),
0 = emqttd_backend:retained_count().
%%--------------------------------------------------------------------
%% Backend Test
%%--------------------------------------------------------------------
backend_subscription(_) ->
Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2},
emqttd_backend:add_subscription(Sub1),
emqttd_backend:add_subscription(Sub2),
[Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
[Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
emqttd_backend:del_subscriptions(<<"clientId">>),
[] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
%%--------------------------------------------------------------------
%% CLI Group
%%--------------------------------------------------------------------
ctl_register_cmd(_) ->
emqttd_ctl:register_cmd(test_cmd, {?MODULE, test_cmd}),
erlang:yield(),
timer:sleep(5),
[{?MODULE, test_cmd}] = emqttd_ctl:lookup(test_cmd),
emqttd_ctl:run(["test_cmd", "arg1", "arg2"]),
emqttd_ctl:unregister_cmd(test_cmd).
test_cmd(["arg1", "arg2"]) ->
ct:print("test_cmd is called");
test_cmd([]) ->
io:format("test command").
cli_status(_) ->
emqttd_cli:status([]).
cli_broker(_) ->
emqttd_cli:broker([]),
emqttd_cli:broker(["stats"]),
emqttd_cli:broker(["metrics"]),
emqttd_cli:broker(["pubsub"]).
cli_clients(_) ->
emqttd_cli:clients(["list"]),
emqttd_cli:clients(["show", "clientId"]),
emqttd_cli:clients(["kick", "clientId"]).
cli_sessions(_) ->
emqttd_cli:sessions(["list"]),
emqttd_cli:sessions(["list", "persistent"]),
emqttd_cli:sessions(["list", "transient"]),
emqttd_cli:sessions(["show", "clientId"]).
cli_routes(_) ->
emqttd:subscribe(<<"topic/route">>),
emqttd_cli:routes(["list"]),
emqttd_cli:routes(["show", "topic/route"]),
emqttd:unsubscribe(<<"topic/route">>).
cli_topics(_) ->
emqttd:subscribe(<<"topic">>),
emqttd_cli:topics(["list"]),
emqttd_cli:topics(["show", "topic"]),
emqttd:unsubscribe(<<"topic">>).
cli_subscriptions(_) ->
emqttd_cli:subscriptions(["list"]),
emqttd_cli:subscriptions(["show", "clientId"]),
emqttd_cli:subscriptions(["add", "clientId", "topic", "2"]),
emqttd_cli:subscriptions(["del", "clientId", "topic"]).
cli_plugins(_) ->
emqttd_cli:plugins(["list"]),
emqttd_cli:plugins(["load", "emqttd_plugin_template"]),
emqttd_cli:plugins(["unload", "emqttd_plugin_template"]).
cli_bridges(_) ->
emqttd_cli:bridges(["list"]),
emqttd_cli:bridges(["start", "a@127.0.0.1", "topic"]),
emqttd_cli:bridges(["stop", "a@127.0.0.1", "topic"]).
cli_listeners(_) ->
emqttd_cli:listeners([]).
cli_vm(_) ->
emqttd_cli:vm([]),
emqttd_cli:vm(["ports"]).