Refine emqx_mock_client
Before this change, eqmx_mock_client uses a shared ets table to store last received message, this causes troulbe when we want to start / stop two or more clients in one test case the ets table gets owned by the first spanwed client and gets closed when the owner client dies. Now it keeps the last received message in process state and a gen_server call is added to retrieve it for verification Along with this change in emqx_mock_client, it made possible to write test case to verify the actual subscriber pid used in shared subscription strategy, so test cases were added (and modified) to verify different strategies
This commit is contained in:
parent
b35d37c92d
commit
3b9247994d
|
@ -16,15 +16,12 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0,
|
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]).
|
||||||
try_get_last_message/0]).
|
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, {clean_start, client_id, client_pid}).
|
-record(state, {clean_start, client_id, client_pid, last_msg}).
|
||||||
|
|
||||||
-define(TAB, messages).
|
|
||||||
|
|
||||||
start_link(ClientId) ->
|
start_link(ClientId) ->
|
||||||
gen_server:start_link(?MODULE, [ClientId], []).
|
gen_server:start_link(?MODULE, [ClientId], []).
|
||||||
|
@ -38,25 +35,14 @@ close_session(ClientPid, SessPid) ->
|
||||||
stop(CPid) ->
|
stop(CPid) ->
|
||||||
gen_server:call(CPid, stop).
|
gen_server:call(CPid, stop).
|
||||||
|
|
||||||
get_last_message() ->
|
get_last_message(Pid) ->
|
||||||
[{last_message, Msg}] = ets:lookup(?TAB, last_message),
|
gen_server:call(Pid, get_last_message).
|
||||||
Msg.
|
|
||||||
|
|
||||||
try_get_last_message() ->
|
|
||||||
case ets:lookup(?TAB, last_message) of
|
|
||||||
[{last_message, Msg}] -> Msg;
|
|
||||||
[] -> false
|
|
||||||
end.
|
|
||||||
|
|
||||||
init([ClientId]) ->
|
init([ClientId]) ->
|
||||||
Result = lists:member(?TAB, ets:all()),
|
{ok, #state{clean_start = true,
|
||||||
if Result == false ->
|
client_id = ClientId,
|
||||||
ets:new(?TAB, [set, named_table, public]);
|
last_msg = undefined
|
||||||
true -> ok
|
}
|
||||||
end,
|
|
||||||
{ok,
|
|
||||||
#state{clean_start = true,
|
|
||||||
client_id = ClientId}
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
||||||
|
@ -68,28 +54,26 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
||||||
expiry_interval => 0
|
expiry_interval => 0
|
||||||
},
|
},
|
||||||
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
||||||
{reply, {ok, SessPid}, State#state{
|
{reply, {ok, SessPid},
|
||||||
clean_start = true,
|
State#state{clean_start = true,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
client_pid = ClientPid
|
client_pid = ClientPid
|
||||||
}};
|
}};
|
||||||
|
|
||||||
handle_call({stop_session, SessPid}, _From, State) ->
|
handle_call({stop_session, SessPid}, _From, State) ->
|
||||||
emqx_sm:close_session(SessPid),
|
emqx_sm:close_session(SessPid),
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
|
handle_call(get_last_message, _From, #state{last_msg = Msg} = State) ->
|
||||||
|
{reply, Msg, State};
|
||||||
handle_call(stop, _From, State) ->
|
handle_call(stop, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({_, Msg}, State) ->
|
handle_info({deliver, Msg}, State) ->
|
||||||
ets:insert(?TAB, {last_message, Msg}),
|
{noreply, State#state{last_msg = Msg}};
|
||||||
{noreply, State};
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ t_session_all(_) ->
|
||||||
[{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}),
|
[{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}),
|
||||||
emqx_session:publish(SPid, 1, Message1),
|
emqx_session:publish(SPid, 1, Message1),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
{publish, 1, _} = emqx_mock_client:get_last_message(),
|
{publish, 1, _} = emqx_mock_client:get_last_message(ConnPid),
|
||||||
emqx_session:puback(SPid, 2),
|
emqx_session:puback(SPid, 2),
|
||||||
emqx_session:puback(SPid, 3, reasoncode),
|
emqx_session:puback(SPid, 3, reasoncode),
|
||||||
emqx_session:pubrec(SPid, 4),
|
emqx_session:pubrec(SPid, 4),
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]).
|
-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
|
-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
|
||||||
|
@ -32,7 +33,7 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_broker_helpers:run_teardown_steps().
|
emqx_ct_broker_helpers:run_teardown_steps().
|
||||||
|
|
||||||
t_random(_) ->
|
t_random_basic(_) ->
|
||||||
application:set_env(?APPLICATION, shared_subscription_strategy, random),
|
application:set_env(?APPLICATION, shared_subscription_strategy, random),
|
||||||
ClientId = <<"ClientId">>,
|
ClientId = <<"ClientId">>,
|
||||||
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
|
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
|
||||||
|
@ -42,7 +43,7 @@ t_random(_) ->
|
||||||
%% wait for the subscription to show up
|
%% wait for the subscription to show up
|
||||||
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000),
|
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000),
|
||||||
emqx_session:publish(SPid, 1, Message1),
|
emqx_session:publish(SPid, 1, Message1),
|
||||||
?wait(case emqx_mock_client:try_get_last_message() of
|
?wait(case emqx_mock_client:get_last_message(ConnPid) of
|
||||||
{publish, 1, _} -> true;
|
{publish, 1, _} -> true;
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end, 1000),
|
end, 1000),
|
||||||
|
@ -55,6 +56,9 @@ t_random(_) ->
|
||||||
emqx_mock_client:close_session(ConnPid, SPid),
|
emqx_mock_client:close_session(ConnPid, SPid),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_random(_) ->
|
||||||
|
test_two_messages(random).
|
||||||
|
|
||||||
t_round_robin(_) ->
|
t_round_robin(_) ->
|
||||||
test_two_messages(round_robin).
|
test_two_messages(round_robin).
|
||||||
|
|
||||||
|
@ -76,17 +80,19 @@ t_not_so_sticky(_) ->
|
||||||
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
|
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
|
||||||
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
|
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
|
||||||
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
|
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
|
||||||
emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
|
|
||||||
%% wait for the subscription to show up
|
%% wait for the subscription to show up
|
||||||
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso
|
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000),
|
||||||
ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
|
|
||||||
emqx_session:publish(SPid1, 1, Message1),
|
emqx_session:publish(SPid1, 1, Message1),
|
||||||
?wait(case emqx_mock_client:try_get_last_message() of
|
?wait(case emqx_mock_client:get_last_message(ConnPid1) of
|
||||||
{publish, _, #message{payload = <<"hello1">>}} -> true;
|
{publish, _, #message{payload = <<"hello1">>}} -> true;
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end, 1000),
|
end, 1000),
|
||||||
emqx_session:publish(SPid1, 2, Message2),
|
emqx_mock_client:close_session(ConnPid1, SPid1),
|
||||||
?wait(case emqx_mock_client:try_get_last_message() of
|
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000),
|
||||||
|
emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
|
||||||
|
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
|
||||||
|
emqx_session:publish(SPid2, 2, Message2),
|
||||||
|
?wait(case emqx_mock_client:get_last_message(ConnPid2) of
|
||||||
{publish, _, #message{payload = <<"hello2">>}} -> true;
|
{publish, _, #message{payload = <<"hello2">>}} -> true;
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end, 1000),
|
end, 1000),
|
||||||
|
@ -132,11 +138,17 @@ test_two_messages(Strategy) ->
|
||||||
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
>>>>>>> 38d0d409... Add 'hash' option for shared subscription
|
|
||||||
emqx_mock_client:close_session(ConnPid1, SPid1),
|
emqx_mock_client:close_session(ConnPid1, SPid1),
|
||||||
emqx_mock_client:close_session(ConnPid2, SPid2),
|
emqx_mock_client:close_session(ConnPid2, SPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
last_message(_ExpectedPayload, []) -> <<"not yet?">>;
|
||||||
|
last_message(ExpectedPayload, [Pid | Pids]) ->
|
||||||
|
case emqx_mock_client:get_last_message(Pid) of
|
||||||
|
{publish, _, #message{payload = ExpectedPayload}} -> {true, Pid};
|
||||||
|
_Other -> last_message(ExpectedPayload, Pids)
|
||||||
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% help functions
|
%% help functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue