test: Add test case to cover shared sub QoS2 pubrel in inflights

This commit is contained in:
Zaiming (Stone) Shi 2022-10-05 12:33:15 +02:00
parent 6769bd4edc
commit 3339df8b24
3 changed files with 149 additions and 48 deletions

View File

@ -639,6 +639,7 @@ run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
F = fun({_, {Msg, _Ts}}) -> F = fun({_, {Msg, _Ts}}) ->
case Msg of case Msg of
#message{} -> #message{} ->
@ -649,7 +650,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
false false
end end
end, end,
InflightList = lists:filtermap(F, emqx_inflight:to_list(sort_fun(), Inflight)), InflightList = lists:filtermap(F, AllInflights),
MqList = mqueue_to_list(Q, []), MqList = mqueue_to_list(Q, []),
emqx_shared_sub:redispatch(InflightList ++ MqList). emqx_shared_sub:redispatch(InflightList ++ MqList).

View File

@ -266,7 +266,7 @@ redispatch(Messages0) ->
Messages = lists:filter(fun is_redispatch_needed/1, Messages0), Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
case length(Messages) of case length(Messages) of
L when L > 0 -> L when L > 0 ->
?LOG(info, "Redispatching ~p shared subscription messages", [L]), ?LOG(info, "Redispatching ~p shared subscription message(s)", [L]),
lists:foreach(fun redispatch_shared_message/1, Messages); lists:foreach(fun redispatch_shared_message/1, Messages);
_ -> _ ->
ok ok

View File

@ -25,13 +25,24 @@
-define(SUITE, ?MODULE). -define(SUITE, ?MODULE).
-define(wait(For, Timeout),
emqx_ct_helpers:wait_for(
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
-define(ack, shared_sub_ack). -define(ack, shared_sub_ack).
-define(no_ack, no_ack). -define(no_ack, no_ack).
-define(WAIT(TIMEOUT, PATTERN, Res),
(fun() ->
receive
PATTERN ->
Res;
Other ->
ct:fail(#{expected => ??PATTERN,
got => Other
})
after
TIMEOUT ->
ct:fail({timeout, ??PATTERN})
end
end)()).
all() -> emqx_ct:all(?SUITE). all() -> emqx_ct:all(?SUITE).
init_per_suite(Config) -> init_per_suite(Config) ->
@ -135,40 +146,7 @@ t_no_connection_nack(_) ->
SendF(1), SendF(1),
ct:sleep(200), ct:sleep(200),
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
?assertMatch([#{packet_id := 1}], recv_msgs(1)), ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
%% Now kill the connection, expect all following messages to be delivered to the other
%% subscriber.
%emqx_mock_client:stop(ConnPid),
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
% timer:sleep(2),
% _ = emqx_session:info(SPid1),
% _ = emqx_session:info(SPid2),
% %% Now we know what is the other still alive connection
% [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
% %% Send some more messages
% PacketIdList = lists:seq(2, 10),
% lists:foreach(fun(Id) ->
% SendF(Id),
% ?wait(Received(Id, TheOtherConnPid), 1000)
% end, PacketIdList),
% %% Now close the 2nd (last connection)
% emqx_mock_client:stop(TheOtherConnPid),
% timer:sleep(2),
% %% both sessions should have conn_pid = undefined
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
% %% send more messages, but all should be queued in session state
% lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
% {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
% {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
% ?assertEqual(length(PacketIdList), L1 + L2),
% %% clean up
% emqx_mock_client:close_session(PubConnPid),
% emqx_sm:close_session(SPid1),
% emqx_sm:close_session(SPid2),
ok. ok.
t_random(_) -> t_random(_) ->
@ -422,8 +400,14 @@ t_local_fallback(_) ->
%% This one tests that broker tries to select another shared subscriber %% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK %% If the first one doesn't return an ACK
t_redispatch(_) -> t_redispatch_with_ack(Config) ->
ok = ensure_config(sticky, true), test_redispatch(Config, true).
t_redispatch_no_ack(Config) ->
test_redispatch(Config, false).
test_redispatch(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
application:set_env(emqx, shared_dispatch_ack_enabled, true), application:set_env(emqx, shared_dispatch_ack_enabled, true),
Group = <<"group1">>, Group = <<"group1">>,
@ -453,15 +437,55 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2), emqtt:stop(UsedSubPid2),
ok. ok.
t_dispatch_when_inflights_are_full(_) -> t_redispatch_wildcard_with_ack(Config) ->
ok = ensure_config(round_robin, true), redispatch_wildcard(Config, true).
t_redispatch_wildcard_no_ack(Config) ->
redispatch_wildcard(Config, false).
%% This one tests that broker tries to redispatch to another member in the group
%% if the first one disconnected before acking (auto_ack set to false)
redispatch_wildcard(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
Group = <<"group1">>,
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
emqx:publish(Message),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
ok = emqtt:stop(UsedSubPid1),
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
{true, UsedSubPid2} = Res,
emqtt:stop(UsedSubPid2),
ok.
t_dispatch_when_inflights_are_full_with_ack(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = true),
Topic = <<"foo/bar">>, Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
%% Note that max_inflight is 1 %% make sure broker does not push more than one inflight
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), meck:new(emqx_zone, [passthrough, no_history]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
{ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2), {ok, _} = emqtt:connect(ConnPid2),
@ -484,8 +508,7 @@ t_dispatch_when_inflights_are_full(_) ->
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
%% Now kill any client %% Now kill any client
erlang:exit(ConnPid1, normal), ok = kill_process(ConnPid1),
ct:sleep(100),
%% And try to send the message %% And try to send the message
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
@ -497,13 +520,90 @@ t_dispatch_when_inflights_are_full(_) ->
?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])), ?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])),
?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])), ?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])),
meck:unload(emqx_zone),
emqtt:stop(ConnPid2), emqtt:stop(ConnPid2),
ok. ok.
%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (aot_ack=true)
%% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
sys:resume(ConnPid1),
%% emqtt automatically send PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
ct:sleep(100),
%% no message expected
?assertEqual([], collect_msgs([])),
%% now kill client 1
kill_process(ConnPid1),
%% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec4 > MsgRec3),
emqtt:stop(ConnPid2),
meck:unload(emqx_zone),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
kill_process(Pid) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
erlang:exit(Pid, kill),
receive
{'DOWN', _, process, Pid, _} ->
ok
end.
collect_msgs(Acc) ->
receive
Msg ->
collect_msgs([Msg | Acc])
after
0 ->
lists:reverse(Acc)
end.
ensure_config(Strategy) -> ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true). ensure_config(Strategy, _AckEnabled = true).