From d0658476657fce46c3c9810fe763a5e61c0dce07 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Jun 2019 10:28:23 +0800 Subject: [PATCH] Remove session record --- ...x_sm_registry.erl => emqx_cm_registry.erl} | 0 src/emqx_cm_sup.erl | 49 ++-- src/emqx_session_sup.erl | 265 ------------------ src/emqx_sm_sup.erl | 64 ----- 4 files changed, 32 insertions(+), 346 deletions(-) rename src/{emqx_sm_registry.erl => emqx_cm_registry.erl} (100%) delete mode 100644 src/emqx_session_sup.erl delete mode 100644 src/emqx_sm_sup.erl diff --git a/src/emqx_sm_registry.erl b/src/emqx_cm_registry.erl similarity index 100% rename from src/emqx_sm_registry.erl rename to src/emqx_cm_registry.erl diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 19940da05..65702b26f 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_cm_sup). +-module(emqx_sm_sup). -behaviour(supervisor). @@ -24,26 +24,41 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Banned = #{id => banned, - start => {emqx_banned, start_link, []}, + %% Session locker + Locker = #{id => locker, + start => {emqx_sm_locker, start_link, []}, restart => permanent, - shutdown => 1000, + shutdown => 5000, type => worker, - modules => [emqx_banned]}, - FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), - Flapping = #{id => flapping, - start => {emqx_flapping, start_link, [FlappingOption]}, + modules => [emqx_sm_locker] + }, + %% Session registry + Registry = #{id => registry, + start => {emqx_sm_registry, start_link, []}, restart => permanent, - shutdown => 1000, + shutdown => 5000, type => worker, - modules => [emqx_flapping]}, + modules => [emqx_sm_registry] + }, + %% Session Manager Manager = #{id => manager, - start => {emqx_cm, start_link, []}, + start => {emqx_sm, start_link, []}, restart => permanent, - shutdown => 2000, + shutdown => 5000, type => worker, - modules => [emqx_cm]}, - SupFlags = #{strategy => one_for_one, - intensity => 100, - period => 10}, - {ok, {SupFlags, [Banned, Manager, Flapping]}}. + modules => [emqx_sm] + }, + %% Session Sup + SessSpec = #{start => {emqx_session, start_link, []}, + shutdown => brutal_kill, + clean_down => fun emqx_sm:clean_down/1 + }, + SessionSup = #{id => session_sup, + start => {emqx_session_sup, start_link, [SessSpec ]}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [emqx_session_sup] + }, + {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager, SessionSup]}}. + diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl deleted file mode 100644 index ad721d5fd..000000000 --- a/src/emqx_session_sup.erl +++ /dev/null @@ -1,265 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_session_sup). - --behaviour(gen_server). - --include("logger.hrl"). --include("types.hrl"). - --export([start_link/1]). - --export([ start_session/1 - , count_sessions/0 - ]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --type(shutdown() :: brutal_kill | infinity | pos_integer()). - --record(state, - { sessions :: #{pid() => emqx_types:client_id()} - , mfargs :: mfa() - , shutdown :: shutdown() - , clean_down :: fun() - }). - --define(SUP, ?MODULE). --define(BATCH_EXIT, 100000). - -%% @doc Start session supervisor. --spec(start_link(map()) -> startlink_ret()). -start_link(SessSpec) when is_map(SessSpec) -> - gen_server:start_link({local, ?SUP}, ?MODULE, [SessSpec], []). - -%%------------------------------------------------------------------------------ -%% API -%%------------------------------------------------------------------------------ - -%% @doc Start a session. --spec(start_session(map()) -> startlink_ret()). -start_session(SessAttrs) -> - gen_server:call(?SUP, {start_session, SessAttrs}, infinity). - -%% @doc Count sessions. --spec(count_sessions() -> non_neg_integer()). -count_sessions() -> - gen_server:call(?SUP, count_sessions, infinity). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Spec]) -> - process_flag(trap_exit, true), - MFA = maps:get(start, Spec), - Shutdown = maps:get(shutdown, Spec, brutal_kill), - CleanDown = maps:get(clean_down, Spec, undefined), - State = #state{sessions = #{}, - mfargs = MFA, - shutdown = Shutdown, - clean_down = CleanDown - }, - {ok, State}. - -handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From, - State = #state{sessions = SessMap, mfargs = {M, F, Args}}) -> - try erlang:apply(M, F, [SessAttrs | Args]) of - {ok, Pid} -> - reply({ok, Pid}, State#state{sessions = maps:put(Pid, ClientId, SessMap)}); - ignore -> - reply(ignore, State); - {error, Reason} -> - reply({error, Reason}, State) - catch - _:Error:Stk -> - ?LOG(error, "[Session Supervisor] Failed to start session ~p: ~p, stacktrace:~n~p", - [ClientId, Error, Stk]), - reply({error, Error}, State) - end; - -handle_call(count_sessions, _From, State = #state{sessions = SessMap}) -> - {reply, maps:size(SessMap), State}; - -handle_call(Req, _From, State) -> - ?LOG(error, "[Session Supervisor] Unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "[Session Supervisor] Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) -> - SessPids = [Pid | drain_exit(?BATCH_EXIT, [])], - {SessItems, SessMap1} = erase_all(SessPids, SessMap), - (CleanDown =:= undefined) - orelse emqx_pool:async_submit( - fun lists:foreach/2, [CleanDown, SessItems]), - {noreply, State#state{sessions = SessMap1}}; - -handle_info(Info, State) -> - ?LOG(notice, "[Session Supervisor] Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, State) -> - terminate_children(State). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -drain_exit(0, Acc) -> - lists:reverse(Acc); -drain_exit(Cnt, Acc) -> - receive - {'EXIT', Pid, _Reason} -> - drain_exit(Cnt - 1, [Pid|Acc]) - after 0 -> - lists:reverse(Acc) - end. - -erase_all(Pids, Map) -> - lists:foldl( - fun(Pid, {Acc, M}) -> - case maps:take(Pid, M) of - {Val, M1} -> - {[{Val, Pid}|Acc], M1}; - error -> - {Acc, M} - end - end, {[], Map}, Pids). - -terminate_children(State = #state{sessions = SessMap, shutdown = Shutdown}) -> - {Pids, EStack0} = monitor_children(SessMap), - Sz = sets:size(Pids), - EStack = - case Shutdown of - brutal_kill -> - sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), - wait_children(Shutdown, Pids, Sz, undefined, EStack0); - infinity -> - sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), - wait_children(Shutdown, Pids, Sz, undefined, EStack0); - Time when is_integer(Time) -> - sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), - TRef = erlang:start_timer(Time, self(), kill), - wait_children(Shutdown, Pids, Sz, TRef, EStack0) - end, - %% Unroll stacked errors and report them - dict:fold(fun(Reason, Pid, _) -> - report_error(connection_shutdown_error, Reason, Pid, State) - end, ok, EStack). - -monitor_children(SessMap) -> - lists:foldl( - fun(Pid, {Pids, EStack}) -> - case monitor_child(Pid) of - ok -> - {sets:add_element(Pid, Pids), EStack}; - {error, normal} -> - {Pids, EStack}; - {error, Reason} -> - {Pids, dict:append(Reason, Pid, EStack)} - end - end, {sets:new(), dict:new()}, maps:keys(SessMap)). - -%% Help function to shutdown/2 switches from link to monitor approach -monitor_child(Pid) -> - %% Do the monitor operation first so that if the child dies - %% before the monitoring is done causing a 'DOWN'-message with - %% reason noproc, we will get the real reason in the 'EXIT'-message - %% unless a naughty child has already done unlink... - erlang:monitor(process, Pid), - unlink(Pid), - - receive - %% If the child dies before the unlik we must empty - %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. - {'EXIT', Pid, Reason} -> - receive - {'DOWN', _, process, Pid, _} -> - {error, Reason} - end - after 0 -> - %% If a naughty child did unlink and the child dies before - %% monitor the result will be that shutdown/2 receives a - %% 'DOWN'-message with reason noproc. - %% If the child should die after the unlink there - %% will be a 'DOWN'-message with a correct reason - %% that will be handled in shutdown/2. - ok - end. - -wait_children(_Shutdown, _Pids, 0, undefined, EStack) -> - EStack; -wait_children(_Shutdown, _Pids, 0, TRef, EStack) -> - %% If the timer has expired before its cancellation, we must empty the - %% mail-box of the 'timeout'-message. - erlang:cancel_timer(TRef), - receive - {timeout, TRef, kill} -> - EStack - after 0 -> - EStack - end; - -%%TODO: Copied from supervisor.erl, rewrite it later. -wait_children(brutal_kill, Pids, Sz, TRef, EStack) -> - receive - {'DOWN', _MRef, process, Pid, killed} -> - wait_children(brutal_kill, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); - - {'DOWN', _MRef, process, Pid, Reason} -> - wait_children(brutal_kill, sets:del_element(Pid, Pids), - Sz-1, TRef, dict:append(Reason, Pid, EStack)) - end; - -wait_children(Shutdown, Pids, Sz, TRef, EStack) -> - receive - {'DOWN', _MRef, process, Pid, shutdown} -> - wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); - {'DOWN', _MRef, process, Pid, normal} -> - wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); - {'DOWN', _MRef, process, Pid, Reason} -> - wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, - TRef, dict:append(Reason, Pid, EStack)); - {timeout, TRef, kill} -> - sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), - wait_children(Shutdown, Pids, Sz-1, undefined, EStack) - end. - -report_error(Error, Reason, Pid, #state{mfargs = MFA}) -> - SupName = list_to_atom("esockd_connection_sup - " ++ pid_to_list(self())), - ErrorMsg = [{supervisor, SupName}, - {errorContext, Error}, - {reason, Reason}, - {offender, [{pid, Pid}, - {name, connection}, - {mfargs, MFA}]}], - error_logger:error_report(supervisor_report, ErrorMsg). - -reply(Repy, State) -> - {reply, Repy, State}. - diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl deleted file mode 100644 index 65702b26f..000000000 --- a/src/emqx_sm_sup.erl +++ /dev/null @@ -1,64 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_sm_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - %% Session locker - Locker = #{id => locker, - start => {emqx_sm_locker, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sm_locker] - }, - %% Session registry - Registry = #{id => registry, - start => {emqx_sm_registry, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sm_registry] - }, - %% Session Manager - Manager = #{id => manager, - start => {emqx_sm, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sm] - }, - %% Session Sup - SessSpec = #{start => {emqx_session, start_link, []}, - shutdown => brutal_kill, - clean_down => fun emqx_sm:clean_down/1 - }, - SessionSup = #{id => session_sup, - start => {emqx_session_sup, start_link, [SessSpec ]}, - restart => transient, - shutdown => infinity, - type => supervisor, - modules => [emqx_session_sup] - }, - {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager, SessionSup]}}. -