From fa99b65c91e8ab29d649e503c696bb1e4c3ec8ee Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 15 Jun 2022 11:10:56 +0800 Subject: [PATCH] fix(limiter): refresh dispatcher limiter when retainer config updated --- .../src/emqx_limiter/src/emqx_limiter_server.erl | 4 ++++ apps/emqx_retainer/src/emqx_retainer.erl | 1 + .../emqx_retainer/src/emqx_retainer_dispatcher.erl | 14 +++++++++----- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 519b32eca..d540497fc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -112,6 +112,10 @@ %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; +%% Workaround. +%% After API updated some config, the bucket name maybe become ‘’ (converted from empty binary) +connect(_Type, '') -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; connect(Type, BucketName) when is_atom(BucketName) -> case get_bucket_cfg(Type, BucketName) of undefined -> diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 4eb358ade..41cb9132e 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -201,6 +201,7 @@ init([]) -> handle_call({update_config, NewConf, OldConf}, _, State) -> State2 = update_config(State, NewConf, OldConf), + emqx_retainer_dispatcher:refresh_limiter(NewConf), {reply, ok, State2}; handle_call(clean, _, #{context := Context} = State) -> clean(Context), diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 3cabead3e..02ef6ecc0 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -26,6 +26,7 @@ start_link/2, dispatch/2, refresh_limiter/0, + refresh_limiter/1, wait_dispatch_complete/1, worker/0 ]). @@ -51,13 +52,16 @@ dispatch(Context, Topic) -> cast({?FUNCTION_NAME, Context, self(), Topic}). -%% sometimes it is necessary to reset the client's limiter after updated the limiter's config -%% an limiter update handler maybe added later, now this is a workaround +%% reset the client's limiter after updated the limiter's config refresh_limiter() -> + Conf = emqx:get_config([retainer]), + refresh_limiter(Conf). + +refresh_limiter(Conf) -> Workers = gproc_pool:active_workers(?POOL), lists:foreach( fun({_, Pid}) -> - gen_server:cast(Pid, ?FUNCTION_NAME) + gen_server:cast(Pid, {?FUNCTION_NAME, Conf}) end, Workers ). @@ -150,8 +154,8 @@ handle_call(Req, _From, State) -> handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {noreply, State#{limiter := Limiter2}}; -handle_cast(refresh_limiter, State) -> - BucketName = emqx_conf:get([retainer, flow_control, batch_deliver_limiter]), +handle_cast({refresh_limiter, Conf}, State) -> + BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) ->