From e145fdbef39014ed6f1ceca25639642276648ae2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 7 Jul 2022 10:46:03 +0200 Subject: [PATCH] fix(shared_sub): Fix crash on dispatch with subscribers disconnected --- src/emqx.appup.src | 14 ++++++++++---- src/emqx_shared_sub.erl | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 2534585eb..db9818c07 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,9 +1,12 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.17",[{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]}, + [{"4.3.17", + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}]}, {"4.3.16", - [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -652,9 +655,12 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.17",[{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]}, + [{"4.3.17", + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}]}, {"4.3.16", - [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index c7fb1a3cb..1e77b6014 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -242,7 +242,7 @@ maybe_nack_dropped(Msg) -> %% For fresh Ref we send a nack and return true, to note that the inflight is full {Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop; - + %% For retry Ref we can't reject a message if inflight is full, so we mark it as %% acknowledged and put it into mqueue {_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store; @@ -311,7 +311,7 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> false; [] -> %% We try redispatch to subs who dropped the message because inflight was full. - Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) -> + Found = maps_find_by(FailedSubs, fun(SubPid, FailReason) -> FailReason == dropped andalso is_alive_sub(SubPid) end), case Found of