Merge pull request #6714 from lafirest/fix/quota_limiter
feat(emqx_limiter): add support for update overall limiter
This commit is contained in:
commit
348e6b8f10
|
@ -133,7 +133,7 @@
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_zone, brutal_purge, soft_purge, []}
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
|
@ -269,7 +269,7 @@
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_zone, brutal_purge, soft_purge, []}
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
, init/4 %% XXX: Compatible with before 4.2 version
|
, init/4 %% XXX: Compatible with before 4.2 version
|
||||||
, info/1
|
, info/1
|
||||||
, check/2
|
, check/2
|
||||||
|
, update_overall_limiter/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(limiter, {
|
-record(limiter, {
|
||||||
|
@ -152,3 +153,15 @@ is_message_limiter(conn_messages_in) -> true;
|
||||||
is_message_limiter(conn_messages_routing) -> true;
|
is_message_limiter(conn_messages_routing) -> true;
|
||||||
is_message_limiter(overall_messages_routing) -> true;
|
is_message_limiter(overall_messages_routing) -> true;
|
||||||
is_message_limiter(_) -> false.
|
is_message_limiter(_) -> false.
|
||||||
|
|
||||||
|
update_overall_limiter(Zone, Name, Capacity, Interval) ->
|
||||||
|
case is_overall_limiter(Name) of
|
||||||
|
false -> false;
|
||||||
|
_ ->
|
||||||
|
try
|
||||||
|
esockd_limiter:update({Zone, Name}, Capacity, Interval),
|
||||||
|
true
|
||||||
|
catch _:_:_ ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
|
@ -243,7 +243,6 @@ get_env(Zone, Key, Def) ->
|
||||||
|
|
||||||
-spec(set_env(zone(), atom(), term()) -> ok).
|
-spec(set_env(zone(), atom(), term()) -> ok).
|
||||||
set_env(Zone, Key, Val) ->
|
set_env(Zone, Key, Val) ->
|
||||||
on_conf_update(Key, Val, Zone),
|
|
||||||
persistent_term:put(?KEY(Zone, Key), Val).
|
persistent_term:put(?KEY(Zone, Key), Val).
|
||||||
|
|
||||||
-spec(unset_env(zone(), atom()) -> boolean()).
|
-spec(unset_env(zone(), atom()) -> boolean()).
|
||||||
|
@ -297,8 +296,3 @@ do_reload() ->
|
||||||
[persistent_term:put(?KEY(Zone, Key), Val)
|
[persistent_term:put(?KEY(Zone, Key), Val)
|
||||||
|| {Zone, Opts} <- emqx:get_env(zones, []), {Key, Val} <- Opts].
|
|| {Zone, Opts} <- emqx:get_env(zones, []), {Key, Val} <- Opts].
|
||||||
|
|
||||||
on_conf_update(overall_messages_routing, {Capacity, Interval}, Zone) ->
|
|
||||||
esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval),
|
|
||||||
ok;
|
|
||||||
on_conf_update(_, _, _) ->
|
|
||||||
ok.
|
|
||||||
|
|
Loading…
Reference in New Issue