diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 55359ec6b..9c28654eb 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -30,7 +30,7 @@ -include("emqttd.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(SERVER, ?MODULE). @@ -59,7 +59,7 @@ Id :: pos_integer(), StatsFun :: fun(). start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + gen_server2:start_link(?MODULE, [Id, StatsFun], []). pool() -> ?CM_POOL. @@ -81,7 +81,7 @@ lookup(ClientId) when is_binary(ClientId) -> -spec register(Client :: mqtt_client()) -> ok. register(Client = #mqtt_client{client_id = ClientId}) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {register, Client}). + gen_server2:cast(CmPid, {register, Client}). %%------------------------------------------------------------------------------ %% @doc Unregister clientId with pid. @@ -90,7 +90,7 @@ register(Client = #mqtt_client{client_id = ClientId}) -> -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {unregister, ClientId, self()}). + gen_server2:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks @@ -105,7 +105,6 @@ handle_call(Req, _From, State) -> {reply, {error, badreq}, State}. handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> - lager:info("CM register ~s with ~p", [ClientId, Pid]), case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]), @@ -119,21 +118,22 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> - lager:info("CM unregister ~s with ~p", [ClientId, Pid]), case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> ets:delete(mqtt_client, ClientId); [_] -> ignore; [] -> - lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) + lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid]) end, {noreply, setstats(State)}; -handle_cast(_Msg, State) -> +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. -handle_info(_Info, State) -> +handle_info(Info, State) -> + lager:critical("Unexpected Msg: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1420fb2df..4c89cd63a 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -51,12 +51,15 @@ -export([dispatch/2, match/1]). --behaviour(gen_server). +-behaviour(gen_server2). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -define(POOL, pubsub). -record(state, {id, submap :: map()}). @@ -104,7 +107,7 @@ mnesia(copy) -> Id :: pos_integer(), Opts :: list(). start_link(Id, Opts) -> - gen_server:start_link(?MODULE, [Id, Opts], []). + gen_server2:start_link(?MODULE, [Id, Opts], []). %%------------------------------------------------------------------------------ %% @doc Create topic. Notice That this transaction is not protected by pubsub pool @@ -157,11 +160,11 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> call(Req) -> Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server:call(Pid, Req, infinity). + gen_server2:call(Pid, Req, infinity). cast(Msg) -> Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server:cast(Pid, Msg). + gen_server2:cast(Pid, Msg). %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes @@ -232,6 +235,15 @@ init([Id, _Opts]) -> gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. +prioritise_call(_Msg, _From, _Len, _State) -> + 1. + +prioritise_cast(_Msg, _Len, _State) -> + 0. + +prioritise_info(_Msg, _Len, _State) -> + 1. + handle_call({subscribe, SubPid, Topics}, _From, State) -> TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}; @@ -363,7 +375,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa end; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), + lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index f1e3ae567..6d52c40ab 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -44,12 +44,15 @@ -export([register_session/3, unregister_session/2]). --behaviour(gen_server). +-behaviour(gen_server2). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -record(state, {id, statsfun}). -define(SM_POOL, sm_pool). @@ -82,7 +85,7 @@ mnesia(copy) -> Id :: pos_integer(), StatsFun :: fun(). start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + gen_server2:start_link(?MODULE, [Id, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Pool name. @@ -123,7 +126,7 @@ register_session(true, ClientId, Info) -> register_session(false, ClientId, Info) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:cast(SM, {register, ClientId, Info}). + gen_server2:cast(SM, {register, ClientId, Info}). %%------------------------------------------------------------------------------ %% @doc Unregister a session. @@ -136,9 +139,9 @@ unregister_session(true, ClientId) -> ets:delete(mqtt_transient_session, ClientId); unregister_session(false, ClientId) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:cast(SM, {unregister, ClientId}). + gen_server2:cast(SM, {unregister, ClientId}). -call(SM, Req) -> gen_server:call(SM, Req, infinity). +call(SM, Req) -> gen_server2:call(SM, Req, infinity). %%%============================================================================= %%% gen_server callbacks @@ -148,6 +151,15 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. +prioritise_call(_Msg, _From, _Len, _State) -> + 1. + +prioritise_cast(_Msg, _Len, _State) -> + 0. + +prioritise_info(_Msg, _Len, _State) -> + 1. + %% persistent session handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> case lookup_session(ClientId) of @@ -194,7 +206,8 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> end), {noreply, setstats(State)}; -handle_info(_Info, State) -> +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) ->