Add connection/session shutdown policy

The hibernation behaviour is also changed (implicitly) in this commit:
Prior to this change, connection/session always hibernates after
the stats timer expires regardless of messages in mailbox.
After this commit, connection/session process only goes to hibernate
when the timer expires AND there is nothing left in the mailbox to
process
This commit is contained in:
spring2maz 2018-09-18 22:53:37 +02:00
parent f9f09f66dd
commit 6fca651a84
6 changed files with 154 additions and 7 deletions

View File

@ -1767,6 +1767,7 @@ end}.
%% @doc Max message queue length for connection/session process. %% @doc Max message queue length for connection/session process.
%% NOTE: Message queue here is the Erlang process mailbox, but not %% NOTE: Message queue here is the Erlang process mailbox, but not
%% the number of MQTT queued messages. %% the number of MQTT queued messages.
%% Set to 0 or negative to have it disabled
{mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ {mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [
{default, 100000}, {default, 100000},
{datatype, integer} {datatype, integer}

View File

@ -214,9 +214,17 @@ handle_info({timeout, Timer, emit_stats},
proto_state = ProtoState proto_state = ProtoState
}) -> }) ->
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
NewState = State#state{stats_timer = undefined},
case meqx_misc:conn_proc_mng_policy() of
continue ->
{noreply, NewState};
hibernate ->
ok = emqx_gc:reset(), ok = emqx_gc:reset(),
{noreply, State#state{stats_timer = undefined}, hibernate}; {noreply, NewState, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
shutdown(Reason, NewState)
end;
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);

View File

@ -15,7 +15,7 @@
-module(emqx_misc). -module(emqx_misc).
-export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
proc_name/2, proc_stats/0, proc_stats/1]). proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]).
%% @doc Merge options %% @doc Merge options
-spec(merge_opts(list(), list()) -> list()). -spec(merge_opts(list(), list()) -> list()).
@ -59,3 +59,42 @@ proc_stats(Pid) ->
{value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
[{mailbox_len, V} | Stats1]. [{mailbox_len, V} | Stats1].
-define(DISABLED, 0).
%% @doc Check self() process status against connection/session process management policy,
%% return `continue | hibernate | {shutdown, Reason}' accordingly.
%% `continue': There is nothing out of the ordinary
%% `hibernate': Nothing to process in my mailbox (and since this check is triggered
%% by a timer, we assume it is a fat chance to continue idel, hence hibernate.
%% `shutdown': Some numbers (message queue length or heap size have hit the limit,
%% hence shutdown for greater good (system stability).
-spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}).
conn_proc_mng_policy() ->
MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED),
Qlength = proc_info(message_queue_len),
Checks =
[{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end,
{shutdown, message_queue_too_long}},
{fun() -> is_heap_size_too_large() end,
{shutdown, total_heap_size_too_large}},
{fun() -> Qlength > 0 end, continue},
{fun() -> true end, hibernate}
],
check(Checks).
check([{Pred, Result} | Rest]) ->
case Pred() of
true -> Result;
false -> check(Rest)
end.
is_heap_size_too_large() ->
MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED),
is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize.
is_enabled(Max) -> Max > ?DISABLED.
proc_info(Key) ->
{Key, Value} = erlang:process_info(self(), Key),
Value.

View File

@ -574,9 +574,17 @@ handle_info({timeout, Timer, emit_stats},
State = #state{client_id = ClientId, State = #state{client_id = ClientId,
stats_timer = Timer}) -> stats_timer = Timer}) ->
_ = emqx_sm:set_session_stats(ClientId, stats(State)), _ = emqx_sm:set_session_stats(ClientId, stats(State)),
NewState = State#state{stats_timer = undefined},
case emqx_misc:conn_proc_mng_policy() of
continue ->
{noreply, NewState};
hibernate ->
ok = emqx_gc:reset(), %% going to hibernate, reset gc stats ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
{noreply, State#state{stats_timer = undefined}, hibernate}; {noreply, NewState, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
shutdown(Reason, NewState)
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "expired, shutdown now:(", [], State), ?LOG(info, "expired, shutdown now:(", [], State),
shutdown(expired, State); shutdown(expired, State);

54
test/emqx_misc_tests.erl Normal file
View File

@ -0,0 +1,54 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_misc_tests).
-include_lib("eunit/include/eunit.hrl").
shutdown_disabled_test() ->
with_env(
[{conn_max_msg_queue_len, 0},
{conn_max_total_heap_size, 0}],
fun() ->
self() ! foo,
?assertEqual(continue, conn_proc_mng_policy()),
receive foo -> ok end,
?assertEqual(hibernate, conn_proc_mng_policy())
end).
message_queue_too_long_test() ->
with_env(
[{conn_max_msg_queue_len, 1},
{conn_max_total_heap_size, 0}],
fun() ->
self() ! foo,
self() ! bar,
?assertEqual({shutdown, message_queue_too_long},
conn_proc_mng_policy()),
receive foo -> ok end,
?assertEqual(continue, conn_proc_mng_policy()),
receive bar -> ok end
end).
total_heap_size_too_large_test() ->
with_env(
[{conn_max_msg_queue_len, 0},
{conn_max_total_heap_size, 1}],
fun() ->
?assertEqual({shutdown, total_heap_size_too_large},
conn_proc_mng_policy())
end).
with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F).
conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy().

37
test/emqx_test_lib.erl Normal file
View File

@ -0,0 +1,37 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_test_lib).
-export([ with_env/2
]).
with_env([], F) -> F();
with_env([{Key, Val} | Rest], F) ->
Origin = get_env(Key),
try
ok = set_env(Key, Val),
with_env(Rest, F)
after
case Origin of
undefined -> ok = unset_env(Key);
_ -> ok = set_env(Key, Origin)
end
end.
get_env(Key) -> application:get_env(?APPLICATION, Key).
set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val).
unset_env(Key) -> application:unset_env(?APPLICATION, Key).