From 3b9247994d018e4c665b74a44b09152fa02ddfe2 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sat, 15 Sep 2018 15:02:16 +0200 Subject: [PATCH] Refine emqx_mock_client Before this change, eqmx_mock_client uses a shared ets table to store last received message, this causes troulbe when we want to start / stop two or more clients in one test case the ets table gets owned by the first spanwed client and gets closed when the owner client dies. Now it keeps the last received message in process state and a gen_server call is added to retrieve it for verification Along with this change in emqx_mock_client, it made possible to write test case to verify the actual subscriber pid used in shared subscription strategy, so test cases were added (and modified) to verify different strategies --- test/emqx_mock_client.erl | 50 ++++++++++++---------------------- test/emqx_session_SUITE.erl | 2 +- test/emqx_shared_sub_SUITE.erl | 30 ++++++++++++++------ 3 files changed, 39 insertions(+), 43 deletions(-) diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index ad1e2d22d..4528239e6 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,15 +16,12 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0, - try_get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {clean_start, client_id, client_pid}). - --define(TAB, messages). +-record(state, {clean_start, client_id, client_pid, last_msg}). start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). @@ -38,25 +35,14 @@ close_session(ClientPid, SessPid) -> stop(CPid) -> gen_server:call(CPid, stop). -get_last_message() -> - [{last_message, Msg}] = ets:lookup(?TAB, last_message), - Msg. - -try_get_last_message() -> - case ets:lookup(?TAB, last_message) of - [{last_message, Msg}] -> Msg; - [] -> false - end. +get_last_message(Pid) -> + gen_server:call(Pid, get_last_message). init([ClientId]) -> - Result = lists:member(?TAB, ets:all()), - if Result == false -> - ets:new(?TAB, [set, named_table, public]); - true -> ok - end, - {ok, - #state{clean_start = true, - client_id = ClientId} + {ok, #state{clean_start = true, + client_id = ClientId, + last_msg = undefined + } }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> @@ -68,28 +54,26 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> expiry_interval => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), - {reply, {ok, SessPid}, State#state{ - clean_start = true, - client_id = ClientId, - client_pid = ClientPid - }}; - + {reply, {ok, SessPid}, + State#state{clean_start = true, + client_id = ClientId, + client_pid = ClientPid + }}; handle_call({stop_session, SessPid}, _From, State) -> emqx_sm:close_session(SessPid), {stop, normal, ok, State}; - +handle_call(get_last_message, _From, #state{last_msg = Msg} = State) -> + {reply, Msg, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; - handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({_, Msg}, State) -> - ets:insert(?TAB, {last_message, Msg}), - {noreply, State}; +handle_info({deliver, Msg}, State) -> + {noreply, State#state{last_msg = Msg}}; handle_info(_Info, State) -> {noreply, State}. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 2b60b747d..f79b84557 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -40,7 +40,7 @@ t_session_all(_) -> [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), - {publish, 1, _} = emqx_mock_client:get_last_message(), + {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), emqx_session:puback(SPid, 2), emqx_session:puback(SPid, 3, reasoncode), emqx_session:pubrec(SPid, 4), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index b44a00680..8eb309001 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -19,6 +19,7 @@ -export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). -include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). @@ -32,7 +33,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -t_random(_) -> +t_random_basic(_) -> application:set_env(?APPLICATION, shared_subscription_strategy, random), ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), @@ -42,7 +43,7 @@ t_random(_) -> %% wait for the subscription to show up ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), emqx_session:publish(SPid, 1, Message1), - ?wait(case emqx_mock_client:try_get_last_message() of + ?wait(case emqx_mock_client:get_last_message(ConnPid) of {publish, 1, _} -> true; Other -> Other end, 1000), @@ -55,6 +56,9 @@ t_random(_) -> emqx_mock_client:close_session(ConnPid, SPid), ok. +t_random(_) -> + test_two_messages(random). + t_round_robin(_) -> test_two_messages(round_robin). @@ -76,17 +80,19 @@ t_not_so_sticky(_) -> Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), - emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), %% wait for the subscription to show up - ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso - ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000), emqx_session:publish(SPid1, 1, Message1), - ?wait(case emqx_mock_client:try_get_last_message() of + ?wait(case emqx_mock_client:get_last_message(ConnPid1) of {publish, _, #message{payload = <<"hello1">>}} -> true; Other -> Other end, 1000), - emqx_session:publish(SPid1, 2, Message2), - ?wait(case emqx_mock_client:try_get_last_message() of + emqx_mock_client:close_session(ConnPid1, SPid1), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid2, 2, Message2), + ?wait(case emqx_mock_client:get_last_message(ConnPid2) of {publish, _, #message{payload = <<"hello2">>}} -> true; Other -> Other end, 1000), @@ -132,11 +138,17 @@ test_two_messages(Strategy) -> hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); _ -> ok end, ->>>>>>> 38d0d409... Add 'hash' option for shared subscription emqx_mock_client:close_session(ConnPid1, SPid1), emqx_mock_client:close_session(ConnPid2, SPid2), ok. +last_message(_ExpectedPayload, []) -> <<"not yet?">>; +last_message(ExpectedPayload, [Pid | Pids]) -> + case emqx_mock_client:get_last_message(Pid) of + {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid}; + _Other -> last_message(ExpectedPayload, Pids) + end. + %%------------------------------------------------------------------------------ %% help functions %%------------------------------------------------------------------------------