fix(limiter): refresh dispatcher limiter when retainer config updated

This commit is contained in:
firest 2022-06-15 11:10:56 +08:00
parent 6ca58e5fbc
commit fa99b65c91
3 changed files with 14 additions and 5 deletions

View File

@ -112,6 +112,10 @@
%% If no bucket path is set in config, there will be no limit %% If no bucket path is set in config, there will be no limit
connect(_Type, undefined) -> connect(_Type, undefined) ->
{ok, emqx_htb_limiter:make_infinity_limiter()}; {ok, emqx_htb_limiter:make_infinity_limiter()};
%% Workaround.
%% After API updated some config, the bucket name maybe become (converted from empty binary)
connect(_Type, '') ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
connect(Type, BucketName) when is_atom(BucketName) -> connect(Type, BucketName) when is_atom(BucketName) ->
case get_bucket_cfg(Type, BucketName) of case get_bucket_cfg(Type, BucketName) of
undefined -> undefined ->

View File

@ -201,6 +201,7 @@ init([]) ->
handle_call({update_config, NewConf, OldConf}, _, State) -> handle_call({update_config, NewConf, OldConf}, _, State) ->
State2 = update_config(State, NewConf, OldConf), State2 = update_config(State, NewConf, OldConf),
emqx_retainer_dispatcher:refresh_limiter(NewConf),
{reply, ok, State2}; {reply, ok, State2};
handle_call(clean, _, #{context := Context} = State) -> handle_call(clean, _, #{context := Context} = State) ->
clean(Context), clean(Context),

View File

@ -26,6 +26,7 @@
start_link/2, start_link/2,
dispatch/2, dispatch/2,
refresh_limiter/0, refresh_limiter/0,
refresh_limiter/1,
wait_dispatch_complete/1, wait_dispatch_complete/1,
worker/0 worker/0
]). ]).
@ -51,13 +52,16 @@
dispatch(Context, Topic) -> dispatch(Context, Topic) ->
cast({?FUNCTION_NAME, Context, self(), Topic}). cast({?FUNCTION_NAME, Context, self(), Topic}).
%% sometimes it is necessary to reset the client's limiter after updated the limiter's config %% reset the client's limiter after updated the limiter's config
%% an limiter update handler maybe added later, now this is a workaround
refresh_limiter() -> refresh_limiter() ->
Conf = emqx:get_config([retainer]),
refresh_limiter(Conf).
refresh_limiter(Conf) ->
Workers = gproc_pool:active_workers(?POOL), Workers = gproc_pool:active_workers(?POOL),
lists:foreach( lists:foreach(
fun({_, Pid}) -> fun({_, Pid}) ->
gen_server:cast(Pid, ?FUNCTION_NAME) gen_server:cast(Pid, {?FUNCTION_NAME, Conf})
end, end,
Workers Workers
). ).
@ -150,8 +154,8 @@ handle_call(Req, _From, State) ->
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
{noreply, State#{limiter := Limiter2}}; {noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) -> handle_cast({refresh_limiter, Conf}, State) ->
BucketName = emqx_conf:get([retainer, flow_control, batch_deliver_limiter]), BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
{ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}}; {noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->