Merge pull request #7881 from lafirest/fix/limiter_update
fix(limiter): add config update handler for limiter
This commit is contained in:
commit
4d3aa16ceb
|
@ -29,7 +29,8 @@
|
||||||
find_bucket/2,
|
find_bucket/2,
|
||||||
insert_bucket/2, insert_bucket/3,
|
insert_bucket/2, insert_bucket/3,
|
||||||
make_path/2,
|
make_path/2,
|
||||||
restart_server/1
|
restart_server/1,
|
||||||
|
post_config_update/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -68,7 +69,7 @@ start_server(Type) ->
|
||||||
|
|
||||||
-spec restart_server(limiter_type()) -> _.
|
-spec restart_server(limiter_type()) -> _.
|
||||||
restart_server(Type) ->
|
restart_server(Type) ->
|
||||||
emqx_limiter_server_sup:restart(Type).
|
emqx_limiter_server:restart(Type).
|
||||||
|
|
||||||
-spec find_bucket(limiter_type(), bucket_name()) ->
|
-spec find_bucket(limiter_type(), bucket_name()) ->
|
||||||
{ok, bucket_ref()} | undefined.
|
{ok, bucket_ref()} | undefined.
|
||||||
|
@ -100,6 +101,10 @@ insert_bucket(Path, Bucket) ->
|
||||||
make_path(Type, BucketName) ->
|
make_path(Type, BucketName) ->
|
||||||
[Type | BucketName].
|
[Type | BucketName].
|
||||||
|
|
||||||
|
post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
|
||||||
|
Config = maps:get(Type, NewConf),
|
||||||
|
emqx_limiter_server:update_config(Type, Config).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Starts the server
|
%% Starts the server
|
||||||
|
@ -130,6 +135,7 @@ start_link() ->
|
||||||
| {stop, Reason :: term()}
|
| {stop, Reason :: term()}
|
||||||
| ignore.
|
| ignore.
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
ok = emqx_config_handler:add_handler([limiter], ?MODULE),
|
||||||
_ = ets:new(?TAB, [
|
_ = ets:new(?TAB, [
|
||||||
set,
|
set,
|
||||||
public,
|
public,
|
||||||
|
@ -204,6 +210,7 @@ handle_info(Info, State) ->
|
||||||
State :: term()
|
State :: term()
|
||||||
) -> any().
|
) -> any().
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
emqx_config_handler:remove_handler([limiter]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -46,7 +46,8 @@
|
||||||
info/1,
|
info/1,
|
||||||
name/1,
|
name/1,
|
||||||
get_initial_val/1,
|
get_initial_val/1,
|
||||||
update_config/1
|
restart/1,
|
||||||
|
update_config/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% number of tokens generated per period
|
%% number of tokens generated per period
|
||||||
|
@ -87,7 +88,9 @@
|
||||||
-type decimal() :: emqx_limiter_decimal:decimal().
|
-type decimal() :: emqx_limiter_decimal:decimal().
|
||||||
-type index() :: pos_integer().
|
-type index() :: pos_integer().
|
||||||
|
|
||||||
-define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
|
-define(CALL(Type, Msg), gen_server:call(name(Type), Msg)).
|
||||||
|
-define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)).
|
||||||
|
|
||||||
%% minimum coefficient for overloaded limiter
|
%% minimum coefficient for overloaded limiter
|
||||||
-define(OVERLOAD_MIN_ALLOC, 0.3).
|
-define(OVERLOAD_MIN_ALLOC, 0.3).
|
||||||
-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
|
-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
|
||||||
|
@ -145,10 +148,14 @@ info(Type) ->
|
||||||
name(Type) ->
|
name(Type) ->
|
||||||
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
|
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
|
||||||
|
|
||||||
-spec update_config(limiter_type()) -> ok.
|
-spec restart(limiter_type()) -> ok.
|
||||||
update_config(Type) ->
|
restart(Type) ->
|
||||||
?CALL(Type).
|
?CALL(Type).
|
||||||
|
|
||||||
|
-spec update_config(limiter_type(), hocons:config()) -> ok.
|
||||||
|
update_config(Type, Config) ->
|
||||||
|
?CALL(Type, {update_config, Type, Config}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Starts the server
|
%% Starts the server
|
||||||
|
@ -197,9 +204,12 @@ init([Type]) ->
|
||||||
| {stop, Reason :: term(), NewState :: term()}.
|
| {stop, Reason :: term(), NewState :: term()}.
|
||||||
handle_call(info, _From, State) ->
|
handle_call(info, _From, State) ->
|
||||||
{reply, State, State};
|
{reply, State, State};
|
||||||
handle_call(update_config, _From, #{type := Type}) ->
|
handle_call(restart, _From, #{type := Type}) ->
|
||||||
NewState = init_tree(Type),
|
NewState = init_tree(Type),
|
||||||
{reply, ok, NewState};
|
{reply, ok, NewState};
|
||||||
|
handle_call({update_config, Type, Config}, _From, #{type := Type}) ->
|
||||||
|
NewState = init_tree(Type, Config),
|
||||||
|
{reply, ok, NewState};
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -442,7 +452,11 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
|
||||||
{Alloced, Buckets}.
|
{Alloced, Buckets}.
|
||||||
|
|
||||||
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
|
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
|
||||||
init_tree(Type) ->
|
init_tree(Type) when is_atom(Type) ->
|
||||||
|
Cfg = emqx:get_config([limiter, Type]),
|
||||||
|
init_tree(Type, Cfg).
|
||||||
|
|
||||||
|
init_tree(Type, #{bucket := Buckets} = Cfg) ->
|
||||||
State = #{
|
State = #{
|
||||||
type => Type,
|
type => Type,
|
||||||
root => undefined,
|
root => undefined,
|
||||||
|
@ -451,7 +465,6 @@ init_tree(Type) ->
|
||||||
buckets => #{}
|
buckets => #{}
|
||||||
},
|
},
|
||||||
|
|
||||||
#{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
|
|
||||||
{Factor, Root} = make_root(Cfg),
|
{Factor, Root} = make_root(Cfg),
|
||||||
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
|
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0, start/1, restart/1]).
|
-export([start_link/0, start/1]).
|
||||||
|
|
||||||
%% Supervisor callbacks
|
%% Supervisor callbacks
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
@ -47,13 +47,6 @@ start(Type) ->
|
||||||
Spec = make_child(Type),
|
Spec = make_child(Type),
|
||||||
supervisor:start_child(?MODULE, Spec).
|
supervisor:start_child(?MODULE, Spec).
|
||||||
|
|
||||||
%% XXX This is maybe a workaround, not so good
|
|
||||||
-spec restart(emqx_limiter_schema:limiter_type()) -> _.
|
|
||||||
restart(Type) ->
|
|
||||||
Id = emqx_limiter_server:name(Type),
|
|
||||||
_ = supervisor:terminate_child(?MODULE, Id),
|
|
||||||
supervisor:restart_child(?MODULE, Id).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Supervisor callbacks
|
%% Supervisor callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -733,7 +733,7 @@ with_config(Path, Modifier, Case) ->
|
||||||
NewCfg = Modifier(Cfg),
|
NewCfg = Modifier(Cfg),
|
||||||
ct:pal("test with config:~p~n", [NewCfg]),
|
ct:pal("test with config:~p~n", [NewCfg]),
|
||||||
emqx_config:put(Path, NewCfg),
|
emqx_config:put(Path, NewCfg),
|
||||||
emqx_limiter_server:update_config(message_routing),
|
emqx_limiter_server:restart(message_routing),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
DelayReturn = delay_return(Case),
|
DelayReturn = delay_return(Case),
|
||||||
emqx_config:put(Path, Cfg),
|
emqx_config:put(Path, Cfg),
|
||||||
|
|
|
@ -483,7 +483,7 @@ t_ensure_rate_limit(_) ->
|
||||||
PerClient = emqx_config:get(Path),
|
PerClient = emqx_config:get(Path),
|
||||||
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
|
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
|
||||||
emqx_config:put(Path, PerClient#{rate := Rate}),
|
emqx_config:put(Path, PerClient#{rate := Rate}),
|
||||||
emqx_limiter_server:update_config(bytes_in),
|
emqx_limiter_server:restart(bytes_in),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
|
|
||||||
Limiter = init_limiter(),
|
Limiter = init_limiter(),
|
||||||
|
@ -502,7 +502,7 @@ t_ensure_rate_limit(_) ->
|
||||||
?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
|
?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
|
||||||
|
|
||||||
emqx_config:put(Path, PerClient),
|
emqx_config:put(Path, PerClient),
|
||||||
emqx_limiter_server:update_config(bytes_in),
|
emqx_limiter_server:restart(bytes_in),
|
||||||
timer:sleep(100).
|
timer:sleep(100).
|
||||||
|
|
||||||
t_parse_incoming(_) ->
|
t_parse_incoming(_) ->
|
||||||
|
|
|
@ -359,7 +359,7 @@ t_flow_control(_) ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
|
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
|
||||||
emqx_limiter_manager:restart_server(shared),
|
emqx_limiter_manager:restart_server(batch),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
|
|
||||||
emqx_retainer_dispatcher:refresh_limiter(),
|
emqx_retainer_dispatcher:refresh_limiter(),
|
||||||
|
@ -408,7 +408,7 @@ t_flow_control(_) ->
|
||||||
|
|
||||||
%% recover the limiter
|
%% recover the limiter
|
||||||
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
|
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
|
||||||
emqx_limiter_manager:restart_server(shared),
|
emqx_limiter_manager:restart_server(batch),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
|
|
||||||
emqx_retainer_dispatcher:refresh_limiter(),
|
emqx_retainer_dispatcher:refresh_limiter(),
|
||||||
|
|
Loading…
Reference in New Issue