diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index d95856332..efd89236b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -29,7 +29,8 @@ find_bucket/2, insert_bucket/2, insert_bucket/3, make_path/2, - restart_server/1 + restart_server/1, + post_config_update/5 ]). %% gen_server callbacks @@ -68,7 +69,7 @@ start_server(Type) -> -spec restart_server(limiter_type()) -> _. restart_server(Type) -> - emqx_limiter_server_sup:restart(Type). + emqx_limiter_server:restart(Type). -spec find_bucket(limiter_type(), bucket_name()) -> {ok, bucket_ref()} | undefined. @@ -100,6 +101,10 @@ insert_bucket(Path, Bucket) -> make_path(Type, BucketName) -> [Type | BucketName]. +post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) -> + Config = maps:get(Type, NewConf), + emqx_limiter_server:update_config(Type, Config). + %%-------------------------------------------------------------------- %% @doc %% Starts the server @@ -130,6 +135,7 @@ start_link() -> | {stop, Reason :: term()} | ignore. init([]) -> + ok = emqx_config_handler:add_handler([limiter], ?MODULE), _ = ets:new(?TAB, [ set, public, @@ -204,6 +210,7 @@ handle_info(Info, State) -> State :: term() ) -> any(). terminate(_Reason, _State) -> + emqx_config_handler:remove_handler([limiter]), ok. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index eb450bcfb..52dab82dd 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -46,7 +46,8 @@ info/1, name/1, get_initial_val/1, - update_config/1 + restart/1, + update_config/2 ]). %% number of tokens generated per period @@ -87,7 +88,9 @@ -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). --define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). +-define(CALL(Type, Msg), gen_server:call(name(Type), Msg)). +-define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)). + %% minimum coefficient for overloaded limiter -define(OVERLOAD_MIN_ALLOC, 0.3). -define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end). @@ -145,10 +148,14 @@ info(Type) -> name(Type) -> erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). --spec update_config(limiter_type()) -> ok. -update_config(Type) -> +-spec restart(limiter_type()) -> ok. +restart(Type) -> ?CALL(Type). +-spec update_config(limiter_type(), hocons:config()) -> ok. +update_config(Type, Config) -> + ?CALL(Type, {update_config, Type, Config}). + %%-------------------------------------------------------------------- %% @doc %% Starts the server @@ -197,9 +204,12 @@ init([Type]) -> | {stop, Reason :: term(), NewState :: term()}. handle_call(info, _From, State) -> {reply, State, State}; -handle_call(update_config, _From, #{type := Type}) -> +handle_call(restart, _From, #{type := Type}) -> NewState = init_tree(Type), {reply, ok, NewState}; +handle_call({update_config, Type, Config}, _From, #{type := Type}) -> + NewState = init_tree(Type, Config), + {reply, ok, NewState}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -442,7 +452,11 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) -> {Alloced, Buckets}. -spec init_tree(emqx_limiter_schema:limiter_type()) -> state(). -init_tree(Type) -> +init_tree(Type) when is_atom(Type) -> + Cfg = emqx:get_config([limiter, Type]), + init_tree(Type, Cfg). + +init_tree(Type, #{bucket := Buckets} = Cfg) -> State = #{ type => Type, root => undefined, @@ -451,7 +465,6 @@ init_tree(Type) -> buckets => #{} }, - #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]), {Factor, Root} = make_root(Cfg), {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 0727d3235..71bc26eb2 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -19,7 +19,7 @@ -behaviour(supervisor). %% API --export([start_link/0, start/1, restart/1]). +-export([start_link/0, start/1]). %% Supervisor callbacks -export([init/1]). @@ -47,13 +47,6 @@ start(Type) -> Spec = make_child(Type), supervisor:start_child(?MODULE, Spec). -%% XXX This is maybe a workaround, not so good --spec restart(emqx_limiter_schema:limiter_type()) -> _. -restart(Type) -> - Id = emqx_limiter_server:name(Type), - _ = supervisor:terminate_child(?MODULE, Id), - supervisor:restart_child(?MODULE, Id). - %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index bb751f9ee..665ca8ba1 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -733,7 +733,7 @@ with_config(Path, Modifier, Case) -> NewCfg = Modifier(Cfg), ct:pal("test with config:~p~n", [NewCfg]), emqx_config:put(Path, NewCfg), - emqx_limiter_server:update_config(message_routing), + emqx_limiter_server:restart(message_routing), timer:sleep(500), DelayReturn = delay_return(Case), emqx_config:put(Path, Cfg), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 3ddac7d42..0d1b3c5bf 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -483,7 +483,7 @@ t_ensure_rate_limit(_) -> PerClient = emqx_config:get(Path), {ok, Rate} = emqx_limiter_schema:to_rate("50MB"), emqx_config:put(Path, PerClient#{rate := Rate}), - emqx_limiter_server:update_config(bytes_in), + emqx_limiter_server:restart(bytes_in), timer:sleep(100), Limiter = init_limiter(), @@ -502,7 +502,7 @@ t_ensure_rate_limit(_) -> ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)), emqx_config:put(Path, PerClient), - emqx_limiter_server:update_config(bytes_in), + emqx_limiter_server:restart(bytes_in), timer:sleep(100). t_parse_incoming(_) -> diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index f207bdd04..953752788 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -359,7 +359,7 @@ t_flow_control(_) -> } }, emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2), - emqx_limiter_manager:restart_server(shared), + emqx_limiter_manager:restart_server(batch), timer:sleep(500), emqx_retainer_dispatcher:refresh_limiter(), @@ -408,7 +408,7 @@ t_flow_control(_) -> %% recover the limiter emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg), - emqx_limiter_manager:restart_server(shared), + emqx_limiter_manager:restart_server(batch), timer:sleep(500), emqx_retainer_dispatcher:refresh_limiter(),