From 90b33b044d340b2b0ad420283d0fb12c0d76683d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 2 Sep 2020 17:45:53 +0800 Subject: [PATCH] feature(ratelimit): support to set ratelimit & quota policy --- src/emqx_channel.erl | 9 +++++++-- src/emqx_connection.erl | 5 +++++ src/emqx_limiter.erl | 8 +++----- src/emqx_ws_connection.erl | 5 +++++ test/emqx_channel_SUITE.erl | 9 ++++++--- test/emqx_connection_SUITE.erl | 4 +++- 6 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 6259a0f55..2a3e35ccd 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -106,7 +106,7 @@ await_timer => expire_awaiting_rel, expire_timer => expire_session, will_timer => will_message, - quota_timer => reset_quota_flag + quota_timer => expire_quota_limit }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -855,6 +855,11 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session, handle_call(list_acl_cache, Channel) -> {reply, emqx_acl_cache:list_acl_cache(), Channel}; +handle_call({quota, Policy}, Channel) -> + Zone = info(zone, Channel), + Quota = emqx_limiter:init(Zone, Policy), + reply(ok, Channel#channel{quota = Quota}); + handle_call(Req, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), reply(ignored, Channel). @@ -962,7 +967,7 @@ handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; -handle_timeout(_TRef, reset_quota_flag, Channel) -> +handle_timeout(_TRef, expire_quota_limit, Channel) -> {ok, clean_timer(quota_timer, Channel)}; handle_timeout(_TRef, Msg, Channel) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ef25c36e0..2ab7f6c1e 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -455,6 +455,11 @@ handle_call(_From, info, State) -> handle_call(_From, stats, State) -> {reply, stats(State), State}; +handle_call(_From, {ratelimit, Policy}, State = #state{channel = Channel}) -> + Zone = emqx_channel:info(zone, Channel), + Limiter = emqx_limiter:init(Zone, Policy), + {reply, ok, State#state{limiter = Limiter}}; + handle_call(_From, Req, State = #state{channel = Channel}) -> case emqx_channel:handle_call(Req, Channel) of {reply, Reply, NChannel} -> diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index 5d0d4fe52..e3ff7512f 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -44,9 +44,7 @@ | overall_messages_routing ). --type(spec() :: {name(), esockd_rate_limit:config()}). - --type(specs() :: [spec()]). +-type(policy() :: [{name(), esockd_rate_limit:config()}]). -type(info() :: #{name() := #{tokens := non_neg_integer(), @@ -61,7 +59,7 @@ -spec(init(emqx_zone:zone(), maybe(esockd_rate_limit:config()), - maybe(esockd_rate_limit:config()), specs()) + maybe(esockd_rate_limit:config()), policy()) -> maybe(limiter())). init(Zone, PubLimit, BytesIn, Specs) -> Merged = maps:merge(#{conn_messages_in => PubLimit, @@ -69,7 +67,7 @@ init(Zone, PubLimit, BytesIn, Specs) -> Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged), init(Zone, maps:to_list(Filtered)). --spec(init(emqx_zone:zone(), specs()) -> maybe(limiter())). +-spec(init(emqx_zone:zone(), policy()) -> maybe(limiter())). init(_Zone, []) -> undefined; init(Zone, Specs) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 47239efe9..33b882768 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -354,6 +354,11 @@ handle_call(From, stats, State) -> gen_server:reply(From, stats(State)), return(State); +handle_call(_From, {ratelimit, Policy}, State = #state{channel = Channel}) -> + Zone = emqx_channel:info(zone, Channel), + Limiter = emqx_limiter:init(Zone, Policy), + {reply, ok, State#state{limiter = Limiter}}; + handle_call(From, Req, State = #state{channel = Channel}) -> case emqx_channel:handle_call(Req, Channel) of {reply, Reply, NChannel} -> diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index fe269c04d..3f282ba6a 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -367,7 +367,7 @@ t_quota_qos0(_) -> {ok, Chann1} = emqx_channel:handle_in(Pub, Chann), {ok, Chann2} = emqx_channel:handle_in(Pub, Chann1), M1 = emqx_metrics:val('packets.publish.dropped') - 1, - {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2), + {ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2), {ok, _} = emqx_channel:handle_in(Pub, Chann3), M1 = emqx_metrics:val('packets.publish.dropped') - 1, @@ -383,7 +383,7 @@ t_quota_qos1(_) -> %% Quota per connections {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub, Chann), {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub, Chann1), - {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2), + {ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2), {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3), %% Quota in overall {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4), @@ -400,7 +400,7 @@ t_quota_qos2(_) -> %% Quota per connections {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub1, Chann), {ok, ?PUBREC_PACKET(2, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub2, Chann1), - {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2), + {ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2), {ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3), %% Quota in overall {ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4), @@ -528,6 +528,9 @@ t_handle_call_takeover_end(_) -> {shutdown, takeovered, [], _, _Chan} = emqx_channel:handle_call({takeover, 'end'}, channel()). +t_handle_call_quota(_) -> + {reply, ok, _Chan} = emqx_channel:handle_call({quota, [{conn_messages_routing, {100,1}}]}, channel()). + t_handle_call_unexpected(_) -> {reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()). diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index a2d300e9b..b908fe4ab 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -220,7 +220,9 @@ t_handle_call(_) -> St = st(), ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)), ?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)), - ?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)), + ?assertMatch({reply, _Stats, _NSt}, emqx_connection:handle_call(self(), stats, St)), + ?assertMatch({reply, ok, _NSt}, emqx_connection:handle_call(self(), {ratelimit, []}, St)), + ?assertMatch({reply, ok, _NSt}, emqx_connection:handle_call(self(), {ratelimit, [{conn_messages_in, {100, 1}}]}, St)), ?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)), ?assertMatch({stop, {shutdown,kicked}, ok, _NSt}, emqx_connection:handle_call(self(), kick, St)).