fix(emqx_slow_subs): fix test case error
This commit is contained in:
parent
b09683bfcd
commit
2fcc24dea6
|
@ -129,9 +129,8 @@
|
||||||
%% Awaiting PUBREL Timeout (Unit: millisecond)
|
%% Awaiting PUBREL Timeout (Unit: millisecond)
|
||||||
await_rel_timeout :: timeout(),
|
await_rel_timeout :: timeout(),
|
||||||
%% Created at
|
%% Created at
|
||||||
created_at :: pos_integer(),
|
created_at :: pos_integer()
|
||||||
%% Message deliver latency stats
|
%% Message deliver latency stats
|
||||||
latency_stats :: emqx_message_latency_stats:stats()
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
||||||
|
@ -615,7 +614,7 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
||||||
|
|
||||||
-spec(retry(emqx_types:clientinfo(), session()) ->
|
-spec(retry(emqx_types:clientinfo(), session()) ->
|
||||||
{ok, session()} | {ok, replies(), timeout(), session()}).
|
{ok, session()} | {ok, replies(), timeout(), session()}).
|
||||||
retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
|
retry(ClientInfo, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:is_empty(Inflight) of
|
case emqx_inflight:is_empty(Inflight) of
|
||||||
true -> {ok, Session};
|
true -> {ok, Session};
|
||||||
false ->
|
false ->
|
||||||
|
@ -637,12 +636,8 @@ retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More],
|
||||||
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) ->
|
do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data,
|
||||||
Update = Data#inflight_data{timestamp = Now},
|
Now, Acc, Inflight, ClientInfo) ->
|
||||||
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
|
||||||
{[{pubrel, PacketId} | Acc], Inflight1};
|
|
||||||
|
|
||||||
do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight, ClientInfo) ->
|
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
||||||
|
@ -653,7 +648,12 @@ do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Da
|
||||||
Update = Data#inflight_data{message = Msg1, timestamp = Now},
|
Update = Data#inflight_data{message = Msg1, timestamp = Now},
|
||||||
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
||||||
{[{PacketId, Msg1} | Acc], Inflight1}
|
{[{PacketId, Msg1} | Acc], Inflight1}
|
||||||
end.
|
end;
|
||||||
|
|
||||||
|
do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) ->
|
||||||
|
Update = Data#inflight_data{timestamp = Now},
|
||||||
|
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
||||||
|
{[{pubrel, PacketId} | Acc], Inflight1}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Expire Awaiting Rel
|
%% Expire Awaiting Rel
|
||||||
|
@ -770,7 +770,7 @@ mark_begin_deliver(Msg) ->
|
||||||
|
|
||||||
-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
|
-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
|
||||||
|
|
||||||
sort_fun(A, B) ->
|
sort_fun({_, A}, {_, B}) ->
|
||||||
A#inflight_data.timestamp =< B#inflight_data.timestamp.
|
A#inflight_data.timestamp =< B#inflight_data.timestamp.
|
||||||
|
|
||||||
batch_n(Inflight) ->
|
batch_n(Inflight) ->
|
||||||
|
|
|
@ -25,7 +25,12 @@
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
-define(NOW, erlang:system_time(millisecond)).
|
-define(NOW, erlang:system_time(millisecond)).
|
||||||
-record(pubrel_await, {timestamp :: non_neg_integer()}).
|
|
||||||
|
-type inflight_data_phase() :: wait_ack | wait_comp.
|
||||||
|
|
||||||
|
-record(inflight_data, { phase :: inflight_data_phase()
|
||||||
|
, message :: emqx_types:message()
|
||||||
|
, timestamp :: non_neg_integer()}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% CT callbacks
|
%% CT callbacks
|
||||||
|
@ -167,14 +172,14 @@ t_is_awaiting_full_true(_) ->
|
||||||
|
|
||||||
t_puback(_) ->
|
t_puback(_) ->
|
||||||
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
|
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
|
||||||
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg), emqx_inflight:new()),
|
||||||
Session = session(#{inflight => Inflight, mqueue => mqueue()}),
|
Session = session(#{inflight => Inflight, mqueue => mqueue()}),
|
||||||
{ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
|
{ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
||||||
|
|
||||||
t_puback_with_dequeue(_) ->
|
t_puback_with_dequeue(_) ->
|
||||||
Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>),
|
Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>),
|
||||||
Inflight = emqx_inflight:insert(1, {Msg1, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg1), emqx_inflight:new()),
|
||||||
Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
|
Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
|
||||||
{_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
|
{_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
|
||||||
Session = session(#{inflight => Inflight, mqueue => Q}),
|
Session = session(#{inflight => Inflight, mqueue => Q}),
|
||||||
|
@ -184,7 +189,7 @@ t_puback_with_dequeue(_) ->
|
||||||
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
|
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
|
||||||
|
|
||||||
t_puback_error_packet_id_in_use(_) ->
|
t_puback_error_packet_id_in_use(_) ->
|
||||||
Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
|
||||||
emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
|
emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
|
||||||
|
|
||||||
|
@ -193,13 +198,13 @@ t_puback_error_packet_id_not_found(_) ->
|
||||||
|
|
||||||
t_pubrec(_) ->
|
t_pubrec(_) ->
|
||||||
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
|
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
|
||||||
Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(2, with_ts(wait_ack, Msg), emqx_inflight:new()),
|
||||||
Session = session(#{inflight => Inflight}),
|
Session = session(#{inflight => Inflight}),
|
||||||
{ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session),
|
{ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session),
|
||||||
?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
|
?assertMatch([#inflight_data{phase = wait_comp}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
|
||||||
|
|
||||||
t_pubrec_packet_id_in_use_error(_) ->
|
t_pubrec_packet_id_in_use_error(_) ->
|
||||||
Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
|
||||||
emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})).
|
emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})).
|
||||||
|
|
||||||
|
@ -215,7 +220,7 @@ t_pubrel_error_packetid_not_found(_) ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()).
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()).
|
||||||
|
|
||||||
t_pubcomp(_) ->
|
t_pubcomp(_) ->
|
||||||
Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
|
||||||
Session = session(#{inflight => Inflight}),
|
Session = session(#{inflight => Inflight}),
|
||||||
{ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
|
{ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
||||||
|
@ -272,9 +277,11 @@ t_deliver_qos1(_) ->
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
|
||||||
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
|
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
|
||||||
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
|
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
|
||||||
{ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
|
{ok, Msg1T, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
|
||||||
|
?assertEqual(Msg1, remove_deliver_flag(Msg1T)),
|
||||||
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
|
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
|
||||||
{ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
|
{ok, Msg2T, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
|
||||||
|
?assertEqual(Msg2, remove_deliver_flag(Msg2T)),
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
|
||||||
|
|
||||||
t_deliver_qos2(_) ->
|
t_deliver_qos2(_) ->
|
||||||
|
@ -319,8 +326,9 @@ t_retry(_) ->
|
||||||
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
|
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
|
||||||
ElapseMs = 200, %% 0.2s
|
ElapseMs = 200, %% 0.2s
|
||||||
ok = timer:sleep(ElapseMs),
|
ok = timer:sleep(ElapseMs),
|
||||||
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
|
Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs],
|
||||||
{ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
|
{ok, Msgs1T, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
|
||||||
|
?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)),
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -344,7 +352,7 @@ t_replay(_) ->
|
||||||
Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
|
Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
|
||||||
Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
|
Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
|
||||||
{ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
|
{ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
|
||||||
?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
|
?assertEqual(Pubs1 ++ [{3, Msg}], remove_deliver_flag(ReplayPubs)),
|
||||||
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
|
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
|
||||||
|
|
||||||
t_expire_awaiting_rel(_) ->
|
t_expire_awaiting_rel(_) ->
|
||||||
|
@ -404,3 +412,32 @@ ts(second) ->
|
||||||
erlang:system_time(second);
|
erlang:system_time(second);
|
||||||
ts(millisecond) ->
|
ts(millisecond) ->
|
||||||
erlang:system_time(millisecond).
|
erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
with_ts(Phase, Msg) ->
|
||||||
|
with_ts(Phase, Msg, erlang:system_time(millisecond)).
|
||||||
|
|
||||||
|
with_ts(Phase, Msg, Ts) ->
|
||||||
|
#inflight_data{phase = Phase,
|
||||||
|
message = Msg,
|
||||||
|
timestamp = Ts}.
|
||||||
|
|
||||||
|
remove_deliver_flag({Id, Data}) ->
|
||||||
|
{Id, remove_deliver_flag(Data)};
|
||||||
|
|
||||||
|
remove_deliver_flag(#inflight_data{message = Msg} = Data) ->
|
||||||
|
Data#inflight_data{message = remove_deliver_flag(Msg)};
|
||||||
|
|
||||||
|
remove_deliver_flag(List) when is_list(List) ->
|
||||||
|
lists:map(fun remove_deliver_flag/1, List);
|
||||||
|
|
||||||
|
remove_deliver_flag(Msg) ->
|
||||||
|
emqx_message:remove_header(deliver_begin_at, Msg).
|
||||||
|
|
||||||
|
inflight_data_to_msg({Id, Data}) ->
|
||||||
|
{Id, inflight_data_to_msg(Data)};
|
||||||
|
|
||||||
|
inflight_data_to_msg(#inflight_data{message = Msg}) ->
|
||||||
|
Msg;
|
||||||
|
|
||||||
|
inflight_data_to_msg(List) when is_list(List) ->
|
||||||
|
lists:map(fun inflight_data_to_msg/1, List).
|
||||||
|
|
Loading…
Reference in New Issue