test(session): make testsuite trigger takeover logic consistently

This commit is contained in:
Andrew Mayorov 2023-09-21 11:53:40 +04:00
parent d92a93d4b3
commit a8f4b5bf86
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 68 additions and 70 deletions

View File

@ -30,11 +30,12 @@
)
).
-define(drainMailbox(),
-define(drainMailbox(), ?drainMailbox(0)).
-define(drainMailbox(TIMEOUT),
(fun F__Flush_() ->
receive
X__Msg_ -> [X__Msg_ | F__Flush_()]
after 0 -> []
after TIMEOUT -> []
end
end)()
).

View File

@ -19,14 +19,14 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TOPIC, <<"t">>).
-define(CNT, 100).
-define(SLEEP, 10).
%%--------------------------------------------------------------------
%% Initial funcs
@ -49,89 +49,86 @@ end_per_suite(Config) ->
t_takeover(_) ->
process_flag(trap_exit, true),
AllMsgs = messages(?CNT),
Pos = rand:uniform(?CNT),
ClientId = <<"clientid">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
{ok, _} = emqtt:connect(C1),
emqtt:subscribe(C1, <<"t">>, 1),
Middle = ?CNT div 2,
Client1Msgs = messages(0, Middle),
Client2Msgs = messages(Middle, ?CNT div 2),
AllMsgs = Client1Msgs ++ Client2Msgs,
spawn(fun() ->
[
begin
emqx:publish(lists:nth(I, AllMsgs)),
timer:sleep(rand:uniform(10))
end
|| I <- lists:seq(1, Pos)
]
meck:new(emqx_cm, [non_strict, passthrough]),
meck:expect(emqx_cm, takeover_session_end, fun(Arg) ->
ok = timer:sleep(?SLEEP * 2),
meck:passthrough([Arg])
end),
emqtt:pause(C1),
timer:sleep(?CNT * 10),
load_meck(ClientId),
spawn(fun() ->
[
begin
emqx:publish(lists:nth(I, AllMsgs)),
timer:sleep(rand:uniform(10))
end
|| I <- lists:seq(Pos + 1, ?CNT)
]
end),
{ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
{ok, _} = emqtt:connect(C2),
Commands =
[{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
[{fun stop_client/1, []}],
Received = all_received_publishs(),
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPid1]} = FCtx,
?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}),
?assertReceive({'EXIT', CPid2, normal}),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("middle: ~p", [Middle]),
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received),
emqtt:disconnect(C2),
unload_meck(ClientId).
meck:unload(emqx_cm),
ok.
t_takover_in_cluster(_) ->
todo.
%%--------------------------------------------------------------------
%% Helpers
%% Commands
load_meck(ClientId) ->
meck:new(fake_conn_mod, [non_strict]),
HookTakeover = fun
(Pid, Msg = {takeover, 'begin'}) ->
emqx_connection:call(Pid, Msg);
(Pid, Msg = {takeover, 'end'}) ->
timer:sleep(?CNT * 10),
emqx_connection:call(Pid, Msg);
(Pid, Msg) ->
emqx_connection:call(Pid, Msg)
end,
meck:expect(fake_conn_mod, call, HookTakeover),
[ChanPid] = emqx_cm:lookup_channels(ClientId),
ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId),
NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}},
true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}).
start_client(Ctx, ClientId, Topic, Qos) ->
{ok, CPid} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{clean_start, false}
]),
_ = erlang:spawn_link(fun() ->
{ok, _} = emqtt:connect(CPid),
ct:pal("CLIENT: connected ~p", [CPid]),
{ok, _, [Qos]} = emqtt:subscribe(CPid, Topic, Qos)
end),
Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
unload_meck(_ClientId) ->
meck:unload(fake_conn_mod).
all_received_publishs() ->
all_received_publishs([]).
all_received_publishs(Ls) ->
receive
M = {publish, _Pub} -> all_received_publishs([M | Ls]);
_ -> all_received_publishs(Ls)
after 100 ->
lists:reverse(Ls)
publish_msg(Ctx, Msg) ->
ok = timer:sleep(rand:uniform(?SLEEP)),
case emqx:publish(Msg) of
[] -> publish_msg(Ctx, Msg);
[_ | _] -> Ctx
end.
stop_client(Ctx = #{client := [CPid | _]}) ->
ok = timer:sleep(?SLEEP),
ok = emqtt:stop(CPid),
Ctx.
%%--------------------------------------------------------------------
%% Helpers
assert_messages_missed(Ls1, Ls2) ->
Missed = lists:filtermap(
fun(Msg) ->
No = emqx_message:payload(Msg),
case lists:any(fun({publish, #{payload := No1}}) -> No1 == No end, Ls2) of
case lists:any(fun(#{payload := No1}) -> No1 == No end, Ls2) of
true -> false;
false -> {true, No}
end
@ -148,7 +145,7 @@ assert_messages_missed(Ls1, Ls2) ->
assert_messages_order([], []) ->
ok;
assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) ->
assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) ->
case emqx_message:payload(Msg) == No of
false ->
ct:fail("Message order is not correct, expected: ~p, received: ~p", [
@ -159,8 +156,8 @@ assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) ->
assert_messages_order(Ls1, Ls2)
end.
messages(Cnt) ->
[emqx_message:make(ct, 1, ?TOPIC, payload(I)) || I <- lists:seq(1, Cnt)].
messages(Offset, Cnt) ->
[emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)].
payload(I) ->
% NOTE