fix(limiter): fix test case errors
This commit is contained in:
parent
4e05d751c1
commit
28d9939713
|
@ -982,8 +982,7 @@ check_limiter(
|
||||||
_ ->
|
_ ->
|
||||||
%% if there has a retry timer,
|
%% if there has a retry timer,
|
||||||
%% cache the operation and execute it after the retry is over
|
%% cache the operation and execute it after the retry is over
|
||||||
%% TODO: maybe we need to set socket to passive if size of queue is very large
|
%% the maximum length of the cache queue is equal to the active_n
|
||||||
%% because we queue up lots of ops that checks with the limiters.
|
|
||||||
New = #cache{need = Needs, data = Data, next = WhenOk},
|
New = #cache{need = Needs, data = Data, next = WhenOk},
|
||||||
{ok, State#state{limiter_cache = queue:in(New, Cache)}}
|
{ok, State#state{limiter_cache = queue:in(New, Cache)}}
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -468,6 +468,8 @@ t_handle_in_qos1_publish(_) ->
|
||||||
t_handle_in_qos2_publish(_) ->
|
t_handle_in_qos2_publish(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end),
|
||||||
Channel = channel(#{conn_state => connected, session => session()}),
|
Channel = channel(#{conn_state => connected, session => session()}),
|
||||||
|
%% waiting limiter server
|
||||||
|
timer:sleep(200),
|
||||||
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||||
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
|
||||||
emqx_channel:handle_in(Publish1, Channel),
|
emqx_channel:handle_in(Publish1, Channel),
|
||||||
|
@ -482,6 +484,8 @@ t_handle_in_qos2_publish_with_error_return(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||||
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
|
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
|
||||||
Channel = channel(#{conn_state => connected, session => Session}),
|
Channel = channel(#{conn_state => connected, session => Session}),
|
||||||
|
%% waiting limiter server
|
||||||
|
timer:sleep(200),
|
||||||
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||||
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
|
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
|
||||||
emqx_channel:handle_in(Publish1, Channel),
|
emqx_channel:handle_in(Publish1, Channel),
|
||||||
|
|
Loading…
Reference in New Issue