diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index a7e7f6356..9e3edf0b7 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -133,26 +133,22 @@ handle_call(Req, _From, State) -> emqx_logger:error("[CM] Unexpected request: ~p", [Req]), {reply, ignore, State}. -handle_cast({notify, {registered, ClientId, Pid}}, - State = #state{client_pmon = PMon}) -> - {noreply, State#state{client_pmon = PMon:monitor(Pid, ClientId)}}; +handle_cast({notify, {registered, ClientId, Pid}}, State = #state{client_pmon = PMon}) -> + {noreply, State#state{client_pmon = emqx_pmon:monitor(Pid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, Pid}}, - State = #state{client_pmon = PMon}) -> - {noreply, State#state{client_pmon = PMon:demonitor(Pid)}}; +handle_cast({notify, {unregistered, _ClientId, Pid}}, State = #state{client_pmon = PMon}) -> + {noreply, State#state{client_pmon = emqx_pmon:demonitor(Pid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[CM] Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, - State = #state{client_pmon = PMon}) -> - case PMon:find(DownPid) of - undefined -> - {noreply, State}; - ClientId -> +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{client_pmon = PMon}) -> + case emqx_pmon:find(DownPid, PMon) of + undefined -> {noreply, State}; + ClientId -> unregister_client({ClientId, DownPid}), - {noreply, State#state{client_pmon = PMon:erase(DownPid)}} + {noreply, State#state{client_pmon = emqx_pmon:erase(DownPid, PMon)}} end; handle_info(Info, State) -> diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index bcb4ae537..6af403a2d 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -35,10 +35,9 @@ monitor(Pid, PM) -> monitor(Pid, Val, PM = {?MODULE, [M]}) -> case maps:is_key(Pid, M) of - true -> PM; - false -> - Ref = erlang:monitor(process, Pid), - {?MODULE, [maps:put(Pid, {Ref, Val}, M)]} + true -> PM; + false -> Ref = erlang:monitor(process, Pid), + {?MODULE, [maps:put(Pid, {Ref, Val}, M)]} end. -spec(demonitor(pid(), pmon()) -> pmon()). @@ -48,8 +47,7 @@ demonitor(Pid, PM = {?MODULE, [M]}) -> %% Don't flush _ = erlang:demonitor(Ref), {?MODULE, [maps:remove(Pid, M)]}; - error -> - PM + error -> PM end. -spec(find(pid(), pmon()) -> undefined | term()). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index f4f0ecf8e..6825ca8d0 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -116,7 +116,7 @@ init([]) -> init_monitors() -> mnesia:foldl( fun(#shared_subscription{subpid = SubPid}, Mon) -> - Mon:monitor(SubPid) + emqx_pmon:monitor(SubPid, Mon) end, emqx_pmon:new(), ?TAB). handle_call(Req, _From, State) -> @@ -124,7 +124,7 @@ handle_call(Req, _From, State) -> {reply, ignore, State}. handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> - {noreply, update_stats(State#state{pmon = PMon:monitor(SubPid)})}; + {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_cast(Msg, State) -> emqx_logger:error("[Shared] Unexpected msg: ~p", [Msg]), @@ -132,11 +132,11 @@ handle_cast(Msg, State) -> handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> #shared_subscription{subpid = SubPid} = NewRecord, - {noreply, update_stats(State#state{pmon = PMon:monitor(SubPid)})}; + {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> #shared_subscription{subpid = SubPid} = OldRecord, - {noreply, update_stats(State#state{pmon = PMon:demonitor(SubPid)})}; + {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; @@ -144,7 +144,7 @@ handle_info({mnesia_table_event, _Event}, State) -> handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> emqx_logger:info("Shared subscription down: ~p", [SubPid]), mnesia:async_dirty(fun cleanup_down/1, [SubPid]), - {noreply, update_stats(State#state{pmon = PMon:erase(SubPid)})}; + {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; handle_info(Info, State) -> emqx_logger:error("[Shared] Unexpected info: ~p", [Info]), @@ -162,12 +162,8 @@ code_change(_OldVsn, State, _Extra) -> cleanup_down(SubPid) -> Pat = #shared_subscription{_ = '_', subpid = SubPid}, - lists:foreach(fun(Record) -> - mnesia:delete_object(?TAB, Record) - end, mnesia:match_object(Pat)). + lists:foreach(fun(Record) -> mnesia:delete_object(?TAB, Record) end, mnesia:match_object(Pat)). update_stats(State) -> - emqx_stats:setstat('subscriptions/shared/count', - 'subscriptions/shared/max', - ets:info(?TAB, size)), State. + emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index e2e5ca20c..50ce9cb75 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -216,26 +216,22 @@ handle_call(Req, _From, State) -> emqx_logger:error("[SM] Unexpected request: ~p", [Req]), {reply, ignore, State}. -handle_cast({notify, {registered, ClientId, SessionPid}}, - State = #state{session_pmon = PMon}) -> - {noreply, State#state{session_pmon = PMon:monitor(SessionPid, ClientId)}}; +handle_cast({notify, {registered, ClientId, SessionPid}}, State = #state{session_pmon = PMon}) -> + {noreply, State#state{session_pmon = emqx_pmon:monitor(SessionPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, SessionPid}}, - State = #state{session_pmon = PMon}) -> - {noreply, State#state{session_pmon = PMon:demonitor(SessionPid)}}; +handle_cast({notify, {unregistered, _ClientId, SessionPid}}, State = #state{session_pmon = PMon}) -> + {noreply, State#state{session_pmon = emqx_pmon:demonitor(SessionPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[SM] Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, - State = #state{session_pmon = PMon}) -> - case PMon:find(DownPid) of - undefined -> - {noreply, State}; - ClientId -> +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{session_pmon = PMon}) -> + case emqx_pmon:find(DownPid, PMon) of + undefined -> {noreply, State}; + ClientId -> unregister_session({ClientId, DownPid}), - {noreply, State#state{session_pmon = PMon:erase(DownPid)}} + {noreply, State#state{session_pmon = emqx_pmon:erase(DownPid, PMon)}} end; handle_info(Info, State) ->