From 14cffcf7fbb7b3dae227554476152efd4a339b27 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 20 Dec 2018 16:45:25 +0800 Subject: [PATCH] Add the 'emqx_pd' module Add utility functions for erlang process dictionary Add test cases for emqx_pd --- Makefile | 2 +- src/emqx_connection.erl | 31 +++++++++++++------------------ src/emqx_pd.erl | 33 +++++++++++++++++++++++++++++++++ test/emqx_cm_SUITE.erl | 3 ++- test/emqx_pd_SUITE.erl | 31 +++++++++++++++++++++++++++++++ test/emqx_sm_SUITE.erl | 2 ++ 6 files changed, 82 insertions(+), 20 deletions(-) create mode 100644 src/emqx_pd.erl create mode 100644 test/emqx_pd_SUITE.erl diff --git a/Makefile b/Makefile index 68d383940..374b74fb7 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon + emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 79c244a11..2cf647beb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -249,12 +249,13 @@ handle_info({tcp, _Sock, Data}, State) -> ?LOG(debug, "RECV ~p", [Data]), Size = iolist_size(Data), emqx_metrics:trans(inc, 'bytes/received', Size), + emqx_pd:update_counter(incoming_bytes, Size), Incoming = #{bytes => Size, packets => 0}, handle_packet(Data, State#state{incoming = Incoming}); %% Rate limit here, cool:) handle_info({tcp_passive, _Sock}, State) -> - {noreply, ensure_rate_limit(State)}; + {noreply, run_socket(ensure_rate_limit(State))}; handle_info({tcp_error, _Sock, Reason}, State) -> shutdown(Reason, State); @@ -336,10 +337,10 @@ handle_packet(Data, State = #state{proto_state = ProtoState, {noreply, State#state{parser_state = ParserState1}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), + (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - NewState = State#state{proto_state = ProtoState1}, - handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState))); + handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1})); {error, Reason} -> ?LOG(error, "Process packet error - ~p", [Reason]), shutdown(Reason, State); @@ -360,28 +361,21 @@ handle_packet(Data, State = #state{proto_state = ProtoState, reset_parser(State = #state{proto_state = ProtoState}) -> State#state{parser_state = emqx_protocol:parser(ProtoState)}. -inc_publish_cnt(Type, State = #state{incoming = Incoming = #{packets := Cnt}}) - when Type == ?PUBLISH; Type == ?SUBSCRIBE -> - State#state{incoming = Incoming#{packets := Cnt + 1}}; - -inc_publish_cnt(_Type, State) -> - State. - %%------------------------------------------------------------------------------ %% Ensure rate limit %%------------------------------------------------------------------------------ -ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl, - incoming = #{packets := Packets, bytes := Bytes}}) -> - ensure_rate_limit([{Pl, #state.pub_limit, Packets}, - {Rl, #state.rate_limit, Bytes}], State). +ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) -> + Limiters = [{Pl, #state.pub_limit, emqx_pd:reset_counter(incoming_pubs)}, + {Rl, #state.rate_limit, emqx_pd:reset_counter(incoming_bytes)}], + ensure_rate_limit(Limiters, State). ensure_rate_limit([], State) -> - run_socket(State); -ensure_rate_limit([{undefined, _Pos, _Num}|Limiters], State) -> + State; +ensure_rate_limit([{undefined, _Pos, _Cnt}|Limiters], State) -> ensure_rate_limit(Limiters, State); -ensure_rate_limit([{Rl, Pos, Num}|Limiters], State) -> - case esockd_rate_limit:check(Num, Rl) of +ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> + case esockd_rate_limit:check(Cnt, Rl) of {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> @@ -423,3 +417,4 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> maybe_gc(_, _) -> ok. + diff --git a/src/emqx_pd.erl b/src/emqx_pd.erl new file mode 100644 index 000000000..ce1e7723c --- /dev/null +++ b/src/emqx_pd.erl @@ -0,0 +1,33 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc The utility functions for erlang process dictionary. +-module(emqx_pd). + +-export([update_counter/2, get_counter/1, reset_counter/1]). + +-type(key() :: term()). + +-spec(update_counter(key(), number()) -> undefined | number()). +update_counter(Key, Inc) -> + put(Key, get_counter(Key) + Inc). + +-spec(get_counter(key()) -> number()). +get_counter(Key) -> + case get(Key) of undefined -> 0; Cnt -> Cnt end. + +-spec(reset_counter(key()) -> number()). +reset_counter(Key) -> + case put(Key, 0) of undefined -> 0; Cnt -> Cnt end. + diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index b720849f6..208294474 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -18,6 +18,7 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). all() -> [t_register_unregister_connection]. @@ -25,7 +26,7 @@ t_register_unregister_connection(_) -> {ok, _} = emqx_cm_sup:start_link(), Pid = self(), ok = emqx_cm:register_connection(<<"conn1">>), - ok emqx_cm:register_connection(<<"conn2">>, Pid), + ok = emqx_cm:register_connection(<<"conn2">>, Pid), true = emqx_cm:set_conn_attrs(<<"conn1">>, [{port, 8080}, {ip, "192.168.0.1"}]), true = emqx_cm:set_conn_attrs(<<"conn2">>, Pid, [{port, 8080}, {ip, "192.168.0.1"}]), timer:sleep(2000), diff --git a/test/emqx_pd_SUITE.erl b/test/emqx_pd_SUITE.erl new file mode 100644 index 000000000..e53fa7539 --- /dev/null +++ b/test/emqx_pd_SUITE.erl @@ -0,0 +1,31 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_pd_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> [update_counter]. + +update_counter(_) -> + ?assertEqual(undefined, emqx_pd:update_counter(bytes, 1)), + ?assertEqual(1, emqx_pd:update_counter(bytes, 1)), + ?assertEqual(2, emqx_pd:update_counter(bytes, 1)), + ?assertEqual(3, emqx_pd:get_counter(bytes)), + ?assertEqual(3, emqx_pd:reset_counter(bytes)), + ?assertEqual(0, emqx_pd:get_counter(bytes)). + diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 008d4b6e6..6adb7c388 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -19,6 +19,8 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("eunit/include/eunit.hrl"). + all() -> [t_open_close_session]. t_open_close_session(_) ->