test: add test case to verify QoS 0 message is never redispatched

This commit is contained in:
Zaiming (Stone) Shi 2022-10-05 17:17:51 +02:00
parent d23dfcca39
commit a1032db4e1
2 changed files with 53 additions and 5 deletions

View File

@ -56,7 +56,7 @@ File format:
- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). - Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094).
- When discarding QoS 2 inflight messages, there were excessive logs - When discarding QoS 2 inflight messages, there were excessive logs
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
but not the subscrbing topic), caused messages to be lost when dispatching. but not the subscribing topic), caused messages to be lost when dispatching.
## v4.3.20 ## v4.3.20

View File

@ -599,7 +599,7 @@ t_dispatch_qos2(Config) when is_list(Config) ->
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
ct:sleep(100), ct:sleep(100),
%% no message expected %% no message expected
?assertEqual([], collect_msgs([])), ?assertEqual([], collect_msgs(0)),
%% now kill client 1 %% now kill client 1
kill_process(ConnPid1), kill_process(ConnPid1),
%% client 2 should receive the message %% client 2 should receive the message
@ -609,6 +609,51 @@ t_dispatch_qos2(Config) when is_list(Config) ->
emqtt:stop(ConnPid2), emqtt:stop(ConnPid2),
ok. ok.
t_dispatch_qos0({init, Config}) when is_list(Config) ->
Config;
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
ok;
t_dispatch_qos0(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
%% subscribe with QoS 0
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
%% subscribe with QoS 0
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
?assertMatch([_], emqx:publish(Message1)),
?assertMatch([_], emqx:publish(Message2)),
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
kill_process(ConnPid1),
%% expect no redispatch
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -622,12 +667,15 @@ kill_process(Pid) ->
ok ok
end. end.
collect_msgs(Acc) -> collect_msgs(Timeout) ->
collect_msgs([], Timeout).
collect_msgs(Acc, Timeout) ->
receive receive
Msg -> Msg ->
collect_msgs([Msg | Acc]) collect_msgs([Msg | Acc], Timeout)
after after
0 -> Timeout ->
lists:reverse(Acc) lists:reverse(Acc)
end. end.