diff --git a/Makefile b/Makefile index dbd503864..23a7da11c 100644 --- a/Makefile +++ b/Makefile @@ -35,12 +35,12 @@ EUNIT_OPTS = verbose # CT_SUITES = emqx_frame ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session \ +CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 8f1c3156f..175271306 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -102,8 +102,13 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +-ifdef(TEST). +ensure_expiry_timer(State) -> + State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}. +-else. ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. +-endif. expire_banned_items(Now) -> mnesia:foldl(fun diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index f0946e92d..857090c25 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -260,9 +260,19 @@ subscription(Topic, Subscriber) -> -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1; + case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of + {Match, _} -> + length(Match) >= 1; + '$end_of_table' -> + false + end; subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1; + case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of + {Match, _} -> + length(Match) >= 1; + '$end_of_table' -> + false + end; subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 29dbb660c..25faef166 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -75,4 +75,3 @@ compile(Rules) -> {ok, MP} = re:compile(Re), {rewrite, Topic, MP, Dest} end, Rules). - diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index c91aeae45..9d4c85134 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -29,13 +29,17 @@ t_banned_all(_) -> emqx_ct_broker_helpers:run_setup_steps(), emqx_banned:start_link(), TimeNow = erlang:system_time(second), - ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>}, - reason = <<"test">>, - by = <<"banned suite">>, - desc = <<"test">>, - until = TimeNow + 10}), + Banned = #banned{who = {client_id, <<"TestClient">>}, + reason = <<"test">>, + by = <<"banned suite">>, + desc = <<"test">>, + until = TimeNow + 1}, + ok = emqx_banned:add(Banned), % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed - timer:sleep(100), + ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + timer:sleep(2500), + ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + ok = emqx_banned:add(Banned), ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), emqx_banned:del({client_id, <<"TestClient">>}), ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl new file mode 100644 index 000000000..f337e3b4e --- /dev/null +++ b/test/emqx_bridge_SUITE.erl @@ -0,0 +1,57 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [bridge_test]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +bridge_test(_) -> + {ok, _Pid} = emqx_bridge:start_link(emqx, []), + #{msg := <<"start bridge successfully">>} + = emqx_bridge:start_bridge(emqx), + test_forwards(), + test_subscriptions(0), + test_subscriptions(1), + test_subscriptions(2), + #{msg := <<"stop bridge successfully">>} + = emqx_bridge:stop_bridge(emqx), + ok. + +test_forwards() -> + emqx_bridge:add_forward(emqx, <<"test_forwards">>), + [<<"test_forwards">>] = emqx_bridge:show_forwards(emqx), + emqx_bridge:del_forward(emqx, <<"test_forwards">>), + [] = emqx_bridge:show_forwards(emqx), + ok. + +test_subscriptions(QoS) -> + emqx_bridge:add_subscription(emqx, <<"test_subscriptions">>, QoS), + [{<<"test_subscriptions">>, QoS}] = emqx_bridge:show_subscriptions(emqx), + emqx_bridge:del_subscription(emqx, <<"test_subscriptions">>), + [] = emqx_bridge:show_subscriptions(emqx), + ok. diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 7baa248f3..7fcc2a598 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -60,6 +60,11 @@ subscribe_unsubscribe(_) -> ok = emqx:subscribe(<<"topic">>, <<"clientId">>), ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }), ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }), + true = emqx:subscribed(<<"topic">>, <<"clientId">>), + Topics = emqx:topics(), + lists:foreach(fun(Topic) -> + ?assert(lists:member(Topic, Topics)) + end, Topics), ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). @@ -72,12 +77,16 @@ publish(_) -> ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). pubsub(_) -> + true = emqx:is_running(node()), Self = self(), Subscriber = {Self, <<"clientId">>}, ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }), - #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + #{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + #{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber), + true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}), + #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber), ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }), - #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + #{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), timer:sleep(10), [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl deleted file mode 100644 index 716e771b5..000000000 --- a/test/emqx_connection_SUITE.erl +++ /dev/null @@ -1,47 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_connection_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("common_test/include/ct.hrl"). - -all() -> - [{group, connection}]. - -groups() -> - [{connection, [sequence], [t_attrs]}]. - -init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), - Config. - -end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). - - -t_attrs(_) -> - {ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]), - [{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>), - Attrs = emqx_connection:attrs(ConnPid), - <<"simpleClient">> = proplists:get_value(client_id, Attrs), - <<"plain">> = proplists:get_value(username, Attrs), - emqx_client:disconnect(C). - -%% t_stats() -> -%% {ok, C, _ } = emqx_client; -%% t_stats() -> - diff --git a/test/emqx_mod_rewrite_tests.erl b/test/emqx_mod_rewrite_tests.erl new file mode 100644 index 000000000..6fea7ee71 --- /dev/null +++ b/test/emqx_mod_rewrite_tests.erl @@ -0,0 +1,63 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_mod_rewrite_tests). + +-include_lib("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +rules() -> + Rawrules1 = "x/# ^x/y/(.+)$ z/y/$1", + Rawrules2 = "y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2", + Rawrules = [Rawrules1, Rawrules2], + Rules = lists:map(fun(Rule) -> + [Topic, Re, Dest] = string:tokens(Rule, " "), + {rewrite, + list_to_binary(Topic), + list_to_binary(Re), + list_to_binary(Dest)} + end, Rawrules), + lists:map(fun({rewrite, Topic, Re, Dest}) -> + {ok, MP} = re:compile(Re), + {rewrite, Topic, MP, Dest} + end, Rules). + +rewrite_subscribe_test() -> + Rules = rules(), + io:format("Rules: ~p",[Rules]), + ?assertEqual({ok, [{<<"test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"z/y/test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)). + +rewrite_unsubscribe_test() -> + Rules = rules(), + ?assertEqual({ok, [{<<"test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"z/y/test">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)), + ?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]}, + emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)). + +rewrite_publish_test() -> + Rules = rules(), + ?assertMatch({ok, #message{topic = <<"test">>}}, + emqx_mod_rewrite:rewrite_publish(#message{topic = <<"test">>}, Rules)), + ?assertMatch({ok, #message{topic = <<"z/y/test">>}}, + emqx_mod_rewrite:rewrite_publish(#message{topic = <<"x/y/test">>}, Rules)), + ?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}}, + emqx_mod_rewrite:rewrite_publish(#message{topic = <<"y/test/z/test_topic">>}, Rules)). diff --git a/test/emqx_pqueue_SUITE.erl b/test/emqx_pqueue_SUITE.erl index e610a7639..e7672cb0b 100644 --- a/test/emqx_pqueue_SUITE.erl +++ b/test/emqx_pqueue_SUITE.erl @@ -22,7 +22,7 @@ -define(PQ, emqx_pqueue). -all() -> [t_priority_queue_plen, t_priority_queue_out2]. +all() -> [t_priority_queue_plen, t_priority_queue_out2, t_priority_queues]. t_priority_queue_plen(_) -> Q = ?PQ:new(), @@ -67,3 +67,57 @@ t_priority_queue_out2(_) -> {Val5, Q6} = ?PQ:out(Q5), {value, a} = Val5, {empty, _Q7} = ?PQ:out(Q6). + +t_priority_queues(_) -> + Q0 = ?PQ:new(), + Q1 = ?PQ:new(), + PQueue = {pqueue, [{0, Q0}, {1, Q1}]}, + ?assert(?PQ:is_queue(PQueue)), + [] = ?PQ:to_list(PQueue), + + PQueue1 = ?PQ:in(a, 0, ?PQ:new()), + PQueue2 = ?PQ:in(b, 0, PQueue1), + + PQueue3 = ?PQ:in(c, 1, PQueue2), + PQueue4 = ?PQ:in(d, 1, PQueue3), + + 4 = ?PQ:len(PQueue4), + + [{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4), + PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]), + + empty = ?PQ:highest(?PQ:new()), + 0 = ?PQ:highest(PQueue1), + 1 = ?PQ:highest(PQueue4), + + PQueue5 = ?PQ:in(e, infinity, PQueue4), + PQueue6 = ?PQ:in(f, 1, PQueue5), + + {{value, e}, PQueue7} = ?PQ:out(PQueue6), + {empty, _} = ?PQ:out(0, ?PQ:new()), + + {empty, Q0} = ?PQ:out_p(Q0), + + Q2 = ?PQ:in(a, Q0), + Q3 = ?PQ:in(b, Q2), + Q4 = ?PQ:in(c, Q3), + + {{value, a, 0}, _Q5} = ?PQ:out_p(Q4), + + {{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7), + + Q4 = ?PQ:join(Q4, ?PQ:new()), + Q4 = ?PQ:join(?PQ:new(), Q4), + + {queue, [a], [a], 2} = ?PQ:join(Q2, Q2), + + {pqueue,[{-1,{queue,[f],[d],2}}, + {0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2), + + {pqueue,[{-1,{queue,[f],[d],2}}, + {0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8), + + {pqueue,[{-1,{queue,[f],[d,f,d],4}}, + {0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8). + + diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index 196b1678e..a35da9c5d 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -29,7 +29,9 @@ all() -> groups() -> [{route, [sequence], [add_del_route, - match_routes]}]. + match_routes, + has_routes, + router_add_del]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -81,6 +83,7 @@ match_routes(_) -> has_routes(_) -> From = {self(), make_ref()}, ?R:add_route(From, <<"devices/+/messages">>, node()), + timer:sleep(200), ?assert(?R:has_routes(<<"devices/+/messages">>)). clear_tables() -> @@ -88,28 +91,33 @@ clear_tables() -> router_add_del(_) -> ?R:add_route(<<"#">>), - ?R:add_route(<<"a/b/c">>), + ?R:add_route(<<"a/b/c">>, node()), ?R:add_route(<<"+/#">>), Routes = [R1, R2 | _] = [ #route{topic = <<"#">>, dest = node()}, #route{topic = <<"+/#">>, dest = node()}, #route{topic = <<"a/b/c">>, dest = node()}], + timer:sleep(500), ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), + ?R:print_routes(<<"a/b/c">>), + %% Batch Add lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), %% Del - ?R:del_route(<<"a/b/c">>), - [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)), + ?R:del_route(<<"a/b/c">>, node()), + timer:sleep(500), + [R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)), {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), %% Batch Del R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'}, ?R:add_route(R3), - ?R:del_route(R1), + ?R:del_route(<<"#">>), ?R:del_route(R2), ?R:del_route(R3), - [] = lists:sort(?R:match(<<"a/b/c">>)). + timer:sleep(500), + [] = lists:sort(?R:match_routes(<<"a/b/c">>)).