Merge pull request #11650 from keynslug/fix/simplify-takeover-suite

test(session): make testsuite trigger takeover logic consistently
This commit is contained in:
Andrew Mayorov 2023-09-21 18:02:38 +04:00 committed by GitHub
commit b563e30615
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 70 deletions

View File

@ -30,11 +30,12 @@
) )
). ).
-define(drainMailbox(), -define(drainMailbox(), ?drainMailbox(0)).
-define(drainMailbox(TIMEOUT),
(fun F__Flush_() -> (fun F__Flush_() ->
receive receive
X__Msg_ -> [X__Msg_ | F__Flush_()] X__Msg_ -> [X__Msg_ | F__Flush_()]
after 0 -> [] after TIMEOUT -> []
end end
end)() end)()
). ).

View File

@ -19,14 +19,14 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TOPIC, <<"t">>). -define(TOPIC, <<"t">>).
-define(CNT, 100). -define(CNT, 100).
-define(SLEEP, 10).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Initial funcs %% Initial funcs
@ -49,89 +49,86 @@ end_per_suite(Config) ->
t_takeover(_) -> t_takeover(_) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
AllMsgs = messages(?CNT),
Pos = rand:uniform(?CNT),
ClientId = <<"clientid">>, ClientId = <<"clientid">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), Middle = ?CNT div 2,
{ok, _} = emqtt:connect(C1), Client1Msgs = messages(0, Middle),
emqtt:subscribe(C1, <<"t">>, 1), Client2Msgs = messages(Middle, ?CNT div 2),
AllMsgs = Client1Msgs ++ Client2Msgs,
spawn(fun() -> meck:new(emqx_cm, [non_strict, passthrough]),
[ meck:expect(emqx_cm, takeover_session_end, fun(Arg) ->
begin ok = timer:sleep(?SLEEP * 2),
emqx:publish(lists:nth(I, AllMsgs)), meck:passthrough([Arg])
timer:sleep(rand:uniform(10))
end
|| I <- lists:seq(1, Pos)
]
end), end),
emqtt:pause(C1),
timer:sleep(?CNT * 10),
load_meck(ClientId), Commands =
spawn(fun() -> [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
[ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
begin [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
emqx:publish(lists:nth(I, AllMsgs)), [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
timer:sleep(rand:uniform(10)) [{fun stop_client/1, []}],
end
|| I <- lists:seq(Pos + 1, ?CNT)
]
end),
{ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
{ok, _} = emqtt:connect(C2),
Received = all_received_publishs(), FCtx = lists:foldl(
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPid1]} = FCtx,
?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}),
?assertReceive({'EXIT', CPid2, normal}),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("middle: ~p", [Middle]),
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(AllMsgs, Received), assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received), assert_messages_order(AllMsgs, Received),
emqtt:disconnect(C2), meck:unload(emqx_cm),
unload_meck(ClientId). ok.
t_takover_in_cluster(_) -> t_takover_in_cluster(_) ->
todo. todo.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Commands
load_meck(ClientId) -> start_client(Ctx, ClientId, Topic, Qos) ->
meck:new(fake_conn_mod, [non_strict]), {ok, CPid} = emqtt:start_link([
HookTakeover = fun {clientid, ClientId},
(Pid, Msg = {takeover, 'begin'}) -> {proto_ver, v5},
emqx_connection:call(Pid, Msg); {clean_start, false}
(Pid, Msg = {takeover, 'end'}) -> ]),
timer:sleep(?CNT * 10), _ = erlang:spawn_link(fun() ->
emqx_connection:call(Pid, Msg); {ok, _} = emqtt:connect(CPid),
(Pid, Msg) -> ct:pal("CLIENT: connected ~p", [CPid]),
emqx_connection:call(Pid, Msg) {ok, _, [Qos]} = emqtt:subscribe(CPid, Topic, Qos)
end, end),
meck:expect(fake_conn_mod, call, HookTakeover), Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
[ChanPid] = emqx_cm:lookup_channels(ClientId),
ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId),
NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}},
true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}).
unload_meck(_ClientId) -> publish_msg(Ctx, Msg) ->
meck:unload(fake_conn_mod). ok = timer:sleep(rand:uniform(?SLEEP)),
case emqx:publish(Msg) of
all_received_publishs() -> [] -> publish_msg(Ctx, Msg);
all_received_publishs([]). [_ | _] -> Ctx
all_received_publishs(Ls) ->
receive
M = {publish, _Pub} -> all_received_publishs([M | Ls]);
_ -> all_received_publishs(Ls)
after 100 ->
lists:reverse(Ls)
end. end.
stop_client(Ctx = #{client := [CPid | _]}) ->
ok = timer:sleep(?SLEEP),
ok = emqtt:stop(CPid),
Ctx.
%%--------------------------------------------------------------------
%% Helpers
assert_messages_missed(Ls1, Ls2) -> assert_messages_missed(Ls1, Ls2) ->
Missed = lists:filtermap( Missed = lists:filtermap(
fun(Msg) -> fun(Msg) ->
No = emqx_message:payload(Msg), No = emqx_message:payload(Msg),
case lists:any(fun({publish, #{payload := No1}}) -> No1 == No end, Ls2) of case lists:any(fun(#{payload := No1}) -> No1 == No end, Ls2) of
true -> false; true -> false;
false -> {true, No} false -> {true, No}
end end
@ -148,7 +145,7 @@ assert_messages_missed(Ls1, Ls2) ->
assert_messages_order([], []) -> assert_messages_order([], []) ->
ok; ok;
assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) -> assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) ->
case emqx_message:payload(Msg) == No of case emqx_message:payload(Msg) == No of
false -> false ->
ct:fail("Message order is not correct, expected: ~p, received: ~p", [ ct:fail("Message order is not correct, expected: ~p, received: ~p", [
@ -159,8 +156,8 @@ assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) ->
assert_messages_order(Ls1, Ls2) assert_messages_order(Ls1, Ls2)
end. end.
messages(Cnt) -> messages(Offset, Cnt) ->
[emqx_message:make(ct, 1, ?TOPIC, payload(I)) || I <- lists:seq(1, Cnt)]. [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)].
payload(I) -> payload(I) ->
% NOTE % NOTE