From e5d223000ed618e0ffee19d2e312108c4bb2174d Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 14 Jun 2022 17:46:53 +0800 Subject: [PATCH 1/6] fix(limiter): set maximum value for `infinity` rate and capacity There are now two types of limiters, `infinity` and `limited`. When `infinity` is updated to `limited` by config, the changes only take effect for new users. When `limited` is updated to `infinity`, old users will never get tokens, because the `countes` they hold are no longer updated. Setting the maximum value for `infinity` rate and capacity can unify these two limiters and slove this problem --- .../src/emqx_limiter_bucket_ref.erl | 8 +--- .../emqx_limiter/src/emqx_limiter_schema.erl | 19 ++++++-- .../emqx_limiter/src/emqx_limiter_server.erl | 48 +++++++------------ 3 files changed, 34 insertions(+), 41 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl index 34110b161..ad9f6a7cc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl @@ -50,13 +50,7 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new( - undefined | counters:countres_ref(), - undefined | index(), - rate() -) -> bucket_ref(). -new(undefined, _, _) -> - infinity; +-spec new(counters:countres_ref(), index(), rate()) -> bucket_ref(). new(Counter, Index, Rate) -> #{ counter => Counter, diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index c050956ec..835661654 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -30,7 +30,8 @@ namespace/0, get_bucket_cfg_path/2, desc/1, - types/0 + types/0, + infinity_value/0, ]). -define(KILOBYTE, 1024). @@ -46,7 +47,7 @@ -type rate() :: infinity | float(). -type burst_rate() :: 0 | float(). %% the capacity of the token bucket --type capacity() :: infinity | number(). +-type capacity() :: non_neg_integer(). %% initial capacity of the token bucket -type initial() :: non_neg_integer(). -type bucket_path() :: list(atom()). @@ -207,6 +208,18 @@ types() -> %% Internal functions %%-------------------------------------------------------------------- +%% `infinity` to `infinity_value` rules: +%% 1. all infinity capacity will change to infinity_value +%% 2. if the rate of global and bucket both are `infinity`, +%% use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2` +infinity_value() -> + %% 1 TB + 1099511627776. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + to_burst_rate(Str) -> to_rate(Str, false, true). @@ -294,7 +307,7 @@ to_quota(Str, Regex) -> {match, [Quota, ""]} -> {ok, erlang:list_to_integer(Quota)}; {match, ""} -> - {ok, infinity}; + {ok, infinity_value()}; _ -> {error, Str} end diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 9c344c752..c1956f780 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -118,18 +118,16 @@ connect(Type, BucketName) when is_atom(BucketName) -> ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}), {error, config_not_found}; #{ - rate := AggrRate, - capacity := AggrSize, + rate := BucketRate, + capacity := BucketSize, per_client := #{rate := CliRate, capacity := CliSize} = Cfg } -> case emqx_limiter_manager:find_bucket(Type, BucketName) of {ok, Bucket} -> {ok, if - CliRate < AggrRate orelse CliSize < AggrSize -> + CliRate < BucketRate orelse CliSize < BucketSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); - Bucket =:= infinity -> - emqx_htb_limiter:make_infinity_limiter(); true -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end}; @@ -372,9 +370,6 @@ longitudinal( case lists:min([ShouldAlloc, Flow, Capacity]) of Available when Available > 0 -> - %% XXX if capacity is infinity, and flow always > 0, the value in - %% counter will be overflow at some point in the future, do we need - %% to deal with this situation??? {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), counters:add(Counter, Index, Inc), @@ -491,26 +486,14 @@ make_root(#{rate := Rate, burst := Burst}) -> make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) -> Path = emqx_limiter_manager:make_path(Type, Name), - case get_counter_rate(Conf, GlobalCfg) of - infinity -> - Rate = infinity, - Capacity = infinity, - Initial = 0, - Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate), - emqx_limiter_manager:insert_bucket(Path, Ref), - CounterNum2 = CounterNum, - InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - State#{buckets := Buckets#{BucketName => Bucket}} - end; - Rate -> - #{capacity := Capacity} = Conf, - Initial = get_initial_val(Conf), - CounterNum2 = CounterNum + 1, - InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State), - Bucket2 = Bucket#{counter := Counter, index := Idx}, - State2#{buckets := Buckets#{BucketName => Bucket2}} - end + Rate = get_counter_rate(Conf, GlobalCfg), + #{capacity := Capacity} = Conf, + Initial = get_initial_val(Conf), + CounterNum2 = CounterNum + 1, + InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> + {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State), + Bucket2 = Bucket#{counter := Counter, index := Idx}, + State2#{buckets := Buckets#{BucketName => Bucket2}} end, Bucket = #{ @@ -569,8 +552,10 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> %% @doc find first limited node get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity -> Rate; -get_counter_rate(_Cfg, #{rate := Rate}) -> - Rate. +get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity -> + Rate; +get_counter_rate(_Cfg, _GlobalCfg) -> + emqx_limiter_schema:infinity_value(). -spec get_initial_val(hocons:config()) -> decimal(). get_initial_val(#{ @@ -579,12 +564,13 @@ get_initial_val(#{ capacity := Capacity }) -> %% initial will nevner be infinity(see the emqx_limiter_schema) + InfVal = emqx_limiter_schema:infinity_value(), if Initial > 0 -> Initial; Rate =/= infinity -> erlang:min(Rate, Capacity); - Capacity =/= infinity -> + Capacity =/= InfVal -> Capacity; true -> 0 From 6ca58e5fbcb0f44be7e87fee87e7ca665bc9cf87 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 15 Jun 2022 09:39:21 +0800 Subject: [PATCH 2/6] fix(limiter): fix test case error --- .../src/emqx_limiter/src/emqx_limiter_server.erl | 14 ++++++++------ apps/emqx/test/emqx_ratelimiter_SUITE.erl | 5 +++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index c1956f780..519b32eca 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -558,11 +558,13 @@ get_counter_rate(_Cfg, _GlobalCfg) -> emqx_limiter_schema:infinity_value(). -spec get_initial_val(hocons:config()) -> decimal(). -get_initial_val(#{ - initial := Initial, - rate := Rate, - capacity := Capacity -}) -> +get_initial_val( + #{ + initial := Initial, + rate := Rate, + capacity := Capacity + } +) -> %% initial will nevner be infinity(see the emqx_limiter_schema) InfVal = emqx_limiter_schema:infinity_value(), if @@ -570,7 +572,7 @@ get_initial_val(#{ Initial; Rate =/= infinity -> erlang:min(Rate, Capacity); - Capacity =/= InfVal -> + Capacity =/= infinity andalso Capacity =/= InfVal -> Capacity; true -> 0 diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index d6ad9f1a0..1251278f2 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -246,7 +246,8 @@ t_infinity_client(_) -> end, Case = fun() -> Client = connect(default), - ?assertEqual(infinity, Client), + InfVal = emqx_limiter_schema:infinity_value(), + ?assertMatch(#{bucket := #{rate := InfVal}}, Client), Result = emqx_htb_limiter:check(100000, Client), ?assertEqual({ok, Client}, Result) end, @@ -596,7 +597,7 @@ t_schema_unit(_) -> ?assertMatch({error, _}, M:to_rate("100MB/1")), ?assertMatch({error, _}, M:to_rate("100/10x")), - ?assertEqual({ok, infinity}, M:to_capacity("infinity")), + ?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")), ?assertEqual({ok, 100}, M:to_capacity("100")), ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")), ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")), From fa99b65c91e8ab29d649e503c696bb1e4c3ec8ee Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 15 Jun 2022 11:10:56 +0800 Subject: [PATCH 3/6] fix(limiter): refresh dispatcher limiter when retainer config updated --- .../src/emqx_limiter/src/emqx_limiter_server.erl | 4 ++++ apps/emqx_retainer/src/emqx_retainer.erl | 1 + .../emqx_retainer/src/emqx_retainer_dispatcher.erl | 14 +++++++++----- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 519b32eca..d540497fc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -112,6 +112,10 @@ %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; +%% Workaround. +%% After API updated some config, the bucket name maybe become ‘’ (converted from empty binary) +connect(_Type, '') -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; connect(Type, BucketName) when is_atom(BucketName) -> case get_bucket_cfg(Type, BucketName) of undefined -> diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 4eb358ade..41cb9132e 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -201,6 +201,7 @@ init([]) -> handle_call({update_config, NewConf, OldConf}, _, State) -> State2 = update_config(State, NewConf, OldConf), + emqx_retainer_dispatcher:refresh_limiter(NewConf), {reply, ok, State2}; handle_call(clean, _, #{context := Context} = State) -> clean(Context), diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 3cabead3e..02ef6ecc0 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -26,6 +26,7 @@ start_link/2, dispatch/2, refresh_limiter/0, + refresh_limiter/1, wait_dispatch_complete/1, worker/0 ]). @@ -51,13 +52,16 @@ dispatch(Context, Topic) -> cast({?FUNCTION_NAME, Context, self(), Topic}). -%% sometimes it is necessary to reset the client's limiter after updated the limiter's config -%% an limiter update handler maybe added later, now this is a workaround +%% reset the client's limiter after updated the limiter's config refresh_limiter() -> + Conf = emqx:get_config([retainer]), + refresh_limiter(Conf). + +refresh_limiter(Conf) -> Workers = gproc_pool:active_workers(?POOL), lists:foreach( fun({_, Pid}) -> - gen_server:cast(Pid, ?FUNCTION_NAME) + gen_server:cast(Pid, {?FUNCTION_NAME, Conf}) end, Workers ). @@ -150,8 +154,8 @@ handle_call(Req, _From, State) -> handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {noreply, State#{limiter := Limiter2}}; -handle_cast(refresh_limiter, State) -> - BucketName = emqx_conf:get([retainer, flow_control, batch_deliver_limiter]), +handle_cast({refresh_limiter, Conf}, State) -> + BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> From c147743895c13d84f687a3e10a16293f2153300c Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 15 Jun 2022 14:57:49 +0800 Subject: [PATCH 4/6] fix(limiter): move default connection setting into schema --- apps/emqx/etc/emqx.conf | 5 ----- .../src/emqx_limiter/etc/emqx_limiter.conf | 11 ----------- .../emqx_limiter/src/emqx_limiter_schema.erl | 18 ++++++++++++++++-- apps/emqx/src/emqx_schema.erl | 2 +- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index f3dee0bcf..bd3ae4af0 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1,13 +1,11 @@ listeners.tcp.default { bind = "0.0.0.0:1883" max_connections = 1024000 - limiter.connection = default } listeners.ssl.default { bind = "0.0.0.0:8883" max_connections = 512000 - limiter.connection = default ssl_options { keyfile = "{{ platform_etc_dir }}/certs/key.pem" certfile = "{{ platform_etc_dir }}/certs/cert.pem" @@ -18,14 +16,12 @@ listeners.ssl.default { listeners.ws.default { bind = "0.0.0.0:8083" max_connections = 1024000 - limiter.connection = default websocket.mqtt_path = "/mqtt" } listeners.wss.default { bind = "0.0.0.0:8084" max_connections = 512000 - limiter.connection = default websocket.mqtt_path = "/mqtt" ssl_options { keyfile = "{{ platform_etc_dir }}/certs/key.pem" @@ -38,7 +34,6 @@ listeners.wss.default { # enabled = false # bind = "0.0.0.0:14567" # max_connections = 1024000 -# limiter.connection = default # keyfile = "{{ platform_etc_dir }}/certs/key.pem" # certfile = "{{ platform_etc_dir }}/certs/cert.pem" #} diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index 36c48dde3..e69de29bb 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -1,11 +0,0 @@ -limiter { - connection { - rate = "1000/s" - bucket { - default { - rate = "1000/s" - capacity = 1000 - } - } - } -} diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 835661654..1e4679ee3 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -31,7 +31,7 @@ get_bucket_cfg_path/2, desc/1, types/0, - infinity_value/0, + infinity_value/0 ]). -define(KILOBYTE, 1024). @@ -89,7 +89,7 @@ fields(limiter) -> {Type, ?HOCON(?R_REF(limiter_opts), #{ desc => ?DESC(Type), - default => #{} + default => make_limiter_default(Type) })} || Type <- types() ]; @@ -321,3 +321,17 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE; apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). + +make_limiter_default(connection) -> + #{ + <<"rate">> => <<"1000/s">>, + <<"bucket">> => #{ + <<"default">> => + #{ + <<"rate">> => <<"1000/s">>, + <<"capacity">> => 1000 + } + } + }; +make_limiter_default(_) -> + #{}. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8bd1da1f9..90b696fcd 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1613,7 +1613,7 @@ base_listener(Bind) -> map("ratelimit_name", emqx_limiter_schema:bucket_name()), #{ desc => ?DESC(base_listener_limiter), - default => #{} + default => #{<<"connection">> => <<"default">>} } )} ]. From 4e05d751c1277f9ee5313eb8239fd8fbfa1ed945 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 15 Jun 2022 16:01:16 +0800 Subject: [PATCH 5/6] fix(limiter): change limiter log level to debug --- apps/emqx/src/emqx_connection.erl | 4 ++-- apps/emqx/src/emqx_ws_connection.erl | 4 ++-- apps/emqx_retainer/src/emqx_retainer_dispatcher.erl | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index e71361364..bf4d266a5 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -956,7 +956,7 @@ check_limiter( {ok, Limiter2} -> WhenOk(Data, Msgs, State#state{limiter = Limiter2}); {pause, Time, Limiter2} -> - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "pause_time_dueto_rate_limit", needs => Needs, time_in_ms => Time @@ -1006,7 +1006,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> } ); {pause, Time, Limiter2} -> - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "pause_time_dueto_rate_limit", types => Types, time_in_ms => Time diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index c76bbd393..2e4587b46 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -586,7 +586,7 @@ check_limiter( {ok, Limiter2} -> WhenOk(Data, Msgs, State#state{limiter = Limiter2}); {pause, Time, Limiter2} -> - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "pause_time_due_to_rate_limit", needs => Needs, time_in_ms => Time @@ -634,7 +634,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> } ); {pause, Time, Limiter2} -> - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "pause_time_due_to_rate_limit", types => Types, time_in_ms => Time diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 02ef6ecc0..a0121f721 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -277,7 +277,7 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> do_deliver(ToDelivers, Pid, Topic), do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); {drop, _} = Drop -> - ?SLOG(error, #{ + ?SLOG(debug, #{ msg => "retained_message_dropped", reason => "reached_ratelimit", dropped_count => length(ToDelivers) From 28d99397130e33265e2ff2472576622ad2ce3eee Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 15 Jun 2022 17:35:13 +0800 Subject: [PATCH 6/6] fix(limiter): fix test case errors --- apps/emqx/src/emqx_connection.erl | 3 +-- apps/emqx/test/emqx_channel_SUITE.erl | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index bf4d266a5..59248a0b8 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -982,8 +982,7 @@ check_limiter( _ -> %% if there has a retry timer, %% cache the operation and execute it after the retry is over - %% TODO: maybe we need to set socket to passive if size of queue is very large - %% because we queue up lots of ops that checks with the limiters. + %% the maximum length of the cache queue is equal to the active_n New = #cache{need = Needs, data = Data, next = WhenOk}, {ok, State#state{limiter_cache = queue:in(New, Cache)}} end; diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index e8367de18..34bafb1de 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -468,6 +468,8 @@ t_handle_in_qos1_publish(_) -> t_handle_in_qos2_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end), Channel = channel(#{conn_state => connected, session => session()}), + %% waiting limiter server + timer:sleep(200), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} = emqx_channel:handle_in(Publish1, Channel), @@ -482,6 +484,8 @@ t_handle_in_qos2_publish_with_error_return(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), Channel = channel(#{conn_state => connected, session => Session}), + %% waiting limiter server + timer:sleep(200), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = emqx_channel:handle_in(Publish1, Channel),