From 3339df8b249a0acba545292beabad15dd85daf2a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 12:33:15 +0200 Subject: [PATCH] test: Add test case to cover shared sub QoS2 pubrel in inflights --- src/emqx_session.erl | 3 +- src/emqx_shared_sub.erl | 2 +- test/emqx_shared_sub_SUITE.erl | 192 +++++++++++++++++++++++++-------- 3 files changed, 149 insertions(+), 48 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index f87fc1f23..40c8eacc0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -639,6 +639,7 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> + AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), F = fun({_, {Msg, _Ts}}) -> case Msg of #message{} -> @@ -649,7 +650,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> false end end, - InflightList = lists:filtermap(F, emqx_inflight:to_list(sort_fun(), Inflight)), + InflightList = lists:filtermap(F, AllInflights), MqList = mqueue_to_list(Q, []), emqx_shared_sub:redispatch(InflightList ++ MqList). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 8667ae6c4..6130ccc0a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -266,7 +266,7 @@ redispatch(Messages0) -> Messages = lists:filter(fun is_redispatch_needed/1, Messages0), case length(Messages) of L when L > 0 -> - ?LOG(info, "Redispatching ~p shared subscription messages", [L]), + ?LOG(info, "Redispatching ~p shared subscription message(s)", [L]), lists:foreach(fun redispatch_shared_message/1, Messages); _ -> ok diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5e79ee983..d8a7d12b4 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -25,13 +25,24 @@ -define(SUITE, ?MODULE). --define(wait(For, Timeout), - emqx_ct_helpers:wait_for( - ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). - -define(ack, shared_sub_ack). -define(no_ack, no_ack). +-define(WAIT(TIMEOUT, PATTERN, Res), + (fun() -> + receive + PATTERN -> + Res; + Other -> + ct:fail(#{expected => ??PATTERN, + got => Other + }) + after + TIMEOUT -> + ct:fail({timeout, ??PATTERN}) + end + end)()). + all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> @@ -135,40 +146,7 @@ t_no_connection_nack(_) -> SendF(1), ct:sleep(200), %% This is the connection which was picked by broker to dispatch (sticky) for 1st message - ?assertMatch([#{packet_id := 1}], recv_msgs(1)), - %% Now kill the connection, expect all following messages to be delivered to the other - %% subscriber. - %emqx_mock_client:stop(ConnPid), - %% sleep then make synced calls to session processes to ensure that - %% the connection pid's 'EXIT' message is propagated to the session process - %% also to be sure sessions are still alive - % timer:sleep(2), - % _ = emqx_session:info(SPid1), - % _ = emqx_session:info(SPid2), - % %% Now we know what is the other still alive connection - % [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid], - % %% Send some more messages - % PacketIdList = lists:seq(2, 10), - % lists:foreach(fun(Id) -> - % SendF(Id), - % ?wait(Received(Id, TheOtherConnPid), 1000) - % end, PacketIdList), - % %% Now close the 2nd (last connection) - % emqx_mock_client:stop(TheOtherConnPid), - % timer:sleep(2), - % %% both sessions should have conn_pid = undefined - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), - % %% send more messages, but all should be queued in session state - % lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), - % {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), - % {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), - % ?assertEqual(length(PacketIdList), L1 + L2), - % %% clean up - % emqx_mock_client:close_session(PubConnPid), - % emqx_sm:close_session(SPid1), - % emqx_sm:close_session(SPid2), ok. t_random(_) -> @@ -422,8 +400,14 @@ t_local_fallback(_) -> %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK -t_redispatch(_) -> - ok = ensure_config(sticky, true), +t_redispatch_with_ack(Config) -> + test_redispatch(Config, true). + +t_redispatch_no_ack(Config) -> + test_redispatch(Config, false). + +test_redispatch(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), application:set_env(emqx, shared_dispatch_ack_enabled, true), Group = <<"group1">>, @@ -453,15 +437,55 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. -t_dispatch_when_inflights_are_full(_) -> - ok = ensure_config(round_robin, true), +t_redispatch_wildcard_with_ack(Config) -> + redispatch_wildcard(Config, true). + +t_redispatch_wildcard_no_ack(Config) -> + redispatch_wildcard(Config, false). + +%% This one tests that broker tries to redispatch to another member in the group +%% if the first one disconnected before acking (auto_ack set to false) +redispatch_wildcard(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), + + Group = <<"group1">>, + + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), + ok. + +t_dispatch_when_inflights_are_full_with_ack(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = true), Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - %% Note that max_inflight is 1 - {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), - {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), + %% make sure broker does not push more than one inflight + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), {ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid2), @@ -484,8 +508,7 @@ t_dispatch_when_inflights_are_full(_) -> ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), %% Now kill any client - erlang:exit(ConnPid1, normal), - ct:sleep(100), + ok = kill_process(ConnPid1), %% And try to send the message ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), @@ -497,13 +520,90 @@ t_dispatch_when_inflights_are_full(_) -> ?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])), ?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])), + meck:unload(emqx_zone), emqtt:stop(ConnPid2), ok. +%% No ack, QoS 2 subscriptions, +%% client1 receives one message, send pubrec, then suspend +%% client2 acts normal (aot_ack=true) +%% Expected behaviour: +%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down +t_dispatch_qos2(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + %% One message is inflight + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + sys:resume(ConnPid1), + %% emqtt automatically send PUBREC, but since auto_ack is set to false + %% so it will never send PUBCOMP, hence EMQX should not attempt to send + %% the 4th message yet since max_inflight is 1. + MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), + ct:sleep(100), + %% no message expected + ?assertEqual([], collect_msgs([])), + %% now kill client 1 + kill_process(ConnPid1), + %% client 2 should receive the message + MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec4 > MsgRec3), + emqtt:stop(ConnPid2), + meck:unload(emqx_zone), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- +kill_process(Pid) -> + _ = unlink(Pid), + _ = monitor(process, Pid), + erlang:exit(Pid, kill), + receive + {'DOWN', _, process, Pid, _} -> + ok + end. + +collect_msgs(Acc) -> + receive + Msg -> + collect_msgs([Msg | Acc]) + after + 0 -> + lists:reverse(Acc) + end. + ensure_config(Strategy) -> ensure_config(Strategy, _AckEnabled = true).