Merge pull request #8441 from ieQu1/shared-sub-dispatch-v4.3
fix(shared_sub): Fix crash on dispatch with subscribers disconnected
This commit is contained in:
commit
a344487935
|
@ -1,9 +1,12 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.17",[{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
[{"4.3.17",
|
||||||
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.16",
|
{"4.3.16",
|
||||||
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
|
@ -652,9 +655,12 @@
|
||||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.17",[{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
[{"4.3.17",
|
||||||
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.16",
|
{"4.3.16",
|
||||||
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -242,7 +242,7 @@ maybe_nack_dropped(Msg) ->
|
||||||
|
|
||||||
%% For fresh Ref we send a nack and return true, to note that the inflight is full
|
%% For fresh Ref we send a nack and return true, to note that the inflight is full
|
||||||
{Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop;
|
{Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop;
|
||||||
|
|
||||||
%% For retry Ref we can't reject a message if inflight is full, so we mark it as
|
%% For retry Ref we can't reject a message if inflight is full, so we mark it as
|
||||||
%% acknowledged and put it into mqueue
|
%% acknowledged and put it into mqueue
|
||||||
{_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store;
|
{_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store;
|
||||||
|
@ -311,7 +311,7 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
false;
|
false;
|
||||||
[] ->
|
[] ->
|
||||||
%% We try redispatch to subs who dropped the message because inflight was full.
|
%% We try redispatch to subs who dropped the message because inflight was full.
|
||||||
Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) ->
|
Found = maps_find_by(FailedSubs, fun(SubPid, FailReason) ->
|
||||||
FailReason == dropped andalso is_alive_sub(SubPid)
|
FailReason == dropped andalso is_alive_sub(SubPid)
|
||||||
end),
|
end),
|
||||||
case Found of
|
case Found of
|
||||||
|
|
Loading…
Reference in New Issue