Remove 'tuple call' and be compatible with Erlang/OTP R21
This commit is contained in:
parent
1de94b6858
commit
385c7cd3e6
|
@ -133,26 +133,22 @@ handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[CM] Unexpected request: ~p", [Req]),
|
emqx_logger:error("[CM] Unexpected request: ~p", [Req]),
|
||||||
{reply, ignore, State}.
|
{reply, ignore, State}.
|
||||||
|
|
||||||
handle_cast({notify, {registered, ClientId, Pid}},
|
handle_cast({notify, {registered, ClientId, Pid}}, State = #state{client_pmon = PMon}) ->
|
||||||
State = #state{client_pmon = PMon}) ->
|
{noreply, State#state{client_pmon = emqx_pmon:monitor(Pid, ClientId, PMon)}};
|
||||||
{noreply, State#state{client_pmon = PMon:monitor(Pid, ClientId)}};
|
|
||||||
|
|
||||||
handle_cast({notify, {unregistered, _ClientId, Pid}},
|
handle_cast({notify, {unregistered, _ClientId, Pid}}, State = #state{client_pmon = PMon}) ->
|
||||||
State = #state{client_pmon = PMon}) ->
|
{noreply, State#state{client_pmon = emqx_pmon:demonitor(Pid, PMon)}};
|
||||||
{noreply, State#state{client_pmon = PMon:demonitor(Pid)}};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[CM] Unexpected msg: ~p", [Msg]),
|
emqx_logger:error("[CM] Unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
|
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{client_pmon = PMon}) ->
|
||||||
State = #state{client_pmon = PMon}) ->
|
case emqx_pmon:find(DownPid, PMon) of
|
||||||
case PMon:find(DownPid) of
|
undefined -> {noreply, State};
|
||||||
undefined ->
|
ClientId ->
|
||||||
{noreply, State};
|
|
||||||
ClientId ->
|
|
||||||
unregister_client({ClientId, DownPid}),
|
unregister_client({ClientId, DownPid}),
|
||||||
{noreply, State#state{client_pmon = PMon:erase(DownPid)}}
|
{noreply, State#state{client_pmon = emqx_pmon:erase(DownPid, PMon)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -35,10 +35,9 @@ monitor(Pid, PM) ->
|
||||||
|
|
||||||
monitor(Pid, Val, PM = {?MODULE, [M]}) ->
|
monitor(Pid, Val, PM = {?MODULE, [M]}) ->
|
||||||
case maps:is_key(Pid, M) of
|
case maps:is_key(Pid, M) of
|
||||||
true -> PM;
|
true -> PM;
|
||||||
false ->
|
false -> Ref = erlang:monitor(process, Pid),
|
||||||
Ref = erlang:monitor(process, Pid),
|
{?MODULE, [maps:put(Pid, {Ref, Val}, M)]}
|
||||||
{?MODULE, [maps:put(Pid, {Ref, Val}, M)]}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(demonitor(pid(), pmon()) -> pmon()).
|
-spec(demonitor(pid(), pmon()) -> pmon()).
|
||||||
|
@ -48,8 +47,7 @@ demonitor(Pid, PM = {?MODULE, [M]}) ->
|
||||||
%% Don't flush
|
%% Don't flush
|
||||||
_ = erlang:demonitor(Ref),
|
_ = erlang:demonitor(Ref),
|
||||||
{?MODULE, [maps:remove(Pid, M)]};
|
{?MODULE, [maps:remove(Pid, M)]};
|
||||||
error ->
|
error -> PM
|
||||||
PM
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(find(pid(), pmon()) -> undefined | term()).
|
-spec(find(pid(), pmon()) -> undefined | term()).
|
||||||
|
|
|
@ -116,7 +116,7 @@ init([]) ->
|
||||||
init_monitors() ->
|
init_monitors() ->
|
||||||
mnesia:foldl(
|
mnesia:foldl(
|
||||||
fun(#shared_subscription{subpid = SubPid}, Mon) ->
|
fun(#shared_subscription{subpid = SubPid}, Mon) ->
|
||||||
Mon:monitor(SubPid)
|
emqx_pmon:monitor(SubPid, Mon)
|
||||||
end, emqx_pmon:new(), ?TAB).
|
end, emqx_pmon:new(), ?TAB).
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -124,7 +124,7 @@ handle_call(Req, _From, State) ->
|
||||||
{reply, ignore, State}.
|
{reply, ignore, State}.
|
||||||
|
|
||||||
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
|
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
|
||||||
{noreply, update_stats(State#state{pmon = PMon:monitor(SubPid)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Shared] Unexpected msg: ~p", [Msg]),
|
emqx_logger:error("[Shared] Unexpected msg: ~p", [Msg]),
|
||||||
|
@ -132,11 +132,11 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
|
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
|
||||||
#shared_subscription{subpid = SubPid} = NewRecord,
|
#shared_subscription{subpid = SubPid} = NewRecord,
|
||||||
{noreply, update_stats(State#state{pmon = PMon:monitor(SubPid)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
||||||
#shared_subscription{subpid = SubPid} = OldRecord,
|
#shared_subscription{subpid = SubPid} = OldRecord,
|
||||||
{noreply, update_stats(State#state{pmon = PMon:demonitor(SubPid)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, _Event}, State) ->
|
handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
@ -144,7 +144,7 @@ handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
|
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
|
||||||
emqx_logger:info("Shared subscription down: ~p", [SubPid]),
|
emqx_logger:info("Shared subscription down: ~p", [SubPid]),
|
||||||
mnesia:async_dirty(fun cleanup_down/1, [SubPid]),
|
mnesia:async_dirty(fun cleanup_down/1, [SubPid]),
|
||||||
{noreply, update_stats(State#state{pmon = PMon:erase(SubPid)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Shared] Unexpected info: ~p", [Info]),
|
emqx_logger:error("[Shared] Unexpected info: ~p", [Info]),
|
||||||
|
@ -162,12 +162,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
cleanup_down(SubPid) ->
|
cleanup_down(SubPid) ->
|
||||||
Pat = #shared_subscription{_ = '_', subpid = SubPid},
|
Pat = #shared_subscription{_ = '_', subpid = SubPid},
|
||||||
lists:foreach(fun(Record) ->
|
lists:foreach(fun(Record) -> mnesia:delete_object(?TAB, Record) end, mnesia:match_object(Pat)).
|
||||||
mnesia:delete_object(?TAB, Record)
|
|
||||||
end, mnesia:match_object(Pat)).
|
|
||||||
|
|
||||||
update_stats(State) ->
|
update_stats(State) ->
|
||||||
emqx_stats:setstat('subscriptions/shared/count',
|
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.
|
||||||
'subscriptions/shared/max',
|
|
||||||
ets:info(?TAB, size)), State.
|
|
||||||
|
|
||||||
|
|
|
@ -216,26 +216,22 @@ handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[SM] Unexpected request: ~p", [Req]),
|
emqx_logger:error("[SM] Unexpected request: ~p", [Req]),
|
||||||
{reply, ignore, State}.
|
{reply, ignore, State}.
|
||||||
|
|
||||||
handle_cast({notify, {registered, ClientId, SessionPid}},
|
handle_cast({notify, {registered, ClientId, SessionPid}}, State = #state{session_pmon = PMon}) ->
|
||||||
State = #state{session_pmon = PMon}) ->
|
{noreply, State#state{session_pmon = emqx_pmon:monitor(SessionPid, ClientId, PMon)}};
|
||||||
{noreply, State#state{session_pmon = PMon:monitor(SessionPid, ClientId)}};
|
|
||||||
|
|
||||||
handle_cast({notify, {unregistered, _ClientId, SessionPid}},
|
handle_cast({notify, {unregistered, _ClientId, SessionPid}}, State = #state{session_pmon = PMon}) ->
|
||||||
State = #state{session_pmon = PMon}) ->
|
{noreply, State#state{session_pmon = emqx_pmon:demonitor(SessionPid, PMon)}};
|
||||||
{noreply, State#state{session_pmon = PMon:demonitor(SessionPid)}};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[SM] Unexpected msg: ~p", [Msg]),
|
emqx_logger:error("[SM] Unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
|
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{session_pmon = PMon}) ->
|
||||||
State = #state{session_pmon = PMon}) ->
|
case emqx_pmon:find(DownPid, PMon) of
|
||||||
case PMon:find(DownPid) of
|
undefined -> {noreply, State};
|
||||||
undefined ->
|
ClientId ->
|
||||||
{noreply, State};
|
|
||||||
ClientId ->
|
|
||||||
unregister_session({ClientId, DownPid}),
|
unregister_session({ClientId, DownPid}),
|
||||||
{noreply, State#state{session_pmon = PMon:erase(DownPid)}}
|
{noreply, State#state{session_pmon = emqx_pmon:erase(DownPid, PMon)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
Loading…
Reference in New Issue