From 8d50c62a94f0bfdf2aefef97a468614824f40dca Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 12 Dec 2018 16:10:16 +0800 Subject: [PATCH] Optimize connection and session management --- src/emqx_cm.erl | 24 +++++++---- src/emqx_protocol.erl | 13 +++--- src/emqx_session.erl | 15 ++++--- src/emqx_session_sup.erl | 2 +- src/emqx_sm.erl | 88 ++++++++++++++++++++-------------------- src/emqx_sm_locker.erl | 2 +- src/emqx_sm_registry.erl | 5 +-- 7 files changed, 78 insertions(+), 71 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 6756cf02b..0d2ecf5eb 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -36,7 +36,7 @@ -define(CM, ?MODULE). %% ETS Tables. --define(CONN_TAB, emqx_conn). +-define(CONN_TAB, emqx_conn). -define(CONN_ATTRS_TAB, emqx_conn_attrs). -define(CONN_STATS_TAB, emqx_conn_stats). @@ -56,7 +56,7 @@ register_connection(ClientId) when is_binary(ClientId) -> register_connection({ClientId, self()}); register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> - _ = ets:insert(?CONN_TAB, Conn), + true = ets:insert(?CONN_TAB, Conn), notify({registered, ClientId, ConnPid}). -spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok). @@ -87,10 +87,13 @@ unregister_connection(ClientId) when is_binary(ClientId) -> unregister_connection({ClientId, self()}); unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> - _ = ets:delete(?CONN_STATS_TAB, Conn), - _ = ets:delete(?CONN_ATTRS_TAB, Conn), - _ = ets:delete_object(?CONN_TAB, Conn), - notify({unregistered, ClientId, ConnPid}). + do_unregister_connection(Conn), + notify({unregistered, ConnPid}). + +do_unregister_connection(Conn) -> + true = ets:delete(?CONN_STATS_TAB, Conn), + true = ets:delete(?CONN_ATTRS_TAB, Conn), + true = ets:delete_object(?CONN_TAB, Conn). %% @doc Lookup connection pid -spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined). @@ -138,7 +141,7 @@ handle_call(Req, _From, State) -> handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> +handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) -> {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}}; handle_cast(Msg, State) -> @@ -150,7 +153,12 @@ handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := P undefined -> {noreply, State}; ClientId -> - unregister_connection({ClientId, ConnPid}), + Conn = {ClientId, ConnPid}, + case ets:member(?CONN_ATTRS_TAB, Conn) of + true -> + ok = emqx_pool:async_submit(fun do_unregister_connection/1, [Conn]); + false -> ok + end, {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}} end; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 573b913f7..3052191a1 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -849,14 +849,13 @@ shutdown(_Reason, #pstate{client_id = undefined}) -> ok; shutdown(_Reason, #pstate{connected = false}) -> ok; -shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; - Reason =:= discard -> - emqx_cm:unregister_connection(ClientId); -shutdown(Reason, PState = #pstate{connected = true, - client_id = ClientId}) -> +shutdown(conflict, _PState) -> + ok; +shutdown(discard, _PState) -> + ok; +shutdown(Reason, PState) -> ?LOG(info, "Shutdown for ~p", [Reason]), - emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), - emqx_cm:unregister_connection(ClientId). + emqx_hooks:run('client.disconnected', [credentials(PState), Reason]). start_keepalive(0, _PState) -> ignore; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 262a9a7a8..b92baa567 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -645,16 +645,14 @@ handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) -> - emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), +terminate(Reason, #state{will_msg = WillMsg, conn_pid = ConnPid}) -> + %% Should not run hooks here. + %% emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), send_willmsg(WillMsg), %% Ensure to shutdown the connection - if - ConnPid =/= undefined -> - ConnPid ! {shutdown, Reason}; - true -> ok - end, - emqx_sm:unregister_session(ClientId). + (ConnPid =:= undefined) orelse ConnPid ! {shutdown, Reason}. + %% Let it crash. + %% emqx_sm:unregister_session(ClientId). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -1011,3 +1009,4 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. + diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index 644e33f37..8efc4afc8 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -38,7 +38,7 @@ init([]) -> [#{id => session, start => {emqx_session, start_link, []}, restart => temporary, - shutdown => 5000, + shutdown => brutal_kill, type => worker, modules => [emqx_session]}]}}. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 637e44b0c..df2e4b862 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -40,9 +40,9 @@ -define(SM, ?MODULE). -%% ETS Tables --define(SESSION_TAB, emqx_session). --define(SESSION_P_TAB, emqx_persistent_session). +%% ETS Tables for session management. +-define(SESSION_TAB, emqx_session). +-define(SESSION_P_TAB, emqx_session_p). -define(SESSION_ATTRS_TAB, emqx_session_attrs). -define(SESSION_STATS_TAB, emqx_session_stats). @@ -59,8 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, - client_id := ClientId}) -> +open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> ResumeStart = fun(_) -> case resume_session(ClientId, SessAttrs) of {ok, SPid} -> @@ -77,13 +76,14 @@ discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, self()). discard_session(ClientId, ConnPid) when is_binary(ClientId) -> - lists:foreach(fun({_ClientId, SPid}) -> - case catch emqx_session:discard(SPid, ConnPid) of - {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> - emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); - ok -> ok - end - end, lookup_session(ClientId)). + lists:foreach( + fun({_ClientId, SPid}) -> + case catch emqx_session:discard(SPid, ConnPid) of + {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> + emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); + ok -> ok + end + end, lookup_session(ClientId)). %% @doc Try to resume a session. -spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}). @@ -116,19 +116,18 @@ close_session(SPid) when is_pid(SPid) -> register_session(ClientId, SessAttrs) when is_binary(ClientId) -> register_session({ClientId, self()}, SessAttrs); -register_session(Session = {ClientId, SPid}, SessAttrs) - when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION_TAB, Session), - ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}), - proplists:get_value(clean_start, SessAttrs, true) - andalso ets:insert(?SESSION_P_TAB, Session), - emqx_sm_registry:register_session(Session), +register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> + true = ets:insert(?SESSION_TAB, Session), + true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}), + true = proplists:get_value(clean_start, SessAttrs, true) + orelse ets:insert(?SESSION_P_TAB, Session), + ok = emqx_sm_registry:register_session(Session), notify({registered, ClientId, SPid}). %% @doc Get session attrs -spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())). get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - safe_lookup_element(?SESSION_ATTRS_TAB, Session, []). + emqx_tables:lookup_value(?SESSION_ATTRS_TAB, Session, []). %% @doc Set session attrs -spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()}, @@ -144,17 +143,21 @@ unregister_session(ClientId) when is_binary(ClientId) -> unregister_session({ClientId, self()}); unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - emqx_sm_registry:unregister_session(Session), - ets:delete(?SESSION_STATS_TAB, Session), - ets:delete(?SESSION_ATTRS_TAB, Session), - ets:delete_object(?SESSION_P_TAB, Session), - ets:delete_object(?SESSION_TAB, Session), + ok = do_unregister_session(Session), notify({unregistered, ClientId, SPid}). +%% @private +do_unregister_session(Session) -> + true = ets:delete(?SESSION_STATS_TAB, Session), + true = ets:delete(?SESSION_ATTRS_TAB, Session), + true = ets:delete_object(?SESSION_P_TAB, Session), + true = ets:delete_object(?SESSION_TAB, Session), + emqx_sm_registry:unregister_session(Session). + %% @doc Get session stats -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - safe_lookup_element(?SESSION_STATS_TAB, Session, []). + emqx_tables:lookup_value(?SESSION_STATS_TAB, Session, []). %% @doc Set session stats -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()}, @@ -168,7 +171,7 @@ set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), i -spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})). lookup_session(ClientId) -> case emqx_sm_registry:is_enabled() of - true -> emqx_sm_registry:lookup_session(ClientId); + true -> emqx_sm_registry:lookup_session(ClientId); false -> ets:lookup(?SESSION_TAB, ClientId) end. @@ -185,13 +188,7 @@ dispatch(ClientId, Topic, Msg) -> %% @doc Lookup session pid. -spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined). lookup_session_pid(ClientId) -> - safe_lookup_element(?SESSION_TAB, ClientId, undefined). - -safe_lookup_element(Tab, Key, Default) -> - try ets:lookup_element(Tab, Key, 2) - catch - error:badarg -> Default - end. + emqx_tables:lookup_value(?SESSION_TAB, ClientId). notify(Event) -> gen_server:cast(?SM, {notify, Event}). @@ -207,29 +204,34 @@ init([]) -> ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), - {ok, #{session_pmon => emqx_pmon:new()}}. + {ok, #{sess_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[SM] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) -> - {noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}}; +handle_cast({notify, {registered, ClientId, SPid}}, State = #{sess_pmon := PMon}) -> + {noreply, State#{sess_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) -> - {noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}}; +handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{sess_pmon := PMon}) -> + {noreply, State#{sess_pmon := emqx_pmon:demonitor(SPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[SM] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) -> +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{sess_pmon := PMon}) -> case emqx_pmon:find(DownPid, PMon) of undefined -> {noreply, State}; - ClientId -> - unregister_session({ClientId, DownPid}), - {noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}} + ClientId -> + Session = {ClientId, DownPid}, + case ets:member(?SESSION_ATTRS_TAB, Session) of + true -> + ok = emqx_pool:async_submit(fun do_unregister_session/1, [Session]); + false -> ok + end, + {noreply, State#{sess_pmon := emqx_pmon:erase(DownPid, PMon)}} end; handle_info(Info, State) -> diff --git a/src/emqx_sm_locker.erl b/src/emqx_sm_locker.erl index 29adf3342..409331b88 100644 --- a/src/emqx_sm_locker.erl +++ b/src/emqx_sm_locker.erl @@ -21,7 +21,7 @@ -export([trans/2, trans/3]). -export([lock/1, lock/2, unlock/1]). --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> ekka_locker:start_link(?MODULE). diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index b503d71c8..3b472e2c8 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -41,8 +41,7 @@ start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). -spec(is_enabled() -> boolean()). -is_enabled() -> - ets:info(?TAB, name) =/= undefined. +is_enabled() -> ets:info(?TAB, name) =/= undefined. -spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), session_pid()})). @@ -73,7 +72,7 @@ init([]) -> {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), ok = ekka_mnesia:copy_table(?TAB), - _ = ekka:monitor(membership), + ok = ekka:monitor(membership), {ok, #{}}. handle_call(Req, _From, State) ->