fix(shared): retab and dropped handling
This commit is contained in:
parent
5cc778ff30
commit
fae054bf0e
|
@ -40,7 +40,7 @@
|
||||||
|
|
||||||
-export([ dispatch/3
|
-export([ dispatch/3
|
||||||
, dispatch_to_non_self/3
|
, dispatch_to_non_self/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ maybe_ack/1
|
-export([ maybe_ack/1
|
||||||
, maybe_nack_dropped/1
|
, maybe_nack_dropped/1
|
||||||
|
@ -268,7 +268,7 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
false ->
|
false ->
|
||||||
%% randomly pick one for the first message
|
%% randomly pick one for the first message
|
||||||
FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive),
|
FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive),
|
||||||
case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of
|
case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of
|
||||||
false -> false;
|
false -> false;
|
||||||
{Type, Sub} ->
|
{Type, Sub} ->
|
||||||
%% stick to whatever pick result
|
%% stick to whatever pick result
|
||||||
|
@ -288,10 +288,14 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
[] ->
|
[] ->
|
||||||
%% All offline? pick one anyway
|
%% All offline? pick one anyway
|
||||||
%% We redispatch only to subs who dropped the message because inflight was full.
|
%% We redispatch only to subs who dropped the message because inflight was full.
|
||||||
Dropped = maps_find_by(FailedSubs, fun({SubPid, FailReason}) ->
|
Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) ->
|
||||||
FailReason == dropped andalso is_alive_sub(SubPid)
|
FailReason == dropped andalso is_alive_sub(SubPid)
|
||||||
end),
|
end),
|
||||||
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Dropped)};
|
case Found of
|
||||||
|
error -> false;
|
||||||
|
{ok, Dropped} ->
|
||||||
|
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, [Dropped])}
|
||||||
|
end;
|
||||||
Subs ->
|
Subs ->
|
||||||
%% More than one available
|
%% More than one available
|
||||||
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)}
|
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
%fresh, fresh, fresh, %--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
@ -470,13 +470,13 @@ t_dispatch_when_inflights_are_full(_) ->
|
||||||
sys:suspend(ConnPid1),
|
sys:suspend(ConnPid1),
|
||||||
sys:suspend(ConnPid2),
|
sys:suspend(ConnPid2),
|
||||||
|
|
||||||
%% Fill in the inflight for first client
|
%% Fill in the inflight for first client
|
||||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
||||||
|
|
||||||
%% Fill in the inflight for second client
|
%% Fill in the inflight for second client
|
||||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
||||||
|
|
||||||
%% Now kill any client
|
%% Now kill any client
|
||||||
erlang:exit(ConnPid1, normal),
|
erlang:exit(ConnPid1, normal),
|
||||||
ct:sleep(100),
|
ct:sleep(100),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue