diff --git a/Makefile b/Makefile index 68d383940..374b74fb7 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon + emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 79c244a11..2cf647beb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -249,12 +249,13 @@ handle_info({tcp, _Sock, Data}, State) -> ?LOG(debug, "RECV ~p", [Data]), Size = iolist_size(Data), emqx_metrics:trans(inc, 'bytes/received', Size), + emqx_pd:update_counter(incoming_bytes, Size), Incoming = #{bytes => Size, packets => 0}, handle_packet(Data, State#state{incoming = Incoming}); %% Rate limit here, cool:) handle_info({tcp_passive, _Sock}, State) -> - {noreply, ensure_rate_limit(State)}; + {noreply, run_socket(ensure_rate_limit(State))}; handle_info({tcp_error, _Sock, Reason}, State) -> shutdown(Reason, State); @@ -336,10 +337,10 @@ handle_packet(Data, State = #state{proto_state = ProtoState, {noreply, State#state{parser_state = ParserState1}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), + (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - NewState = State#state{proto_state = ProtoState1}, - handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState))); + handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1})); {error, Reason} -> ?LOG(error, "Process packet error - ~p", [Reason]), shutdown(Reason, State); @@ -360,28 +361,21 @@ handle_packet(Data, State = #state{proto_state = ProtoState, reset_parser(State = #state{proto_state = ProtoState}) -> State#state{parser_state = emqx_protocol:parser(ProtoState)}. -inc_publish_cnt(Type, State = #state{incoming = Incoming = #{packets := Cnt}}) - when Type == ?PUBLISH; Type == ?SUBSCRIBE -> - State#state{incoming = Incoming#{packets := Cnt + 1}}; - -inc_publish_cnt(_Type, State) -> - State. - %%------------------------------------------------------------------------------ %% Ensure rate limit %%------------------------------------------------------------------------------ -ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl, - incoming = #{packets := Packets, bytes := Bytes}}) -> - ensure_rate_limit([{Pl, #state.pub_limit, Packets}, - {Rl, #state.rate_limit, Bytes}], State). +ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) -> + Limiters = [{Pl, #state.pub_limit, emqx_pd:reset_counter(incoming_pubs)}, + {Rl, #state.rate_limit, emqx_pd:reset_counter(incoming_bytes)}], + ensure_rate_limit(Limiters, State). ensure_rate_limit([], State) -> - run_socket(State); -ensure_rate_limit([{undefined, _Pos, _Num}|Limiters], State) -> + State; +ensure_rate_limit([{undefined, _Pos, _Cnt}|Limiters], State) -> ensure_rate_limit(Limiters, State); -ensure_rate_limit([{Rl, Pos, Num}|Limiters], State) -> - case esockd_rate_limit:check(Num, Rl) of +ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> + case esockd_rate_limit:check(Cnt, Rl) of {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> @@ -423,3 +417,4 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> maybe_gc(_, _) -> ok. + diff --git a/src/emqx_pd.erl b/src/emqx_pd.erl new file mode 100644 index 000000000..ce1e7723c --- /dev/null +++ b/src/emqx_pd.erl @@ -0,0 +1,33 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc The utility functions for erlang process dictionary. +-module(emqx_pd). + +-export([update_counter/2, get_counter/1, reset_counter/1]). + +-type(key() :: term()). + +-spec(update_counter(key(), number()) -> undefined | number()). +update_counter(Key, Inc) -> + put(Key, get_counter(Key) + Inc). + +-spec(get_counter(key()) -> number()). +get_counter(Key) -> + case get(Key) of undefined -> 0; Cnt -> Cnt end. + +-spec(reset_counter(key()) -> number()). +reset_counter(Key) -> + case put(Key, 0) of undefined -> 0; Cnt -> Cnt end. + diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 08a773d40..3d879a476 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -19,9 +19,8 @@ -compile(nowarn_export_all). -include("emqx.hrl"). --include_lib("eunit/include/eunit.hrl"). - -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). all() -> [{group, cm}]. diff --git a/test/emqx_pd_SUITE.erl b/test/emqx_pd_SUITE.erl new file mode 100644 index 000000000..e53fa7539 --- /dev/null +++ b/test/emqx_pd_SUITE.erl @@ -0,0 +1,31 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_pd_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> [update_counter]. + +update_counter(_) -> + ?assertEqual(undefined, emqx_pd:update_counter(bytes, 1)), + ?assertEqual(1, emqx_pd:update_counter(bytes, 1)), + ?assertEqual(2, emqx_pd:update_counter(bytes, 1)), + ?assertEqual(3, emqx_pd:get_counter(bytes)), + ?assertEqual(3, emqx_pd:reset_counter(bytes)), + ?assertEqual(0, emqx_pd:get_counter(bytes)). + diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 407e2c92b..b3ce70c82 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -17,8 +17,6 @@ -include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). --include("emqx.hrl"). - -compile(export_all). -compile(nowarn_export_all).