From aff1ea5af0048e638945be77e8e12a07ce5d3c75 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 28 Jun 2021 09:31:10 +0800 Subject: [PATCH 1/2] fix(shared_sub): failed to clean the emqx_shared_subscription tab --- src/emqx_shared_sub.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 71372cd90..4466d0c7f 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -325,9 +325,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P #emqx_shared_subscription{subpid = SubPid} = NewRecord, {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; -handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> - #emqx_shared_subscription{subpid = SubPid} = OldRecord, - {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; +%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until +%% it `unsubscribed` the last topic. +%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually +%% be disconnected. +% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> +% #emqx_shared_subscription{subpid = SubPid} = OldRecord, +% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; @@ -337,8 +341,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMo cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), +handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> From 285fdaaa9a930e943586981b71eecc1bea555def Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 28 Jun 2021 09:32:17 +0800 Subject: [PATCH 2/2] chore(appup): update the appup for 4.2.13 --- src/emqx.appup.src | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index f87d3f5c2..3f854286f 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -8,6 +8,9 @@ end, {VSN, [ + {"4.2.12", [ + {load_module, emqx_shared_sub, soft_purge, soft_purge, []} + ]}, {"4.2.11", [ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, @@ -77,6 +80,9 @@ {<<".*">>, []} ], [ + {"4.2.12", [ + {load_module, emqx_shared_sub, soft_purge, soft_purge, []} + ]}, {"4.2.11", [ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []},