diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index ad1e2d22d..4528239e6 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,15 +16,12 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0, - try_get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {clean_start, client_id, client_pid}). - --define(TAB, messages). +-record(state, {clean_start, client_id, client_pid, last_msg}). start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). @@ -38,25 +35,14 @@ close_session(ClientPid, SessPid) -> stop(CPid) -> gen_server:call(CPid, stop). -get_last_message() -> - [{last_message, Msg}] = ets:lookup(?TAB, last_message), - Msg. - -try_get_last_message() -> - case ets:lookup(?TAB, last_message) of - [{last_message, Msg}] -> Msg; - [] -> false - end. +get_last_message(Pid) -> + gen_server:call(Pid, get_last_message). init([ClientId]) -> - Result = lists:member(?TAB, ets:all()), - if Result == false -> - ets:new(?TAB, [set, named_table, public]); - true -> ok - end, - {ok, - #state{clean_start = true, - client_id = ClientId} + {ok, #state{clean_start = true, + client_id = ClientId, + last_msg = undefined + } }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> @@ -68,28 +54,26 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> expiry_interval => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), - {reply, {ok, SessPid}, State#state{ - clean_start = true, - client_id = ClientId, - client_pid = ClientPid - }}; - + {reply, {ok, SessPid}, + State#state{clean_start = true, + client_id = ClientId, + client_pid = ClientPid + }}; handle_call({stop_session, SessPid}, _From, State) -> emqx_sm:close_session(SessPid), {stop, normal, ok, State}; - +handle_call(get_last_message, _From, #state{last_msg = Msg} = State) -> + {reply, Msg, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; - handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({_, Msg}, State) -> - ets:insert(?TAB, {last_message, Msg}), - {noreply, State}; +handle_info({deliver, Msg}, State) -> + {noreply, State#state{last_msg = Msg}}; handle_info(_Info, State) -> {noreply, State}. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 2b60b747d..f79b84557 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -40,7 +40,7 @@ t_session_all(_) -> [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), - {publish, 1, _} = emqx_mock_client:get_last_message(), + {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), emqx_session:puback(SPid, 2), emqx_session:puback(SPid, 3, reasoncode), emqx_session:pubrec(SPid, 4), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index b44a00680..8eb309001 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -19,6 +19,7 @@ -export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). -include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). @@ -32,7 +33,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -t_random(_) -> +t_random_basic(_) -> application:set_env(?APPLICATION, shared_subscription_strategy, random), ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), @@ -42,7 +43,7 @@ t_random(_) -> %% wait for the subscription to show up ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), emqx_session:publish(SPid, 1, Message1), - ?wait(case emqx_mock_client:try_get_last_message() of + ?wait(case emqx_mock_client:get_last_message(ConnPid) of {publish, 1, _} -> true; Other -> Other end, 1000), @@ -55,6 +56,9 @@ t_random(_) -> emqx_mock_client:close_session(ConnPid, SPid), ok. +t_random(_) -> + test_two_messages(random). + t_round_robin(_) -> test_two_messages(round_robin). @@ -76,17 +80,19 @@ t_not_so_sticky(_) -> Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), - emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), %% wait for the subscription to show up - ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso - ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000), emqx_session:publish(SPid1, 1, Message1), - ?wait(case emqx_mock_client:try_get_last_message() of + ?wait(case emqx_mock_client:get_last_message(ConnPid1) of {publish, _, #message{payload = <<"hello1">>}} -> true; Other -> Other end, 1000), - emqx_session:publish(SPid1, 2, Message2), - ?wait(case emqx_mock_client:try_get_last_message() of + emqx_mock_client:close_session(ConnPid1, SPid1), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid2, 2, Message2), + ?wait(case emqx_mock_client:get_last_message(ConnPid2) of {publish, _, #message{payload = <<"hello2">>}} -> true; Other -> Other end, 1000), @@ -132,11 +138,17 @@ test_two_messages(Strategy) -> hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); _ -> ok end, ->>>>>>> 38d0d409... Add 'hash' option for shared subscription emqx_mock_client:close_session(ConnPid1, SPid1), emqx_mock_client:close_session(ConnPid2, SPid2), ok. +last_message(_ExpectedPayload, []) -> <<"not yet?">>; +last_message(ExpectedPayload, [Pid | Pids]) -> + case emqx_mock_client:get_last_message(Pid) of + {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid}; + _Other -> last_message(ExpectedPayload, Pids) + end. + %%------------------------------------------------------------------------------ %% help functions %%------------------------------------------------------------------------------