Move the 'rate_limit' option from zone to listener

This commit is contained in:
Feng Lee 2018-08-08 19:31:25 +08:00
parent 4cf1815030
commit 4005d58166
3 changed files with 10 additions and 9 deletions

View File

@ -41,7 +41,7 @@
await_recv, %% Awaiting recv
incoming, %% Incoming bytes and packets
pub_limit, %% Publish rate limit
rate_limit, %% Throughput rate limit
rate_limit, %% Traffic rate limit
limit_timer, %% Rate limit timer
proto_state, %% MQTT protocol state
parse_state, %% MQTT parse state
@ -56,7 +56,7 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level("Conn(~s): " ++ Format,
emqx_logger:Level("Client(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])).
start_link(Transport, Socket, Options) ->
@ -100,11 +100,12 @@ set_pub_limit(CPid, Rl = {_Rate, _Burst}) ->
init([Transport, RawSocket, Options]) ->
case Transport:wait(RawSocket) of
{ok, Socket} ->
io:format("Options: ~p~n", [Options]),
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
Zone = proplists:get_value(zone, Options),
RateLimit = init_rate_limit(emqx_zone:get_env(Zone, rate_limit)),
RateLimit = init_rate_limit(proplists:get_value(rate_limit, Options)),
PubLimit = init_rate_limit(emqx_zone:get_env(Zone, publish_limit)),
EnableStats = emqx_zone:get_env(Zone, enable_stats, false),
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
@ -194,16 +195,16 @@ handle_cast(Msg, State) ->
?LOG(error, "unexpected cast: ~p", [Msg], State),
{noreply, State}.
handle_info(SubReq = {subscribe, _TopicTable}, State) ->
handle_info(Sub = {subscribe, _TopicTable}, State) ->
with_proto(
fun(ProtoState) ->
emqx_protocol:process(SubReq, ProtoState)
emqx_protocol:process(Sub, ProtoState)
end, State);
handle_info(UnsubReq = {unsubscribe, _Topics}, State) ->
handle_info(Unsub = {unsubscribe, _Topics}, State) ->
with_proto(
fun(ProtoState) ->
emqx_protocol:process(UnsubReq, ProtoState)
emqx_protocol:process(Unsub, ProtoState)
end, State);
handle_info({deliver, PubOrAck}, State) ->
@ -300,7 +301,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%------------------------------------------------------------------------------
%% Receive and parse TCP data
%% Receive and parse data
handle_packet(<<>>, State) ->
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};

View File

@ -162,7 +162,6 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
?QOS_0 -> {undefined, Rest};
_ -> parse_packet_id(Rest)
end,
io:format("Rest1: ~p~n", [Rest1]),
{Properties, Payload} = parse_properties(Rest1, Ver),
{#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId,

View File

@ -19,6 +19,7 @@
-include("emqx_misc.hrl").
-export([init/2, info/1, stats/1, clientid/1, session/1]).
%%-export([capabilities/1]).
-export([parser/1]).
-export([received/2, process/2, deliver/2, send/2]).
-export([shutdown/2]).