From 766f74692d7edb38bdad5dbdade0c404bae7a5cd Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 10 Aug 2016 16:36:09 +0800 Subject: [PATCH] mqtt_local_session --- src/emqttd_cm.erl | 44 ++++++++++++++----------------------- src/emqttd_sm.erl | 47 ++++++++++++++-------------------------- src/emqttd_sm_helper.erl | 2 +- src/emqttd_sm_sup.erl | 9 +------- 4 files changed, 34 insertions(+), 68 deletions(-) diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index f78db2ad0..da98dedd6 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -26,7 +26,7 @@ %% API Exports -export([start_link/3]). --export([lookup/1, lookup_proc/1, register/1, unregister/1]). +-export([lookup/1, lookup_proc/1, reg/1, unreg/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -44,23 +44,17 @@ %%-------------------------------------------------------------------- %% @doc Start Client Manager --spec(start_link(Pool, Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - StatsFun :: fun()). +-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, any()}). start_link(Pool, Id, StatsFun) -> gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []). %% @doc Lookup Client by ClientId --spec(lookup(ClientId :: binary()) -> mqtt_client() | undefined). +-spec(lookup(binary()) -> mqtt_client() | undefined). lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(mqtt_client, ClientId) of - [Client] -> Client; - [] -> undefined - end. + case ets:lookup(mqtt_client, ClientId) of [Client] -> Client; [] -> undefined end. %% @doc Lookup client pid by clientId --spec(lookup_proc(ClientId :: binary()) -> pid() | undefined). +-spec(lookup_proc(binary()) -> pid() | undefined). lookup_proc(ClientId) when is_binary(ClientId) -> try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid) catch @@ -68,14 +62,14 @@ lookup_proc(ClientId) when is_binary(ClientId) -> end. %% @doc Register ClientId with Pid. --spec(register(Client :: mqtt_client()) -> ok). -register(Client = #mqtt_client{client_id = ClientId}) -> - gen_server2:call(pick(ClientId), {register, Client}, 120000). +-spec(reg(mqtt_client()) -> ok). +reg(Client = #mqtt_client{client_id = ClientId}) -> + gen_server2:call(pick(ClientId), {reg, Client}, 120000). %% @doc Unregister clientId with pid. --spec(unregister(ClientId :: binary()) -> ok). -unregister(ClientId) when is_binary(ClientId) -> - gen_server2:cast(pick(ClientId), {unregister, ClientId, self()}). +-spec(unreg(binary()) -> ok). +unreg(ClientId) when is_binary(ClientId) -> + gen_server2:cast(pick(ClientId), {unreg, ClientId, self()}). pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId). @@ -88,22 +82,16 @@ init([Pool, Id, StatsFun]) -> {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}. prioritise_call(Req, _From, _Len, _State) -> - case Req of - {register, _Client} -> 2; - _ -> 1 - end. + case Req of {reg, _Client} -> 2; _ -> 1 end. prioritise_cast(Msg, _Len, _State) -> - case Msg of - {unregister, _ClientId, _Pid} -> 9; - _ -> 1 - end. + case Msg of {unreg, _ClientId, _Pid} -> 9; _ -> 1 end. prioritise_info(_Msg, _Len, _State) -> 3. -handle_call({register, Client = #mqtt_client{client_id = ClientId, - client_pid = Pid}}, _From, State) -> +handle_call({reg, Client = #mqtt_client{client_id = ClientId, + client_pid = Pid}}, _From, State) -> case lookup_proc(ClientId) of Pid -> {reply, ok, State}; @@ -115,7 +103,7 @@ handle_call({register, Client = #mqtt_client{client_id = ClientId, handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({unregister, ClientId, Pid}, State) -> +handle_cast({unreg, ClientId, Pid}, State) -> case lookup_proc(ClientId) of Pid -> ets:delete(mqtt_client, ClientId), diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 3ab36186f..de23d3702 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -32,9 +32,7 @@ %% API Function Exports -export([start_link/2]). --export([start_session/2, lookup_session/1]). - --export([register_session/3, unregister_session/2]). +-export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]). -export([dispatch/3]). @@ -60,14 +58,14 @@ mnesia(boot) -> %% Global Session Table - ok = emqttd_mnesia:create_table(session, [ + ok = emqttd_mnesia:create_table(mqtt_session, [ {type, set}, {ram_copies, [node()]}, {record_name, mqtt_session}, {attributes, record_info(fields, mqtt_session)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(session). + ok = emqttd_mnesia:copy_table(mqtt_session). %%-------------------------------------------------------------------- %% API @@ -93,32 +91,22 @@ lookup_session(ClientId) -> end. %% @doc Register a session with info. --spec(register_session(CleanSess, ClientId, Info) -> ok when - CleanSess :: boolean(), - ClientId :: binary(), - Info :: [tuple()]). -register_session(CleanSess, ClientId, Info) -> - ets:insert(sesstab(CleanSess), {{ClientId, self()}, Info}). +-spec(register_session(boolean(), binary(), [tuple()]) -> true). +register_session(CleanSess, ClientId, Properties) -> + ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). %% @doc Unregister a session. --spec(unregister_session(CleanSess, ClientId) -> ok when - CleanSess :: boolean(), - ClientId :: binary()). -unregister_session(CleanSess, ClientId) -> - ets:delete(sesstab(CleanSess), {ClientId, self()}). +-spec(unregister_session(binary()) -> true). +unregister_session(ClientId) -> + ets:delete(mqtt_local_session, ClientId). -%%TODO: FIXME... -dispatch(Id, Topic, Msg) -> - case lookup_session(Id) of - #mqtt_session{sess_pid = Pid} -> - Pid ! {dispatch, Topic, Msg}; - undefined -> - ok +dispatch(ClientId, Topic, Msg) -> + try ets:lookup_element(mqtt_local_session, ClientId, 2) of + Pid -> Pid ! {dispatch, Topic, Msg} + catch + error:badarg -> ok %%TODO: How?? end. -sesstab(true) -> mqtt_transient_session; -sesstab(false) -> mqtt_persistent_session. - call(SM, Req) -> gen_server2:call(SM, Req, ?TIMEOUT). %%infinity). @@ -217,9 +205,7 @@ create_session({CleanSess, ClientId, ClientPid}, State) -> create_session(CleanSess, ClientId, ClientPid) -> case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of {ok, SessPid} -> - Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid, - persistent = not CleanSess}, + Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess}, case insert_session(Session) of {aborted, {conflict, ConflictPid}} -> %% Conflict with othe node? @@ -244,8 +230,7 @@ insert_session(Session = #mqtt_session{client_id = ClientId}) -> end). %% Local node -resume_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid}, ClientPid) +resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) when node(SessPid) =:= node() -> case is_process_alive(SessPid) of diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 1c90acc32..184f2be64 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -83,5 +83,5 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- setstats(State = #state{stats_fun = StatsFun}) -> - StatsFun(ets:info(mqtt_persistent_session, size)), State. + StatsFun(ets:info(mqtt_local_session, size)), State. diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 556d9540f..1935bcfde 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -25,8 +25,6 @@ -define(HELPER, emqttd_sm_helper). --define(TABS, [mqtt_transient_session, mqtt_persistent_session]). - %% API -export([start_link/0]). @@ -38,7 +36,7 @@ start_link() -> init([]) -> %% Create session tables - create_session_tabs(), + ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]), %% Helper StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), @@ -50,9 +48,4 @@ init([]) -> PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. - -create_session_tabs() -> - Opts = [ordered_set, named_table, public, - {write_concurrency, true}], - [ets:new(Tab, Opts) || Tab <- ?TABS].