fix(retainer): check does the bucket is exists when update the retainer config

This commit is contained in:
firest 2022-06-17 10:25:08 +08:00
parent c47d28cdc3
commit 2444970b0b
3 changed files with 33 additions and 7 deletions

View File

@ -112,10 +112,6 @@
%% If no bucket path is set in config, there will be no limit
connect(_Type, undefined) ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
%% Workaround.
%% After API updated some config, the bucket name maybe become (converted from empty binary)
connect(_Type, '') ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
connect(Type, BucketName) when is_atom(BucketName) ->
case get_bucket_cfg(Type, BucketName) of
undefined ->

View File

@ -151,8 +151,13 @@ config(get, _) ->
{200, emqx:get_raw_config([retainer])};
config(put, #{body := Body}) ->
try
{ok, _} = emqx_retainer:update_config(Body),
{200, emqx:get_raw_config([retainer])}
check_bucket_exists(
Body,
fun(Conf) ->
{ok, _} = emqx_retainer:update_config(Conf),
{200, emqx:get_raw_config([retainer])}
end
)
catch
_:Reason:_ ->
{400, #{
@ -232,3 +237,28 @@ check_backend(Type, Params, Cont) ->
_ ->
{400, 'BAD_REQUEST', <<"This API only support built in database">>}
end.
check_bucket_exists(
#{
<<"flow_control">> :=
#{<<"batch_deliver_limiter">> := Name} = Flow
} = Conf,
Cont
) ->
case erlang:binary_to_atom(Name) of
'' ->
%% workaround, empty string means set the value to undefined,
%% but now, we can't store `undefined` in the config file correct,
%% but, we can delete this field
Cont(Conf#{
<<"flow_control">> := maps:remove(<<"batch_deliver_limiter">>, Flow)
});
Bucket ->
Path = emqx_limiter_schema:get_bucket_cfg_path(batch, Bucket),
case emqx:get_config(Path, undefined) of
undefined ->
{400, 'BAD_REQUEST', <<"The limiter bucket not exists">>};
_ ->
Cont(Conf)
end
end.

View File

@ -115,7 +115,7 @@ start_link(Pool, Id) ->
init([Pool, Id]) ->
erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
BucketName = emqx_conf:get([retainer, flow_control, batch_deliver_limiter], undefined),
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
{ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.