mqtt_local_session

This commit is contained in:
Feng Lee 2016-08-10 16:36:09 +08:00
parent 162b7ec229
commit 766f74692d
4 changed files with 34 additions and 68 deletions

View File

@ -26,7 +26,7 @@
%% API Exports
-export([start_link/3]).
-export([lookup/1, lookup_proc/1, register/1, unregister/1]).
-export([lookup/1, lookup_proc/1, reg/1, unreg/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -44,23 +44,17 @@
%%--------------------------------------------------------------------
%% @doc Start Client Manager
-spec(start_link(Pool, Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Pool :: atom(),
Id :: pos_integer(),
StatsFun :: fun()).
-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, any()}).
start_link(Pool, Id, StatsFun) ->
gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []).
%% @doc Lookup Client by ClientId
-spec(lookup(ClientId :: binary()) -> mqtt_client() | undefined).
-spec(lookup(binary()) -> mqtt_client() | undefined).
lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(mqtt_client, ClientId) of
[Client] -> Client;
[] -> undefined
end.
case ets:lookup(mqtt_client, ClientId) of [Client] -> Client; [] -> undefined end.
%% @doc Lookup client pid by clientId
-spec(lookup_proc(ClientId :: binary()) -> pid() | undefined).
-spec(lookup_proc(binary()) -> pid() | undefined).
lookup_proc(ClientId) when is_binary(ClientId) ->
try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid)
catch
@ -68,14 +62,14 @@ lookup_proc(ClientId) when is_binary(ClientId) ->
end.
%% @doc Register ClientId with Pid.
-spec(register(Client :: mqtt_client()) -> ok).
register(Client = #mqtt_client{client_id = ClientId}) ->
gen_server2:call(pick(ClientId), {register, Client}, 120000).
-spec(reg(mqtt_client()) -> ok).
reg(Client = #mqtt_client{client_id = ClientId}) ->
gen_server2:call(pick(ClientId), {reg, Client}, 120000).
%% @doc Unregister clientId with pid.
-spec(unregister(ClientId :: binary()) -> ok).
unregister(ClientId) when is_binary(ClientId) ->
gen_server2:cast(pick(ClientId), {unregister, ClientId, self()}).
-spec(unreg(binary()) -> ok).
unreg(ClientId) when is_binary(ClientId) ->
gen_server2:cast(pick(ClientId), {unreg, ClientId, self()}).
pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId).
@ -88,22 +82,16 @@ init([Pool, Id, StatsFun]) ->
{ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}.
prioritise_call(Req, _From, _Len, _State) ->
case Req of
{register, _Client} -> 2;
_ -> 1
end.
case Req of {reg, _Client} -> 2; _ -> 1 end.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{unregister, _ClientId, _Pid} -> 9;
_ -> 1
end.
case Msg of {unreg, _ClientId, _Pid} -> 9; _ -> 1 end.
prioritise_info(_Msg, _Len, _State) ->
3.
handle_call({register, Client = #mqtt_client{client_id = ClientId,
client_pid = Pid}}, _From, State) ->
handle_call({reg, Client = #mqtt_client{client_id = ClientId,
client_pid = Pid}}, _From, State) ->
case lookup_proc(ClientId) of
Pid ->
{reply, ok, State};
@ -115,7 +103,7 @@ handle_call({register, Client = #mqtt_client{client_id = ClientId,
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
handle_cast({unregister, ClientId, Pid}, State) ->
handle_cast({unreg, ClientId, Pid}, State) ->
case lookup_proc(ClientId) of
Pid ->
ets:delete(mqtt_client, ClientId),

View File

@ -32,9 +32,7 @@
%% API Function Exports
-export([start_link/2]).
-export([start_session/2, lookup_session/1]).
-export([register_session/3, unregister_session/2]).
-export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]).
-export([dispatch/3]).
@ -60,14 +58,14 @@
mnesia(boot) ->
%% Global Session Table
ok = emqttd_mnesia:create_table(session, [
ok = emqttd_mnesia:create_table(mqtt_session, [
{type, set},
{ram_copies, [node()]},
{record_name, mqtt_session},
{attributes, record_info(fields, mqtt_session)}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(session).
ok = emqttd_mnesia:copy_table(mqtt_session).
%%--------------------------------------------------------------------
%% API
@ -93,32 +91,22 @@ lookup_session(ClientId) ->
end.
%% @doc Register a session with info.
-spec(register_session(CleanSess, ClientId, Info) -> ok when
CleanSess :: boolean(),
ClientId :: binary(),
Info :: [tuple()]).
register_session(CleanSess, ClientId, Info) ->
ets:insert(sesstab(CleanSess), {{ClientId, self()}, Info}).
-spec(register_session(boolean(), binary(), [tuple()]) -> true).
register_session(CleanSess, ClientId, Properties) ->
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
%% @doc Unregister a session.
-spec(unregister_session(CleanSess, ClientId) -> ok when
CleanSess :: boolean(),
ClientId :: binary()).
unregister_session(CleanSess, ClientId) ->
ets:delete(sesstab(CleanSess), {ClientId, self()}).
-spec(unregister_session(binary()) -> true).
unregister_session(ClientId) ->
ets:delete(mqtt_local_session, ClientId).
%%TODO: FIXME...
dispatch(Id, Topic, Msg) ->
case lookup_session(Id) of
#mqtt_session{sess_pid = Pid} ->
Pid ! {dispatch, Topic, Msg};
undefined ->
ok
dispatch(ClientId, Topic, Msg) ->
try ets:lookup_element(mqtt_local_session, ClientId, 2) of
Pid -> Pid ! {dispatch, Topic, Msg}
catch
error:badarg -> ok %%TODO: How??
end.
sesstab(true) -> mqtt_transient_session;
sesstab(false) -> mqtt_persistent_session.
call(SM, Req) ->
gen_server2:call(SM, Req, ?TIMEOUT). %%infinity).
@ -217,9 +205,7 @@ create_session({CleanSess, ClientId, ClientPid}, State) ->
create_session(CleanSess, ClientId, ClientPid) ->
case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
{ok, SessPid} ->
Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid,
persistent = not CleanSess},
Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess},
case insert_session(Session) of
{aborted, {conflict, ConflictPid}} ->
%% Conflict with othe node?
@ -244,8 +230,7 @@ insert_session(Session = #mqtt_session{client_id = ClientId}) ->
end).
%% Local node
resume_session(Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid}, ClientPid)
resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid)
when node(SessPid) =:= node() ->
case is_process_alive(SessPid) of

View File

@ -83,5 +83,5 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
setstats(State = #state{stats_fun = StatsFun}) ->
StatsFun(ets:info(mqtt_persistent_session, size)), State.
StatsFun(ets:info(mqtt_local_session, size)), State.

View File

@ -25,8 +25,6 @@
-define(HELPER, emqttd_sm_helper).
-define(TABS, [mqtt_transient_session, mqtt_persistent_session]).
%% API
-export([start_link/0]).
@ -38,7 +36,7 @@ start_link() ->
init([]) ->
%% Create session tables
create_session_tabs(),
ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]),
%% Helper
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
@ -50,9 +48,4 @@ init([]) ->
PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]),
{ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.
create_session_tabs() ->
Opts = [ordered_set, named_table, public,
{write_concurrency, true}],
[ets:new(Tab, Opts) || Tab <- ?TABS].