Merge pull request #5104 from terry-xiaoyu/clean_emqx_shared_subscription_4.2.13
Clean emqx shared subscription 4.2.13
This commit is contained in:
commit
200e8ce764
|
@ -8,6 +8,9 @@
|
||||||
end,
|
end,
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.2.12", [
|
||||||
|
{load_module, emqx_shared_sub, soft_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.2.11", [
|
{"4.2.11", [
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
@ -77,6 +80,9 @@
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.2.12", [
|
||||||
|
{load_module, emqx_shared_sub, soft_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.2.11", [
|
{"4.2.11", [
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
|
|
@ -325,9 +325,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
|
||||||
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
||||||
#emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
%% it `unsubscribed` the last topic.
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
|
||||||
|
%% be disconnected.
|
||||||
|
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
||||||
|
% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
||||||
|
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, _Event}, State) ->
|
handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
@ -337,8 +341,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMo
|
||||||
cleanup_down(SubPid),
|
cleanup_down(SubPid),
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
|
Loading…
Reference in New Issue