From a8f4b5bf86f59807e0641178acc4405f8f33dea3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 21 Sep 2023 11:53:40 +0400 Subject: [PATCH] test(session): make testsuite trigger takeover logic consistently --- apps/emqx/include/asserts.hrl | 5 +- apps/emqx/test/emqx_takeover_SUITE.erl | 133 ++++++++++++------------- 2 files changed, 68 insertions(+), 70 deletions(-) diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index 5f27b0332..489c47862 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -30,11 +30,12 @@ ) ). --define(drainMailbox(), +-define(drainMailbox(), ?drainMailbox(0)). +-define(drainMailbox(TIMEOUT), (fun F__Flush_() -> receive X__Msg_ -> [X__Msg_ | F__Flush_()] - after 0 -> [] + after TIMEOUT -> [] end end)() ). diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 3f86cd3f3..97616c947 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -19,14 +19,14 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TOPIC, <<"t">>). -define(CNT, 100). +-define(SLEEP, 10). %%-------------------------------------------------------------------- %% Initial funcs @@ -49,89 +49,86 @@ end_per_suite(Config) -> t_takeover(_) -> process_flag(trap_exit, true), - AllMsgs = messages(?CNT), - Pos = rand:uniform(?CNT), - ClientId = <<"clientid">>, - {ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), - {ok, _} = emqtt:connect(C1), - emqtt:subscribe(C1, <<"t">>, 1), + Middle = ?CNT div 2, + Client1Msgs = messages(0, Middle), + Client2Msgs = messages(Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, - spawn(fun() -> - [ - begin - emqx:publish(lists:nth(I, AllMsgs)), - timer:sleep(rand:uniform(10)) - end - || I <- lists:seq(1, Pos) - ] + meck:new(emqx_cm, [non_strict, passthrough]), + meck:expect(emqx_cm, takeover_session_end, fun(Arg) -> + ok = timer:sleep(?SLEEP * 2), + meck:passthrough([Arg]) end), - emqtt:pause(C1), - timer:sleep(?CNT * 10), - load_meck(ClientId), - spawn(fun() -> - [ - begin - emqx:publish(lists:nth(I, AllMsgs)), - timer:sleep(rand:uniform(10)) - end - || I <- lists:seq(Pos + 1, ?CNT) - ] - end), - {ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), - {ok, _} = emqtt:connect(C2), + Commands = + [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ + [{fun stop_client/1, []}], - Received = all_received_publishs(), - ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + #{client := [CPid2, CPid1]} = FCtx, + ?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}), + ?assertReceive({'EXIT', CPid2, normal}), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("middle: ~p", [Middle]), + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), assert_messages_order(AllMsgs, Received), - emqtt:disconnect(C2), - unload_meck(ClientId). + meck:unload(emqx_cm), + ok. t_takover_in_cluster(_) -> todo. %%-------------------------------------------------------------------- -%% Helpers +%% Commands -load_meck(ClientId) -> - meck:new(fake_conn_mod, [non_strict]), - HookTakeover = fun - (Pid, Msg = {takeover, 'begin'}) -> - emqx_connection:call(Pid, Msg); - (Pid, Msg = {takeover, 'end'}) -> - timer:sleep(?CNT * 10), - emqx_connection:call(Pid, Msg); - (Pid, Msg) -> - emqx_connection:call(Pid, Msg) - end, - meck:expect(fake_conn_mod, call, HookTakeover), - [ChanPid] = emqx_cm:lookup_channels(ClientId), - ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId), - NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}}, - true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}). +start_client(Ctx, ClientId, Topic, Qos) -> + {ok, CPid} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {clean_start, false} + ]), + _ = erlang:spawn_link(fun() -> + {ok, _} = emqtt:connect(CPid), + ct:pal("CLIENT: connected ~p", [CPid]), + {ok, _, [Qos]} = emqtt:subscribe(CPid, Topic, Qos) + end), + Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. -unload_meck(_ClientId) -> - meck:unload(fake_conn_mod). - -all_received_publishs() -> - all_received_publishs([]). - -all_received_publishs(Ls) -> - receive - M = {publish, _Pub} -> all_received_publishs([M | Ls]); - _ -> all_received_publishs(Ls) - after 100 -> - lists:reverse(Ls) +publish_msg(Ctx, Msg) -> + ok = timer:sleep(rand:uniform(?SLEEP)), + case emqx:publish(Msg) of + [] -> publish_msg(Ctx, Msg); + [_ | _] -> Ctx end. +stop_client(Ctx = #{client := [CPid | _]}) -> + ok = timer:sleep(?SLEEP), + ok = emqtt:stop(CPid), + Ctx. + +%%-------------------------------------------------------------------- +%% Helpers + assert_messages_missed(Ls1, Ls2) -> Missed = lists:filtermap( fun(Msg) -> No = emqx_message:payload(Msg), - case lists:any(fun({publish, #{payload := No1}}) -> No1 == No end, Ls2) of + case lists:any(fun(#{payload := No1}) -> No1 == No end, Ls2) of true -> false; false -> {true, No} end @@ -148,7 +145,7 @@ assert_messages_missed(Ls1, Ls2) -> assert_messages_order([], []) -> ok; -assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) -> +assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) -> case emqx_message:payload(Msg) == No of false -> ct:fail("Message order is not correct, expected: ~p, received: ~p", [ @@ -159,8 +156,8 @@ assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) -> assert_messages_order(Ls1, Ls2) end. -messages(Cnt) -> - [emqx_message:make(ct, 1, ?TOPIC, payload(I)) || I <- lists:seq(1, Cnt)]. +messages(Offset, Cnt) -> + [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. payload(I) -> % NOTE