From 56b88dd7f7a3a93a4045bfad226d8fa8e6e310d4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 23 Mar 2018 16:39:23 +0800 Subject: [PATCH] Improve the design of session management --- include/emqx.hrl | 1 + src/emqx.erl | 2 +- src/emqx_protocol.erl | 7 +- src/emqx_session.erl | 51 ++- src/emqx_session_sup.erl | 8 +- src/emqx_sm.erl | 358 +++++------------- src/emqx_sm_helper.erl | 87 ----- src/emqx_sm_locker.erl | 16 +- src/emqx_sm_sup.erl | 29 +- src/emqx_sup.erl | 1 + src/emqx_tables.erl | 27 ++ src/{emqx_trace.erl => emqx_tracer.erl} | 0 ...emqx_trace_sup.erl => emqx_tracer_sup.erl} | 10 +- 13 files changed, 178 insertions(+), 419 deletions(-) delete mode 100644 src/emqx_sm_helper.erl create mode 100644 src/emqx_tables.erl rename src/{emqx_trace.erl => emqx_tracer.erl} (100%) rename src/{emqx_trace_sup.erl => emqx_tracer_sup.erl} (81%) diff --git a/include/emqx.hrl b/include/emqx.hrl index c1b366b86..5c67236d0 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -63,6 +63,7 @@ clean_start := boolean(), expiry_interval := non_neg_integer()}). +-record(session, {client_id, sess_pid}). %%-------------------------------------------------------------------- %% Message and Delivery diff --git a/src/emqx.erl b/src/emqx.erl index 386fc222c..967478a8c 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (C) 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 103fbf4ff..7f58dee51 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -219,8 +219,11 @@ process(?CONNECT_PACKET(Var), State0) -> State2 = maybe_set_clientid(State1), %% Start session - case emqx_sm:start_session(CleanSess, {clientid(State2), Username}) of - {ok, Session, SP} -> + case emqx_sm:open_session(#{clean_start => CleanSess, + client_id => clientid(State2), + username => Username}) of + {ok, Session} -> %% TODO:... + SP = true, %% TODO:... %% Register the client emqx_cm:reg(client(State2)), %% Start keepalive diff --git a/src/emqx_session.erl b/src/emqx_session.erl index c8e069cd7..611e967be 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -14,35 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% -%% @doc MQTT Session -%% -%% A stateful interaction between a Client and a Server. Some Sessions -%% last only as long as the Network Connection, others can span multiple -%% consecutive Network Connections between a Client and a Server. -%% -%% The Session state in the Server consists of: -%% -%% The existence of a Session, even if the rest of the Session state is empty. -%% -%% The Client’s subscriptions. -%% -%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not -%% been completely acknowledged. -%% -%% QoS 1 and QoS 2 messages pending transmission to the Client. -%% -%% QoS 2 messages which have been received from the Client, but have not -%% been completely acknowledged. -%% -%% Optionally, QoS 0 messages pending transmission to the Client. -%% -%% If the session is currently disconnected, the time at which the Session state -%% will be deleted. -%% -%% @end -%% - -module(emqx_session). -behaviour(gen_server). @@ -76,6 +47,28 @@ -define(MQueue, emqx_mqueue). +%% A stateful interaction between a Client and a Server. Some Sessions +%% last only as long as the Network Connection, others can span multiple +%% consecutive Network Connections between a Client and a Server. +%% +%% The Session state in the Server consists of: +%% +%% The existence of a Session, even if the rest of the Session state is empty. +%% +%% The Client’s subscriptions. +%% +%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not +%% been completely acknowledged. +%% +%% QoS 1 and QoS 2 messages pending transmission to the Client. +%% +%% QoS 2 messages which have been received from the Client, but have not +%% been completely acknowledged. +%% +%% Optionally, QoS 0 messages pending transmission to the Client. +%% +%% If the session is currently disconnected, the time at which the Session state +%% will be deleted. -record(state, { %% Clean Session Flag diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index c7e18e1cd..3e61f09d1 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -20,18 +20,16 @@ -include("emqx.hrl"). --export([start_link/0, start_session_process/1]). +-export([start_link/0, start_session/1]). -export([init/1]). -%% @doc Start session supervisor -spec(start_link() -> {ok, pid()}). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%% @doc Start a session process --spec(start_session_process(session()) -> {ok, pid()}). -start_session_process(Session) -> +-spec(start_session(session()) -> {ok, pid()}). +start_session(Session) -> supervisor:start_child(?MODULE, [Session]). %%-------------------------------------------------------------------- diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index b90dd693b..a3f9e88cb 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -20,220 +20,152 @@ -include("emqx.hrl"). -%% Mnesia Callbacks --export([mnesia/1]). +-export([start_link/1]). --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). +-export([open_session/1, lookup_session/1, close_session/1]). +-export([resume_session/1, discard_session/1]). +-export([register_session/1, unregister_session/2]). -%% API Function Exports --export([start_link/2]). - --export([open_session/1, start_session/2, lookup_session/1, register_session/3, - unregister_session/1, unregister_session/2]). +%% lock_session/1, create_session/1, unlock_session/1, -export([dispatch/3]). --export([local_sessions/0]). - -%% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id, monitors}). +-record(state, {stats_fun, stats_timer, monitors = #{}}). --define(POOL, ?MODULE). +-spec(start_link(StatsFun :: fun()) -> {ok, pid()} | ignore | {error, term()}). +start_link(StatsFun) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). --define(TIMEOUT, 120000). - --define(LOG(Level, Format, Args, Session), - lager:Level("SM(~s): " ++ Format, [Session#session.client_id | Args])). - -%%-------------------------------------------------------------------- -%% Mnesia callbacks -%%-------------------------------------------------------------------- - -mnesia(boot) -> - %% Global Session Table - ok = ekka_mnesia:create_table(session, [ - {type, set}, - {ram_copies, [node()]}, - {record_name, mqtt_session}, - {attributes, record_info(fields, mqtt_session)}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(mqtt_session). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% Open a clean start session. -open_session(Session = #{client_id := ClientId, clean_start := true, expiry_interval := Interval}) -> +open_session(Session = #{client_id := ClientId, clean_start := true}) -> with_lock(ClientId, fun() -> - {ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]), - io:format("ResL: ~p, BadNodes: ~p~n", [ResL, BadNodes]), - case Interval > 0 of - true -> - {ok, emqx_session_sup:start_session_process(Session)}; - false -> - {ok, emqx_session:init_state(Session)} + case rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]) of + {_Res, []} -> ok; + {_Res, BadNodes} -> emqx_log:error("[SM] Bad nodes found when lock a session: ~p", [BadNodes]) + end, + {ok, emqx_session_sup:start_session(Session)} + end); + +open_session(Session = #{client_id := ClientId, clean_start := false}) -> + with_lock(ClientId, + fun() -> + {ResL, _BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]), + case lists:flatten([Pid || Pid <- ResL, Pid =/= undefined]) of + [] -> + {ok, emqx_session_sup:start_session(Session)}; + [SessPid|_] -> + case resume_session(SessPid) of + ok -> {ok, SessPid}; + {error, Reason} -> + emqx_log:error("[SM] Failed to resume session: ~p, ~p", [Session, Reason]), + {ok, emqx_session_sup:start_session(Session)} + end end end). -open_session(Session = #{client_id := ClientId, clean_start := false, expiry_interval := Interval}) -> - with_lock(ClientId, - fun() -> - {ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]), - [SessionPid | _] = lists:flatten(ResL), +resume_session(SessPid) when node(SessPid) == node() -> + case is_process_alive(SessPid) of + true -> + emqx_session:resume(SessPid, self()); + false -> + emqx_log:error("Cannot resume ~p which seems already dead!", [SessPid]), + {error, session_died} + end; + +resume_session(SessPid) -> + case rpc:call(node(SessPid), emqx_session, resume, [SessPid]) of + ok -> {ok, SessPid}; + {badrpc, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason} + end. - - - end). +discard_session(ClientId) -> + case lookup_session(ClientId) of + undefined -> ok; + Pid -> emqx_session:discard(Pid) + end. lookup_session(ClientId) -> - ets:lookup(session, ClientId). - - -lookup_session(ClientId) -> - ets:lookup(session, ClientId). - -with_lock(undefined, Fun) -> - Fun(); + try ets:lookup_element(session, ClientId, 2) catch error:badarg -> undefined end. +close_session(SessPid) -> + emqx_session:close(SessPid). with_lock(ClientId, Fun) -> case emqx_sm_locker:lock(ClientId) of - true -> Result = Fun(), - ok = emqx_sm_locker:unlock(ClientId), - Result; - false -> {error, client_id_unavailable} + true -> Result = Fun(), + emqx_sm_locker:unlock(ClientId), + Result; + false -> {error, client_id_unavailable}; + {error, Reason} -> {error, Reason} end. -%% @doc Start a session manager --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id) -> - gen_server:start_link(?MODULE, [Pool, Id], []). - -%% @doc Start a session --spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}). -start_session(CleanSess, {ClientId, Username}) -> - SM = gproc_pool:pick_worker(?POOL, ClientId), - call(SM, {start_session, CleanSess, {ClientId, Username}, self()}). - -%% @doc Lookup a Session --spec(lookup_session(binary()) -> mqtt_session() | undefined). -lookup_session(ClientId) -> - case mnesia:dirty_read(mqtt_session, ClientId) of - [Session] -> Session; - [] -> undefined - end. - -%% @doc Register a session with info. --spec(register_session(binary(), boolean(), [tuple()]) -> true). -register_session(ClientId, CleanSess, Properties) -> - ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). - -%% @doc Unregister a Session. --spec(unregister_session(binary()) -> boolean()). -unregister_session(ClientId) -> - unregister_session(ClientId, self()). +-spec(register_session(client_id()) -> true). +register_session(ClientId) -> + ets:insert(session, {ClientId, self()}). unregister_session(ClientId, Pid) -> - case ets:lookup(mqtt_local_session, ClientId) of - [LocalSess = {_, Pid, _, _}] -> - emqx_stats:del_session_stats(ClientId), - ets:delete_object(mqtt_local_session, LocalSess); + case ets:lookup(session, ClientId) of + [{_, Pid}] -> + ets:delete_object(session, {ClientId, Pid}); _ -> false end. dispatch(ClientId, Topic, Msg) -> - try ets:lookup_element(mqtt_local_session, ClientId, 2) of - Pid -> Pid ! {dispatch, Topic, Msg} - catch - error:badarg -> - emqx_hooks:run('message.dropped', [ClientId, Msg]), - ok %%TODO: How?? + case lookup_session(ClientId) of + Pid when is_pid(Pid) -> + Pid ! {dispatch, Topic, Msg}; + undefined -> + emqx_hooks:run('message.dropped', [ClientId, Msg]) end. -call(SM, Req) -> - gen_server:call(SM, Req, ?TIMEOUT). %%infinity). - -%% @doc for debug. -local_sessions() -> - ets:tab2list(mqtt_local_session). - %%-------------------------------------------------------------------- -%% gen_server Callbacks +%% gen_server callbacks %%-------------------------------------------------------------------- -init([Pool, Id]) -> - gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id, monitors = dict:new()}}. - -%% Persistent Session -handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) -> - case lookup_session(ClientId) of - undefined -> - %% Create session locally - create_session({false, {ClientId, Username}, ClientPid}, State); - Session -> - case resume_session(Session, ClientPid) of - {ok, SessPid} -> - {reply, {ok, SessPid, true}, State}; - {error, Erorr} -> - {reply, {error, Erorr}, State} - end - end; - -%% Transient Session -handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State) -> - Client = {true, {ClientId, Username}, ClientPid}, - case lookup_session(ClientId) of - undefined -> - create_session(Client, State); - Session -> - case destroy_session(Session) of - ok -> - create_session(Client, State); - {error, Error} -> - {reply, {error, Error}, State} - end - end; +init([StatsFun]) -> + {ok, TRef} = timer:send_interval(timer:seconds(1), stats), + {ok, #state{stats_fun = StatsFun, stats_timer = TRef}}. handle_call(Req, _From, State) -> - lager:error("[MQTT-SM] Unexpected Request: ~p", [Req]), + emqx_log:error("[SM] Unexpected request: ~p", [Req]), {reply, ignore, State}. +handle_cast({monitor_session, SessionPid, ClientId}, + State = #state{monitors = Monitors}) -> + MRef = erlang:monitor(process, SessionPid), + {noreply, State#state{monitors = maps:put(MRef, ClientId, Monitors)}}; + handle_cast(Msg, State) -> - lager:error("[MQTT-SM] Unexpected Message: ~p", [Msg]), + emqx_log:error("[SM] Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - case dict:find(MRef, State#state.monitors) of - {ok, ClientId} -> - case mnesia:dirty_read({mqtt_session, ClientId}) of - [] -> - ok; - [Sess = #mqtt_session{sess_pid = DownPid}] -> - mnesia:dirty_delete_object(Sess); - [_Sess] -> - ok - end, - {noreply, erase_monitor(MRef, State), hibernate}; +handle_info(stats, State) -> + {noreply, setstats(State), hibernate}; + +handle_info({'DOWN', MRef, process, DownPid, _Reason}, + State = #state{monitors = Monitors}) -> + case maps:find(MRef, Monitors) of + {ok, {ClientId, Pid}} -> + ets:delete_object(session, {ClientId, Pid}), + {noreply, setstats(State#state{monitors = maps:remove(MRef, Monitors)})}; error -> - lager:error("MRef of session ~p not found", [DownPid]), + emqx_log:error("session ~p not found", [DownPid]), {noreply, State} end; handle_info(Info, State) -> - lager:error("[MQTT-SM] Unexpected Info: ~p", [Info]), + emqx_log:error("[SM] Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). +terminate(_Reason, _State = #state{stats_timer = TRef}) -> + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -242,100 +174,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -%% Create Session Locally -create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> - case create_session(CleanSess, {ClientId, Username}, ClientPid) of - {ok, SessPid} -> - {reply, {ok, SessPid, false}, - monitor_session(ClientId, SessPid, State)}; - {error, Error} -> - {reply, {error, Error}, State} - end. - -create_session(CleanSess, {ClientId, Username}, ClientPid) -> - case emqx_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of - {ok, SessPid} -> - Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, clean_sess = CleanSess}, - case insert_session(Session) of - {aborted, {conflict, ConflictPid}} -> - %% Conflict with othe node? - lager:error("SM(~s): Conflict with ~p", [ClientId, ConflictPid]), - {error, mnesia_conflict}; - {atomic, ok} -> - {ok, SessPid} - end; - {error, Error} -> - {error, Error} - end. - -insert_session(Session = #mqtt_session{client_id = ClientId}) -> - mnesia:transaction( - fun() -> - case mnesia:wread({mqtt_session, ClientId}) of - [] -> - mnesia:write(mqtt_session, Session, write); - [#mqtt_session{sess_pid = SessPid}] -> - mnesia:abort({conflict, SessPid}) - end - end). - -%% Local node -resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) - when node(SessPid) =:= node() -> - - case is_process_alive(SessPid) of - true -> - emqx_session:resume(SessPid, ClientId, ClientPid), - {ok, SessPid}; - false -> - ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session), - {error, session_died} - end; - -%% Remote node -resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) -> - Node = node(SessPid), - case rpc:call(Node, emqx_session, resume, [SessPid, ClientId, ClientPid]) of - ok -> - {ok, SessPid}; - {badrpc, nodedown} -> - ?LOG(error, "Session died for node '~s' down", [Node], Session), - remove_session(Session), - {error, session_nodedown}; - {badrpc, Reason} -> - ?LOG(error, "Failed to resume from node ~s for ~p", [Node, Reason], Session), - {error, Reason} - end. - -%% Local node -destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) - when node(SessPid) =:= node() -> - emqx_session:destroy(SessPid, ClientId), - remove_session(Session); - -%% Remote node -destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) -> - Node = node(SessPid), - case rpc:call(Node, emqx_session, destroy, [SessPid, ClientId]) of - ok -> - remove_session(Session); - {badrpc, nodedown} -> - ?LOG(error, "Node '~s' down", [Node], Session), - remove_session(Session); - {badrpc, Reason} -> - ?LOG(error, "Failed to destory ~p on remote node ~p for ~s", - [SessPid, Node, Reason], Session), - {error, Reason} - end. - -remove_session(Session) -> - mnesia:dirty_delete_object(Session). - -monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> - MRef = erlang:monitor(process, SessPid), - State#state{monitors = dict:store(MRef, ClientId, Monitors)}. - -erase_monitor(MRef, State = #state{monitors = Monitors}) -> - erlang:demonitor(MRef, [flush]), - State#state{monitors = dict:erase(MRef, Monitors)}. +setstats(State = #state{stats_fun = StatsFun}) -> + StatsFun(ets:info(session, size)), State. diff --git a/src/emqx_sm_helper.erl b/src/emqx_sm_helper.erl deleted file mode 100644 index 068156b94..000000000 --- a/src/emqx_sm_helper.erl +++ /dev/null @@ -1,87 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_sm_helper). - --behaviour(gen_server). - --include("emqx.hrl"). - --include_lib("stdlib/include/ms_transform.hrl"). - -%% API Function Exports --export([start_link/1]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {stats_fun, ticker}). - --define(LOCK, {?MODULE, clean_sessions}). - -%% @doc Start a session helper --spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}). -start_link(StatsFun) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). - -init([StatsFun]) -> - ekka:monitor(membership), - {ok, TRef} = timer:send_interval(timer:seconds(1), tick), - {ok, #state{stats_fun = StatsFun, ticker = TRef}}. - -handle_call(Req, _From, State) -> - lager:error("[SM-HELPER] Unexpected Call: ~p", [Req]), - {reply, ignore, State}. - -handle_cast(Msg, State) -> - lager:error("[SM-HELPER] Unexpected Cast: ~p", [Msg]), - {noreply, State}. - -handle_info({membership, {mnesia, down, Node}}, State) -> - Fun = fun() -> - ClientIds = - mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, - [{'==', {node, '$2'}, Node}], ['$1']}]), - lists:foreach(fun(ClientId) -> mnesia:delete({mqtt_session, ClientId}) end, ClientIds) - end, - global:trans({?LOCK, self()}, fun() -> mnesia:async_dirty(Fun) end), - {noreply, State, hibernate}; - -handle_info({membership, _Event}, State) -> - {noreply, State}; - -handle_info(tick, State) -> - {noreply, setstats(State), hibernate}; - -handle_info(Info, State) -> - lager:error("[SM-HELPER] Unexpected Info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State = #state{ticker = TRef}) -> - timer:cancel(TRef), - ekka:unmonitor(membership). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -setstats(State = #state{stats_fun = StatsFun}) -> - StatsFun(ets:info(mqtt_local_session, size)), State. - diff --git a/src/emqx_sm_locker.erl b/src/emqx_sm_locker.erl index 53af8a678..fc61e6b06 100644 --- a/src/emqx_sm_locker.erl +++ b/src/emqx_sm_locker.erl @@ -18,26 +18,16 @@ -include("emqx.hrl"). --export([start_link/0]). - %% Lock/Unlock API based on canal-lock. -export([lock/1, unlock/1]). -%% @doc Starts the lock server --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - canal_lock:start_link(?MODULE, 1). - %% @doc Lock a clientid --spec(lock(client_id()) -> boolean()). +-spec(lock(client_id()) -> boolean() | {error, term()}). lock(ClientId) -> - case canal_lock:acquire(?MODULE, ClientId, 1, 1) of - {acquired, 1} -> true; - full -> false - end. + emqx_rpc:call(ekka:leader(), emqx_sm_locker, lock, [ClientId]). %% @doc Unlock a clientid -spec(unlock(client_id()) -> ok). unlock(ClientId) -> - canal_lock:release(?MODULE, ClientId, 1, 1). + emqx_rpc:call(ekka:leader(), emqx_locker, unlock, [ClientId]). diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index 1b032a5ae..52138f62a 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -18,31 +18,28 @@ -behaviour(supervisor). - --define(SM, emqx_sm). - --define(HELPER, emqx_sm_helper). - -%% API -export([start_link/0]). -%% Supervisor callbacks -export([init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - %% Create session tables - _ = ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]), + %% Create tables + create_tabs(), - %% Helper StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), - Helper = {?HELPER, {?HELPER, start_link, [StatsFun]}, - permanent, 5000, worker, [?HELPER]}, - %% SM Pool Sup - MFA = {?SM, start_link, []}, - PoolSup = emqx_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), - {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. + SM = {emqx_sm, {emqx_sm, start_link, [StatsFun]}, + permanent, 5000, worker, [emqx_sm]}, + + {ok, {{one_for_all, 0, 3600}, [SM]}}. + +create_tabs() -> + lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]). + +create_tab(Tab) -> + emqx_tables:create(Tab, [public, ordered_set, named_table, + {write_concurrency, true}]). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index b1e229ed6..ff632d4e7 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -57,6 +57,7 @@ init([]) -> {ok, {{one_for_all, 0, 1}, [?CHILD(emqx_ctl, worker), ?CHILD(emqx_hooks, worker), + ?CHILD(emqx_locker, worker), ?CHILD(emqx_stats, worker), ?CHILD(emqx_metrics, worker), ?CHILD(emqx_router_sup, supervisor), diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl new file mode 100644 index 000000000..d971b99cf --- /dev/null +++ b/src/emqx_tables.erl @@ -0,0 +1,27 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_tables). + +-export([create/2]). + +create(Tab, Opts) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, lists:usort([named_table|Opts])); + Tab -> Tab + end. + diff --git a/src/emqx_trace.erl b/src/emqx_tracer.erl similarity index 100% rename from src/emqx_trace.erl rename to src/emqx_tracer.erl diff --git a/src/emqx_trace_sup.erl b/src/emqx_tracer_sup.erl similarity index 81% rename from src/emqx_trace_sup.erl rename to src/emqx_tracer_sup.erl index e5f7bc2f2..35c0b1d5b 100644 --- a/src/emqx_trace_sup.erl +++ b/src/emqx_tracer_sup.erl @@ -14,21 +14,19 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_trace_sup). +-module(emqx_tracer_sup). -behaviour(supervisor). -%% API -export([start_link/0]). -%% Supervisor callbacks -export([init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Trace = {trace, {emqx_trace, start_link, []}, - permanent, 5000, worker, [emqx_trace]}, - {ok, {{one_for_one, 10, 3600}, [Trace]}}. + Tracer = {tracer, {emqx_tracer, start_link, []}, + permanent, 5000, worker, [emqx_tracer]}, + {ok, {{one_for_one, 10, 3600}, [Tracer]}}.