rate limit

This commit is contained in:
Feng 2015-11-01 16:03:32 +08:00
parent e538d91efc
commit c7f0e33674
2 changed files with 7 additions and 16 deletions

View File

@ -91,8 +91,7 @@ open_listener({https, Port, Options}) ->
mochiweb:start_http(Port, Options, MFArgs). mochiweb:start_http(Port, Options, MFArgs).
open_listener(Protocol, Port, Options) -> open_listener(Protocol, Port, Options) ->
Rl = rate_limiter(emqttd_opts:g(rate_limit, Options)), MFArgs = {emqttd_client, start_link, [env(mqtt)]},
MFArgs = {emqttd_client, start_link, [[{rate_limiter, Rl} | env(mqtt)]]},
esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs). esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
merge_sockopts(Options) -> merge_sockopts(Options) ->
@ -100,14 +99,6 @@ merge_sockopts(Options) ->
proplists:get_value(sockopts, Options, [])), proplists:get_value(sockopts, Options, [])),
emqttd_opts:merge(Options, [{sockopts, SockOpts}]). emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
%% TODO: will refactor in 0.14.0 release.
rate_limiter(undefined) ->
undefined;
rate_limiter(Config) ->
Bps = fun(S) -> list_to_integer(string:strip(S)) * 1024 end,
[Burst, Rate] = [Bps(S) || S <- string:tokens(Config, ",")],
esockd_rate_limiter:new(Burst, Rate).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Close Listeners %% @doc Close Listeners
%% @end %% @end

View File

@ -281,14 +281,14 @@ received(Bytes, State = #client_state{parser_fun = ParserFun,
rate_limit(_Size, State = #client_state{rate_limit = undefined}) -> rate_limit(_Size, State = #client_state{rate_limit = undefined}) ->
run_socket(State); run_socket(State);
rate_limit(Size, State = #client_state{rate_limit = Limiter}) -> rate_limit(Size, State = #client_state{rate_limit = Rl}) ->
case esockd_ratelimit:check(Limiter, Size) of case Rl:check(Size) of
{0, Limiter1} -> {0, Rl1} ->
run_socket(State#client_state{conn_state = running, rate_limit = Limiter1}); run_socket(State#client_state{conn_state = running, rate_limit = Rl1});
{Pause, Limiter1} -> {Pause, Rl1} ->
?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State), ?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State),
erlang:send_after(Pause, self(), activate_sock), erlang:send_after(Pause, self(), activate_sock),
State#client_state{conn_state = blocked, rate_limit = Limiter1} State#client_state{conn_state = blocked, rate_limit = Rl1}
end. end.
run_socket(State = #client_state{conn_state = blocked}) -> run_socket(State = #client_state{conn_state = blocked}) ->