From ce3f2e4d9e1a879a0c2db31ee1521b9943281034 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Mon, 27 Aug 2018 21:18:00 +0800 Subject: [PATCH] fix emqx_broker test suite --- src/emqx_alarm_mgr.erl | 2 +- src/emqx_time.erl | 7 ++- test/emqx_broker_SUITE.erl | 90 ++++++++++++++++++++------------------ test/emqx_mock_client.erl | 10 ++++- 4 files changed, 63 insertions(+), 46 deletions(-) diff --git a/src/emqx_alarm_mgr.erl b/src/emqx_alarm_mgr.erl index 41da4e705..9839ba44e 100644 --- a/src/emqx_alarm_mgr.erl +++ b/src/emqx_alarm_mgr.erl @@ -81,7 +81,7 @@ handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)-> handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) -> case encode_alarm(Alarm) of {ok, Json} -> - ok = emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); + emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); {error, Reason} -> emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason]) end, diff --git a/src/emqx_time.erl b/src/emqx_time.erl index 0d74168c4..95bfc9409 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -14,7 +14,7 @@ -module(emqx_time). --export([seed/0, now_secs/0, now_ms/0, now_ms/1]). +-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1]). seed() -> rand:seed(exsplus, erlang:timestamp()). @@ -22,8 +22,11 @@ seed() -> now_secs() -> erlang:system_time(second). +now_secs({MegaSecs, Secs, _MicroSecs}) -> + MegaSecs * 1000000 + Secs. + now_ms() -> erlang:system_time(millisecond). now_ms({MegaSecs, Secs, MicroSecs}) -> - (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). \ No newline at end of file + (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 917143c3a..a71a96539 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -64,12 +64,12 @@ end_per_suite(_Config) -> %%-------------------------------------------------------------------- subscribe_unsubscribe(_) -> - ok = emqx:subscribe(<<"topic">>, <<"clientId">>), - ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]), - ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]), - ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), - ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), - ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). + ok = emqx:subscribe(<<"topic">>, "clientId"), + ok = emqx:subscribe(<<"topic/1">>, "clientId", #{ qos => 1 }), + ok = emqx:subscribe(<<"topic/2">>, "clientId", #{ qos => 2 }), + ok = emqx:unsubscribe(<<"topic">>, "clientId"), + ok = emqx:unsubscribe(<<"topic/1">>, "clientId"), + ok = emqx:unsubscribe(<<"topic/2">>, "clientId"). publish(_) -> Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), @@ -80,13 +80,17 @@ publish(_) -> pubsub(_) -> Self = self(), - ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]), - ?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])), + Subscriber = {Self, <<"clientId">>}, + ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }), + #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }), + #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), + %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), timer:sleep(10), - [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self), - [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>), + [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber), + [{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>), emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), + ?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end), spawn(fun() -> emqx:subscribe(<<"a/b/c">>), emqx:subscribe(<<"c/d/e">>), @@ -97,32 +101,33 @@ pubsub(_) -> emqx:unsubscribe(<<"a/b/c">>). t_local_subscribe(_) -> - ok = emqx:subscribe("$local/topic0"), - ok = emqx:subscribe("$local/topic1", <<"x">>), - ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]), + ok = emqx:subscribe(<<"$local/topic0">>), + ok = emqx:subscribe(<<"$local/topic1">>, "clientId"), + ok = emqx:subscribe(<<"$local/topic2">>, "clientId", #{ qos => 2 }), timer:sleep(10), - ?assertEqual([self()], emqx:subscribers("$local/topic0")), - ?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")), - ?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []}, - {{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}], - emqx:subscriptions(<<"x">>)), + ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")), + ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")), + ?assertEqual([{<<"$local/topic1">>, #{}}, + {<<"$local/topic2">>, #{ qos => 2 }}], + emqx:subscriptions({self(), <<"clientId">>})), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), - ?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")), - ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)), - ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)), + ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), + ?assertEqual(ok, emqx:unsubscribe("$local/topic1", "clientId")), + ?assertEqual(ok, emqx:unsubscribe("$local/topic2", "clientId")), ?assertEqual([], emqx:subscribers("topic1")), - ?assertEqual([], emqx:subscriptions(<<"x">>)). + ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})). t_shared_subscribe(_) -> emqx:subscribe("$local/$share/group1/topic1"), emqx:subscribe("$share/group2/topic2"), emqx:subscribe("$queue/topic3"), timer:sleep(10), - ?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)), - ?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []}, - {self(), <<"$queue/topic3">>, []}, - {self(), <<"$share/group2/topic2">>, []}], - lists:sort(emqx:subscriptions(self()))), + ct:log("share subscriptions: ~p~n", [emqx:subscriptions({self(), undefined})]), + ?assertEqual([{self(), undefined}], emqx:subscribers(<<"$local/$share/group1/topic1">>)), + ?assertEqual([{<<"$local/$share/group1/topic1">>, #{}}, + {<<"$queue/topic3">>, #{}}, + {<<"$share/group2/topic2">>, #{}}], + lists:sort(emqx:subscriptions({self(), undefined}))), emqx:unsubscribe("$local/$share/group1/topic1"), emqx:unsubscribe("$share/group2/topic2"), emqx:unsubscribe("$queue/topic3"), @@ -146,17 +151,18 @@ t_shared_subscribe(_) -> %% Session Group %%-------------------------------------------------------------------- start_session(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), - {ok, SessPid} = emqx_mock_client:start_session(ClientPid), - Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), - Message1 = Message#message{id = 1}, - emqx_session:publish(SessPid, Message1), - emqx_session:pubrel(SessPid, 1), - emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), + ClientId = <<"clientId">>, + {ok, ClientPid} = emqx_mock_client:start_link(ClientId), + {ok, SessPid} = emqx_mock_client:open_session(ClientPid, ClientId, internal), + Message1 = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), + emqx_session:publish(SessPid, 1, Message1), + emqx_session:pubrel(SessPid, 2, reasoncode), + emqx_session:subscribe(SessPid, [{<<"topic/session">>, #{qos => 2}}]), Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>), - emqx_session:publish(SessPid, Message2), + emqx_session:publish(SessPid, 3, Message2), emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]), - emqx_mock_client:stop(ClientPid). + %% emqx_mock_client:stop(ClientPid). + emqx_mock_client:close_session(ClientPid, SessPid). %%-------------------------------------------------------------------- %% Broker Group @@ -231,10 +237,10 @@ hook_fun8(arg, initArg) -> stop. set_alarms(_) -> AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, - emqx_alarm:set_alarm(AlarmTest), - Alarms = emqx_alarm:get_alarms(), + emqx_alarm_mgr:set_alarm(AlarmTest), + Alarms = emqx_alarm_mgr:get_alarms(), + ct:log("Alarms Length: ~p ~n", [length(Alarms)]), ?assertEqual(1, length(Alarms)), - emqx_alarm:clear_alarm(<<"1">>), - [] = emqx_alarm:get_alarms(). - + emqx_alarm_mgr:clear_alarm(<<"1">>), + [] = emqx_alarm_mgr:get_alarms(). diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index e76e5551c..8afbeeb17 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, stop/1]). +-export([start_link/1, open_session/3, close_session/2, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,6 +31,9 @@ start_link(ClientId) -> open_session(ClientPid, ClientId, Zone) -> gen_server:call(ClientPid, {start_session, ClientPid, ClientId, Zone}). +close_session(ClientPid, SessPid) -> + gen_server:call(ClientPid, {stop_session, SessPid}). + stop(CPid) -> gen_server:call(CPid, stop). @@ -55,6 +58,11 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> client_pid = ClientPid }}; +handle_call({stop_session, SessPid}, _From, State) -> + unlink(SessPid), + emqx_sm:close_session(SessPid), + {stop, normal, ok, State}; + handle_call(stop, _From, State) -> {stop, normal, ok, State};