common tests

This commit is contained in:
Feng 2016-02-16 02:20:29 +08:00
parent 8ba6c5bbb6
commit 928780f322
29 changed files with 1233 additions and 1168 deletions

7
.gitignore vendored
View File

@ -19,3 +19,10 @@ log/
*.so
examples
docs/build/*
.erlang.mk/
cover/
emqttd.d
eunit.coverdata
test/ct.cover.spec
logs
ct.coverdata

View File

@ -26,7 +26,10 @@ clean:
@$(REBAR) clean
test:
@ERL_FLAGS="-args_file rel/files/vm.args -config rel/files/test.config" $(REBAR) skip_deps=true eunit
$(REBAR) skip_deps=true eunit
ct:
ERL_FLAGS="-config rel/files/test.config" $(REBAR) -v skip_deps=true ct
edoc:
@$(REBAR) doc

View File

@ -11,17 +11,26 @@
{i, "include"},
{src_dirs, ["src"]}]}.
{eunit_opts, []}. %%verbose
{xref_checks, [undefined_function_calls]}.
{cover_enabled, true}.
{validate_app_modules, true}.
{erl_first_files, ["src/gen_server2.erl",
"src/emqttd_auth_mod.erl",
"src/emqttd_acl_mod.erl"]}.
{eunit_opts, []}. %%verbose
{ct_dir, "test"}.
{ct_log_dir, "logs"}.
{ct_extra_params, "-name ct_emqttd@127.0.0.1 -config rel/files/test.config"}.
{ct_use_short_names, false}.
{xref_checks, [undefined_function_calls]}.
{cover_enabled, true}.
%% plugins cannot find emqttd.hrl without ".." lib dirs:(
%% but this setting will make deps apps collision
%% comment in 0.13.0 release

View File

@ -5,7 +5,7 @@
{start_pg2, true}
]},
{sasl, [
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
{sasl_error_logger, {file, "emqttd_sasl.log"}}
]},
{ssl, [
%{versions, ['tlsv1.2', 'tlsv1.1']}
@ -69,7 +69,7 @@
%% ACL config
{acl, [
%% Internal ACL module
{internal, [{file, "testdata/test_acl.config"}, {nomatch, allow}]}
%% {internal, [{file, "testdata/test_acl.config"}, {nomatch, allow}]}
]}
]},
%% MQTT Protocol Options

214
test/emqttd_SUITE.erl Normal file
View File

@ -0,0 +1,214 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_SUITE).
-compile(export_all).
-include("emqttd.hrl").
all() ->
[{group, pubsub},
{group, router},
{group, session},
{group, retainer},
{group, broker},
{group, metrics},
{group, stats}].
groups() ->
[{pubsub, [sequence],
[create_topic,
create_subscription,
subscribe_unsubscribe,
publish_message]},
{router, [sequence],
[add_delete_routes,
add_delete_route,
route_message]},
{session, [sequence],
[start_session]},
{retainer, [sequence],
[retain_message]},
{broker, [sequence],
[hook_unhook]},
{metrics, [sequence],
[inc_dec_metric]},
{stats, [sequence],
[set_get_stat]}].
init_per_suite(Config) ->
application:start(lager),
application:ensure_all_started(emqttd),
Config.
end_per_suite(_Config) ->
application:stop(emqttd),
application:stop(esockd),
application:stop(gproc),
emqttd_mnesia:ensure_stopped().
%%--------------------------------------------------------------------
%% PubSub Group
%%--------------------------------------------------------------------
create_topic(_) ->
Node = node(),
ok = emqttd_pubsub:create(topic, <<"topic/create">>),
ok = emqttd_pubsub:create(topic, <<"topic/create2">>),
[#mqtt_topic{topic = <<"topic/create">>, node = Node}]
= emqttd_pubsub:lookup(topic, <<"topic/create">>).
create_subscription(_) ->
ok = emqttd_pubsub:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
= emqttd_pubsub:lookup(subscription, <<"clientId">>),
ok = emqttd_pubsub:delete(subscription, <<"clientId">>),
[] = emqttd_pubsub:lookup(subscription, <<"clientId">>).
subscribe_unsubscribe(_) ->
{ok, [1]} = emqttd_pubsub:subscribe({<<"topic/subunsub">>, 1}),
{ok, [1, 2]} = emqttd_pubsub:subscribe([{<<"topic/subunsub1">>, 1}, {<<"topic/subunsub2">>, 2}]),
ok = emqttd_pubsub:unsubscribe(<<"topic/subunsub">>),
ok = emqttd_pubsub:unsubscribe([<<"topic/subunsub1">>, <<"topic/subunsub2">>]),
{ok, [1]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, {<<"topic/subunsub">>, 1}),
{ok, [1,2]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, [{<<"topic/subunsub1">>, 1},
{<<"topic/subunsub2">>, 2}]),
ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, <<"topic/subunsub">>),
ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, [<<"topic/subunsub1">>,
<<"topic/subunsub2">>]).
publish_message(_) ->
Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
{ok, [1]} = emqttd_pubsub:subscribe({<<"test/+">>, qos1}),
emqttd_pubsub:publish(Msg),
true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end.
%%--------------------------------------------------------------------
%% Route Group
%%--------------------------------------------------------------------
add_delete_route(_) ->
Self = self(),
emqttd_router:add_route(<<"topic1">>, Self),
true = emqttd_router:has_route(<<"topic1">>),
emqttd_router:add_route(<<"topic2">>, Self),
true = emqttd_router:has_route(<<"topic2">>),
[Self] = emqttd_router:lookup_routes(<<"topic1">>),
[Self] = emqttd_router:lookup_routes(<<"topic2">>),
%% Del topic1
emqttd_router:delete_route(<<"topic1">>, Self),
erlang:yield(),
timer:sleep(10),
false = emqttd_router:has_route(<<"topic1">>),
%% Del topic2
emqttd_router:delete_route(<<"topic2">>, Self),
erlang:yield(),
timer:sleep(10),
false = emqttd_router:has_route(<<"topic2">>).
add_delete_routes(_) ->
Self = self(),
emqttd_router:add_routes([], Self),
emqttd_router:add_routes([<<"t0">>], Self),
emqttd_router:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self),
true = emqttd_router:has_route(<<"t1">>),
[Self] = emqttd_router:lookup_routes(<<"t1">>),
[Self] = emqttd_router:lookup_routes(<<"t2">>),
[Self] = emqttd_router:lookup_routes(<<"t3">>),
emqttd_router:delete_routes([<<"t3">>], Self),
emqttd_router:delete_routes([<<"t0">>, <<"t1">>], Self),
erlang:yield(),
timer:sleep(10),
false = emqttd_router:has_route(<<"t0">>),
false = emqttd_router:has_route(<<"t1">>),
true = emqttd_router:has_route(<<"t2">>),
false = emqttd_router:has_route(<<"t3">>).
route_message(_) ->
Self = self(),
Pid = spawn_link(fun() -> timer:sleep(1000) end),
emqttd_router:add_routes([<<"$Q/1">>,<<"t/2">>,<<"t/3">>], Self),
emqttd_router:add_routes([<<"t/2">>], Pid),
Msg1 = #mqtt_message{topic = <<"$Q/1">>, payload = <<"q">>},
Msg2 = #mqtt_message{topic = <<"t/2">>, payload = <<"t2">>},
Msg3 = #mqtt_message{topic = <<"t/3">>, payload = <<"t3">>},
emqttd_router:route(<<"$Q/1">>, Msg1),
emqttd_router:route(<<"t/2">>, Msg2),
emqttd_router:route(<<"t/3">>, Msg3),
[Msg1, Msg2, Msg3] = recv_loop([]),
emqttd_router:add_route(<<"$Q/1">>, Self),
emqttd_router:route(<<"$Q/1">>, Msg1).
recv_loop(Msgs) ->
receive
{dispatch, _Topic, Msg} ->
recv_loop([Msg|Msgs])
after
100 -> lists:reverse(Msgs)
end.
%%--------------------------------------------------------------------
%% Session Group
%%--------------------------------------------------------------------
start_session(_) ->
{ok, ClientPid} = emqttd_mock_client:start_link(<<"clientId">>),
{ok, SessPid} = emqttd_mock_client:start_session(ClientPid),
Message = emqttd_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
Message1 = Message#mqtt_message{pktid = 1},
emqttd_session:publish(SessPid, Message1),
emqttd_session:pubrel(SessPid, 1),
emqttd_session:subscribe(SessPid, [{<<"topic/session">>, 2}]),
Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
emqttd_session:publish(SessPid, Message2),
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
emqttd_mock_client:stop(ClientPid).
%%--------------------------------------------------------------------
%% Retainer Group
%%--------------------------------------------------------------------
retain_message(_) ->
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
payload = <<"payload">>},
emqttd_retainer:retain(Msg),
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
[] = mnesia:dirty_read({retained, <<"a/b/c">>}).
%%--------------------------------------------------------------------
%% Broker Group
%%--------------------------------------------------------------------
hook_unhook(_) ->
ok.
%%--------------------------------------------------------------------
%% Metric Group
%%--------------------------------------------------------------------
inc_dec_metric(_) ->
emqttd_metrics:inc(gauge, 'messages/retained', 10),
emqttd_metrics:dec(gauge, 'messages/retained', 10).
%%--------------------------------------------------------------------
%% Stats Group
%%--------------------------------------------------------------------
set_get_stat(_) ->
emqttd_stats:setstat('retained/max', 99),
99 = emqttd_stats:getstat('retained/max').

View File

@ -0,0 +1,166 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_access_SUITE).
-compile(export_all).
-include("emqttd.hrl").
-define(AC, emqttd_access_control).
-import(emqttd_access_rule, [compile/1, match/3]).
all() ->
[{group, access_control},
{group, access_rule}].
groups() ->
[{access_control, [sequence],
[reload_acl,
register_mod,
unregister_mod,
check_acl]},
{access_rule, [],
[compile_rule,
match_rule]}].
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, Config) ->
Config.
init_per_testcase(TestCase, Config) when TestCase =:= reload_acl;
TestCase =:= register_mod;
TestCase =:= unregister_mod;
TestCase =:= check_acl ->
DataDir = proplists:get_value(data_dir, Config),
AclOpts = [
{auth, [{anonymous, []}]},
{acl, [{internal, [{file, filename:join([DataDir, "test_acl.config"])},
{nomatch, allow}]}]}
],
{ok, _Pid} = ?AC:start_link(AclOpts),
Config;
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(TestCase, _Config) when TestCase =:= reload_acl;
TestCase =:= register_mod;
TestCase =:= unregister_mod;
TestCase =:= check_acl ->
?AC:stop();
end_per_testcase(_TestCase, _Config) ->
ok.
%%--------------------------------------------------------------------
%% emqttd_access_control
%%--------------------------------------------------------------------
reload_acl(_) ->
ct:print("~p~n", [whereis(?AC)]),
[ok] = ?AC:reload_acl().
register_mod(_) ->
ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
[{emqttd_acl_test_mod, _, 0},
{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
ok = ?AC:register_mod(auth, emqttd_auth_dashboard, [], 99),
[{emqttd_auth_dashboard, _, 99},
{emqttd_auth_anonymous_test_mod, _, 0},
{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth).
unregister_mod(_) ->
ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
[{emqttd_acl_test_mod, _, 0},
{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
ok = ?AC:unregister_mod(acl, emqttd_acl_test_mod),
timer:sleep(5),
[{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
[{emqttd_auth_anonymous_test_mod, _, 0},
{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth),
ok = ?AC:unregister_mod(auth, emqttd_auth_anonymous_test_mod),
timer:sleep(5),
[{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth).
check_acl(_) ->
User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>},
User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>},
allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>),
deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
%%--------------------------------------------------------------------
%% emqttd_access_rule
%%--------------------------------------------------------------------
compile_rule(_) ->
{allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}),
{allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}),
{allow, {user, <<"admin">>}, pubsub, [ [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}),
{allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} =
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}),
{allow, all, pubsub, [{pattern, [<<"clients">>, <<"$c">>]}]} =
compile({allow, all, pubsub, ["clients/$c"]}),
{allow, all, subscribe, [{pattern, [<<"users">>, <<"$u">>, '#']}]} =
compile({allow, all, subscribe, ["users/$u/#"]}),
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
{allow, all} = compile({allow, all}),
{deny, all} = compile({deny, all}).
match_rule(_) ->
User = #mqtt_client{peername = {{127,0,0,1}, 2948}, client_id = <<"testClient">>, username = <<"TestUser">>},
User2 = #mqtt_client{peername = {{192,168,0,10}, 3028}, client_id = <<"testClient">>, username = <<"TestUser">>},
{matched, allow} = match(User, <<"Test/Topic">>, {allow, all}),
{matched, deny} = match(User, <<"Test/Topic">>, {deny, all}),
{matched, allow} = match(User, <<"Test/Topic">>, compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
{matched, allow} = match(User2, <<"Test/Topic">>, compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]})),
{matched, allow} = match(User, <<"d/e/f/x">>, compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})),
nomatch = match(User, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
{matched, allow} = match(User, <<"testTopics/testClient">>, compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
{matched, allow} = match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/$c"]})),
{matched, allow} = match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>,
compile({allow, all, subscribe, ["users/$u/#"]})),
{matched, deny} = match(User, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
nomatch = match(User, <<"Topic">>, Rule),
AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}),
{matched, allow} = match(User, <<"Topic">>, AndRule),
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
{matched, allow} = match(User, <<"Topic">>, OrRule).

View File

@ -1,93 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_access_control_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(AC, emqttd_access_control).
acl_test_() ->
{foreach,
fun setup/0,
fun teardown/1,
[?_test(t_reload_acl()),
?_test(t_register_mod()),
?_test(t_unregister_mod()),
?_test(t_check_acl())
]}.
setup() ->
AclOpts = [
{auth, [{anonymous, []}]},
{acl, [ %% ACL config
%% Internal ACL module
{internal, [{file, "../testdata/test_acl.config"}, {nomatch, allow}]}
]}
],
?AC:start_link(AclOpts).
teardown({ok, _Pid}) ->
?AC:stop().
t_reload_acl() ->
?assertEqual([ok], ?AC:reload_acl()).
t_register_mod() ->
?AC:register_mod(acl, emqttd_acl_test_mod, []),
?assertMatch([{emqttd_acl_test_mod, _, 0},
{emqttd_acl_internal, _, 0}],
?AC:lookup_mods(acl)),
?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
?AC:register_mod(auth, emqttd_auth_dashboard, [], 99),
?assertMatch([{emqttd_auth_dashboard, _, 99},
{emqttd_auth_anonymous_test_mod, _, 0},
{emqttd_auth_anonymous, _, 0}],
?AC:lookup_mods(auth)).
t_unregister_mod() ->
?AC:register_mod(acl, emqttd_acl_test_mod, []),
?assertMatch([{emqttd_acl_test_mod, _, 0}, {emqttd_acl_internal, _, 0}],
?AC:lookup_mods(acl)),
?AC:unregister_mod(acl, emqttd_acl_test_mod),
timer:sleep(5),
?assertMatch([{emqttd_acl_internal, _, 0}], ?AC:lookup_mods(acl)),
?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
?assertMatch([{emqttd_auth_anonymous_test_mod, _, 0},
{emqttd_auth_anonymous, _, 0}],
?AC:lookup_mods(auth)),
?AC:unregister_mod(auth, emqttd_auth_anonymous_test_mod),
timer:sleep(5),
?assertMatch([{emqttd_auth_anonymous, _, 0}], ?AC:lookup_mods(auth)).
t_check_acl() ->
User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>},
User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>},
?assertEqual(allow, ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>)),
?assertEqual(allow, ?AC:check_acl(User1, subscribe, <<"clients/client1">>)),
?assertEqual(deny, ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>)),
?assertEqual(allow, ?AC:check_acl(User1, publish, <<"users/testuser/1">>)),
?assertEqual(allow, ?AC:check_acl(User1, subscribe, <<"a/b/c">>)),
?assertEqual(deny, ?AC:check_acl(User2, subscribe, <<"a/b/c">>)).
-endif.

View File

@ -1,82 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_access_rule_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqttd_access_rule, [compile/1, match/3]).
compile_test() ->
?assertMatch({allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]})),
?assertMatch({allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]})),
?assertMatch({allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
?assertMatch({allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]},
compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]})),
?assertEqual({allow, {user, <<"admin">>}, pubsub, [ [<<"d">>, <<"e">>, <<"f">>, '#'] ]},
compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
?assertEqual({allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]},
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
?assertEqual({allow, all, pubsub, [{pattern, [<<"clients">>, <<"$c">>]}]},
compile({allow, all, pubsub, ["clients/$c"]})),
?assertEqual({allow, all, subscribe, [{pattern, [<<"users">>, <<"$u">>, '#']}]},
compile({allow, all, subscribe, ["users/$u/#"]})),
?assertEqual({deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
compile({deny, all, subscribe, ["$SYS/#", "#"]})),
?assertEqual({allow, all}, compile({allow, all})),
?assertEqual({deny, all}, compile({deny, all})).
match_test() ->
User = #mqtt_client{peername = {{127,0,0,1}, 2948}, client_id = <<"testClient">>, username = <<"TestUser">>},
User2 = #mqtt_client{peername = {{192,168,0,10}, 3028}, client_id = <<"testClient">>, username = <<"TestUser">>},
?assertEqual({matched, allow}, match(User, <<"Test/Topic">>, {allow, all})),
?assertEqual({matched, deny}, match(User, <<"Test/Topic">>, {deny, all})),
?assertMatch({matched, allow}, match(User, <<"Test/Topic">>,
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}))),
?assertMatch({matched, allow}, match(User2, <<"Test/Topic">>,
compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]}))),
?assertMatch({matched, allow}, match(User, <<"d/e/f/x">>, compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]}))),
?assertEqual(nomatch, match(User, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}))),
?assertMatch({matched, allow}, match(User, <<"testTopics/testClient">>,
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}))),
?assertMatch({matched, allow}, match(User, <<"clients/testClient">>,
compile({allow, all, pubsub, ["clients/$c"]}))),
?assertMatch({matched, allow}, match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>,
compile({allow, all, subscribe, ["users/$u/#"]}))),
?assertMatch({matched, deny}, match(User, <<"d/e/f">>,
compile({deny, all, subscribe, ["$SYS/#", "#"]}))),
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
?assertMatch(nomatch, match(User, <<"Topic">>, Rule)),
AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}),
?assertMatch({matched, allow}, match(User, <<"Topic">>, AndRule)),
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
?assertMatch({matched, allow}, match(User, <<"Topic">>, OrRule)).
-endif.

View File

@ -14,21 +14,23 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_guid_tests).
-module(emqttd_backend_SUITE).
-ifdef(TEST).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> [{group, retainer}].
gen_test() ->
Guid1 = emqttd_guid:gen(),
Guid2 = emqttd_guid:gen(),
?assertMatch(<<_:128>>, Guid1),
?assertEqual(true, Guid2 >= Guid1),
{Ts, _, 0} = Tup = emqttd_guid:new(),
?assertEqual(Ts, emqttd_guid:timestamp(emqttd_guid:bin(Tup))).
groups() -> [{retainer, [], [t_retain]}].
-endif.
init_per_group(retainer, _Config) ->
ok = emqttd_mnesia:ensure_started(),
emqttd_retainer:mnesia(boot),
emqttd_retainer:mnesia(copy).
end_per_group(retainer, _Config) ->
ok;
end_per_group(_Group, _Config) ->
ok.
t_retain(_) -> ok.

146
test/emqttd_lib_SUITE.erl Normal file
View File

@ -0,0 +1,146 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_lib_SUITE).
-compile(export_all).
-define(SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}
]).
-define(PQ, priority_queue).
all() -> [{group, guid}, {group, opts},
{group, ?PQ}, {group, time},
{group, node}].
groups() ->
[{guid, [], [guid_gen]},
{opts, [], [opts_merge]},
{?PQ, [], [priority_queue_plen,
priority_queue_out2]},
{time, [], [time_now_to_]},
{node, [], [node_is_aliving, node_parse_name]}].
%%--------------------------------------------------------------------
%% emqttd_guid
%%--------------------------------------------------------------------
guid_gen(_) ->
Guid1 = emqttd_guid:gen(),
Guid2 = emqttd_guid:gen(),
<<_:128>> = Guid1,
true = (Guid2 >= Guid1),
{Ts1, _, 0} = emqttd_guid:new(),
Ts2 = emqttd_guid:timestamp(emqttd_guid:gen()),
true = Ts2 > Ts1.
%%--------------------------------------------------------------------
%% emqttd_opts
%%--------------------------------------------------------------------
opts_merge(_) ->
Opts = emqttd_opts:merge(?SOCKOPTS, [raw,
binary,
{backlog, 1024},
{nodelay, false},
{max_clients, 1024},
{acceptors, 16}]),
1024 = proplists:get_value(backlog, Opts),
1024 = proplists:get_value(max_clients, Opts),
[binary, raw,
{acceptors, 16},
{backlog, 1024},
{max_clients, 1024},
{nodelay, false},
{packet, raw},
{reuseaddr, true}] = lists:sort(Opts).
%%--------------------------------------------------------------------
%% priority_queue
%%--------------------------------------------------------------------
priority_queue_plen(_) ->
Q = ?PQ:new(),
0 = ?PQ:plen(0, Q),
Q0 = ?PQ:in(z, Q),
1 = ?PQ:plen(0, Q0),
Q1 = ?PQ:in(x, 1, Q0),
1 = ?PQ:plen(1, Q1),
Q2 = ?PQ:in(y, 2, Q1),
1 = ?PQ:plen(2, Q2),
Q3 = ?PQ:in(z, 2, Q2),
2 = ?PQ:plen(2, Q3),
{_, Q4} = ?PQ:out(1, Q3),
0 = ?PQ:plen(1, Q4),
{_, Q5} = ?PQ:out(Q4),
1 = ?PQ:plen(2, Q5),
{_, Q6} = ?PQ:out(Q5),
0 = ?PQ:plen(2, Q6),
1 = ?PQ:len(Q6),
{_, Q7} = ?PQ:out(Q6),
0 = ?PQ:len(Q7).
priority_queue_out2(_) ->
Els = [a, {b, 1}, {c, 1}, {d, 2}, {e, 2}, {f, 2}],
Q = ?PQ:new(),
Q0 = lists:foldl(
fun({El, P}, Acc) ->
?PQ:in(El, P, Acc);
(El, Acc) ->
?PQ:in(El, Acc)
end, Q, Els),
{Val, Q1} = ?PQ:out(Q0),
{value, d} = Val,
{Val1, Q2} = ?PQ:out(2, Q1),
{value, e} = Val1,
{Val2, Q3} = ?PQ:out(1, Q2),
{value, b} = Val2,
{Val3, Q4} = ?PQ:out(Q3),
{value, f} = Val3,
{Val4, Q5} = ?PQ:out(Q4),
{value, c} = Val4,
{Val5, Q6} = ?PQ:out(Q5),
{value, a} = Val5,
{empty, _Q7} = ?PQ:out(Q6).
%%--------------------------------------------------------------------
%% emqttd_time
%%--------------------------------------------------------------------
time_now_to_(_) ->
emqttd_time:seed(),
emqttd_time:now_to_secs(),
emqttd_time:now_to_ms().
%%--------------------------------------------------------------------
%% emqttd_node
%%--------------------------------------------------------------------
node_is_aliving(_) ->
io:format("Node: ~p~n", [node()]),
true = emqttd_node:is_aliving(node()),
false = emqttd_node:is_aliving('x@127.0.0.1').
node_parse_name(_) ->
'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"),
'b@127.0.0.1' = emqttd_node:parse_name("b").

View File

@ -1,70 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_message_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-include("emqttd_protocol.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(M, emqttd_message).
make_test() ->
Msg = ?M:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg#mqtt_message.qos),
?assertEqual(undefined, Msg#mqtt_message.msgid),
Msg1 = ?M:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#mqtt_message.msgid)),
?assertEqual(2, Msg1#mqtt_message.qos).
from_packet_test() ->
Msg = ?M:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
?assertEqual(1, Msg#mqtt_message.qos),
?assertEqual(10, Msg#mqtt_message.pktid),
?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
WillMsg = ?M:from_packet(#mqtt_packet_connect{will_flag = true,
will_topic = <<"WillTopic">>,
will_msg = <<"WillMsg">>}),
?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic),
?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload),
Msg2 = ?M:from_packet(<<"username">>, <<"clientid">>,
?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
?assertEqual(<<"clientid">>, Msg2#mqtt_message.from),
?assertEqual(<<"username">>, Msg2#mqtt_message.sender),
?debugFmt("~s", [?M:format(Msg2)]).
flag_test() ->
Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>),
Msg2 = ?M:from_packet(<<"clientid">>, Pkt),
Msg3 = ?M:set_flag(retain, Msg2),
Msg4 = ?M:set_flag(dup, Msg3),
?assert(Msg4#mqtt_message.dup),
?assert(Msg4#mqtt_message.retain),
Msg5 = ?M:set_flag(Msg4),
Msg6 = ?M:unset_flag(dup, Msg5),
Msg7 = ?M:unset_flag(retain, Msg6),
?assertNot(Msg7#mqtt_message.dup),
?assertNot(Msg7#mqtt_message.retain),
?M:unset_flag(Msg7),
?M:to_packet(Msg7).
-endif.

View File

@ -0,0 +1,63 @@
-module(emqttd_mock_client).
-behaviour(gen_server).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/1, start_session/1, stop/1]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {clientid, session}).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link(ClientId) ->
gen_server:start_link(?MODULE, [ClientId], []).
start_session(CPid) ->
gen_server:call(CPid, start_session).
stop(CPid) ->
gen_server:call(CPid, stop).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init([ClientId]) ->
{ok, #state{clientid = ClientId}}.
handle_call(start_session, _From, State = #state{clientid = ClientId}) ->
{ok, SessPid, _} = emqttd_sm:start_session(true, ClientId),
{reply, {ok, SessPid}, State#state{session = SessPid}};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -14,22 +14,22 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_node_tests).
-module(emqttd_mod_SUITE).
-author("Feng Lee <feng@emqtt.io>").
-compile(export_all).
-ifdef(TEST).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [mod_subscription_rep].
is_aliving_test() ->
?debugFmt("Node: ~p~n", [node()]),
?assert(emqttd_node:is_aliving(node())),
?assertNot(emqttd_node:is_aliving('x@127.0.0.1')).
mod_subscription_rep(_) -> ok.
%% <<"topic/clientId">> = emqttd_mod_subscription:rep(
%% <<"$c">>, <<"clientId">>, <<"topic/$c">>),
%% <<"topic/username">> = emqttd_mod_subscription:rep(
%% <<"$u">>, <<"username">>, <<"topic/$u">>),
%% <<"topic/username/clientId">> = emqttd_mod_subscription:rep(
%% <<"$c">>, <<"clientId">>, emqttd_mod_subscription:rep(
%% <<"$u">>, <<"username">>, <<"topic/$u/$c">>)).
parse_name_test() ->
?assertEqual('a@127.0.0.1', emqttd_node:parse_name("a@127.0.0.1")),
?assertEqual('b@127.0.0.1', emqttd_node:parse_name("b")).
-endif.

View File

@ -1,36 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_mod_subscription_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(M, emqttd_mod_subscription).
rep_test() ->
?assertEqual(<<"topic/clientId">>,
?M:rep(<<"$c">>, <<"clientId">>, <<"topic/$c">>)),
?assertEqual(<<"topic/username">>,
?M:rep(<<"$u">>, <<"username">>, <<"topic/$u">>)),
?assertEqual(<<"topic/username/clientId">>,
?M:rep(<<"$c">>, <<"clientId">>,
?M:rep(<<"$u">>, <<"username">>, <<"topic/$u/$c">>))).
-endif.

View File

@ -14,87 +14,89 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_mqueue_tests).
-module(emqttd_mqueue_SUITE).
-ifdef(TEST).
-compile(export_all).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(Q, emqttd_mqueue).
in_test() ->
all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_priority_mqueue,
t_priority_mqueue2, t_infinity_priority_mqueue,
t_infinity_simple_mqueue].
t_in(_) ->
Opts = [{max_length, 5},
{queue_qos0, true}],
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
?assertEqual(true, ?Q:is_empty(Q)),
true = ?Q:is_empty(Q),
Q1 = ?Q:in(#mqtt_message{}, Q),
?assertEqual(1, ?Q:len(Q1)),
1 = ?Q:len(Q1),
Q2 = ?Q:in(#mqtt_message{qos = 1}, Q1),
?assertEqual(2, ?Q:len(Q2)),
2 = ?Q:len(Q2),
Q3 = ?Q:in(#mqtt_message{qos = 2}, Q2),
Q4 = ?Q:in(#mqtt_message{}, Q3),
Q5 = ?Q:in(#mqtt_message{}, Q4),
?assertEqual(5, ?Q:len(Q5)).
in_qos0_test() ->
5 = ?Q:len(Q5).
t_in_qos0(_) ->
Opts = [{max_length, 5},
{queue_qos0, false}],
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
Q1 = ?Q:in(#mqtt_message{}, Q),
?assertEqual(true, ?Q:is_empty(Q1)),
true = ?Q:is_empty(Q1),
Q2 = ?Q:in(#mqtt_message{qos = 0}, Q1),
?assertEqual(true, ?Q:is_empty(Q2)).
true = ?Q:is_empty(Q2).
out_test() ->
t_out(_) ->
Opts = [{max_length, 5},
{queue_qos0, true}],
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
?assertMatch({empty, Q}, ?Q:out(Q)),
{empty, Q} = ?Q:out(Q),
Q1 = ?Q:in(#mqtt_message{}, Q),
{Value, Q2} = ?Q:out(Q1),
?assertEqual(0, ?Q:len(Q2)),
?assertMatch({value, #mqtt_message{}}, Value).
0 = ?Q:len(Q2),
{value, #mqtt_message{}} = Value.
simple_mqueue_test() ->
t_simple_mqueue(_) ->
Opts = [{type, simple},
{max_length, 3},
{low_watermark, 0.2},
{high_watermark, 0.6},
{queue_qos0, false}],
Q = ?Q:new("simple_queue", Opts, alarm_fun()),
?assertEqual(simple, ?Q:type(Q)),
?assertEqual(3, ?Q:max_len(Q)),
?assertEqual(<<"simple_queue">>, ?Q:name(Q)),
?assert(?Q:is_empty(Q)),
simple = ?Q:type(Q),
3 = ?Q:max_len(Q),
<<"simple_queue">> = ?Q:name(Q),
true = ?Q:is_empty(Q),
Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q),
Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1),
Q3 = ?Q:in(#mqtt_message{qos = 1, payload = <<"3">>}, Q2),
Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3),
?assertEqual(3, ?Q:len(Q4)),
3 = ?Q:len(Q4),
{{value, Msg}, Q5} = ?Q:out(Q4),
?assertMatch(<<"2">>, Msg#mqtt_message.payload),
?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
<<"2">> = Msg#mqtt_message.payload,
[{len, 2}, {max_len, 3}, {dropped, 1}] = ?Q:stats(Q5).
infinity_simple_mqueue_test() ->
t_infinity_simple_mqueue(_) ->
Opts = [{type, simple},
{max_length, infinity},
{low_watermark, 0.2},
{high_watermark, 0.6},
{queue_qos0, false}],
Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()),
?assert(?Q:is_empty(Q)),
?assertEqual(infinity, ?Q:max_len(Q)),
true = ?Q:is_empty(Q),
infinity = ?Q:max_len(Q),
Qx = lists:foldl(fun(I, AccQ) ->
?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ)
end, Q, lists:seq(1, 255)),
?assertEqual(255, ?Q:len(Qx)),
?assertEqual([{len, 255}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)),
{{value, V}, Qy} = ?Q:out(Qx),
?assertEqual(<<1>>, V#mqtt_message.payload).
255 = ?Q:len(Qx),
[{len, 255}, {max_len, infinity}, {dropped, 0}] = ?Q:stats(Qx),
{{value, V}, _Qy} = ?Q:out(Qx),
<<1>> = V#mqtt_message.payload.
priority_mqueue_test() ->
t_priority_mqueue(_) ->
Opts = [{type, priority},
{priority, [{<<"t">>, 10}]},
{max_length, 3},
@ -102,56 +104,54 @@ priority_mqueue_test() ->
{high_watermark, 0.6},
{queue_qos0, false}],
Q = ?Q:new("priority_queue", Opts, alarm_fun()),
?assertEqual(priority, ?Q:type(Q)),
?assertEqual(3, ?Q:max_len(Q)),
?assertEqual(<<"priority_queue">>, ?Q:name(Q)),
priority = ?Q:type(Q),
3 = ?Q:max_len(Q),
<<"priority_queue">> = ?Q:name(Q),
?assert(?Q:is_empty(Q)),
true = ?Q:is_empty(Q),
Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q),
Q2 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t">>}, Q1),
Q3 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t2">>}, Q2),
?assertEqual(3, ?Q:len(Q3)),
3 = ?Q:len(Q3),
Q4 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q3),
?assertEqual(4, ?Q:len(Q4)),
4 = ?Q:len(Q4),
Q5 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q4),
?assertEqual(5, ?Q:len(Q5)),
5 = ?Q:len(Q5),
Q6 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q5),
?assertEqual(5, ?Q:len(Q6)),
{{value, Msg}, Q7} = ?Q:out(Q6),
?assertMatch(<<"t">>, Msg#mqtt_message.topic).
5 = ?Q:len(Q6),
{{value, Msg}, _Q7} = ?Q:out(Q6),
<<"t">> = Msg#mqtt_message.topic.
infinity_priority_mqueue_test() ->
t_infinity_priority_mqueue(_) ->
Opts = [{type, priority},
{priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]},
{max_length, infinity},
{queue_qos0, false}],
Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()),
?assertEqual(infinity, ?Q:max_len(Q)),
infinity = ?Q:max_len(Q),
Qx = lists:foldl(fun(I, AccQ) ->
AccQ1 =
?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
end, Q, lists:seq(1, 255)),
?assertEqual(510, ?Q:len(Qx)),
?assertEqual([{len, 510}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)).
510 = ?Q:len(Qx),
[{len, 510}, {max_len, infinity}, {dropped, 0}] = ?Q:stats(Qx).
priority_mqueue2_test() ->
t_priority_mqueue2(_) ->
Opts = [{type, priority},
{max_length, 2},
{low_watermark, 0.2},
{high_watermark, 0.6},
{queue_qos0, false}],
Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()),
?assertEqual(2, ?Q:max_len(Q)),
2 = ?Q:max_len(Q),
Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
Q2 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
Q3 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
?assertEqual(4, ?Q:len(Q4)),
{{value, Val}, Q5} = ?Q:out(Q4),
?assertEqual(3, ?Q:len(Q5)).
4 = ?Q:len(Q4),
{{value, _Val}, Q5} = ?Q:out(Q4),
3 = ?Q:len(Q5).
alarm_fun() -> fun(_, _) -> alarm_fun() end.
-endif.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,25 +14,31 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_keepalive_tests).
-module(emqttd_net_SUITE).
-ifdef(TEST).
%% CT
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> [{group, keepalive}].
keepalive_test() ->
groups() -> [{keepalive, [], [t_keepalive]}].
%%--------------------------------------------------------------------
%% Keepalive
%%--------------------------------------------------------------------
t_keepalive(_) ->
KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
?assertEqual([resumed, timeout], lists:reverse(loop(KA, []))).
[resumed, timeout] = lists:reverse(keepalive_recv(KA, [])).
loop(KA, Acc) ->
keepalive_recv(KA, Acc) ->
receive
{keepalive, timeout} ->
case emqttd_keepalive:check(KA) of
{ok, KA1} -> loop(KA1, [resumed | Acc]);
{ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]);
{error, timeout} -> [timeout | Acc]
end
after 4000 ->
Acc
end.
-endif.

View File

@ -1,49 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_opts_tests).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}
]).
merge_test() ->
Opts = emqttd_opts:merge(?SOCKOPTS, [raw,
binary,
{backlog, 1024},
{nodelay, false},
{max_clients, 1024},
{acceptors, 16}]),
?assertEqual(1024, proplists:get_value(backlog, Opts)),
?assertEqual(1024, proplists:get_value(max_clients, Opts)),
?assertEqual(lists:sort(Opts), [binary, raw,
{acceptors, 16},
{backlog, 1024},
{max_clients, 1024},
{nodelay, false},
{packet, raw},
{reuseaddr, true}]).
-endif.

View File

@ -1,56 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_packet_tests).
-ifdef(TEST).
-include("emqttd_protocol.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(P, emqttd_packet).
protocol_name_test() ->
?assertEqual(<<"MQIsdp">>, ?P:protocol_name(3)),
?assertEqual(<<"MQTT">>, ?P:protocol_name(4)).
type_name_test() ->
?assertEqual( 'CONNECT', ?P:type_name(?CONNECT) ),
?assertEqual( 'UNSUBSCRIBE', ?P:type_name(?UNSUBSCRIBE) ).
connack_name_test() ->
?assertEqual( 'CONNACK_ACCEPT', ?P:connack_name(?CONNACK_ACCEPT) ),
?assertEqual( 'CONNACK_PROTO_VER', ?P:connack_name(?CONNACK_PROTO_VER) ),
?assertEqual( 'CONNACK_INVALID_ID', ?P:connack_name(?CONNACK_INVALID_ID) ),
?assertEqual( 'CONNACK_SERVER', ?P:connack_name(?CONNACK_SERVER) ),
?assertEqual( 'CONNACK_CREDENTIALS', ?P:connack_name(?CONNACK_CREDENTIALS) ),
?assertEqual( 'CONNACK_AUTH', ?P:connack_name(?CONNACK_AUTH) ).
format_test() ->
?debugFmt("~s", [?P:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
?debugFmt("~s", [?P:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
?debugFmt("~s", [?P:format(?PUBLISH_PACKET(?QOS_1, 1))]),
?debugFmt("~s", [?P:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
?debugFmt("~s", [?P:format(?PUBACK_PACKET(?PUBACK, 98))]),
?debugFmt("~s", [?P:format(?PUBREL_PACKET(99))]),
?debugFmt("~s", [?P:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
?debugFmt("~s", [?P:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
?debugFmt("~s", [?P:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
?debugFmt("~s", [?P:format(?UNSUBACK_PACKET(90))]).
-endif.

View File

@ -1,147 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_parser_tests).
-ifdef(TEST).
-include("emqttd_protocol.hrl").
-include_lib("eunit/include/eunit.hrl").
parse_connect_test() ->
Parser = emqttd_parser:new([]),
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 3,
proto_name = <<"MQIsdp">>,
client_id = <<"mosqpub/10451-iMac.loca">>,
clean_sess = true,
keep_alive = 60}}, <<>>}, Parser(V31ConnBin)),
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 4,
proto_name = <<"MQTT">>,
client_id = <<"mosqpub/10451-iMac.loca">>,
clean_sess = true,
keep_alive = 60 } }, <<>>}, Parser(V311ConnBin)),
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 4,
proto_name = <<"MQTT">>,
client_id = <<>>,
clean_sess = true,
keep_alive = 60 } }, <<>>}, Parser(V311ConnWithoutClientId)),
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 3,
proto_name = <<"MQIsdp">>,
client_id = <<"mosqpub/10452-iMac.loca">>,
clean_sess = true,
keep_alive = 60,
will_retain = false,
will_qos = 1,
will_flag = true,
will_topic = <<"/will">>,
will_msg = <<"willmsg">> ,
username = <<"test">>,
password = <<"public">>}},
<<>> }, Parser(ConnBinWithWill)),
ok.
parse_publish_test() ->
Parser = emqttd_parser:new([]),
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = 1,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
packet_id = 1},
payload = <<"hahah">> }, <<>>}, Parser(PubBin)),
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
%DISCONNECT(Qos=0, Retain=false, Dup=false)
PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>,
?assertMatch({ok, #mqtt_packet {
header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
packet_id = undefined},
payload = <<"hello">> }, <<224,0>>}, Parser(PubBin1)),
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?DISCONNECT,
dup = false,
qos = 0,
retain = false}
}, <<>>}, Parser(<<224, 0>>)).
parse_puback_test() ->
Parser = emqttd_parser:new([]),
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
PubAckBin = <<64,2,0,1>>,
?assertMatch({ok, #mqtt_packet {
header = #mqtt_packet_header{type = ?PUBACK,
dup = false,
qos = 0,
retain = false }
}, <<>>}, Parser(PubAckBin)),
ok.
parse_subscribe_test() ->
ok.
parse_pingreq_test() ->
ok.
parse_disconnect_test() ->
Parser = emqttd_parser:new([]),
%DISCONNECT(Qos=0, Retain=false, Dup=false)
Bin = <<224, 0>>,
?assertMatch({ok, #mqtt_packet{
header = #mqtt_packet_header{type = ?DISCONNECT,
dup = false,
qos = 0,
retain = false}
}, <<>>}, Parser(Bin)).
-endif.

View File

@ -0,0 +1,308 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_protocol_SUITE).
-compile(export_all).
-import(emqttd_serializer, [serialize/1]).
-include("emqttd.hrl").
-include("emqttd_protocol.hrl").
all() ->
[{group, parser},
{group, serializer},
{group, packet},
{group, message}].
groups() ->
[{parser, [],
[parse_connect,
parse_publish,
parse_puback,
parse_subscribe,
parse_pingreq,
parse_disconnect]},
{serializer, [],
[serialize_connect,
serialize_connack,
serialize_publish,
serialize_puback,
serialize_pubrel,
serialize_subscribe,
serialize_suback,
serialize_unsubscribe,
serialize_unsuback,
serialize_pingreq,
serialize_pingresp,
serialize_disconnect]},
{packet, [],
[packet_proto_name,
packet_type_name,
packet_connack_name,
packet_format]},
{message, [],
[message_make,
message_from_packet,
message_flag]}].
%%--------------------------------------------------------------------
%% Parse Cases
%%--------------------------------------------------------------------
parse_connect(_) ->
Parser = emqttd_parser:new([]),
%% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 3,
proto_name = <<"MQIsdp">>,
client_id = <<"mosqpub/10451-iMac.loca">>,
clean_sess = true,
keep_alive = 60}}, <<>>} = Parser(V31ConnBin),
%% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 4,
proto_name = <<"MQTT">>,
client_id = <<"mosqpub/10451-iMac.loca">>,
clean_sess = true,
keep_alive = 60 } }, <<>>} = Parser(V311ConnBin),
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 4,
proto_name = <<"MQTT">>,
client_id = <<>>,
clean_sess = true,
keep_alive = 60 } }, <<>>} = Parser(V311ConnWithoutClientId),
%%CONNECT(Q0, R0, D0, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60,
%% Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 3,
proto_name = <<"MQIsdp">>,
client_id = <<"mosqpub/10452-iMac.loca">>,
clean_sess = true,
keep_alive = 60,
will_retain = false,
will_qos = 1,
will_flag = true,
will_topic = <<"/will">>,
will_msg = <<"willmsg">>,
username = <<"test">>,
password = <<"public">>}}, <<>>} = Parser(ConnBinWithWill),
ok.
parse_publish(_) ->
Parser = emqttd_parser:new([]),
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = 1,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
packet_id = 1},
payload = <<"hahah">> }, <<>>} = Parser(PubBin),
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
%DISCONNECT(Qos=0, Retain=false, Dup=false)
PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
packet_id = undefined},
payload = <<"hello">> }, <<224,0>>} = Parser(PubBin1),
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT,
dup = false,
qos = 0,
retain = false}}, <<>>} = Parser(<<224, 0>>).
parse_puback(_) ->
Parser = emqttd_parser:new([]),
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
PubAckBin = <<64,2,0,1>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK,
dup = false,
qos = 0,
retain = false}}, <<>>} = Parser(PubAckBin).
parse_subscribe(_) ->
ok.
parse_pingreq(_) ->
ok.
parse_disconnect(_) ->
Parser = emqttd_parser:new([]),
%DISCONNECT(Qos=0, Retain=false, Dup=false)
Bin = <<224, 0>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT,
dup = false,
qos = 0,
retain = false}}, <<>>} = Parser(Bin).
%%--------------------------------------------------------------------
%% Serialize Cases
%%--------------------------------------------------------------------
serialize_connect(_) ->
serialize(?CONNECT_PACKET(#mqtt_packet_connect{})),
serialize(?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"clientId">>,
will_qos = ?QOS1,
will_flag = true,
will_retain = true,
will_topic = <<"will">>,
will_msg = <<"haha">>,
clean_sess = true})).
serialize_connack(_) ->
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
<<32,2,0,0>> = serialize(ConnAck).
serialize_publish(_) ->
serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)),
serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 99, long_payload())).
serialize_puback(_) ->
serialize(?PUBACK_PACKET(?PUBACK, 10384)).
serialize_pubrel(_) ->
serialize(?PUBREL_PACKET(10384)).
serialize_subscribe(_) ->
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
serialize(?SUBSCRIBE_PACKET(10, TopicTable)).
serialize_suback(_) ->
serialize(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
serialize_unsubscribe(_) ->
serialize(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
serialize_unsuback(_) ->
serialize(?UNSUBACK_PACKET(10)).
serialize_pingreq(_) ->
serialize(?PACKET(?PINGREQ)).
serialize_pingresp(_) ->
serialize(?PACKET(?PINGRESP)).
serialize_disconnect(_) ->
serialize(?PACKET(?DISCONNECT)).
long_payload() ->
iolist_to_binary(["payload." || _I <- lists:seq(1, 100)]).
%%--------------------------------------------------------------------
%% Packet Cases
%%--------------------------------------------------------------------
packet_proto_name(_) ->
<<"MQIsdp">> = emqttd_packet:protocol_name(3),
<<"MQTT">> = emqttd_packet:protocol_name(4).
packet_type_name(_) ->
'CONNECT' = emqttd_packet:type_name(?CONNECT),
'UNSUBSCRIBE' = emqttd_packet:type_name(?UNSUBSCRIBE).
packet_connack_name(_) ->
'CONNACK_ACCEPT' = emqttd_packet:connack_name(?CONNACK_ACCEPT),
'CONNACK_PROTO_VER' = emqttd_packet:connack_name(?CONNACK_PROTO_VER),
'CONNACK_INVALID_ID' = emqttd_packet:connack_name(?CONNACK_INVALID_ID),
'CONNACK_SERVER' = emqttd_packet:connack_name(?CONNACK_SERVER),
'CONNACK_CREDENTIALS' = emqttd_packet:connack_name(?CONNACK_CREDENTIALS),
'CONNACK_AUTH' = emqttd_packet:connack_name(?CONNACK_AUTH).
packet_format(_) ->
io:format("~s", [emqttd_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqttd_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqttd_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqttd_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqttd_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqttd_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqttd_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
io:format("~s", [emqttd_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
io:format("~s", [emqttd_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqttd_packet:format(?UNSUBACK_PACKET(90))]).
%%--------------------------------------------------------------------
%% Message Cases
%%--------------------------------------------------------------------
message_make(_) ->
Msg = emqttd_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
0 = Msg#mqtt_message.qos,
undefined = Msg#mqtt_message.msgid,
Msg1 = emqttd_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
true = is_binary(Msg1#mqtt_message.msgid),
2 = Msg1#mqtt_message.qos.
message_from_packet(_) ->
Msg = emqttd_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
1 = Msg#mqtt_message.qos,
10 = Msg#mqtt_message.pktid,
<<"topic">> = Msg#mqtt_message.topic,
WillMsg = emqttd_message:from_packet(#mqtt_packet_connect{will_flag = true,
will_topic = <<"WillTopic">>,
will_msg = <<"WillMsg">>}),
<<"WillTopic">> = WillMsg#mqtt_message.topic,
<<"WillMsg">> = WillMsg#mqtt_message.payload,
Msg2 = emqttd_message:from_packet(<<"username">>, <<"clientid">>,
?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
<<"clientid">> = Msg2#mqtt_message.from,
<<"username">> = Msg2#mqtt_message.sender,
io:format("~s", [emqttd_message:format(Msg2)]).
message_flag(_) ->
Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>),
Msg2 = emqttd_message:from_packet(<<"clientid">>, Pkt),
Msg3 = emqttd_message:set_flag(retain, Msg2),
Msg4 = emqttd_message:set_flag(dup, Msg3),
true = Msg4#mqtt_message.dup,
true = Msg4#mqtt_message.retain,
Msg5 = emqttd_message:set_flag(Msg4),
Msg6 = emqttd_message:unset_flag(dup, Msg5),
Msg7 = emqttd_message:unset_flag(retain, Msg6),
false = Msg7#mqtt_message.dup,
false = Msg7#mqtt_message.retain,
emqttd_message:unset_flag(Msg7),
emqttd_message:to_packet(Msg7).

View File

@ -1,29 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_retainer_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
retain_test() ->
mnesia:start(),
emqttd_retainer:mnesia(boot).
-endif.

View File

@ -1,126 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_router_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(R, emqttd_router).
route_test_() ->
{timeout, 60,
[{setup,
fun setup/0,
fun teardown/1,
[?_test(t_add_del_route()),
?_test(t_add_del_routes()),
?_test(t_route())
]}
]}.
setup() ->
application:start(gproc),
application:start(mnesia),
emqttd_pubsub:create_table(topic, ram_copies),
emqttd_trie:mnesia(boot),
emqttd_pubsub_sup:create_tab(route),
gproc_pool:new(router, hash, [{size, 2}]),
lists:foreach(fun(I) ->
gproc_pool:add_worker(router, {router, I}, I),
{ok, R} = ?R:start_link(router, I, fun(_) -> ok end, [{route_aging, 2}])
end, [1, 2]).
ensure_tab(Tab, Opts) ->
case ets:info(Tab, name) of
undefined -> ets:new(Tab, Opts);
_ -> ok
end.
teardown(_) ->
lists:foreach(fun(I) ->
?R:stop(I), gproc_pool:remove_worker(router, {router, I})
end, [1, 2]),
gproc_pool:delete(router),
ets:delete(route),
application:stop(gproc).
t_add_del_route() ->
Self = self(),
?R:add_route(<<"topic1">>, Self),
?assert(?R:has_route(<<"topic1">>)),
?R:add_route(<<"topic2">>, Self),
?assert(?R:has_route(<<"topic2">>)),
?assertEqual([Self], ?R:lookup_routes(<<"topic1">>)),
?assertEqual([Self], ?R:lookup_routes(<<"topic2">>)),
%% Del topic1
?R:delete_route(<<"topic1">>, Self),
erlang:yield(),
timer:sleep(10),
?assertNot(?R:has_route(<<"topic1">>)),
%% Del topic2
?R:delete_route(<<"topic2">>, Self),
erlang:yield(),
timer:sleep(10),
?assertNot(?R:has_route(<<"topic2">>)).
t_add_del_routes() ->
Self = self(),
?R:add_routes([], Self),
?R:add_routes([<<"t0">>], Self),
?R:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self),
?assert(?R:has_route(<<"t1">>)),
?assertEqual([Self], ?R:lookup_routes(<<"t1">>)),
?assertEqual([Self], ?R:lookup_routes(<<"t2">>)),
?assertEqual([Self], ?R:lookup_routes(<<"t3">>)),
?R:delete_routes([<<"t3">>], Self),
?R:delete_routes([<<"t0">>, <<"t1">>], Self),
erlang:yield(),
timer:sleep(10),
?assertNot(?R:has_route(<<"t0">>)),
?assertNot(?R:has_route(<<"t1">>)),
?assert(?R:has_route(<<"t2">>)),
?assertNot(?R:has_route(<<"t3">>)).
t_route() ->
Self = self(),
Pid = spawn_link(fun() -> timer:sleep(1000) end),
?R:add_routes([<<"$Q/1">>,<<"t/2">>,<<"t/3">>], Self),
?R:add_routes([<<"t/2">>], Pid),
Msg1 = #mqtt_message{topic = <<"$Q/1">>, payload = <<"q">>},
Msg2 = #mqtt_message{topic = <<"t/2">>, payload = <<"t2">>},
Msg3 = #mqtt_message{topic = <<"t/3">>, payload = <<"t3">>},
?R:route(<<"$Q/1">>, Msg1),
?R:route(<<"t/2">>, Msg2),
?R:route(<<"t/3">>, Msg3),
?assertEqual([Msg1, Msg2, Msg3], recv_loop([])),
?R:add_route(<<"$Q/1">>, Pid),
?R:route(<<"$Q/1">>, Msg1).
recv_loop(Msgs) ->
receive
{dispatch, _Topic, Msg} ->
recv_loop([Msg|Msgs])
after
500 -> lists:reverse(Msgs)
end.
-endif.

View File

@ -1,80 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_serializer_tests).
-ifdef(TEST).
-include("emqttd_protocol.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqttd_serializer, [serialize/1]).
serialize_connect_test() ->
serialize(?CONNECT_PACKET(#mqtt_packet_connect{})),
serialize(?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"clientId">>,
will_qos = ?QOS1,
will_flag = true,
will_retain = true,
will_topic = <<"will">>,
will_msg = <<"haha">>,
clean_sess = true})).
serialize_connack_test() ->
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
?assertEqual(<<32,2,0,0>>, serialize(ConnAck)).
serialize_publish_test() ->
serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)),
serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 99, long_payload())).
serialize_puback_test() ->
serialize(?PUBACK_PACKET(?PUBACK, 10384)).
serialize_pubrel_test() ->
serialize(?PUBREL_PACKET(10384)).
serialize_subscribe_test() ->
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
serialize(?SUBSCRIBE_PACKET(10, TopicTable)).
serialize_suback_test() ->
serialize(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
serialize_unsubscribe_test() ->
serialize(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
serialize_unsuback_test() ->
serialize(?UNSUBACK_PACKET(10)).
serialize_pingreq_test() ->
serialize(?PACKET(?PINGREQ)).
serialize_pingresp_test() ->
serialize(?PACKET(?PINGRESP)).
serialize_disconnect_test() ->
serialize(?PACKET(?DISCONNECT)).
long_payload() ->
iolist_to_binary(["payload." || _I <- lists:seq(1, 100)]).
-endif.

View File

@ -1,29 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_time_tests).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
all_test() ->
emqttd_time:seed(),
emqttd_time:now_to_secs(),
emqttd_time:now_to_ms().
-endif.

176
test/emqttd_topic_SUITE.erl Normal file
View File

@ -0,0 +1,176 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_topic_SUITE).
%% CT
-compile(export_all).
-import(emqttd_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
words/1, systop/1, is_queue/1, feed_var/3]).
-define(N, 10000).
all() -> [t_wildcard, t_match, t_match2, t_validate, t_triples, t_join,
t_words, t_systop, t_is_queue, t_feed_var, t_sys_match, 't_#_match',
t_sigle_level_validate, t_sigle_level_match, t_match_perf,
t_triples_perf].
t_wildcard(_) ->
true = wildcard(<<"a/b/#">>),
true = wildcard(<<"a/+/#">>),
false = wildcard(<<"">>),
false = wildcard(<<"a/b/c">>).
t_match(_) ->
true = match(<<"a/b/c">>, <<"a/b/+">>),
true = match(<<"a/b/c">>, <<"a/#">>),
true = match(<<"abcd/ef/g">>, <<"#">>),
true = match(<<"abc/de/f">>, <<"abc/de/f">>),
true = match(<<"abc">>, <<"+">>),
true = match(<<"a/b/c">>, <<"a/b/c">>),
false = match(<<"a/b/c">>, <<"a/c/d">>),
false = match(<<"$shared/x/y">>, <<"+">>),
false = match(<<"$shared/x/y">>, <<"+/x/y">>),
false = match(<<"$shared/x/y">>, <<"#">>),
false = match(<<"$shared/x/y">>, <<"+/+/#">>),
false = match(<<"house/1/sensor/0">>, <<"house/+">>),
false = match(<<"house">>, <<"house/+">>).
t_match2(_) ->
true = match(<<"sport/tennis/player1">>, <<"sport/tennis/player1/#">>),
true = match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/player1/#">>),
true = match(<<"sport/tennis/player1/score/wimbledon">>, <<"sport/tennis/player1/#">>),
true = match(<<"sport">>, <<"sport/#">>),
true = match(<<"sport">>, <<"#">>),
true = match(<<"/sport/football/score/1">>, <<"#">>),
true = match(<<"Topic/C">>, <<"+/+">>),
true = match(<<"TopicA/B">>, <<"+/+">>),
true = match(<<"TopicA/C">>, <<"+/+">>),
true = match(<<"abc">>, <<"+">>),
true = match(<<"a/b/c">>, <<"a/b/c">>),
false = match(<<"a/b/c">>, <<"a/c/d">>),
false = match(<<"$shared/x/y">>, <<"+">>),
false = match(<<"$shared/x/y">>, <<"+/x/y">>),
false = match(<<"$shared/x/y">>, <<"#">>),
false = match(<<"$shared/x/y">>, <<"+/+/#">>),
false = match(<<"house/1/sensor/0">>, <<"house/+">>).
t_sigle_level_match(_) ->
true = match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>),
false = match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/+">>),
false = match(<<"sport">>, <<"sport/+">>),
true = match(<<"sport/">>, <<"sport/+">>),
true = match(<<"/finance">>, <<"+/+">>),
true = match(<<"/finance">>, <<"/+">>),
false = match(<<"/finance">>, <<"+">>).
t_sys_match(_) ->
true = match(<<"$SYS/broker/clients/testclient">>, <<"$SYS/#">>),
true = match(<<"$SYS/broker">>, <<"$SYS/+">>),
false = match(<<"$SYS/broker">>, <<"+/+">>),
false = match(<<"$SYS/broker">>, <<"#">>).
't_#_match'(_) ->
true = match(<<"a/b/c">>, <<"#">>),
true = match(<<"a/b/c">>, <<"+/#">>),
false = match(<<"$SYS/brokers">>, <<"#">>).
t_match_perf(_) ->
true = match(<<"a/b/ccc">>, <<"a/#">>),
Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
Filter = <<"/abkc/19383/+/akakdkkdkak/#">>,
true = match(Name, Filter),
{Time, _} = timer:tc(fun() ->
[match(Name, Filter) || _I <- lists:seq(1, ?N)]
end),
io:format("Time for match: ~p(micro)", [Time/?N]).
t_validate(_) ->
true = validate({name, <<"abc/de/f">>}),
true = validate({filter, <<"abc/+/f">>}),
true = validate({filter, <<"abc/#">>}),
true = validate({filter, <<"x">>}),
true = validate({name, <<"x//y">>}),
true = validate({filter, <<"sport/tennis/#">>}),
false = validate({name, <<>>}),
false = validate({name, long_topic()}),
false = validate({name, <<"abc/#">>}),
false = validate({filter, <<"abc/#/1">>}),
false = validate({filter, <<"abc/#xzy/+">>}),
false = validate({filter, <<"abc/xzy/+9827">>}),
false = validate({filter, <<"sport/tennis#">>}),
false = validate({filter, <<"sport/tennis/#/ranking">>}).
t_sigle_level_validate(_) ->
true = validate({filter, <<"+">>}),
true = validate({filter, <<"+/tennis/#">>}),
true = validate({filter, <<"sport/+/player1">>}),
false = validate({filter, <<"sport+">>}).
t_triples(_) ->
Triples = [{root,<<"a">>,<<"a">>},
{<<"a">>,<<"b">>,<<"a/b">>},
{<<"a/b">>,<<"c">>,<<"a/b/c">>}],
Triples = triples(<<"a/b/c">>).
t_triples_perf(_) ->
Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
{Time, _} = timer:tc(fun() ->
[triples(Topic) || _I <- lists:seq(1, ?N)]
end),
io:format("Time for triples: ~p(micro)", [Time/?N]).
t_words(_) ->
['', <<"a">>, '+', '#'] = words(<<"/a/+/#">>),
['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'] = words(<<"/abkc/19383/+/akakdkkdkak/#">>),
{Time, _} = timer:tc(fun() ->
[words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)]
end),
io:format("Time for words: ~p(micro)", [Time/?N]),
{Time2, _} = timer:tc(fun() ->
[binary:split(<<"/abkc/19383/+/akakdkkdkak/#">>, <<"/">>, [global]) || _I <- lists:seq(1, ?N)]
end),
io:format("Time for binary:split: ~p(micro)", [Time2/?N]).
t_join(_) ->
<<>> = join([]),
<<"x">> = join([<<"x">>]),
<<"#">> = join(['#']),
<<"+//#">> = join(['+', '', '#']),
<<"x/y/z/+">> = join([<<"x">>, <<"y">>, <<"z">>, '+']),
<<"/ab/cd/ef/">> = join(words(<<"/ab/cd/ef/">>)),
<<"ab/+/#">> = join(words(<<"ab/+/#">>)).
t_is_queue(_) ->
true = is_queue(<<"$Q/queue">>),
true = is_queue(<<"$q/queue">>),
false = is_queue(<<"xyz/queue">>).
t_systop(_) ->
SysTop1 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/xyz"]),
SysTop1 = systop('xyz'),
SysTop2 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/abc"]),
SysTop2 = systop(<<"abc">>).
t_feed_var(_) ->
<<"$Q/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>),
<<"username/test/client/x">> = feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>),
<<"username/test/client/clientId">> = feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>).
long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).

View File

@ -1,158 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_topic_tests).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-import(emqttd_topic, [validate/1, wildcard/1, match/2, triples/1, words/1,
join/1, feed_var/3, is_queue/1, systop/1]).
-define(N, 10000).
validate_test() ->
?assert( validate({filter, <<"sport/tennis/#">>}) ),
?assert( validate({filter, <<"a/b/c">>}) ),
?assert( validate({filter, <<"/a/b">>}) ),
?assert( validate({filter, <<"/+/x">>}) ),
?assert( validate({filter, <<"/a/b/c/#">>}) ),
?assert( validate({filter, <<"x">>}) ),
?assertNot( validate({name, <<>>}) ),
?assertNot( validate({filter, <<"a/#/c">>}) ),
?assertNot( validate({filter, <<"sport/tennis#">>}) ),
?assertNot( validate({filter, <<"sport/tennis/#/ranking">>}) ).
sigle_level_validate_test() ->
?assert( validate({filter, <<"+">>}) ),
?assert( validate({filter, <<"+/tennis/#">>}) ),
?assertNot( validate({filter, <<"sport+">>}) ),
?assert( validate({filter, <<"sport/+/player1">>}) ).
match_test() ->
?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/player1/#">>) ),
?assert( match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/player1/#">>) ),
?assert( match(<<"sport/tennis/player1/score/wimbledon">>, <<"sport/tennis/player1/#">>) ),
?assert( match(<<"sport">>, <<"sport/#">>) ),
?assert( match(<<"sport">>, <<"#">>) ),
?assert( match(<<"/sport/football/score/1">>, <<"#">>) ),
%% paho test
?assert( match(<<"Topic/C">>, <<"+/+">>) ),
?assert( match(<<"TopicA/B">>, <<"+/+">>) ),
?assert( match(<<"TopicA/C">>, <<"+/+">>) ),
?assert( match(<<"abc">>, <<"+">>) ),
?assert( match(<<"a/b/c">>, <<"a/b/c">>) ),
?assertNot( match(<<"a/b/c">>, <<"a/c/d">>) ),
?assertNot( match(<<"$shared/x/y">>, <<"+">>) ),
?assertNot( match(<<"$shared/x/y">>, <<"+/x/y">>) ),
?assertNot( match(<<"$shared/x/y">>, <<"#">>) ),
?assertNot( match(<<"$shared/x/y">>, <<"+/+/#">>) ),
?assertNot( match(<<"house/1/sensor/0">>, <<"house/+">>) ).
sigle_level_match_test() ->
?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>) ),
?assertNot( match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/+">>) ),
?assertNot( match(<<"sport">>, <<"sport/+">>) ),
?assert( match(<<"sport/">>, <<"sport/+">>) ),
?assert( match(<<"/finance">>, <<"+/+">>) ),
?assert( match(<<"/finance">>, <<"/+">>) ),
?assertNot( match(<<"/finance">>, <<"+">>) ).
sys_match_test() ->
?assert( match(<<"$SYS/broker/clients/testclient">>, <<"$SYS/#">>) ),
?assert( match(<<"$SYS/broker">>, <<"$SYS/+">>) ),
?assertNot( match(<<"$SYS/broker">>, <<"+/+">>) ),
?assertNot( match(<<"$SYS/broker">>, <<"#">>) ).
'#_match_test'() ->
?assert( match(<<"a/b/c">>, <<"#">>) ),
?assert( match(<<"a/b/c">>, <<"+/#">>) ),
?assertNot( match(<<"$SYS/brokers">>, <<"#">>) ).
match_perf_test() ->
?assert( match(<<"a/b/ccc">>, <<"a/#">>) ),
Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
Filter = <<"/abkc/19383/+/akakdkkdkak/#">>,
?assert( match(Name, Filter) ),
%?debugFmt("Match ~p with ~p", [Name, Filter]),
{Time, _} = timer:tc(fun() ->
[match(Name, Filter) || _I <- lists:seq(1, ?N)]
end),
?debugFmt("Time for match: ~p(micro)", [Time/?N]),
ok.
triples_test() ->
Triples = [{root, <<"a">>, <<"a">>}, {<<"a">>, <<"b">>, <<"a/b">>}],
?assertMatch(Triples, triples(<<"a/b">>) ).
triples_perf_test() ->
Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
{Time, _} = timer:tc(fun() ->
[triples(Topic) || _I <- lists:seq(1, ?N)]
end),
?debugFmt("Time for triples: ~p(micro)", [Time/?N]),
ok.
type_test() ->
?assertEqual(false, wildcard(<<"/a/b/cdkd">>)),
?assertEqual(true, wildcard(<<"/a/+/d">>)),
?assertEqual(true, wildcard(<<"/a/b/#">>)).
words_test() ->
?assertEqual(['', <<"a">>, '+', '#'], words(<<"/a/+/#">>) ),
?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)),
{Time, _} = timer:tc(fun() ->
[words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)]
end),
?debugFmt("Time for words: ~p(micro)", [Time/?N]),
{Time2, _} = timer:tc(fun() ->
[binary:split(<<"/abkc/19383/+/akakdkkdkak/#">>, <<"/">>, [global]) || _I <- lists:seq(1, ?N)]
end),
?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]),
ok.
is_queue_test() ->
?assert( is_queue(<<"$Q/queue">>) ),
?assert( is_queue(<<"$q/queue">>) ),
?assertNot( is_queue(<<"xyz/queue">>) ).
systop_test() ->
?assertEqual( iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/xyz"]), systop('xyz') ),
?assertEqual( iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/abc"]), systop(<<"abc">>) ).
feed_var_test() ->
?assertEqual(<<"$Q/client/clientId">>, feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)),
?assertEqual(<<"username/test/client/x">>,
feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)),
?assertEqual(<<"username/test/client/clientId">>,
feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)).
join_test() ->
?assertEqual(<<"/ab/cd/ef/">>, join(words(<<"/ab/cd/ef/">>))),
?assertEqual(<<"ab/+/#">>, join(words(<<"ab/+/#">>))),
?assertEqual( <<"x/y/z/+">>, join([<<"x">>, <<"y">>, <<"z">>, '+']) ),
?assertEqual( <<>>, join([]) ),
?assertEqual( <<"x">>, join([<<"x">>]) ),
?assertEqual( <<"#">>, join(['#']) ),
?assertEqual( <<"+//#">>, join(['+', '', '#']) ).
long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).
-endif.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>. All Rights Reserved.
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,45 +14,39 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_trie_tests).
-module(emqttd_trie_SUITE).
-ifdef(TEST).
-include("emqttd.hrl").
-compile(export_all).
-include("emqttd_trie.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(TRIE, emqttd_trie).
trie_test_() ->
{setup, fun setup/0, fun teardown/1,
[{foreach, fun() -> ok end, fun(_) -> clear() end,
[?_test(t_insert()),
?_test(t_match()),
?_test(t_match2()),
?_test(t_match3()),
?_test(t_delete()),
?_test(t_delete2()),
?_test(t_delete3())]
}]}.
all() ->
[t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3].
setup() ->
init_per_suite(Config) ->
emqttd_mnesia:ensure_started(),
?TRIE:mnesia(boot),
?TRIE:mnesia(copy).
?TRIE:mnesia(copy),
Config.
teardown(_) ->
end_per_suite(_Config) ->
emqttd_mnesia:ensure_stopped(),
emqttd_mnesia:delete_schema().
t_insert() ->
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
clear_tables().
t_insert(_) ->
TN = #trie_node{node_id = <<"sensor">>,
edge_count = 3,
topic = <<"sensor">>,
flags = undefined},
?assertEqual({atomic, [TN]}, mnesia:transaction(
{atomic, [TN]} = mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/+/#">>),
@ -60,43 +54,43 @@ t_insert() ->
?TRIE:insert(<<"sensor">>),
?TRIE:insert(<<"sensor">>),
?TRIE:lookup(<<"sensor">>)
end)).
end).
t_match() ->
t_match(_) ->
Machted = [<<"sensor/+/#">>, <<"sensor/#">>],
?assertEqual({atomic, Machted}, mnesia:transaction(
{atomic, Machted} = mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/+/#">>),
?TRIE:insert(<<"sensor/#">>),
?TRIE:match(<<"sensor/1">>)
end)).
end).
t_match2() ->
t_match2(_) ->
Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []},
?assertEqual({atomic, Matched}, mnesia:transaction(
{atomic, Matched} = mnesia:transaction(
fun() ->
?TRIE:insert(<<"#">>),
?TRIE:insert(<<"+/#">>),
?TRIE:insert(<<"+/+/#">>),
{?TRIE:match(<<"a/b/c">>),
?TRIE:match(<<"$SYS/broker/zenmq">>)}
end)).
end).
t_match3() ->
t_match3(_) ->
Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
mnesia:transaction(fun() -> [emqttd_trie:insert(Topic) || Topic <- Topics] end),
Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [<<"a/b/c">>]),
?assertEqual(4, length(Matched)),
4 = length(Matched),
SysMatched = mnesia:async_dirty(fun emqttd_trie:match/1, [<<"$SYS/a/b/c">>]),
?assertEqual([<<"$SYS/#">>], SysMatched).
[<<"$SYS/#">>] = SysMatched.
t_delete() ->
t_delete(_) ->
TN = #trie_node{node_id = <<"sensor/1">>,
edge_count = 2,
topic = undefined,
flags = undefined},
?assertEqual({atomic, [TN]}, mnesia:transaction(
{atomic, [TN]} = mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/1/#">>),
?TRIE:insert(<<"sensor/1/metric/2">>),
@ -105,10 +99,10 @@ t_delete() ->
?TRIE:delete(<<"sensor/1/metric">>),
?TRIE:delete(<<"sensor/1/metric">>),
?TRIE:lookup(<<"sensor/1">>)
end)).
end).
t_delete2() ->
?assertEqual({atomic, {[], []}}, mnesia:transaction(
t_delete2(_) ->
{atomic, {[], []}} = mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor">>),
?TRIE:insert(<<"sensor/1/metric/2">>),
@ -118,10 +112,10 @@ t_delete2() ->
?TRIE:delete(<<"sensor/1/metric/3">>),
{?TRIE:lookup(<<"sensor">>),
?TRIE:lookup(<<"sensor/1">>)}
end)).
end).
t_delete3() ->
?assertEqual({atomic, {[], []}}, mnesia:transaction(
t_delete3(_) ->
{atomic, {[], []}} = mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/+">>),
?TRIE:insert(<<"sensor/+/metric/2">>),
@ -132,10 +126,8 @@ t_delete3() ->
?TRIE:delete(<<"sensor/+">>),
?TRIE:delete(<<"sensor/+/unknown">>),
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)}
end)).
end).
clear() ->
mnesia:clear_table(trie),
mnesia:clear_table(trie_node).
clear_tables() ->
lists:foreach(fun mnesia:clear_table/1, [trie, trie_node]).
-endif.

View File

@ -1,72 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(priority_queue_tests).
-include("emqttd.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(PQ, priority_queue).
plen_test() ->
Q = ?PQ:new(),
?assertEqual(0, ?PQ:plen(0, Q)),
Q0 = ?PQ:in(z, Q),
?assertEqual(1, ?PQ:plen(0, Q0)),
Q1 = ?PQ:in(x, 1, Q0),
?assertEqual(1, ?PQ:plen(1, Q1)),
Q2 = ?PQ:in(y, 2, Q1),
?assertEqual(1, ?PQ:plen(2, Q2)),
Q3 = ?PQ:in(z, 2, Q2),
?assertEqual(2, ?PQ:plen(2, Q3)),
{_, Q4} = ?PQ:out(1, Q3),
?assertEqual(0, ?PQ:plen(1, Q4)),
{_, Q5} = ?PQ:out(Q4),
?assertEqual(1, ?PQ:plen(2, Q5)),
{_, Q6} = ?PQ:out(Q5),
?assertEqual(0, ?PQ:plen(2, Q6)),
?assertEqual(1, ?PQ:len(Q6)),
{_, Q7} = ?PQ:out(Q6),
?assertEqual(0, ?PQ:len(Q7)).
out2_test() ->
Els = [a, {b, 1}, {c, 1}, {d, 2}, {e, 2}, {f, 2}],
Q = ?PQ:new(),
Q0 = lists:foldl(
fun({El, P}, Q) ->
?PQ:in(El, P, Q);
(El, Q) ->
?PQ:in(El, Q)
end, Q, Els),
{Val, Q1} = ?PQ:out(Q0),
?assertEqual({value, d}, Val),
{Val1, Q2} = ?PQ:out(2, Q1),
?assertEqual({value, e}, Val1),
{Val2, Q3} = ?PQ:out(1, Q2),
?assertEqual({value, b}, Val2),
{Val3, Q4} = ?PQ:out(Q3),
?assertEqual({value, f}, Val3),
{Val4, Q5} = ?PQ:out(Q4),
?assertEqual({value, c}, Val4),
{Val5, Q6} = ?PQ:out(Q5),
?assertEqual({value, a}, Val5),
{empty, _Q7} = ?PQ:out(Q6).
-endif.