diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index bf4d266a5..59248a0b8 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -982,8 +982,7 @@ check_limiter( _ -> %% if there has a retry timer, %% cache the operation and execute it after the retry is over - %% TODO: maybe we need to set socket to passive if size of queue is very large - %% because we queue up lots of ops that checks with the limiters. + %% the maximum length of the cache queue is equal to the active_n New = #cache{need = Needs, data = Data, next = WhenOk}, {ok, State#state{limiter_cache = queue:in(New, Cache)}} end; diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index e8367de18..34bafb1de 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -468,6 +468,8 @@ t_handle_in_qos1_publish(_) -> t_handle_in_qos2_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end), Channel = channel(#{conn_state => connected, session => session()}), + %% waiting limiter server + timer:sleep(200), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} = emqx_channel:handle_in(Publish1, Channel), @@ -482,6 +484,8 @@ t_handle_in_qos2_publish_with_error_return(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), Channel = channel(#{conn_state => connected, session => Session}), + %% waiting limiter server + timer:sleep(200), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = emqx_channel:handle_in(Publish1, Channel),