diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index ef96ce745..26dd529a2 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -56,7 +56,7 @@ File format: - Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). - When discarding QoS 2 inflight messages, there were excessive logs - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, - but not the subscrbing topic), caused messages to be lost when dispatching. + but not the subscribing topic), caused messages to be lost when dispatching. ## v4.3.20 diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index a16f948ff..2c4ecf265 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -599,7 +599,7 @@ t_dispatch_qos2(Config) when is_list(Config) -> MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), ct:sleep(100), %% no message expected - ?assertEqual([], collect_msgs([])), + ?assertEqual([], collect_msgs(0)), %% now kill client 1 kill_process(ConnPid1), %% client 2 should receive the message @@ -609,6 +609,51 @@ t_dispatch_qos2(Config) when is_list(Config) -> emqtt:stop(ConnPid2), ok. +t_dispatch_qos0({init, Config}) when is_list(Config) -> + Config; +t_dispatch_qos0({'end', Config}) when is_list(Config) -> + ok; +t_dispatch_qos0(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + %% subscribe with QoS 0 + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}), + + %% publish with QoS 2, but should be downgraded to 0 as the subscribers + %% subscribe with QoS 0 + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + ?assertMatch([_], emqx:publish(Message1)), + ?assertMatch([_], emqx:publish(Message2)), + ?assertMatch([_], emqx:publish(Message3)), + ?assertMatch([_], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + kill_process(ConnPid1), + %% expect no redispatch + ?assertEqual([], collect_msgs(timer:seconds(2))), + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -622,12 +667,15 @@ kill_process(Pid) -> ok end. -collect_msgs(Acc) -> +collect_msgs(Timeout) -> + collect_msgs([], Timeout). + +collect_msgs(Acc, Timeout) -> receive Msg -> - collect_msgs([Msg | Acc]) + collect_msgs([Msg | Acc], Timeout) after - 0 -> + Timeout -> lists:reverse(Acc) end.