From 892d9439b9144ce88b2187f7ab02c4e8cb7cf0c4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 19 Dec 2018 16:49:35 +0800 Subject: [PATCH] Implement a new session supervisor. (#2077) --- src/emqx_session_sup.erl | 256 +++++++++++++++++++++++++++++++++++++++ src/emqx_sm.erl | 39 ++---- src/emqx_sm_sup.erl | 47 ++++--- 3 files changed, 297 insertions(+), 45 deletions(-) create mode 100644 src/emqx_session_sup.erl diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl new file mode 100644 index 000000000..efa64815b --- /dev/null +++ b/src/emqx_session_sup.erl @@ -0,0 +1,256 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_session_sup). + +-behaviour(gen_server). + +-export([start_link/1]). +-export([start_session/1, count_sessions/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-type(shutdown() :: brutal_kill | infinity | pos_integer()). + +-record(state, { + sessions :: #{pid() => emqx_types:client_id()}, + mfargs :: mfa(), + shutdown :: shutdown(), + clean_down :: fun() + }). + +-define(SUP, ?MODULE). +-define(BATCH_EXIT, 100000). +-define(ERROR_MSG(Format, Args), + error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])). + +%% @doc Start session supervisor. +-spec(start_link(map()) -> emqx_types:startlink_ret()). +start_link(SessSpec) when is_map(SessSpec) -> + gen_server:start_link({local, ?SUP}, ?MODULE, [SessSpec], []). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +%% @doc Start a session. +-spec(start_session(map()) -> emqx_types:startlink_ret()). +start_session(SessAttrs) -> + gen_server:call(?SUP, {start_session, SessAttrs}, infinity). + +%% @doc Count sessions. +-spec(count_sessions() -> non_neg_integer()). +count_sessions() -> + gen_server:call(?SUP, count_sessions, infinity). + +%%------------------------------------------------------------------------------ +%% gen_server callbacks +%%------------------------------------------------------------------------------ + +init([Spec]) -> + process_flag(trap_exit, true), + MFA = maps:get(start, Spec), + Shutdown = maps:get(shutdown, Spec, brutal_kill), + CleanDown = maps:get(clean_down, Spec, undefined), + State = #state{sessions = #{}, + mfargs = MFA, + shutdown = Shutdown, + clean_down = CleanDown + }, + {ok, State}. + +handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From, + State = #state{sessions = SessMap, mfargs = {M, F, Args}}) -> + try erlang:apply(M, F, [SessAttrs | Args]) of + {ok, Pid} -> + reply({ok, Pid}, State#state{sessions = maps:put(Pid, ClientId, SessMap)}); + ignore -> + reply(ignore, State); + {error, Reason} -> + reply({error, Reason}, State) + catch + _:Error:Stk -> + ?ERROR_MSG("Failed to start session ~p: ~p, stacktrace:~n~p", + [ClientId, Error, Stk]), + reply({error, Error}, State) + end; + +handle_call(count_sessions, _From, State = #state{sessions = SessMap}) -> + {reply, maps:size(SessMap), State}; + +handle_call(Req, _From, State) -> + ?ERROR_MSG("unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?ERROR_MSG("unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) -> + SessPids = [Pid | drain_exit(?BATCH_EXIT, [])], + {SessItems, SessMap1} = erase_all(SessPids, SessMap), + (CleanDown =:= undefined) + orelse emqx_pool:async_submit( + fun lists:foreach/2, [CleanDown, SessItems]), + {noreply, State#state{sessions = SessMap1}}; + +handle_info(Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, State) -> + terminate_children(State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +drain_exit(0, Acc) -> + lists:reverse(Acc); +drain_exit(Cnt, Acc) -> + receive + {'EXIT', Pid, _Reason} -> + drain_exit(Cnt - 1, [Pid|Acc]) + after 0 -> + lists:reverse(Acc) + end. + +erase_all(Pids, Map) -> + lists:foldl( + fun(Pid, {Acc, M}) -> + case maps:take(Pid, M) of + {Val, M1} -> + {[{Val, Pid}|Acc], M1}; + error -> + {Acc, M} + end + end, {[], Map}, Pids). + +terminate_children(State = #state{sessions = SessMap, shutdown = Shutdown}) -> + {Pids, EStack0} = monitor_children(SessMap), + Sz = sets:size(Pids), + EStack = + case Shutdown of + brutal_kill -> + sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_children(Shutdown, Pids, Sz, undefined, EStack0); + infinity -> + sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + wait_children(Shutdown, Pids, Sz, undefined, EStack0); + Time when is_integer(Time) -> + sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + TRef = erlang:start_timer(Time, self(), kill), + wait_children(Shutdown, Pids, Sz, TRef, EStack0) + end, + %% Unroll stacked errors and report them + dict:fold(fun(Reason, Pid, _) -> + report_error(connection_shutdown_error, Reason, Pid, State) + end, ok, EStack). + +monitor_children(SessMap) -> + lists:foldl( + fun(Pid, {Pids, EStack}) -> + case monitor_child(Pid) of + ok -> + {sets:add_element(Pid, Pids), EStack}; + {error, normal} -> + {Pids, EStack}; + {error, Reason} -> + {Pids, dict:append(Reason, Pid, EStack)} + end + end, {sets:new(), dict:new()}, maps:keys(SessMap)). + +%% Help function to shutdown/2 switches from link to monitor approach +monitor_child(Pid) -> + %% Do the monitor operation first so that if the child dies + %% before the monitoring is done causing a 'DOWN'-message with + %% reason noproc, we will get the real reason in the 'EXIT'-message + %% unless a naughty child has already done unlink... + erlang:monitor(process, Pid), + unlink(Pid), + + receive + %% If the child dies before the unlik we must empty + %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. + {'EXIT', Pid, Reason} -> + receive + {'DOWN', _, process, Pid, _} -> + {error, Reason} + end + after 0 -> + %% If a naughty child did unlink and the child dies before + %% monitor the result will be that shutdown/2 receives a + %% 'DOWN'-message with reason noproc. + %% If the child should die after the unlink there + %% will be a 'DOWN'-message with a correct reason + %% that will be handled in shutdown/2. + ok + end. + +wait_children(_Shutdown, _Pids, 0, undefined, EStack) -> + EStack; +wait_children(_Shutdown, _Pids, 0, TRef, EStack) -> + %% If the timer has expired before its cancellation, we must empty the + %% mail-box of the 'timeout'-message. + erlang:cancel_timer(TRef), + receive + {timeout, TRef, kill} -> + EStack + after 0 -> + EStack + end; + +%%TODO: Copied from supervisor.erl, rewrite it later. +wait_children(brutal_kill, Pids, Sz, TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, killed} -> + wait_children(brutal_kill, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_children(brutal_kill, sets:del_element(Pid, Pids), + Sz-1, TRef, dict:append(Reason, Pid, EStack)) + end; + +wait_children(Shutdown, Pids, Sz, TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, shutdown} -> + wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); + {'DOWN', _MRef, process, Pid, normal} -> + wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); + {'DOWN', _MRef, process, Pid, Reason} -> + wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, + TRef, dict:append(Reason, Pid, EStack)); + {timeout, TRef, kill} -> + sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_children(Shutdown, Pids, Sz-1, undefined, EStack) + end. + +report_error(Error, Reason, Pid, #state{mfargs = MFA}) -> + SupName = list_to_atom("esockd_connection_sup - " ++ pid_to_list(self())), + ErrorMsg = [{supervisor, SupName}, + {errorContext, Error}, + {reason, Reason}, + {offender, [{pid, Pid}, + {name, connection}, + {mfargs, MFA}]}], + error_logger:error_report(supervisor_report, ErrorMsg). + +reply(Repy, State) -> + {reply, Repy, State}. + diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 97a1b689c..ab270f1af 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -37,6 +37,9 @@ %% Internal function for stats -export([stats_fun/0]). +%% Internal function for emqx_session_sup +-export([clean_down/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -60,7 +63,7 @@ start_link() -> open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> CleanStart = fun(_) -> ok = discard_session(ClientId, ConnPid), - emqx_session:start_link(SessAttrs) + emqx_session_sup:start_session(SessAttrs) end, emqx_sm_locker:trans(ClientId, CleanStart); @@ -70,7 +73,7 @@ open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> {ok, SessPid} -> {ok, SessPid, true}; {error, not_found} -> - emqx_session:start_link(SessAttrs) + emqx_session_sup:start_session(SessAttrs) end end, emqx_sm_locker:trans(ClientId, ResumeStart). @@ -130,8 +133,7 @@ register_session(ClientId) when is_binary(ClientId) -> register_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) -> Session = {ClientId, SessPid}, true = ets:insert(?SESSION_TAB, Session), - ok = emqx_sm_registry:register_session(Session), - notify({registered, ClientId, SessPid}). + emqx_sm_registry:register_session(Session). %% @doc Unregister a session -spec(unregister_session(emqx_types:client_id()) -> ok). @@ -140,11 +142,7 @@ unregister_session(ClientId) when is_binary(ClientId) -> -spec(unregister_session(emqx_types:client_id(), pid()) -> ok). unregister_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) -> - ok = do_unregister_session({ClientId, SessPid}), - notify({unregistered, SessPid}). - -%% @private -do_unregister_session(Session) -> + Session = {ClientId, SessPid}, true = ets:delete(?SESSION_STATS_TAB, Session), true = ets:delete(?SESSION_ATTRS_TAB, Session), true = ets:delete_object(?SESSION_P_TAB, Session), @@ -214,9 +212,6 @@ dispatch(ClientId, Topic, Msg) -> emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) end. -notify(Event) -> - gen_server:cast(?SM, {notify, Event}). - %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -228,29 +223,16 @@ init([]) -> ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), ok = emqx_stats:update_interval(sess_stats, fun ?MODULE:stats_fun/0), - {ok, #{sess_pmon => emqx_pmon:new()}}. + {ok, #{}}. handle_call(Req, _From, State) -> emqx_logger:error("[SM] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, SessPid}}, State = #{sess_pmon := PMon}) -> - {noreply, State#{sess_pmon := emqx_pmon:monitor(SessPid, ClientId, PMon)}}; - -handle_cast({notify, {unregistered, SessPid}}, State = #{sess_pmon := PMon}) -> - {noreply, State#{sess_pmon := emqx_pmon:demonitor(SessPid, PMon)}}; - handle_cast(Msg, State) -> emqx_logger:error("[SM] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{sess_pmon := PMon}) -> - SessPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], - {Items, PMon1} = emqx_pmon:erase_all(SessPids, PMon), - ok = emqx_pool:async_submit( - fun lists:foreach/2, [fun clean_down/1, Items]), - {noreply, State#{sess_pmon := PMon1}}; - handle_info(Info, State) -> emqx_logger:error("[SM] unexpected info: ~p", [Info]), {noreply, State}. @@ -265,12 +247,11 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -clean_down({SessPid, ClientId}) -> - Session = {ClientId, SessPid}, +clean_down(Session = {ClientId, SessPid}) -> case ets:member(?SESSION_TAB, ClientId) orelse ets:member(?SESSION_ATTRS_TAB, Session) of true -> - do_unregister_session(Session); + unregister_session(ClientId, SessPid); false -> ok end. diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index 0be9facb0..317cd11db 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -26,25 +26,40 @@ start_link() -> init([]) -> %% Session locker - Locker = #{id => locker, - start => {emqx_sm_locker, start_link, []}, - restart => permanent, + Locker = #{id => locker, + start => {emqx_sm_locker, start_link, []}, + restart => permanent, shutdown => 5000, - type => worker, - modules => [emqx_sm_locker]}, + type => worker, + modules => [emqx_sm_locker] + }, %% Session registry - Registry = #{id => registry, - start => {emqx_sm_registry, start_link, []}, - restart => permanent, + Registry = #{id => registry, + start => {emqx_sm_registry, start_link, []}, + restart => permanent, shutdown => 5000, - type => worker, - modules => [emqx_sm_registry]}, + type => worker, + modules => [emqx_sm_registry] + }, %% Session Manager - Manager = #{id => manager, - start => {emqx_sm, start_link, []}, - restart => permanent, + Manager = #{id => manager, + start => {emqx_sm, start_link, []}, + restart => permanent, shutdown => 5000, - type => worker, - modules => [emqx_sm]}, - {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager]}}. + type => worker, + modules => [emqx_sm] + }, + %% Session Sup + SessSpec = #{start => {emqx_session, start_link, []}, + shutdown => brutal_kill, + clean_down => fun emqx_sm:clean_down/1 + }, + SessionSup = #{id => session_sup, + start => {emqx_session_sup, start_link, [SessSpec ]}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [emqx_session_sup] + }, + {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager, SessionSup]}}.