From ebef0ec5542daa986b91cfa85a9219aafd5ca52b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 28 Oct 2019 07:51:09 +0800 Subject: [PATCH 1/8] Add zone_options module --- src/emqx_time.erl | 52 ----------------- src/emqx_zone_options.erl | 120 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 52 deletions(-) delete mode 100644 src/emqx_time.erl create mode 100644 src/emqx_zone_options.erl diff --git a/src/emqx_time.erl b/src/emqx_time.erl deleted file mode 100644 index 073ee7067..000000000 --- a/src/emqx_time.erl +++ /dev/null @@ -1,52 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_time). - --export([ seed/0 - , now_secs/0 - , now_secs/1 - , now_ms/0 - , now_ms/1 - ]). - --compile({inline, - [ seed/0 - , now_secs/0 - , now_secs/1 - , now_ms/0 - , now_ms/1 - ]}). - -seed() -> - rand:seed(exsplus, erlang:timestamp()). - --spec(now_secs() -> pos_integer()). -now_secs() -> - erlang:system_time(second). - --spec(now_secs(erlang:timestamp()) -> pos_integer()). -now_secs({MegaSecs, Secs, _MicroSecs}) -> - MegaSecs * 1000000 + Secs. - --spec(now_ms() -> pos_integer()). -now_ms() -> - erlang:system_time(millisecond). - --spec(now_ms(erlang:timestamp()) -> pos_integer()). -now_ms({MegaSecs, Secs, MicroSecs}) -> - (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). - diff --git a/src/emqx_zone_options.erl b/src/emqx_zone_options.erl new file mode 100644 index 000000000..37844bfbb --- /dev/null +++ b/src/emqx_zone_options.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_zone_options). + +-compile(inline). + +-include("types.hrl"). +-include("emqx_mqtt.hrl"). + +-export([ idle_timeout/1 + , publish_limit/1 + , mqtt_frame_options/1 + , mqtt_strict_mode/1 + , max_packet_size/1 + , mountpoint/1 + , use_username_as_clientid/1 + , enable_stats/1 + , enable_acl/1 + , enable_ban/1 + , enable_flapping_detect/1 + , ignore_loop_deliver/1 + , server_keepalive/1 + , keepalive_backoff/1 + , max_inflight/1 + , session_expiry_interval/1 + , force_gc_policy/1 + , force_shutdown_policy/1 + ]). + +-import(emqx_zone, [get_env/2, get_env/3]). + +-define(DEFAULT_IDLE_TIMEOUT, 30000). + +-spec(idle_timeout(emqx_zone:zone()) -> pos_integer()). +idle_timeout(Zone) -> + get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT). + +-spec(publish_limit(emqx_zone:zone()) -> maybe(esockd_rate_limit:bucket())). +publish_limit(Zone) -> + get_env(Zone, publish_limit). + +-spec(mqtt_frame_options(emqx_zone:zone()) -> emqx_frame:options()). +mqtt_frame_options(Zone) -> + #{strict_mode => mqtt_strict_mode(Zone), + max_size => max_packet_size(Zone) + }. + +-spec(mqtt_strict_mode(emqx_zone:zone()) -> boolean()). +mqtt_strict_mode(Zone) -> + get_env(Zone, mqtt_strict_mode, false). + +-spec(max_packet_size(emqx_zone:zone()) -> integer()). +max_packet_size(Zone) -> + get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE). + +-spec(mountpoint(emqx_zone:zone()) -> maybe(emqx_mountpoint:mountpoint())). +mountpoint(Zone) -> get_env(Zone, mountpoint). + +-spec(use_username_as_clientid(emqx_zone:zone()) -> boolean()). +use_username_as_clientid(Zone) -> + get_env(Zone, use_username_as_clientid, false). + +-spec(enable_stats(emqx_zone:zone()) -> boolean()). +enable_stats(Zone) -> + get_env(Zone, enable_stats, true). + +-spec(enable_acl(emqx_zone:zone()) -> boolean()). +enable_acl(Zone) -> + get_env(Zone, enable_acl, true). + +-spec(enable_ban(emqx_zone:zone()) -> boolean()). +enable_ban(Zone) -> + get_env(Zone, enable_ban, false). + +-spec(enable_flapping_detect(emqx_zone:zone()) -> boolean()). +enable_flapping_detect(Zone) -> + get_env(Zone, enable_flapping_detect, false). + +-spec(ignore_loop_deliver(emqx_zone:zone()) -> boolean()). +ignore_loop_deliver(Zone) -> + get_env(Zone, ignore_loop_deliver, false). + +-spec(server_keepalive(emqx_zone:zone()) -> pos_integer()). +server_keepalive(Zone) -> + get_env(Zone, server_keepalive). + +-spec(keepalive_backoff(emqx_zone:zone()) -> float()). +keepalive_backoff(Zone) -> + get_env(Zone, keepalive_backoff, 0.75). + +-spec(max_inflight(emqx_zone:zone()) -> 0..65535). +max_inflight(Zone) -> + get_env(Zone, max_inflight, 65535). + +-spec(session_expiry_interval(emqx_zone:zone()) -> non_neg_integer()). +session_expiry_interval(Zone) -> + get_env(Zone, session_expiry_interval, 0). + +-spec(force_gc_policy(emqx_zone:zone()) -> maybe(emqx_gc:opts())). +force_gc_policy(Zone) -> + get_env(Zone, force_gc_policy). + +-spec(force_shutdown_policy(emqx_zone:zone()) -> maybe(emqx_oom:opts())). +force_shutdown_policy(Zone) -> + get_env(Zone, force_shutdown_policy). + From 4c9dda105fd332b22a43eb702aaa32c418e680ec Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:00:11 +0800 Subject: [PATCH 2/8] Add 'active_n' option for WebSocket listener --- etc/emqx.conf | 14 ++++- priv/emqx.schema | 19 ++++-- src/emqx_zone_options.erl | 120 -------------------------------------- test/emqx_time_SUITE.erl | 34 ----------- 4 files changed, 27 insertions(+), 160 deletions(-) delete mode 100644 src/emqx_zone_options.erl delete mode 100644 test/emqx_time_SUITE.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index c2d2ad75f..6f28bf59d 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1349,9 +1349,14 @@ listener.ws.external.max_connections = 102400 ## Value: Number listener.ws.external.max_conn_rate = 1000 +## Simulate the {active, N} option for the MQTT/WebSocket connections. +## +## Value: Number +listener.ws.external.active_n = 100 + ## Rate limit for the MQTT/WebSocket connections. ## -## Value: limit,duration +## Value: Limit,Duration ## Default: 100KB incoming per 10 seconds. ## listener.ws.external.rate_limit = 100KB,10s @@ -1557,9 +1562,14 @@ listener.wss.external.max_connections = 16 ## Value: Number listener.wss.external.max_conn_rate = 1000 +## Simulate the {active, N} option for the MQTT/WebSocket/SSL connections. +## +## Value: Number +listener.wss.external.active_n = 100 + ## Rate limit for the MQTT/WebSocket/SSL connections. ## -## Value: limit,duration +## Value: Limit,Duration ## Default: 100KB incoming per 10 seconds. ## listener.wss.external.rate_limit = 100KB,10s diff --git a/priv/emqx.schema b/priv/emqx.schema index d751ecd06..6ac622ea7 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -959,17 +959,18 @@ end}. {force_gc_policy, GcPolicy}; ("force_shutdown_policy", "default") -> {DefaultLen, DefaultSize} = - case erlang:system_info(wordsize) of + case WordSize = erlang:system_info(wordsize) of 8 -> % arch_64 {8000, cuttlefish_bytesize:parse("800MB")}; 4 -> % arch_32 {1000, cuttlefish_bytesize:parse("100MB")} end, {force_shutdown_policy, #{message_queue_len => DefaultLen, - max_heap_size => DefaultSize}}; + max_heap_size => DefaultSize div WordSize + }}; ("force_shutdown_policy", Val) -> [Len, Siz] = string:tokens(Val, "| "), - MaxSiz = case erlang:system_info(wordsize) of + MaxSiz = case WordSize = erlang:system_info(wordsize) of 8 -> % arch_64 (1 bsl 59) - 1; 4 -> % arch_32 @@ -983,7 +984,7 @@ end}. cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); Siz1 -> #{message_queue_len => list_to_integer(Len), - max_heap_size => Siz1} + max_heap_size => Siz1 div WordSize} end, {force_shutdown_policy, ShutdownPolicy}; ("mqueue_priorities", Val) -> @@ -1289,6 +1290,11 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ws.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + {mapping, "listener.ws.$name.zone", "emqx.listeners", [ {datatype, string} ]}. @@ -1442,6 +1448,11 @@ end}. {datatype, integer} ]}. +{mapping, "listener.wss.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + {mapping, "listener.wss.$name.zone", "emqx.listeners", [ {datatype, string} ]}. diff --git a/src/emqx_zone_options.erl b/src/emqx_zone_options.erl deleted file mode 100644 index 37844bfbb..000000000 --- a/src/emqx_zone_options.erl +++ /dev/null @@ -1,120 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_zone_options). - --compile(inline). - --include("types.hrl"). --include("emqx_mqtt.hrl"). - --export([ idle_timeout/1 - , publish_limit/1 - , mqtt_frame_options/1 - , mqtt_strict_mode/1 - , max_packet_size/1 - , mountpoint/1 - , use_username_as_clientid/1 - , enable_stats/1 - , enable_acl/1 - , enable_ban/1 - , enable_flapping_detect/1 - , ignore_loop_deliver/1 - , server_keepalive/1 - , keepalive_backoff/1 - , max_inflight/1 - , session_expiry_interval/1 - , force_gc_policy/1 - , force_shutdown_policy/1 - ]). - --import(emqx_zone, [get_env/2, get_env/3]). - --define(DEFAULT_IDLE_TIMEOUT, 30000). - --spec(idle_timeout(emqx_zone:zone()) -> pos_integer()). -idle_timeout(Zone) -> - get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT). - --spec(publish_limit(emqx_zone:zone()) -> maybe(esockd_rate_limit:bucket())). -publish_limit(Zone) -> - get_env(Zone, publish_limit). - --spec(mqtt_frame_options(emqx_zone:zone()) -> emqx_frame:options()). -mqtt_frame_options(Zone) -> - #{strict_mode => mqtt_strict_mode(Zone), - max_size => max_packet_size(Zone) - }. - --spec(mqtt_strict_mode(emqx_zone:zone()) -> boolean()). -mqtt_strict_mode(Zone) -> - get_env(Zone, mqtt_strict_mode, false). - --spec(max_packet_size(emqx_zone:zone()) -> integer()). -max_packet_size(Zone) -> - get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE). - --spec(mountpoint(emqx_zone:zone()) -> maybe(emqx_mountpoint:mountpoint())). -mountpoint(Zone) -> get_env(Zone, mountpoint). - --spec(use_username_as_clientid(emqx_zone:zone()) -> boolean()). -use_username_as_clientid(Zone) -> - get_env(Zone, use_username_as_clientid, false). - --spec(enable_stats(emqx_zone:zone()) -> boolean()). -enable_stats(Zone) -> - get_env(Zone, enable_stats, true). - --spec(enable_acl(emqx_zone:zone()) -> boolean()). -enable_acl(Zone) -> - get_env(Zone, enable_acl, true). - --spec(enable_ban(emqx_zone:zone()) -> boolean()). -enable_ban(Zone) -> - get_env(Zone, enable_ban, false). - --spec(enable_flapping_detect(emqx_zone:zone()) -> boolean()). -enable_flapping_detect(Zone) -> - get_env(Zone, enable_flapping_detect, false). - --spec(ignore_loop_deliver(emqx_zone:zone()) -> boolean()). -ignore_loop_deliver(Zone) -> - get_env(Zone, ignore_loop_deliver, false). - --spec(server_keepalive(emqx_zone:zone()) -> pos_integer()). -server_keepalive(Zone) -> - get_env(Zone, server_keepalive). - --spec(keepalive_backoff(emqx_zone:zone()) -> float()). -keepalive_backoff(Zone) -> - get_env(Zone, keepalive_backoff, 0.75). - --spec(max_inflight(emqx_zone:zone()) -> 0..65535). -max_inflight(Zone) -> - get_env(Zone, max_inflight, 65535). - --spec(session_expiry_interval(emqx_zone:zone()) -> non_neg_integer()). -session_expiry_interval(Zone) -> - get_env(Zone, session_expiry_interval, 0). - --spec(force_gc_policy(emqx_zone:zone()) -> maybe(emqx_gc:opts())). -force_gc_policy(Zone) -> - get_env(Zone, force_gc_policy). - --spec(force_shutdown_policy(emqx_zone:zone()) -> maybe(emqx_oom:opts())). -force_shutdown_policy(Zone) -> - get_env(Zone, force_shutdown_policy). - diff --git a/test/emqx_time_SUITE.erl b/test/emqx_time_SUITE.erl deleted file mode 100644 index e190c7f78..000000000 --- a/test/emqx_time_SUITE.erl +++ /dev/null @@ -1,34 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_time_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -t_seed(_) -> - ?assert(is_tuple(emqx_time:seed())). - -t_now_secs(_) -> - ?assert(emqx_time:now_secs() =< emqx_time:now_secs(os:timestamp())). - -t_now_ms(_) -> - ?assert(emqx_time:now_ms() =< emqx_time:now_ms(os:timestamp())). - From 223163d5b9366ed429d587a24058163e6832ea50 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:00:35 +0800 Subject: [PATCH 3/8] Remove the 'emqx_oom' module --- src/emqx_oom.erl | 97 ------------------------------------------------ 1 file changed, 97 deletions(-) delete mode 100644 src/emqx_oom.erl diff --git a/src/emqx_oom.erl b/src/emqx_oom.erl deleted file mode 100644 index efc0a4c69..000000000 --- a/src/emqx_oom.erl +++ /dev/null @@ -1,97 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @doc OOM (Out Of Memory) monitor for the channel process. -%% @end -%%-------------------------------------------------------------------- - --module(emqx_oom). - --include("types.hrl"). - --export([ init/1 - , check/1 - , info/1 - ]). - --export_type([opts/0, oom_policy/0]). - --type(opts() :: #{message_queue_len => non_neg_integer(), - max_heap_size => non_neg_integer() - }). - --opaque(oom_policy() :: {oom_policy, opts()}). - --type(reason() :: message_queue_too_long|proc_heap_too_large). - --define(DISABLED, 0). - -%% @doc Init the OOM policy. --spec(init(opts()) -> oom_policy()). -init(#{message_queue_len := MaxQLen, - max_heap_size := MaxHeapSizeInBytes}) -> - MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), - %% If set to zero, the limit is disabled. - _ = erlang:process_flag(max_heap_size, #{size => MaxHeapSize, - kill => false, - error_logger => true - }), - {oom_policy, #{message_queue_len => MaxQLen, - max_heap_size => MaxHeapSize - }}. - -%% @doc Check self() process status against channel process management policy, -%% return `ok | {shutdown, Reason}' accordingly. -%% `ok': There is nothing out of the ordinary. -%% `shutdown': Some numbers (message queue length hit the limit), -%% hence shutdown for greater good (system stability). --spec(check(oom_policy()) -> ok | {shutdown, reason()}). -check({oom_policy, #{message_queue_len := MaxQLen, - max_heap_size := MaxHeapSize}}) -> - Qlength = proc_info(message_queue_len), - HeapSize = proc_info(total_heap_size), - do_check([{fun() -> is_exceeded(Qlength, MaxQLen) end, - {shutdown, message_queue_too_long}}, - {fun() -> is_exceeded(HeapSize, MaxHeapSize) end, - {shutdown, proc_heap_too_large}}]). - -do_check([]) -> ok; -do_check([{Pred, Result} | Rest]) -> - case Pred() of - true -> Result; - false -> do_check(Rest) - end. - --spec(info(oom_policy()) -> opts()). -info({oom_policy, Opts}) -> Opts. - --compile({inline, - [ is_exceeded/2 - , is_enabled/1 - , proc_info/1 - ]}). - -is_exceeded(Val, Max) -> - is_enabled(Max) andalso Val > Max. - -is_enabled(Max) -> - is_integer(Max) andalso Max > ?DISABLED. - -proc_info(Key) -> - {Key, Value} = erlang:process_info(self(), Key), - Value. - From 605a03453efb3a412fb52f772b8f31ac6b9dbaaa Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:07:34 +0800 Subject: [PATCH 4/8] Remove the 'emqx_time' module and use 'erlang:system_time/1' --- src/emqx_alarm_handler.erl | 2 +- src/emqx_flapping.erl | 9 +++++---- src/emqx_mod_presence.erl | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index c01003c0d..2131bd177 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -158,7 +158,7 @@ encode_alarm({AlarmId, #alarm{severity = Severity, {desc, [{severity, Severity}, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, - {timestamp, emqx_time:now_ms(Ts)}]}]); + {timestamp, emqx_misc:now_to_secs(Ts)}]}]); encode_alarm({AlarmId, undefined}) -> emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]); encode_alarm({AlarmId, AlarmDesc}) -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index e058f0cda..93c0a3194 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -99,7 +99,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost}, %% Create a flapping record. Flapping = #flapping{clientid = ClientId, peerhost = PeerHost, - started_at = emqx_time:now_ms(), + started_at = erlang:system_time(millisecond), detect_cnt = 1 }, true = ets:insert(?FLAPPING_TAB, Flapping), @@ -111,7 +111,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost}, get_policy() -> emqx:get_env(flapping_detect_policy, ?DEFAULT_DETECT_POLICY). -now_diff(TS) -> emqx_time:now_ms() - TS. +now_diff(TS) -> erlang:system_time(millisecond) - TS. %%-------------------------------------------------------------------- %% gen_server callbacks @@ -143,7 +143,7 @@ handle_cast({detected, Flapping = #flapping{clientid = ClientId, [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]), %% Banned. BannedFlapping = Flapping#flapping{clientid = {banned, ClientId}, - banned_at = emqx_time:now_ms() + banned_at = erlang:system_time(millisecond) }, alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}), ets:insert(?FLAPPING_TAB, BannedFlapping); @@ -160,7 +160,8 @@ handle_cast(Msg, State) -> handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) -> with_flapping_tab(fun expire_flapping/2, - [emqx_time:now_ms(), get_policy()]), + [erlang:system_time(millisecond), + get_policy()]), {noreply, ensure_timer(State#{tref => undefined}), hibernate}; handle_info(Info, State) -> diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index d9f6d5c75..b1e3ff8b0 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -62,7 +62,7 @@ on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) -> connack => ConnAck, clean_start => CleanStart, expiry_interval => ExpiryInterval, - ts => emqx_time:now_ms() + ts => erlang:system_time(millisecond) }, case emqx_json:safe_encode(Presence) of {ok, Payload} -> @@ -78,7 +78,7 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> Presence = #{clientid => ClientId, username => Username, reason => reason(Reason), - ts => emqx_time:now_ms() + ts => erlang:system_time(millisecond) }, case emqx_json:safe_encode(Presence) of {ok, Payload} -> From 30adfc18e684df3e9f470fc8943aa165e0cfd9da Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:08:38 +0800 Subject: [PATCH 5/8] Remove 'gc_state' and 'pub_stats' from channel's state --- src/emqx_channel.erl | 127 ++++++++++--------------------------------- 1 file changed, 29 insertions(+), 98 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 99a595929..eedeacf06 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -40,15 +40,11 @@ , handle_in/2 , handle_out/2 , handle_call/2 - , handle_info/2 , handle_timeout/3 + , handle_info/2 , terminate/2 ]). --export([ recvd/2 - , sent/2 - ]). - %% export for ct -export([set_field/3]). @@ -75,14 +71,10 @@ topic_aliases :: maybe(map()), %% MQTT Topic Alias Maximum alias_maximum :: maybe(map()), - %% Publish Stats - pub_stats :: emqx_types:stats(), %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% Conn State conn_state :: conn_state(), - %% GC State - gc_state :: maybe(emqx_gc:gc_state()), %% Takeover takeover :: boolean(), %% Resume @@ -103,7 +95,6 @@ -type(output() :: emqx_types:packet() | action() | [action()]). -define(TIMER_TABLE, #{ - stats_timer => emit_stats, alive_timer => keepalive, retry_timer => retry_delivery, await_timer => expire_awaiting_rel, @@ -113,8 +104,7 @@ -define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]). --define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, - alias_maximum, gc_state]). +-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, alias_maximum]). %%-------------------------------------------------------------------- %% Info, Attrs and Caps @@ -146,12 +136,8 @@ info(will_msg, #channel{will_msg = undefined}) -> undefined; info(will_msg, #channel{will_msg = WillMsg}) -> emqx_message:to_map(WillMsg); -info(pub_stats, #channel{pub_stats = PubStats}) -> - PubStats; info(timers, #channel{timers = Timers}) -> - Timers; -info(gc_state, #channel{gc_state = GcState}) -> - maybe_apply(fun emqx_gc:info/1, GcState). + Timers. %% @doc Get attrs of the channel. -spec(attrs(channel()) -> emqx_types:attrs()). @@ -164,10 +150,8 @@ attrs(session, #channel{session = Session}) -> attrs(Key, Channel) -> info(Key, Channel). -spec(stats(channel()) -> emqx_types:stats()). -stats(#channel{pub_stats = PubStats, session = undefined}) -> - maps:to_list(PubStats); -stats(#channel{pub_stats = PubStats, session = Session}) -> - maps:to_list(PubStats) ++ emqx_session:stats(Session). +stats(#channel{session = Session})-> + emqx_session:stats(Session). -spec(caps(channel()) -> emqx_types:caps()). caps(#channel{clientinfo = #{zone := Zone}}) -> @@ -194,7 +178,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> _ -> undefined end, Protocol = maps:get(protocol, ConnInfo, mqtt), - MountPoint = emqx_zone:get_env(Zone, mountpoint), + MountPoint = emqx_zone:mountpoint(Zone), ClientInfo = #{zone => Zone, protocol => Protocol, peerhost => PeerHost, @@ -205,16 +189,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> is_bridge => false, is_superuser => false }, - StatsTimer = case emqx_zone:enable_stats(Zone) of - true -> undefined; - false -> disabled - end, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, - pub_stats = #{}, - timers = #{stats_timer => StatsTimer}, + timers = #{}, conn_state = idle, - gc_state = init_gc_state(Zone), takeover = false, resuming = false, pendings = [] @@ -223,17 +201,10 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> peer_cert_as_username(Options) -> proplists:get_value(peer_cert_as_username, Options). -init_gc_state(Zone) -> - maybe_apply(fun emqx_gc:init/1, emqx_zone:force_gc_policy(Zone)). - %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- --spec(recvd(pos_integer(), channel()) -> channel()). -recvd(Bytes, Channel) -> - ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)). - -spec(handle_in(emqx_types:packet(), channel()) -> {ok, channel()} | {ok, output(), channel()} @@ -258,74 +229,69 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> end; handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> - NChannel = inc_pub_stats(publish_in, Channel), case emqx_packet:check(Packet) of - ok -> handle_publish(Packet, NChannel); + ok -> handle_publish(Packet, Channel); {error, ReasonCode} -> - handle_out(disconnect, ReasonCode, NChannel) + handle_out(disconnect, ReasonCode, Channel) end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - NChannel = inc_pub_stats(puback_in, Channel), case emqx_session:puback(PacketId, Session) of {ok, Msg, Publishes, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - handle_out({publish, Publishes}, NChannel#channel{session = NSession}); + handle_out({publish, Publishes}, Channel#channel{session = NSession}); {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - {ok, NChannel#channel{session = NSession}}; + {ok, Channel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.puback.inuse'), - {ok, NChannel}; + {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), - {ok, NChannel} + {ok, Channel} end; handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - Channel1 = inc_pub_stats(pubrec_in, Channel), case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - NChannel = Channel1#channel{session = NSession}, + NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]), ok = emqx_metrics:inc('packets.pubrec.inuse'), - handle_out(pubrel, {PacketId, RC}, Channel1); + handle_out(pubrel, {PacketId, RC}, Channel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.pubrec.missed'), - handle_out(pubrel, {PacketId, RC}, Channel1) + handle_out(pubrel, {PacketId, RC}, Channel) end; handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - Channel1 = inc_pub_stats(pubrel_in, Channel), case emqx_session:pubrel(PacketId, Session) of {ok, NSession} -> - Channel2 = Channel1#channel{session = NSession}, - handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel2); + NChannel = Channel#channel{session = NSession}, + handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, NotFound} -> ok = emqx_metrics:inc('packets.pubrel.missed'), ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), - handle_out(pubcomp, {PacketId, NotFound}, Channel1) + handle_out(pubcomp, {PacketId, NotFound}, Channel) end; handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - Channel1 = inc_pub_stats(pubcomp_in, Channel), case emqx_session:pubcomp(PacketId, Session) of {ok, Publishes, NSession} -> - handle_out({publish, Publishes}, Channel1#channel{session = NSession}); + handle_out({publish, Publishes}, Channel#channel{session = NSession}); {ok, NSession} -> - {ok, Channel1#channel{session = NSession}}; + {ok, Channel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.pubcomp.missed'), - {ok, Channel1} + {ok, Channel} end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), @@ -422,11 +388,6 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, %% Process Publish %%-------------------------------------------------------------------- -inc_pub_stats(Key, Channel) -> inc_pub_stats(Key, 1, Channel). -inc_pub_stats(Key, I, Channel = #channel{pub_stats = PubStats}) -> - NPubStats = maps:update_with(Key, fun(V) -> V+I end, I, PubStats), - Channel#channel{pub_stats = NPubStats}. - handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) -> case pipeline([fun process_alias/2, @@ -540,10 +501,6 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %% Handle outgoing packet %%-------------------------------------------------------------------- --spec(sent(pos_integer(), channel()) -> channel()). -sent(Bytes, Channel) -> - ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)). - -spec(handle_out(term(), channel()) -> {ok, channel()} | {ok, output(), channel()} @@ -578,8 +535,7 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> {ok, _Ch} -> Acc end end, [], Publishes), - NChannel = inc_pub_stats(publish_out, length(Packets), Channel), - {ok, {outgoing, lists:reverse(Packets)}, NChannel}; + {ok, {outgoing, lists:reverse(Packets)}, Channel}; %% Ignore loop deliver handle_out({publish, _PacketId, #message{from = ClientId, @@ -635,16 +591,16 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel); handle_out(puback, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)}; + {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubrec, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)}; + {ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubrel, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)}; + {ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubcomp, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)}; + {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel}; handle_out(suback, {PacketId, ReasonCodes}, Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> @@ -747,11 +703,6 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; -handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := ClientId}}) -> - ok = emqx_cm:register_channel(ClientId), - emqx_cm:set_chan_attrs(ClientId, Attrs), - emqx_cm:set_chan_stats(ClientId, Stats); - handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; @@ -788,12 +739,6 @@ handle_info(Info, Channel) -> -> {ok, channel()} | {ok, Result :: term(), channel()} | {shutdown, Reason :: term(), channel()}). -handle_timeout(TRef, {emit_stats, Stats}, - Channel = #channel{clientinfo = #{clientid := ClientId}, - timers = #{stats_timer := TRef}}) -> - ok = emqx_cm:set_chan_stats(ClientId, Stats), - {ok, clean_timer(stats_timer, Channel)}; - handle_timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive, timers = #{alive_timer := TRef}}) -> @@ -873,8 +818,6 @@ reset_timer(Name, Time, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. -interval(stats_timer, #channel{clientinfo = #{zone := Zone}}) -> - emqx_zone:get_env(Zone, idle_timeout, 30000); interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> @@ -912,6 +855,7 @@ publish_will_msg(undefined) -> publish_will_msg(Msg) -> emqx_broker:publish(Msg). + %% @doc Enrich MQTT Connect Info. enrich_conninfo(#mqtt_packet_connect{ proto_name = ProtoName, @@ -1171,7 +1115,7 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), + Backoff = emqx_zone:keepalive_backoff(Zone), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). @@ -1197,19 +1141,6 @@ is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). -%%-------------------------------------------------------------------- -%% Maybe GC and Check OOM -%%-------------------------------------------------------------------- - -maybe_gc_and_check_oom(_Oct, Channel = #channel{gc_state = undefined}) -> - Channel; -maybe_gc_and_check_oom(Oct, Channel = #channel{clientinfo = #{zone := Zone}, - gc_state = GCSt}) -> - {IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt), - IsGC andalso emqx_metrics:inc('channel.gc.cnt'), - IsGC andalso emqx_zone:check_oom(Zone, fun(Shutdown) -> self() ! Shutdown end), - Channel#channel{gc_state = GCSt1}. - %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- From d256387cee336896290140a87553f462c95c8685 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:09:45 +0800 Subject: [PATCH 6/8] Ensure the 'inc_sent/1', 'inc_recv/1' to return 'ok' --- src/emqx_metrics.erl | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index db42cd1e8..14c812703 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -281,7 +281,7 @@ do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) -> ?QOS_0 -> inc('messages.qos0.received'); ?QOS_1 -> inc('messages.qos1.received'); ?QOS_2 -> inc('messages.qos2.received'); - _ -> ignore + _ -> ok end, inc('packets.publish.received'); do_inc_recv(?PACKET(?PUBACK)) -> @@ -302,13 +302,12 @@ do_inc_recv(?PACKET(?DISCONNECT)) -> inc('packets.disconnect.received'); do_inc_recv(?PACKET(?AUTH)) -> inc('packets.auth.received'); -do_inc_recv(_Packet) -> - ignore. +do_inc_recv(_Packet) -> ok. %% @doc Inc packets sent. Will not count $SYS PUBLISH. --spec(inc_sent(emqx_types:packet()) -> ok | ignore). +-spec(inc_sent(emqx_types:packet()) -> ok). inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) -> - ignore; + ok; inc_sent(Packet) -> inc('packets.sent'), do_inc_sent(Packet). @@ -349,8 +348,7 @@ do_inc_sent(?PACKET(?DISCONNECT)) -> inc('packets.disconnect.sent'); do_inc_sent(?PACKET(?AUTH)) -> inc('packets.auth.sent'); -do_inc_sent(_Packet) -> - ignore. +do_inc_sent(_Packet) -> ok. %%-------------------------------------------------------------------- %% gen_server callbacks From 6f30dca4ba2a0c40ecc4ecc28e3b46d0ea90aabe Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:10:26 +0800 Subject: [PATCH 7/8] Add more option APIs --- src/emqx_zone.erl | 104 +++++++++++++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 28 deletions(-) diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 90d56eb0e..386b71af7 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -25,27 +25,58 @@ -logger_header("[Zone]"). +-compile({inline, + [ idle_timeout/1 + , publish_limit/1 + , mqtt_frame_options/1 + , mqtt_strict_mode/1 + , max_packet_size/1 + , mountpoint/1 + , use_username_as_clientid/1 + , stats_timer/1 + , enable_stats/1 + , enable_acl/1 + , enable_ban/1 + , enable_flapping_detect/1 + , ignore_loop_deliver/1 + , server_keepalive/1 + , keepalive_backoff/1 + , max_inflight/1 + , session_expiry_interval/1 + , force_gc_policy/1 + , force_shutdown_policy/1 + ]}). + %% APIs -export([start_link/0, stop/0]). --export([ frame_options/1 +%% Zone Option API +-export([ idle_timeout/1 + , publish_limit/1 + , mqtt_frame_options/1 , mqtt_strict_mode/1 , max_packet_size/1 + , mountpoint/1 , use_username_as_clientid/1 + , stats_timer/1 , enable_stats/1 , enable_acl/1 , enable_ban/1 , enable_flapping_detect/1 , ignore_loop_deliver/1 , server_keepalive/1 + , keepalive_backoff/1 , max_inflight/1 , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 ]). --export([check_oom/2]). +-export([ init_gc_state/1 + , oom_policy/1 + ]). +%% Zone API -export([ get_env/2 , get_env/3 , set_env/3 @@ -63,27 +94,46 @@ , code_change/3 ]). --export_type([zone/0]). +-import(emqx_misc, [maybe_apply/2]). -%% dummy state --record(state, {}). +-export_type([zone/0]). -type(zone() :: atom()). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). +-define(DEFAULT_IDLE_TIMEOUT, 30000). -define(KEY(Zone, Key), {?MODULE, Zone, Key}). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(frame_options(zone()) -> emqx_frame:options()). -frame_options(Zone) -> +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER). + +-spec(init_gc_state(zone()) -> emqx_gc:gc_state()). +init_gc_state(Zone) -> + maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)). + +-spec(oom_policy(zone()) -> emqx_types:oom_policy()). +oom_policy(Zone) -> force_shutdown_policy(Zone). + +%%-------------------------------------------------------------------- +%% Zone Options API +%%-------------------------------------------------------------------- + +-spec(idle_timeout(zone()) -> pos_integer()). +idle_timeout(Zone) -> + get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT). + +-spec(publish_limit(zone()) -> maybe(esockd_rate_limit:config())). +publish_limit(Zone) -> + get_env(Zone, publish_limit). + +-spec(mqtt_frame_options(zone()) -> emqx_frame:options()). +mqtt_frame_options(Zone) -> #{strict_mode => mqtt_strict_mode(Zone), max_size => max_packet_size(Zone) }. @@ -96,10 +146,17 @@ mqtt_strict_mode(Zone) -> max_packet_size(Zone) -> get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE). +-spec(mountpoint(zone()) -> maybe(emqx_mountpoint:mountpoint())). +mountpoint(Zone) -> get_env(Zone, mountpoint). + -spec(use_username_as_clientid(zone()) -> boolean()). use_username_as_clientid(Zone) -> get_env(Zone, use_username_as_clientid, false). +-spec(stats_timer(zone()) -> undefined | disabled). +stats_timer(Zone) -> + case enable_stats(Zone) of true -> undefined; false -> disabled end. + -spec(enable_stats(zone()) -> boolean()). enable_stats(Zone) -> get_env(Zone, enable_stats, true). @@ -124,6 +181,10 @@ ignore_loop_deliver(Zone) -> server_keepalive(Zone) -> get_env(Zone, server_keepalive). +-spec(keepalive_backoff(zone()) -> float()). +keepalive_backoff(Zone) -> + get_env(Zone, keepalive_backoff, 0.75). + -spec(max_inflight(zone()) -> 0..65535). max_inflight(Zone) -> get_env(Zone, max_inflight, 65535). @@ -140,18 +201,9 @@ force_gc_policy(Zone) -> force_shutdown_policy(Zone) -> get_env(Zone, force_shutdown_policy). --spec(check_oom(zone(), fun()) -> ok | term()). -check_oom(Zone, Action) -> - case emqx_zone:force_shutdown_policy(Zone) of - undefined -> ok; - Policy -> do_check_oom(emqx_oom:init(Policy), Action) - end. - -do_check_oom(OomPolicy, Action) -> - case emqx_oom:check(OomPolicy) of - ok -> ok; - Shutdown -> Action(Shutdown) - end. +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- -spec(get_env(maybe(zone()), atom()) -> maybe(term())). get_env(undefined, Key) -> emqx:get_env(Key); @@ -179,17 +231,13 @@ unset_env(Zone, Key) -> force_reload() -> gen_server:call(?SERVER, force_reload). --spec(stop() -> ok). -stop() -> - gen_server:stop(?SERVER, normal, infinity). - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> _ = do_reload(), - {ok, #state{}}. + {ok, #{}}. handle_call(force_reload, _From, State) -> _ = do_reload(), From 2b1b58fc66f3e31893e809e51fc074f550fb65e4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Nov 2019 08:10:58 +0800 Subject: [PATCH 8/8] Add the new 'emqx_limiter' module --- src/emqx_cm.erl | 3 +- src/emqx_connection.erl | 390 +++++++++++++++++++++---------------- src/emqx_gc.erl | 16 +- src/emqx_limiter.erl | 73 +++++++ src/emqx_misc.erl | 114 +++++++---- src/emqx_pd.erl | 8 +- src/emqx_types.erl | 6 + src/emqx_ws_connection.erl | 286 ++++++++++++++++++++------- 8 files changed, 611 insertions(+), 285 deletions(-) create mode 100644 src/emqx_limiter.erl diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index d61b56b44..aaa79e78b 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------- %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -93,7 +93,6 @@ start_link() -> %%-------------------------------------------------------------------- %% @doc Register a channel. -%% Channel will be unregistered automatically when the channel process dies -spec(register_channel(emqx_types:clientid()) -> ok). register_channel(ClientId) when is_binary(ClientId) -> register_channel(ClientId, self()). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index b557d42b7..f8d984e4b 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT/TCP Connection +%% MQTT/TCP|TLS Connection -module(emqx_connection). -include("emqx.hrl"). @@ -40,7 +40,7 @@ -export([call/2]). -%% callback +%% Callback -export([init/4]). %% Sys callbacks @@ -50,12 +50,15 @@ , system_get_state/1 ]). -%% Internal callbacks --export([wakeup_from_hib/2]). +%% Internal callback +-export([wakeup_from_hib/3]). + +-import(emqx_misc, + [ maybe_apply/2 + , start_timer/2 + ]). -record(state, { - %% Parent - parent :: pid(), %% TCP/TLS Transport transport :: esockd:transport(), %% TCP/TLS Socket @@ -64,34 +67,37 @@ peername :: emqx_types:peername(), %% Sockname of the connection sockname :: emqx_types:peername(), - %% Sock state + %% Sock State sockstate :: emqx_types:sockstate(), %% The {active, N} option active_n :: pos_integer(), - %% Publish Limit - pub_limit :: maybe(esockd_rate_limit:bucket()), - %% Rate Limit - rate_limit :: maybe(esockd_rate_limit:bucket()), + %% Limiter + limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer limit_timer :: maybe(reference()), - %% Parser State + %% Parse State parse_state :: emqx_frame:parse_state(), %% Serialize function serialize :: emqx_frame:serialize_fun(), %% Channel State channel :: emqx_channel:channel(), - %% Idle timer - idle_timer :: reference() + %% GC State + gc_state :: maybe(emqx_gc:gc_state()), + %% Stats Timer + stats_timer :: disabled | maybe(reference()), + %% Idle Timer + idle_timer :: maybe(reference()) }). -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, - pub_limit, rate_limit]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). +-define(ENABLED(X), (X =/= undefined)). + -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> @@ -123,13 +129,8 @@ info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; -info(pub_limit, #state{pub_limit = PubLimit}) -> - limit_info(PubLimit); -info(rate_limit, #state{rate_limit = RateLimit}) -> - limit_info(RateLimit). - -limit_info(Limit) -> - emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). +info(limiter, #state{limiter = Limiter}) -> + maybe_apply(fun emqx_limiter:info/1, Limiter). %% @doc Get stats of the connection/channel. -spec(stats(pid()|state()) -> emqx_types:stats()). @@ -147,6 +148,13 @@ stats(#state{transport = Transport, ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). +attrs(#state{active_n = ActiveN, sockstate = SockSt, channel = Channel}) -> + SockAttrs = #{active_n => ActiveN, + sockstate => SockSt + }, + ChanAttrs = emqx_channel:attrs(Channel), + maps:merge(ChanAttrs, #{sockinfo => SockAttrs}). + call(Pid, Req) -> gen_server:call(Pid, Req, infinity). @@ -169,7 +177,6 @@ init(Parent, Transport, RawSocket, Options) -> do_init(Parent, Transport, Socket, Options) -> {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), - emqx_logger:set_metadata_peername(esockd_net:format(Peername)), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), ConnInfo = #{socktype => Transport:type(Socket), peername => Peername, @@ -179,42 +186,39 @@ do_init(Parent, Transport, Socket, Options) -> }, Zone = proplists:get_value(zone, Options), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), - PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), - RateLimit = init_limiter(proplists:get_value(rate_limit, Options)), - FrameOpts = emqx_zone:frame_options(Zone), + Limiter = emqx_limiter:init(Options), + FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), Channel = emqx_channel:init(ConnInfo, Options), - IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout), - HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2), - State = #state{parent = Parent, - transport = Transport, + GcState = emqx_zone:init_gc_state(Zone), + StatsTimer = emqx_zone:stats_timer(Zone), + IdleTimeout = emqx_zone:idle_timeout(Zone), + IdleTimer = start_timer(IdleTimeout, idle_timeout), + emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)), + emqx_logger:set_metadata_peername(esockd_net:format(Peername)), + State = #state{transport = Transport, socket = Socket, peername = Peername, sockname = Sockname, sockstate = idle, active_n = ActiveN, - pub_limit = PubLimit, - rate_limit = RateLimit, + limiter = Limiter, parse_state = ParseState, serialize = Serialize, channel = Channel, + gc_state = GcState, + stats_timer = StatsTimer, idle_timer = IdleTimer }, case activate_socket(State) of {ok, NState} -> - hibernate(NState, #{hibernate_after => HibAfterTimeout}); + hibernate(Parent, NState, #{idle_timeout => IdleTimeout}); {error, Reason} -> ok = Transport:fast_close(Socket), exit_on_sock_error(Reason) end. --compile({inline, [init_limiter/1]}). -init_limiter(undefined) -> undefined; -init_limiter({Rate, Burst}) -> - esockd_rate_limit:new(Rate, Burst). - exit_on_sock_error(Reason) when Reason =:= einval; Reason =:= enotconn; Reason =:= closed -> @@ -227,8 +231,7 @@ exit_on_sock_error(Reason) -> %%-------------------------------------------------------------------- %% Recv Loop -recvloop(State = #state{parent = Parent}, - Options = #{hibernate_after := HibAfterTimeout}) -> +recvloop(Parent, State, Options = #{idle_timeout := IdleTimeout}) -> receive {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, @@ -236,33 +239,49 @@ recvloop(State = #state{parent = Parent}, {'EXIT', Parent, Reason} -> terminate(Reason, State); Msg -> - process_msg([Msg], State, Options) + NState = ensure_stats_timer(IdleTimeout, State), + process_msg([Msg], Parent, NState, Options) after - HibAfterTimeout -> - hibernate(State, Options) + IdleTimeout -> + NState = cancel_stats_timer(State), + hibernate(Parent, NState, Options) end. -hibernate(State, Options) -> - proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]). +hibernate(Parent, State, Options) -> + proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State, Options]). -wakeup_from_hib(State, Options) -> +wakeup_from_hib(Parent, State, Options) -> %% Maybe do something later here. - recvloop(State, Options). + recvloop(Parent, State, Options). + +%%-------------------------------------------------------------------- +%% Ensure/cancel stats timer + +-compile({inline, [ensure_stats_timer/2]}). +ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> + State#state{stats_timer = start_timer(Timeout, emit_stats)}; +ensure_stats_timer(_Timeout, State) -> State. + +-compile({inline, [cancel_stats_timer/1]}). +cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -> + ok = emqx_misc:cancel_timer(TRef), + State#state{stats_timer = undefined}; +cancel_stats_timer(State) -> State. %%-------------------------------------------------------------------- %% Process next Msg -process_msg([], State, Options) -> - recvloop(State, Options); +process_msg([], Parent, State, Options) -> + recvloop(Parent, State, Options); -process_msg([Msg|More], State, Options) -> +process_msg([Msg|More], Parent, State, Options) -> case catch handle_msg(Msg, State) of ok -> - process_msg(More, State, Options); + process_msg(More, Parent, State, Options); {ok, NState} -> - process_msg(More, NState, Options); - {ok, NextMsgs, NState} -> - process_msg(append_msg(NextMsgs, More), NState, Options); + process_msg(More, Parent, NState, Options); + {ok, Msgs, NState} -> + process_msg(append_msg(Msgs, More), Parent, NState, Options); {stop, Reason} -> terminate(Reason, State); {stop, Reason, NState} -> @@ -284,14 +303,12 @@ handle_msg({'$gen_call', From, Req}, State) -> stop(Reason, NState) end; -handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel}) - when Inet == tcp; Inet == ssl -> +handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), - emqx_pd:update_counter(incoming_bytes, Oct), + emqx_pd:inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), - NChannel = emqx_channel:recvd(Oct, Channel), - parse_incoming(Data, State#state{channel = NChannel}); + parse_incoming(Data, State); handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> @@ -302,6 +319,9 @@ handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, }, handle_incoming(Packet, NState); +handle_msg({incoming, ?PACKET(?PINGREQ)}, State) -> + handle_outgoing(?PACKET(?PINGRESP), State); + handle_msg({incoming, Packet}, State) -> handle_incoming(Packet, State); @@ -315,30 +335,34 @@ handle_msg({Closed, _Sock}, State) handle_msg({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive -> - %% Rate limit here:) - NState = ensure_rate_limit(State), - handle_info(activate_socket, NState); - -%% Rate limit timer expired. -handle_msg(activate_socket, State) -> - NState = State#state{sockstate = idle, - limit_timer = undefined - }, - handle_info(activate_socket, NState); + InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs), + oct => emqx_pd:reset_counter(incoming_bytes) + }, + %% Ensure Rate Limit + NState = ensure_rate_limit(InStats, State), + %% Run GC and Check OOM + NState1 = check_oom(run_gc(InStats, NState)), + handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, State = #state{channel = Channel}) -> - Delivers = emqx_misc:drain_deliver([Deliver]), - Result = emqx_channel:handle_out(Delivers, Channel), - handle_return(Result, State); + Delivers = [Deliver|emqx_misc:drain_deliver()], + Ret = emqx_channel:handle_out(Delivers, Channel), + handle_chan_return(Ret, State); handle_msg({outgoing, Packets}, State) -> - NState = handle_outgoing(Packets, State), - {ok, NState}; + handle_outgoing(Packets, State); -%% something sent -handle_msg({inet_reply, _Sock, ok}, _State) -> - ok; +%% Something sent +handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> + case emqx_pd:get_counter(outgoing_pubs) > ActiveN of + true -> + OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), + oct => emqx_pd:reset_counter(outgoing_bytes) + }, + {ok, check_oom(run_gc(OutStats, State))}; + false -> ok + end; handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({sock_error, Reason}, State); @@ -349,7 +373,8 @@ handle_msg({timeout, TRef, TMsg}, State) -> handle_msg(Shutdown = {shutdown, _Reason}, State) -> stop(Shutdown, State); -handle_msg(Msg, State) -> handle_info(Msg, State). +handle_msg(Msg, State) -> + handle_info(Msg, State). %%-------------------------------------------------------------------- %% Terminate @@ -363,8 +388,8 @@ terminate(Reason, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- %% Sys callbacks -system_continue(_Parent, _Deb, {State, Options}) -> - recvloop(State, Options). +system_continue(Parent, _Deb, {State, Options}) -> + recvloop(Parent, State, Options). system_terminate(Reason, _Parent, _Deb, {State, _}) -> terminate(Reason, State). @@ -392,8 +417,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> shutdown(Reason, Reply, State#state{channel = NChannel}); {shutdown, Reason, Reply, OutPacket, NChannel} -> NState = State#state{channel = NChannel}, - NState1 = handle_outgoing(OutPacket, NState), - shutdown(Reason, Reply, NState1) + ok = handle_outgoing(OutPacket, NState), + shutdown(Reason, Reply, NState) end. %%-------------------------------------------------------------------- @@ -402,8 +427,18 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> shutdown(idle_timeout, State); -handle_timeout(TRef, emit_stats, State) -> - handle_timeout(TRef, {emit_stats, stats(State)}, State); +handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) -> + NState = State#state{sockstate = idle, + limit_timer = undefined + }, + handle_info(activate_socket, NState); + +handle_timeout(TRef, emit_stats, State = #state{stats_timer = TRef, + channel = Channel}) -> + #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel), + (ClientId =/= undefined) andalso + emqx_cm:set_chan_stats(ClientId, stats(State)), + {ok, State#state{stats_timer = undefined}}; handle_timeout(TRef, keepalive, State = #state{transport = Transport, socket = Socket}) -> @@ -415,7 +450,8 @@ handle_timeout(TRef, keepalive, State = #state{transport = Transport, end; handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State). + Ret = emqx_channel:handle_timeout(TRef, Msg, Channel), + handle_chan_return(Ret, State). %%-------------------------------------------------------------------- %% Parse incoming data @@ -450,30 +486,30 @@ next_incoming_msgs(Packets) -> %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> - _ = inc_incoming_stats(Type), - ok = emqx_metrics:inc_recv(Packet), +handle_incoming(Packet, State = #state{channel = Channel}) + when is_record(Packet, mqtt_packet) -> + ok = inc_incoming_stats(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - handle_return(emqx_channel:handle_in(Packet, Channel), State); + handle_chan_return(emqx_channel:handle_in(Packet, Channel), State); handle_incoming(FrameError, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_in(FrameError, Channel), State). + handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State). %%-------------------------------------------------------------------- %% Handle channel return -handle_return(ok, State) -> +handle_chan_return(ok, State) -> {ok, State}; -handle_return({ok, NChannel}, State) -> +handle_chan_return({ok, NChannel}, State) -> {ok, State#state{channel = NChannel}}; -handle_return({ok, Replies, NChannel}, State) -> +handle_chan_return({ok, Replies, NChannel}, State) -> {ok, next_msgs(Replies), State#state{channel = NChannel}}; -handle_return({shutdown, Reason, NChannel}, State) -> +handle_chan_return({shutdown, Reason, NChannel}, State) -> shutdown(Reason, State#state{channel = NChannel}); -handle_return({shutdown, Reason, OutPacket, NChannel}, State) -> +handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) -> NState = State#state{channel = NChannel}, - NState1 = handle_outgoing(OutPacket, NState), - shutdown(Reason, NState1). + ok = handle_outgoing(OutPacket, NState), + shutdown(Reason, NState). %%-------------------------------------------------------------------- %% Handle outgoing packets @@ -485,14 +521,13 @@ handle_outgoing(Packet, State) -> send((serialize_and_inc_stats_fun(State))(Packet), State). serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> - fun(Packet = ?PACKET(Type)) -> + fun(Packet) -> case Serialize(Packet) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), <<>>; - Data -> _ = inc_outgoing_stats(Type), - _ = emqx_metrics:inc_sent(Packet), - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + ok = inc_outgoing_stats(Packet), Data end end. @@ -500,52 +535,52 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %%-------------------------------------------------------------------- %% Send data -send(IoData, State = #state{transport = Transport, - socket = Socket, - channel = Channel}) -> +-spec(send(iodata(), state()) -> ok). +send(IoData, #state{transport = Transport, socket = Socket}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), + emqx_pd:inc_counter(outgoing_bytes, Oct), case Transport:async_send(Socket, IoData) of - ok -> - State#state{channel = emqx_channel:sent(Oct, Channel)}; + ok -> ok; Error = {error, _Reason} -> - %% Simulate an inet_reply to postpone handling the error - self() ! {inet_reply, Socket, Error}, State + %% Send an inet_reply to postpone handling the error + self() ! {inet_reply, Socket, Error}, + ok end. %%-------------------------------------------------------------------- %% Handle Info -handle_info({connack, ConnAck}, State = #state{active_n = ActiveN, - sockstate = SockSt, - channel = Channel}) -> - NState = handle_outgoing(ConnAck, State), - ChanAttrs = emqx_channel:attrs(Channel), - SockAttrs = #{active_n => ActiveN, - sockstate => SockSt - }, - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - handle_info({register, Attrs, stats(State)}, NState); +handle_info({connack, ConnAck}, State = #state{channel = Channel}) -> + #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel), + ok = emqx_cm:register_channel(ClientId), + ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)), + ok = emqx_cm:set_chan_stats(ClientId, stats(State)), + ok = handle_outgoing(ConnAck, State); -handle_info({enter, disconnected}, State = #state{active_n = ActiveN, - sockstate = SockSt, - channel = Channel}) -> - ChanAttrs = emqx_channel:attrs(Channel), - SockAttrs = #{active_n => ActiveN, - sockstate => SockSt - }, - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - handle_info({register, Attrs, stats(State)}, State); +handle_info({enter, disconnected}, State = #state{channel = Channel}) -> + #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel), + emqx_cm:set_chan_attrs(ClientId, attrs(State)), + emqx_cm:set_chan_stats(ClientId, stats(State)); -handle_info(activate_socket, State) -> +handle_info(activate_socket, State = #state{sockstate = OldSst}) -> case activate_socket(State) of - {ok, NState} -> {ok, NState}; + {ok, NState = #state{sockstate = NewSst}} -> + if OldSst =/= NewSst -> + {ok, {event, sockstate_changed}, NState}; + true -> {ok, NState} + end; {error, Reason} -> handle_info({sock_error, Reason}, State) end; +handle_info({event, sockstate_changed}, State = #state{channel = Channel}) -> + #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel), + ClientId =/= undefined andalso emqx_cm:set_chan_attrs(ClientId, attrs(State)); + %%TODO: this is not right -handle_info({sock_error, _Reason}, #state{sockstate = closed}) -> ok; +handle_info({sock_error, _Reason}, #state{sockstate = closed}) -> + ok; handle_info({sock_error, Reason}, State) -> ?LOG(debug, "Socket error: ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); @@ -560,7 +595,45 @@ handle_info({close, Reason}, State) -> handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_info(Info, Channel), State). + handle_chan_return(emqx_channel:handle_info(Info, Channel), State). + +%%-------------------------------------------------------------------- +%% Ensure rate limit + +ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> + case ?ENABLED(limiter) andalso emqx_limiter:check(Stats, Limiter) of + false -> State; + {ok, Limiter1} -> + State#state{limiter = Limiter1}; + {pause, Time, Limiter1} -> + ?LOG(debug, "Pause ~pms due to rate limit", [Time]), + TRef = start_timer(Time, limit_timeout), + State#state{sockstate = blocked, + limiter = Limiter1, + limit_timer = TRef + } + end. + +%%-------------------------------------------------------------------- +%% Run GC and Check OOM + +run_gc(Stats, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; + {IsGC, GcSt1} -> + IsGC andalso emqx_metrics:inc('channel.gc.cnt'), + State#state{gc_state = GcSt1} + end. + +check_oom(State = #state{channel = Channel}) -> + #{zone := Zone} = emqx_channel:info(clientinfo, Channel), + OomPolicy = emqx_zone:oom_policy(Zone), + case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of + Shutdown = {shutdown, _Reason} -> + erlang:send(self(), Shutdown); + _Other -> ok + end, + State. %%-------------------------------------------------------------------- %% Activate Socket @@ -587,48 +660,30 @@ close_socket(State = #state{transport = Transport, socket = Socket}) -> ok = Transport:fast_close(Socket), State#state{sockstate = closed}. -%%-------------------------------------------------------------------- -%% Ensure rate limit - --define(ENABLED(Rl), (Rl =/= undefined)). - -ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) -> - Pubs = emqx_pd:reset_counter(incoming_pubs), - Bytes = emqx_pd:reset_counter(incoming_bytes), - Limiters = [{Pl, #state.pub_limit, Pubs} || ?ENABLED(Pl)] ++ - [{Rl, #state.rate_limit, Bytes} || ?ENABLED(Rl)], - ensure_rate_limit(Limiters, State). - -ensure_rate_limit([], State) -> - State; -ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> - case esockd_rate_limit:check(Cnt, Rl) of - {0, Rl1} -> - ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); - {Pause, Rl1} -> - ?LOG(debug, "Pause ~pms due to rate limit", [Pause]), - TRef = erlang:send_after(Pause, self(), activate_socket), - NState = State#state{sockstate = blocked, limit_timer = TRef}, - setelement(Pos, NState, Rl1) - end. - %%-------------------------------------------------------------------- %% Inc incoming/outgoing stats -compile({inline, [inc_incoming_stats/1]}). -inc_incoming_stats(Type) when is_integer(Type) -> - emqx_pd:update_counter(recv_pkt, 1), +inc_incoming_stats(Packet = ?PACKET(Type)) -> + emqx_pd:inc_counter(recv_pkt, 1), if Type == ?PUBLISH -> - emqx_pd:update_counter(recv_msg, 1), - emqx_pd:update_counter(incoming_pubs, 1); + emqx_pd:inc_counter(recv_msg, 1), + emqx_pd:inc_counter(incoming_pubs, 1); true -> ok - end. + end, + emqx_metrics:inc_recv(Packet). -compile({inline, [inc_outgoing_stats/1]}). -inc_outgoing_stats(Type) -> - emqx_pd:update_counter(send_pkt, 1), - (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1). +inc_outgoing_stats(Packet = ?PACKET(Type)) -> + emqx_pd:inc_counter(send_pkt, 1), + if + Type == ?PUBLISH -> + emqx_pd:inc_counter(send_msg, 1), + emqx_pd:inc_counter(outgoing_pubs, 1); + true -> ok + end, + emqx_metrics:inc_sent(Packet). %%-------------------------------------------------------------------- %% Helper functions @@ -646,13 +701,14 @@ next_msgs(Action) when is_tuple(Action) -> next_msgs(Actions) when is_list(Actions) -> Actions. +-compile({inline, [shutdown/2, shutdown/3]}). shutdown(Reason, State) -> stop({shutdown, Reason}, State). shutdown(Reason, Reply, State) -> stop({shutdown, Reason}, Reply, State). --compile({inline, [stop/2]}). +-compile({inline, [stop/2, stop/3]}). stop(Reason, State) -> {stop, Reason, State}. diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 5f939bfe5..8da13912c 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -29,6 +29,7 @@ -include("types.hrl"). -export([ init/1 + , run/2 , run/3 , info/1 , reset/1 @@ -57,21 +58,26 @@ init(#{count := Count, bytes := Bytes}) -> ?GCS(maps:from_list(Cnt ++ Oct)). %% @doc Try to run GC based on reduntions of count or bytes. +-spec(run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state()) + -> {boolean(), gc_state()}). +run(#{cnt := Cnt, oct := Oct}, GcSt) -> + run(Cnt, Oct, GcSt). + -spec(run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}). run(Cnt, Oct, ?GCS(St)) -> - {Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St), + {Res, St1} = do_run([{cnt, Cnt}, {oct, Oct}], St), {Res, ?GCS(St1)}. -run([], St) -> +do_run([], St) -> {false, St}; -run([{K, N}|T], St) -> +do_run([{K, N}|T], St) -> case dec(K, N, St) of {true, St1} -> - true = erlang:garbage_collect(), + erlang:garbage_collect(), {true, do_reset(St1)}; {false, St1} -> - run(T, St1) + do_run(T, St1) end. %% @doc Info of GC state. diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl new file mode 100644 index 000000000..a063bcad2 --- /dev/null +++ b/src/emqx_limiter.erl @@ -0,0 +1,73 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter). + +-include("types.hrl"). + +-export([init/1, info/1, check/2]). + +-import(emqx_misc, [maybe_apply/2]). + +-record(limiter, { + %% Publish Limit + pub_limit :: maybe(esockd_rate_limit:bucket()), + %% Rate Limit + rate_limit :: maybe(esockd_rate_limit:bucket()) + }). + +-type(limiter() :: #limiter{}). + +-export_type([limiter/0]). + +-define(ENABLED(Rl), (Rl =/= undefined)). + +-spec(init(proplists:proplist()) -> maybe(limiter())). +init(Options) -> + Zone = proplists:get_value(zone, Options), + Pl = emqx_zone:publish_limit(Zone), + Rl = proplists:get_value(rate_limit, Options), + case ?ENABLED(Pl) or ?ENABLED(Rl) of + true -> #limiter{pub_limit = init_limit(Pl), + rate_limit = init_limit(Rl) + }; + false -> undefined + end. + +init_limit(Rl) -> + maybe_apply(fun esockd_rate_limit:new/1, Rl). + +info(#limiter{pub_limit = Pl, rate_limit = Rl}) -> + #{pub_limit => info(Pl), rate_limit => info(Rl)}; + +info(Rl) -> + maybe_apply(fun esockd_rate_limit:info/1, Rl). + +check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{pub_limit = Pl, + rate_limit = Rl}) -> + do_check([{#limiter.pub_limit, Cnt, Pl} || ?ENABLED(Pl)] ++ + [{#limiter.rate_limit, Oct, Rl} || ?ENABLED(Rl)], Limiter). + +do_check([], Limiter) -> + {ok, Limiter}; +do_check([{Pos, Cnt, Rl}|More], Limiter) -> + case esockd_rate_limit:check(Cnt, Rl) of + {0, Rl1} -> + do_check(More, setelement(Pos, Limiter, Rl1)); + {Pause, Rl1} -> + {pause, Pause, setelement(Pos, Limiter, Rl1)} + end. + diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 359bb045f..2debe7ad0 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -16,6 +16,8 @@ -module(emqx_misc). +-compile(inline). + -include("types.hrl"). -include("logger.hrl"). @@ -26,22 +28,20 @@ , start_timer/2 , start_timer/3 , cancel_timer/1 + , drain_deliver/0 + , drain_down/1 + , check_oom/1 + , check_oom/2 + , tune_heap_size/1 , proc_name/2 , proc_stats/0 , proc_stats/1 + , rand_seed/0 + , now_to_secs/1 + , now_to_ms/1 , index_of/2 ]). --export([ drain_deliver/0 - , drain_deliver/1 - , drain_down/1 - ]). - --compile({inline, - [ start_timer/2 - , start_timer/3 - ]}). - %% @doc Merge options -spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()). merge_opts(Defaults, Options) -> @@ -112,6 +112,68 @@ cancel_timer(Timer) when is_reference(Timer) -> end; cancel_timer(_) -> ok. +%% @doc Drain delivers from the channel proc's mailbox. +drain_deliver() -> + drain_deliver([]). + +drain_deliver(Acc) -> + receive + Deliver = {deliver, _Topic, _Msg} -> + drain_deliver([Deliver|Acc]) + after 0 -> + lists:reverse(Acc) + end. + +%% @doc Drain process 'DOWN' events. +-spec(drain_down(pos_integer()) -> list(pid())). +drain_down(Cnt) when Cnt > 0 -> + drain_down(Cnt, []). + +drain_down(0, Acc) -> + lists:reverse(Acc); +drain_down(Cnt, Acc) -> + receive + {'DOWN', _MRef, process, Pid, _Reason} -> + drain_down(Cnt - 1, [Pid|Acc]) + after 0 -> + lists:reverse(Acc) + end. + +%% @doc Check process's mailbox and heapsize against OOM policy, +%% return `ok | {shutdown, Reason}' accordingly. +%% `ok': There is nothing out of the ordinary. +%% `shutdown': Some numbers (message queue length hit the limit), +%% hence shutdown for greater good (system stability). +-spec(check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}). +check_oom(Policy) -> + check_oom(self(), Policy). + +-spec(check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}). +check_oom(Pid, #{message_queue_len := MaxQLen, + max_heap_size := MaxHeapSize}) -> + case process_info(Pid, [message_queue_len, total_heap_size]) of + undefined -> ok; + [{message_queue_len, QLen}, {total_heap_size, HeapSize}] -> + do_check_oom([{QLen, MaxQLen, message_queue_too_long}, + {HeapSize, MaxHeapSize, proc_heap_too_large} + ]) + end. + +do_check_oom([]) -> ok; +do_check_oom([{Val, Max, Reason}|Rest]) -> + case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of + true -> {shutdown, Reason}; + false -> do_check_oom(Rest) + end. + +tune_heap_size(#{max_heap_size := MaxHeapSize}) -> + %% If set to zero, the limit is disabled. + erlang:process_flag(max_heap_size, #{size => MaxHeapSize, + kill => false, + error_logger => true + }); +tune_heap_size(undefined) -> ok. + -spec(proc_name(atom(), pos_integer()) -> atom()). proc_name(Mod, Id) -> list_to_atom(lists:concat([Mod, "_", Id])). @@ -132,32 +194,16 @@ proc_stats(Pid) -> [{mailbox_len, Len}|ProcStats] end. -%% @doc Drain delivers from the channel's mailbox. -drain_deliver() -> - drain_deliver([]). +rand_seed() -> + rand:seed(exsplus, erlang:timestamp()). -drain_deliver(Acc) -> - receive - Deliver = {deliver, _Topic, _Msg} -> - drain_deliver([Deliver|Acc]) - after 0 -> - lists:reverse(Acc) - end. +-spec(now_to_secs(erlang:timestamp()) -> pos_integer()). +now_to_secs({MegaSecs, Secs, _MicroSecs}) -> + MegaSecs * 1000000 + Secs. -%% @doc Drain process down events. --spec(drain_down(pos_integer()) -> list(pid())). -drain_down(Cnt) when Cnt > 0 -> - drain_down(Cnt, []). - -drain_down(0, Acc) -> - lists:reverse(Acc); -drain_down(Cnt, Acc) -> - receive - {'DOWN', _MRef, process, Pid, _Reason} -> - drain_down(Cnt - 1, [Pid|Acc]) - after 0 -> - drain_down(0, Acc) - end. +-spec(now_to_ms(erlang:timestamp()) -> pos_integer()). +now_to_ms({MegaSecs, Secs, MicroSecs}) -> + (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). %% lists:index_of/2 index_of(E, L) -> diff --git a/src/emqx_pd.erl b/src/emqx_pd.erl index db33927d0..1bcbcd283 100644 --- a/src/emqx_pd.erl +++ b/src/emqx_pd.erl @@ -21,14 +21,14 @@ -export([ get_counters/1 , get_counter/1 - , update_counter/2 + , inc_counter/2 , reset_counter/1 ]). -compile({inline, [ get_counters/1 , get_counter/1 - , update_counter/2 + , inc_counter/2 , reset_counter/1 ]}). @@ -42,8 +42,8 @@ get_counters(Keys) when is_list(Keys) -> get_counter(Key) -> case get(Key) of undefined -> 0; Cnt -> Cnt end. --spec(update_counter(key(), number()) -> maybe(number())). -update_counter(Key, Inc) -> +-spec(inc_counter(key(), number()) -> maybe(number())). +inc_counter(Key, Inc) -> put(Key, get_counter(Key) + Inc). -spec(reset_counter(key()) -> number()). diff --git a/src/emqx_types.erl b/src/emqx_types.erl index c895393b9..be9f4a558 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -85,6 +85,8 @@ , stats/0 ]). +-export_type([oom_policy/0]). + -type(ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). @@ -186,3 +188,7 @@ -type(infos() :: #{atom() => term()}). -type(stats() :: #{atom() => non_neg_integer()|stats()}). +-type(oom_policy() :: #{message_queue_len => non_neg_integer(), + max_heap_size => non_neg_integer() + }). + diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 4fb39e16e..c5e849c1e 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -45,6 +45,11 @@ , terminate/3 ]). +-import(emqx_misc, + [ maybe_apply/2 + , start_timer/2 + ]). + -record(state, { %% Peername of the ws connection. peername :: emqx_types:peername(), @@ -52,24 +57,41 @@ sockname :: emqx_types:peername(), %% Sock state sockstate :: emqx_types:sockstate(), - %% Parser State + %% Simulate the active_n opt + active_n :: pos_integer(), + %% Limiter + limiter :: emqx_limiter:limiter(), + %% Limit Timer + limit_timer :: maybe(reference()), + %% Parse State parse_state :: emqx_frame:parse_state(), %% Serialize function serialize :: emqx_frame:serialize_fun(), %% Channel channel :: emqx_channel:channel(), + %% GC State + gc_state :: maybe(emqx_gc:gc_state()), %% Out Pending Packets pendings :: list(emqx_types:packet()), + %% Stats Timer + stats_timer :: disabled | maybe(reference()), + %% Idle Timeout + idle_timeout :: timeout(), + %% Idle Timer + idle_timer :: reference(), %% The stop reason stop_reason :: term() }). -type(state() :: #state{}). --define(INFO_KEYS, [socktype, peername, sockname, sockstate]). +-define(ACTIVE_N, 100). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). +-define(ENABLED(X), (X =/= undefined)). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -92,11 +114,20 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; +info(active_n, #state{active_n = ActiveN}) -> + ActiveN; +info(limiter, #state{limiter = Limiter}) -> + maybe_apply(fun emqx_limiter:info/1, Limiter); info(channel, #state{channel = Channel}) -> emqx_channel:info(Channel); info(stop_reason, #state{stop_reason = Reason}) -> Reason. +attrs(State = #state{channel = Channel}) -> + ChanAttrs = emqx_channel:attrs(Channel), + SockAttrs = maps:from_list(info(?INFO_KEYS, State)), + maps:merge(ChanAttrs, #{sockinfo => SockAttrs}). + -spec(stats(pid()|state()) -> emqx_types:stats()). stats(WsPid) when is_pid(WsPid) -> call(WsPid, stats); @@ -128,6 +159,7 @@ call(WsPid, Req) when is_pid(WsPid) -> %%-------------------------------------------------------------------- init(Req, Opts) -> + %% WS Transport Idle Timeout IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000), DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])), MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of @@ -174,29 +206,41 @@ websocket_init([Req, Opts]) -> conn_mod => ?MODULE }, Zone = proplists:get_value(zone, Opts), - FrameOpts = emqx_zone:frame_options(Zone), + ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N), + Limiter = emqx_limiter:init(Opts), + FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), Channel = emqx_channel:init(ConnInfo, Opts), + GcState = emqx_zone:init_gc_state(Zone), + StatsTimer = emqx_zone:stats_timer(Zone), + %% MQTT Idle Timeout + IdleTimeout = emqx_zone:idle_timeout(Zone), + IdleTimer = start_timer(IdleTimeout, idle_timeout), + emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), - {ok, #state{peername = Peername, - sockname = Sockname, - sockstate = idle, - parse_state = ParseState, - serialize = Serialize, - channel = Channel, - pendings = [] - }}. + {ok, #state{peername = Peername, + sockname = Sockname, + sockstate = running, + active_n = ActiveN, + limiter = Limiter, + parse_state = ParseState, + serialize = Serialize, + channel = Channel, + gc_state = GcState, + pendings = [], + stats_timer = StatsTimer, + idle_timeout = IdleTimeout, + idle_timer = IdleTimer + }, hibernate}. websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); -websocket_handle({binary, Data}, State = #state{channel = Channel}) -> +websocket_handle({binary, Data}, State) -> ?LOG(debug, "RECV ~p", [Data]), - Oct = iolist_size(Data), - ok = inc_recv_stats(1, Oct), - NChannel = emqx_channel:recvd(Oct, Channel), - parse_incoming(Data, State#state{channel = NChannel}); + ok = inc_recv_stats(1, iolist_size(Data)), + parse_incoming(Data, ensure_stats_timer(State)); %% Pings should be replied with pongs, cowboy does it automatically %% Pongs can be safely ignored. Clause here simply prevents crash. @@ -215,30 +259,43 @@ websocket_info({call, From, Req}, State) -> handle_call(From, Req, State); websocket_info({cast, Msg}, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_info(Msg, Channel), State); + handle_chan_return(emqx_channel:handle_info(Msg, Channel), State); -websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> +websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, + State = #state{idle_timer = IdleTimer}) -> + ok = emqx_misc:cancel_timer(IdleTimer), Serialize = emqx_frame:serialize_fun(ConnPkt), - NState = State#state{sockstate = running, - serialize = Serialize + NState = State#state{serialize = Serialize, + idle_timer = undefined }, handle_incoming(Packet, NState); +websocket_info({incoming, ?PACKET(?PINGREQ)}, State) -> + reply(?PACKET(?PINGRESP), State); + websocket_info({incoming, Packet}, State) -> handle_incoming(Packet, State); -websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{channel = Channel}) -> - Delivers = emqx_misc:drain_deliver([Deliver]), - Result = emqx_channel:handle_out(Delivers, Channel), - handle_return(Result, State); +websocket_info(rate_limit, State) -> + InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs), + oct => emqx_pd:reset_counter(incoming_bytes) + }, + erlang:send(self(), {check_gc, InStats}), + ensure_rate_limit(InStats, State); -websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) -> - RecvOct = emqx_pd:get_counter(recv_oct), - handle_timeout(TRef, {keepalive, RecvOct}, State); +websocket_info({check_gc, Stats}, State) -> + {ok, check_oom(run_gc(Stats, State))}; -websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) -> - handle_timeout(TRef, {emit_stats, stats(State)}, State); +websocket_info({deliver, _Topic, _Msg} = Deliver, State = #state{channel = Channel}) -> + Delivers = [Deliver|emqx_misc:drain_deliver()], + Ret = emqx_channel:handle_out(Delivers, Channel), + handle_chan_return(Ret, State); + +websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef}) -> + NState = State#state{sockstate = running, + limit_timer = undefined + }, + {reply, [{active, true}], NState}; websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); @@ -293,27 +350,89 @@ handle_call(From, Req, State = #state{channel = Channel}) -> %% Handle Info handle_info({connack, ConnAck}, State = #state{channel = Channel}) -> - ChanAttrs = emqx_channel:attrs(Channel), - SockAttrs = maps:from_list(info(?INFO_KEYS, State)), - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel), + ClientId = emqx_channel:info(clientid, Channel), + ok = emqx_cm:register_channel(ClientId), + ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)), + ok = emqx_cm:set_chan_stats(ClientId, stats(State)), reply(enqueue(ConnAck, State)); handle_info({enter, disconnected}, State = #state{channel = Channel}) -> - ChanAttrs = emqx_channel:attrs(Channel), - SockAttrs = maps:from_list(info(?INFO_KEYS, State)), - Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), - ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel), + ClientId = emqx_channel:info(clientid, Channel), + emqx_cm:set_chan_attrs(ClientId, attrs(State)), + emqx_cm:set_chan_stats(ClientId, stats(State)), reply(State); handle_info(Info, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_info(Info, Channel), State). + Ret = emqx_channel:handle_info(Info, Channel), + handle_chan_return(Ret, State). %%-------------------------------------------------------------------- %% Handle timeout -handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State). +handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> + shutdown(idle_timeout, State); + +handle_timeout(TRef, keepalive, State) when is_reference(TRef) -> + RecvOct = emqx_pd:get_counter(recv_oct), + handle_timeout(TRef, {keepalive, RecvOct}, State); + +handle_timeout(TRef, emit_stats, State = #state{channel = Channel, + stats_timer = TRef}) -> + ClientId = emqx_channel:info(clientid, Channel), + (ClientId =/= undefined) andalso emqx_cm:set_chan_stats(ClientId, stats(State)), + reply(State#state{stats_timer = undefined}); + +handle_timeout(TRef, TMsg, State = #state{channel = Channel}) -> + Ret = emqx_channel:handle_timeout(TRef, TMsg, Channel), + handle_chan_return(Ret, State). + +%%-------------------------------------------------------------------- +%% Ensure stats timer + +-compile({inline, [ensure_stats_timer/1]}). +ensure_stats_timer(State = #state{idle_timeout = Timeout, + stats_timer = undefined}) -> + State#state{stats_timer = start_timer(Timeout, emit_stats)}; +ensure_stats_timer(State) -> State. + +%%-------------------------------------------------------------------- +%% Ensure rate limit + +ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> + case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of + false -> {ok, State}; + {ok, Limiter1} -> + {ok, State#state{limiter = Limiter1}}; + {pause, Time, Limiter1} -> + ?LOG(debug, "Pause ~pms due to rate limit", [Time]), + TRef = start_timer(Time, limit_timeout), + NState = State#state{sockstate = blocked, + limiter = Limiter1, + limit_timer = TRef + }, + {reply, [{active, false}], NState} + end. + +%%-------------------------------------------------------------------- +%% Run GC and Check OOM + +run_gc(Stats, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; + {IsGC, GcSt1} -> + IsGC andalso emqx_metrics:inc('channel.gc.cnt'), + State#state{gc_state = GcSt1} + end. + +check_oom(State = #state{channel = Channel}) -> + #{zone := Zone} = emqx_channel:info(clientinfo, Channel), + OomPolicy = emqx_zone:oom_policy(Zone), + case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of + Shutdown = {shutdown, _Reason} -> + erlang:send(self(), Shutdown); + _Other -> ok + end, + State. %%-------------------------------------------------------------------- %% Parse incoming data @@ -326,7 +445,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> {more, NParseState} -> {ok, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> - self() ! {incoming, Packet}, + erlang:send(self(), {incoming, Packet}), parse_incoming(Rest, State#state{parse_state = NParseState}) catch error:Reason:Stk -> @@ -337,52 +456,60 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> end. %%-------------------------------------------------------------------- -%% Handle incoming packets +%% Handle incoming packet -handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> - _ = inc_incoming_stats(Type), - _ = emqx_metrics:inc_recv(Packet), +handle_incoming(Packet, State = #state{active_n = ActiveN, channel = Channel}) + when is_record(Packet, mqtt_packet) -> ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - handle_return(emqx_channel:handle_in(Packet, Channel), State); + ok = inc_incoming_stats(Packet), + (emqx_pd:get_counter(incoming_pubs) > ActiveN) + andalso erlang:send(self(), rate_limit), + Ret = emqx_channel:handle_in(Packet, Channel), + handle_chan_return(Ret, State); handle_incoming(FrameError, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_in(FrameError, Channel), State). + handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State). %%-------------------------------------------------------------------- %% Handle channel return -handle_return(ok, State) -> +handle_chan_return(ok, State) -> reply(State); -handle_return({ok, NChannel}, State) -> +handle_chan_return({ok, NChannel}, State) -> reply(State#state{channel= NChannel}); -handle_return({ok, Replies, NChannel}, State) -> +handle_chan_return({ok, Replies, NChannel}, State) -> reply(Replies, State#state{channel= NChannel}); -handle_return({shutdown, Reason, NChannel}, State) -> +handle_chan_return({shutdown, Reason, NChannel}, State) -> stop(Reason, State#state{channel = NChannel}); -handle_return({shutdown, Reason, OutPacket, NChannel}, State) -> +handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) -> NState = State#state{channel = NChannel}, stop(Reason, enqueue(OutPacket, NState)). %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packets, State = #state{channel = Channel}) -> +handle_outgoing(Packets, State = #state{active_n = ActiveN}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), - NChannel = emqx_channel:sent(Oct, Channel), - {{binary, IoData}, State#state{channel = NChannel}}. + case emqx_pd:get_counter(outgoing_pubs) > ActiveN of + true -> + OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), + oct => emqx_pd:reset_counter(outgoing_bytes) + }, + erlang:send(self(), {check_gc, OutStats}); + false -> ok + end, + {{binary, IoData}, ensure_stats_timer(State)}. -%% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1 serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> - fun(Packet = ?PACKET(Type)) -> + fun(Packet) -> case Serialize(Packet) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), <<>>; - Data -> _ = inc_outgoing_stats(Type), - _ = emqx_metrics:inc_sent(Packet), - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + ok = inc_outgoing_stats(Packet), Data end end. @@ -398,23 +525,33 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ]}). inc_recv_stats(Cnt, Oct) -> - emqx_pd:update_counter(recv_cnt, Cnt), - emqx_pd:update_counter(recv_oct, Oct), + emqx_pd:inc_counter(incoming_bytes, Oct), + emqx_pd:inc_counter(recv_cnt, Cnt), + emqx_pd:inc_counter(recv_oct, Oct), emqx_metrics:inc('bytes.received', Oct). -inc_incoming_stats(Type) -> - emqx_pd:update_counter(recv_pkt, 1), - (Type == ?PUBLISH) - andalso emqx_pd:update_counter(recv_msg, 1). +inc_incoming_stats(Packet = ?PACKET(Type)) -> + emqx_pd:inc_counter(recv_pkt, 1), + if Type == ?PUBLISH -> + emqx_pd:inc_counter(recv_msg, 1), + emqx_pd:inc_counter(incoming_pubs, 1); + true -> ok + end, + emqx_metrics:inc_recv(Packet). -inc_outgoing_stats(Type) -> - emqx_pd:update_counter(send_pkt, 1), - (Type == ?PUBLISH) - andalso emqx_pd:update_counter(send_msg, 1). +inc_outgoing_stats(Packet = ?PACKET(Type)) -> + emqx_pd:inc_counter(send_pkt, 1), + if Type == ?PUBLISH -> + emqx_pd:inc_counter(send_msg, 1), + emqx_pd:inc_counter(outgoing_pubs, 1); + true -> ok + end, + emqx_metrics:inc_sent(Packet). inc_sent_stats(Cnt, Oct) -> - emqx_pd:update_counter(send_cnt, Cnt), - emqx_pd:update_counter(send_oct, Oct), + emqx_pd:inc_counter(outgoing_bytes, Oct), + emqx_pd:inc_counter(send_cnt, Cnt), + emqx_pd:inc_counter(send_oct, Oct), emqx_metrics:inc('bytes.sent', Oct). %%-------------------------------------------------------------------- @@ -451,6 +588,9 @@ enqueue(Packet, State) when is_record(Packet, mqtt_packet) -> enqueue(Packets, State = #state{pendings = Pendings}) -> State#state{pendings = lists:append(Pendings, Packets)}. +shutdown(Reason, State) -> + stop({shutdown, Reason}, State). + stop(Reason, State = #state{pendings = []}) -> {stop, State#state{stop_reason = Reason}}; stop(Reason, State = #state{pendings = Pendings}) ->