From 4005d58166f1edabc45715b13229a05d91b4c316 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 8 Aug 2018 19:31:25 +0800 Subject: [PATCH] Move the 'rate_limit' option from zone to listener --- src/emqx_connection.erl | 17 +++++++++-------- src/emqx_frame.erl | 1 - src/emqx_protocol.erl | 1 + 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 7dc70e6ce..89233fb43 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -41,7 +41,7 @@ await_recv, %% Awaiting recv incoming, %% Incoming bytes and packets pub_limit, %% Publish rate limit - rate_limit, %% Throughput rate limit + rate_limit, %% Traffic rate limit limit_timer, %% Rate limit timer proto_state, %% MQTT protocol state parse_state, %% MQTT parse state @@ -56,7 +56,7 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(LOG(Level, Format, Args, State), - emqx_logger:Level("Conn(~s): " ++ Format, + emqx_logger:Level("Client(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). start_link(Transport, Socket, Options) -> @@ -100,11 +100,12 @@ set_pub_limit(CPid, Rl = {_Rate, _Burst}) -> init([Transport, RawSocket, Options]) -> case Transport:wait(RawSocket) of {ok, Socket} -> + io:format("Options: ~p~n", [Options]), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), Zone = proplists:get_value(zone, Options), - RateLimit = init_rate_limit(emqx_zone:get_env(Zone, rate_limit)), + RateLimit = init_rate_limit(proplists:get_value(rate_limit, Options)), PubLimit = init_rate_limit(emqx_zone:get_env(Zone, publish_limit)), EnableStats = emqx_zone:get_env(Zone, enable_stats, false), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), @@ -194,16 +195,16 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected cast: ~p", [Msg], State), {noreply, State}. -handle_info(SubReq = {subscribe, _TopicTable}, State) -> +handle_info(Sub = {subscribe, _TopicTable}, State) -> with_proto( fun(ProtoState) -> - emqx_protocol:process(SubReq, ProtoState) + emqx_protocol:process(Sub, ProtoState) end, State); -handle_info(UnsubReq = {unsubscribe, _Topics}, State) -> +handle_info(Unsub = {unsubscribe, _Topics}, State) -> with_proto( fun(ProtoState) -> - emqx_protocol:process(UnsubReq, ProtoState) + emqx_protocol:process(Unsub, ProtoState) end, State); handle_info({deliver, PubOrAck}, State) -> @@ -300,7 +301,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -%% Receive and parse TCP data +%% Receive and parse data handle_packet(<<>>, State) -> {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7385b7116..10498afcf 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -162,7 +162,6 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, - io:format("Rest1: ~p~n", [Rest1]), {Properties, Payload} = parse_properties(Rest1, Ver), {#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 85fe35e52..9a13c6538 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -19,6 +19,7 @@ -include("emqx_misc.hrl"). -export([init/2, info/1, stats/1, clientid/1, session/1]). +%%-export([capabilities/1]). -export([parser/1]). -export([received/2, process/2, deliver/2, send/2]). -export([shutdown/2]).