gen_server2

This commit is contained in:
Feng Lee 2015-08-16 11:06:47 +08:00
parent 2f1c03a469
commit 24512a1c1d
3 changed files with 45 additions and 20 deletions

View File

@ -30,7 +30,7 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-behaviour(gen_server). -behaviour(gen_server2).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
@ -59,7 +59,7 @@
Id :: pos_integer(), Id :: pos_integer(),
StatsFun :: fun(). StatsFun :: fun().
start_link(Id, StatsFun) -> start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []). gen_server2:start_link(?MODULE, [Id, StatsFun], []).
pool() -> ?CM_POOL. pool() -> ?CM_POOL.
@ -81,7 +81,7 @@ lookup(ClientId) when is_binary(ClientId) ->
-spec register(Client :: mqtt_client()) -> ok. -spec register(Client :: mqtt_client()) -> ok.
register(Client = #mqtt_client{client_id = ClientId}) -> register(Client = #mqtt_client{client_id = ClientId}) ->
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {register, Client}). gen_server2:cast(CmPid, {register, Client}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unregister clientId with pid. %% @doc Unregister clientId with pid.
@ -90,7 +90,7 @@ register(Client = #mqtt_client{client_id = ClientId}) ->
-spec unregister(ClientId :: binary()) -> ok. -spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) -> unregister(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {unregister, ClientId, self()}). gen_server2:cast(CmPid, {unregister, ClientId, self()}).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -105,7 +105,6 @@ handle_call(Req, _From, State) ->
{reply, {error, badreq}, State}. {reply, {error, badreq}, State}.
handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) ->
lager:info("CM register ~s with ~p", [ClientId, Pid]),
case ets:lookup(mqtt_client, ClientId) of case ets:lookup(mqtt_client, ClientId) of
[#mqtt_client{client_pid = Pid}] -> [#mqtt_client{client_pid = Pid}] ->
lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]), lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]),
@ -119,21 +118,22 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid =
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_cast({unregister, ClientId, Pid}, State) -> handle_cast({unregister, ClientId, Pid}, State) ->
lager:info("CM unregister ~s with ~p", [ClientId, Pid]),
case ets:lookup(mqtt_client, ClientId) of case ets:lookup(mqtt_client, ClientId) of
[#mqtt_client{client_pid = Pid}] -> [#mqtt_client{client_pid = Pid}] ->
ets:delete(mqtt_client, ClientId); ets:delete(mqtt_client, ClientId);
[_] -> [_] ->
ignore; ignore;
[] -> [] ->
lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid])
end, end,
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info(_Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Msg: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{id = Id}) -> terminate(_Reason, #state{id = Id}) ->

View File

@ -51,12 +51,15 @@
-export([dispatch/2, -export([dispatch/2,
match/1]). match/1]).
-behaviour(gen_server). -behaviour(gen_server2).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-define(POOL, pubsub). -define(POOL, pubsub).
-record(state, {id, submap :: map()}). -record(state, {id, submap :: map()}).
@ -104,7 +107,7 @@ mnesia(copy) ->
Id :: pos_integer(), Id :: pos_integer(),
Opts :: list(). Opts :: list().
start_link(Id, Opts) -> start_link(Id, Opts) ->
gen_server:start_link(?MODULE, [Id, Opts], []). gen_server2:start_link(?MODULE, [Id, Opts], []).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Create topic. Notice That this transaction is not protected by pubsub pool %% @doc Create topic. Notice That this transaction is not protected by pubsub pool
@ -157,11 +160,11 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
call(Req) -> call(Req) ->
Pid = gproc_pool:pick_worker(?POOL, self()), Pid = gproc_pool:pick_worker(?POOL, self()),
gen_server:call(Pid, Req, infinity). gen_server2:call(Pid, Req, infinity).
cast(Msg) -> cast(Msg) ->
Pid = gproc_pool:pick_worker(?POOL, self()), Pid = gproc_pool:pick_worker(?POOL, self()),
gen_server:cast(Pid, Msg). gen_server2:cast(Pid, Msg).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish to cluster nodes %% @doc Publish to cluster nodes
@ -232,6 +235,15 @@ init([Id, _Opts]) ->
gproc_pool:connect_worker(pubsub, {?MODULE, Id}), gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
{ok, #state{id = Id, submap = maps:new()}}. {ok, #state{id = Id, submap = maps:new()}}.
prioritise_call(_Msg, _From, _Len, _State) ->
1.
prioritise_cast(_Msg, _Len, _State) ->
0.
prioritise_info(_Msg, _Len, _State) ->
1.
handle_call({subscribe, SubPid, Topics}, _From, State) -> handle_call({subscribe, SubPid, Topics}, _From, State) ->
TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) ->
#mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}; #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos};
@ -363,7 +375,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
end; end;
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View File

@ -44,12 +44,15 @@
-export([register_session/3, unregister_session/2]). -export([register_session/3, unregister_session/2]).
-behaviour(gen_server). -behaviour(gen_server2).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {id, statsfun}). -record(state, {id, statsfun}).
-define(SM_POOL, sm_pool). -define(SM_POOL, sm_pool).
@ -82,7 +85,7 @@ mnesia(copy) ->
Id :: pos_integer(), Id :: pos_integer(),
StatsFun :: fun(). StatsFun :: fun().
start_link(Id, StatsFun) -> start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []). gen_server2:start_link(?MODULE, [Id, StatsFun], []).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Pool name. %% @doc Pool name.
@ -123,7 +126,7 @@ register_session(true, ClientId, Info) ->
register_session(false, ClientId, Info) -> register_session(false, ClientId, Info) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId), SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:cast(SM, {register, ClientId, Info}). gen_server2:cast(SM, {register, ClientId, Info}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unregister a session. %% @doc Unregister a session.
@ -136,9 +139,9 @@ unregister_session(true, ClientId) ->
ets:delete(mqtt_transient_session, ClientId); ets:delete(mqtt_transient_session, ClientId);
unregister_session(false, ClientId) -> unregister_session(false, ClientId) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId), SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:cast(SM, {unregister, ClientId}). gen_server2:cast(SM, {unregister, ClientId}).
call(SM, Req) -> gen_server:call(SM, Req, infinity). call(SM, Req) -> gen_server2:call(SM, Req, infinity).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -148,6 +151,15 @@ init([Id, StatsFun]) ->
gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}. {ok, #state{id = Id, statsfun = StatsFun}}.
prioritise_call(_Msg, _From, _Len, _State) ->
1.
prioritise_cast(_Msg, _Len, _State) ->
0.
prioritise_info(_Msg, _Len, _State) ->
1.
%% persistent session %% persistent session
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
@ -194,7 +206,8 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
end), end),
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_info(_Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{id = Id}) -> terminate(_Reason, #state{id = Id}) ->