fix(limiter): add config update handler for limiter

This commit is contained in:
firest 2022-05-06 14:46:46 +08:00
parent b059eeda0a
commit a4a9650e66
5 changed files with 33 additions and 20 deletions

View File

@ -29,7 +29,8 @@
find_bucket/2, find_bucket/2,
insert_bucket/2, insert_bucket/3, insert_bucket/2, insert_bucket/3,
make_path/2, make_path/2,
restart_server/1 restart_server/1,
post_config_update/5
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -68,7 +69,7 @@ start_server(Type) ->
-spec restart_server(limiter_type()) -> _. -spec restart_server(limiter_type()) -> _.
restart_server(Type) -> restart_server(Type) ->
emqx_limiter_server_sup:restart(Type). emqx_limiter_server:restart(Type).
-spec find_bucket(limiter_type(), bucket_name()) -> -spec find_bucket(limiter_type(), bucket_name()) ->
{ok, bucket_ref()} | undefined. {ok, bucket_ref()} | undefined.
@ -100,6 +101,10 @@ insert_bucket(Path, Bucket) ->
make_path(Type, BucketName) -> make_path(Type, BucketName) ->
[Type | BucketName]. [Type | BucketName].
post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
Config = maps:get(Type, NewConf),
emqx_limiter_server:update_config(Type, Config).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Starts the server %% Starts the server
@ -130,6 +135,7 @@ start_link() ->
| {stop, Reason :: term()} | {stop, Reason :: term()}
| ignore. | ignore.
init([]) -> init([]) ->
emqx_conf:add_handler([limiter], ?MODULE),
_ = ets:new(?TAB, [ _ = ets:new(?TAB, [
set, set,
public, public,
@ -204,6 +210,7 @@ handle_info(Info, State) ->
State :: term() State :: term()
) -> any(). ) -> any().
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
emqx_conf:remove_handler([limiter]),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -46,7 +46,8 @@
info/1, info/1,
name/1, name/1,
get_initial_val/1, get_initial_val/1,
update_config/1 restart/1,
update_config/2
]). ]).
%% number of tokens generated per period %% number of tokens generated per period
@ -87,7 +88,9 @@
-type decimal() :: emqx_limiter_decimal:decimal(). -type decimal() :: emqx_limiter_decimal:decimal().
-type index() :: pos_integer(). -type index() :: pos_integer().
-define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(CALL(Type, Msg), gen_server:call(name(Type), Msg)).
-define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)).
%% minimum coefficient for overloaded limiter %% minimum coefficient for overloaded limiter
-define(OVERLOAD_MIN_ALLOC, 0.3). -define(OVERLOAD_MIN_ALLOC, 0.3).
-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end). -define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
@ -145,10 +148,14 @@ info(Type) ->
name(Type) -> name(Type) ->
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
-spec update_config(limiter_type()) -> ok. -spec restart(limiter_type()) -> ok.
update_config(Type) -> restart(Type) ->
?CALL(Type). ?CALL(Type).
-spec update_config(limiter_type(), hocons:config()) -> ok.
update_config(Type, Config) ->
?CALL(Type, {update_config, Type, Config}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Starts the server %% Starts the server
@ -197,9 +204,12 @@ init([Type]) ->
| {stop, Reason :: term(), NewState :: term()}. | {stop, Reason :: term(), NewState :: term()}.
handle_call(info, _From, State) -> handle_call(info, _From, State) ->
{reply, State, State}; {reply, State, State};
handle_call(update_config, _From, #{type := Type}) -> handle_call(restart, _From, #{type := Type}) ->
NewState = init_tree(Type), NewState = init_tree(Type),
{reply, ok, NewState}; {reply, ok, NewState};
handle_call({update_config, Type, Config}, _From, #{type := Type}) ->
NewState = init_tree(Type, Config),
{reply, ok, NewState};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -442,7 +452,11 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
{Alloced, Buckets}. {Alloced, Buckets}.
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state(). -spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type) -> init_tree(Type) when is_atom(Type) ->
Cfg = emqx:get_config([limiter, Type]),
init_tree(Type, Cfg).
init_tree(Type, #{bucket := Buckets} = Cfg) ->
State = #{ State = #{
type => Type, type => Type,
root => undefined, root => undefined,
@ -451,7 +465,6 @@ init_tree(Type) ->
buckets => #{} buckets => #{}
}, },
#{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
{Factor, Root} = make_root(Cfg), {Factor, Root} = make_root(Cfg),
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),

View File

@ -19,7 +19,7 @@
-behaviour(supervisor). -behaviour(supervisor).
%% API %% API
-export([start_link/0, start/1, restart/1]). -export([start_link/0, start/1]).
%% Supervisor callbacks %% Supervisor callbacks
-export([init/1]). -export([init/1]).
@ -47,13 +47,6 @@ start(Type) ->
Spec = make_child(Type), Spec = make_child(Type),
supervisor:start_child(?MODULE, Spec). supervisor:start_child(?MODULE, Spec).
%% XXX This is maybe a workaround, not so good
-spec restart(emqx_limiter_schema:limiter_type()) -> _.
restart(Type) ->
Id = emqx_limiter_server:name(Type),
_ = supervisor:terminate_child(?MODULE, Id),
supervisor:restart_child(?MODULE, Id).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Supervisor callbacks %% Supervisor callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -733,7 +733,7 @@ with_config(Path, Modifier, Case) ->
NewCfg = Modifier(Cfg), NewCfg = Modifier(Cfg),
ct:pal("test with config:~p~n", [NewCfg]), ct:pal("test with config:~p~n", [NewCfg]),
emqx_config:put(Path, NewCfg), emqx_config:put(Path, NewCfg),
emqx_limiter_server:update_config(message_routing), emqx_limiter_server:restart(message_routing),
timer:sleep(500), timer:sleep(500),
DelayReturn = delay_return(Case), DelayReturn = delay_return(Case),
emqx_config:put(Path, Cfg), emqx_config:put(Path, Cfg),

View File

@ -483,7 +483,7 @@ t_ensure_rate_limit(_) ->
PerClient = emqx_config:get(Path), PerClient = emqx_config:get(Path),
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"), {ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
emqx_config:put(Path, PerClient#{rate := Rate}), emqx_config:put(Path, PerClient#{rate := Rate}),
emqx_limiter_server:update_config(bytes_in), emqx_limiter_server:restart(bytes_in),
timer:sleep(100), timer:sleep(100),
Limiter = init_limiter(), Limiter = init_limiter(),
@ -502,7 +502,7 @@ t_ensure_rate_limit(_) ->
?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)), ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
emqx_config:put(Path, PerClient), emqx_config:put(Path, PerClient),
emqx_limiter_server:update_config(bytes_in), emqx_limiter_server:restart(bytes_in),
timer:sleep(100). timer:sleep(100).
t_parse_incoming(_) -> t_parse_incoming(_) ->