diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 596a27a8e..d21f83534 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -40,7 +40,7 @@ -export([ dispatch/3 , dispatch_to_non_self/3 - ]). + ]). -export([ maybe_ack/1 , maybe_nack_dropped/1 @@ -268,7 +268,7 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> false -> %% randomly pick one for the first message FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive), - case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of + case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of false -> false; {Type, Sub} -> %% stick to whatever pick result @@ -288,10 +288,14 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> [] -> %% All offline? pick one anyway %% We redispatch only to subs who dropped the message because inflight was full. - Dropped = maps_find_by(FailedSubs, fun({SubPid, FailReason}) -> + Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) -> FailReason == dropped andalso is_alive_sub(SubPid) end), - {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Dropped)}; + case Found of + error -> false; + {ok, Dropped} -> + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, [Dropped])} + end; Subs -> %% More than one available {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)} diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 3d3340858..cee53775b 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -1,4 +1,4 @@ -%fresh, fresh, fresh, %-------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -470,13 +470,13 @@ t_dispatch_when_inflights_are_full(_) -> sys:suspend(ConnPid1), sys:suspend(ConnPid2), - %% Fill in the inflight for first client - ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + %% Fill in the inflight for first client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), - %% Fill in the inflight for second client - ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + %% Fill in the inflight for second client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), - %% Now kill any client + %% Now kill any client erlang:exit(ConnPid1, normal), ct:sleep(100),