From c7f0e336743b67a582776357528c8fd53e317b99 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 1 Nov 2015 16:03:32 +0800 Subject: [PATCH] rate limit --- src/emqttd.erl | 11 +---------- src/emqttd_client.erl | 12 ++++++------ 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/emqttd.erl b/src/emqttd.erl index 09661d6c8..eeb5bab4d 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -91,8 +91,7 @@ open_listener({https, Port, Options}) -> mochiweb:start_http(Port, Options, MFArgs). open_listener(Protocol, Port, Options) -> - Rl = rate_limiter(emqttd_opts:g(rate_limit, Options)), - MFArgs = {emqttd_client, start_link, [[{rate_limiter, Rl} | env(mqtt)]]}, + MFArgs = {emqttd_client, start_link, [env(mqtt)]}, esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs). merge_sockopts(Options) -> @@ -100,14 +99,6 @@ merge_sockopts(Options) -> proplists:get_value(sockopts, Options, [])), emqttd_opts:merge(Options, [{sockopts, SockOpts}]). -%% TODO: will refactor in 0.14.0 release. -rate_limiter(undefined) -> - undefined; -rate_limiter(Config) -> - Bps = fun(S) -> list_to_integer(string:strip(S)) * 1024 end, - [Burst, Rate] = [Bps(S) || S <- string:tokens(Config, ",")], - esockd_rate_limiter:new(Burst, Rate). - %%------------------------------------------------------------------------------ %% @doc Close Listeners %% @end diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index f471d311b..23c53d44b 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -281,14 +281,14 @@ received(Bytes, State = #client_state{parser_fun = ParserFun, rate_limit(_Size, State = #client_state{rate_limit = undefined}) -> run_socket(State); -rate_limit(Size, State = #client_state{rate_limit = Limiter}) -> - case esockd_ratelimit:check(Limiter, Size) of - {0, Limiter1} -> - run_socket(State#client_state{conn_state = running, rate_limit = Limiter1}); - {Pause, Limiter1} -> +rate_limit(Size, State = #client_state{rate_limit = Rl}) -> + case Rl:check(Size) of + {0, Rl1} -> + run_socket(State#client_state{conn_state = running, rate_limit = Rl1}); + {Pause, Rl1} -> ?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State), erlang:send_after(Pause, self(), activate_sock), - State#client_state{conn_state = blocked, rate_limit = Limiter1} + State#client_state{conn_state = blocked, rate_limit = Rl1} end. run_socket(State = #client_state{conn_state = blocked}) ->