From 2fcc24dea600699348f9a89d243da16b0afdfa34 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 10 Feb 2022 14:30:49 +0800 Subject: [PATCH] fix(emqx_slow_subs): fix test case error --- apps/emqx/src/emqx_session.erl | 22 +++++----- apps/emqx/test/emqx_session_SUITE.erl | 63 +++++++++++++++++++++------ 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 7851ae56f..003a4cd27 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -129,9 +129,8 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer(), + created_at :: pos_integer() %% Message deliver latency stats - latency_stats :: emqx_message_latency_stats:stats() }). @@ -615,7 +614,7 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> -spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> +retry(ClientInfo, Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> @@ -637,12 +636,8 @@ retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More], {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) -> - Update = Data#inflight_data{timestamp = Now}, - Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), - {[{pubrel, PacketId} | Acc], Inflight1}; - -do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight, ClientInfo) -> +do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, + Now, Acc, Inflight, ClientInfo) -> case emqx_message:is_expired(Msg) of true -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), @@ -653,7 +648,12 @@ do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Da Update = Data#inflight_data{message = Msg1, timestamp = Now}, Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {[{PacketId, Msg1} | Acc], Inflight1} - end. + end; + +do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) -> + Update = Data#inflight_data{timestamp = Now}, + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), + {[{pubrel, PacketId} | Acc], Inflight1}. %%-------------------------------------------------------------------- %% Expire Awaiting Rel @@ -770,7 +770,7 @@ mark_begin_deliver(Msg) -> -compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}). -sort_fun(A, B) -> +sort_fun({_, A}, {_, B}) -> A#inflight_data.timestamp =< B#inflight_data.timestamp. batch_n(Inflight) -> diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 9932424d7..fe8b78c24 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -25,7 +25,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). -define(NOW, erlang:system_time(millisecond)). --record(pubrel_await, {timestamp :: non_neg_integer()}). + +-type inflight_data_phase() :: wait_ack | wait_comp. + +-record(inflight_data, { phase :: inflight_data_phase() + , message :: emqx_types:message() + , timestamp :: non_neg_integer()}). %%-------------------------------------------------------------------- %% CT callbacks @@ -167,14 +172,14 @@ t_is_awaiting_full_true(_) -> t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), - Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg), emqx_inflight:new()), Session = session(#{inflight => Inflight, mqueue => mqueue()}), {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_puback_with_dequeue(_) -> Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>), - Inflight = emqx_inflight:insert(1, {Msg1, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg1), emqx_inflight:new()), Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), @@ -184,7 +189,7 @@ t_puback_with_dequeue(_) -> ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). t_puback_error_packet_id_in_use(_) -> - Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})). @@ -193,13 +198,13 @@ t_puback_error_packet_id_not_found(_) -> t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), - Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(2, with_ts(wait_ack, Msg), emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session), - ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). + ?assertMatch([#inflight_data{phase = wait_comp}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> - Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})). @@ -215,7 +220,7 @@ t_pubrel_error_packetid_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()). t_pubcomp(_) -> - Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). @@ -272,9 +277,11 @@ t_deliver_qos1(_) -> ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), + {ok, Msg1T, Session2} = emqx_session:puback(clientinfo(), 1, Session1), + ?assertEqual(Msg1, remove_deliver_flag(Msg1T)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), + {ok, Msg2T, Session3} = emqx_session:puback(clientinfo(), 2, Session2), + ?assertEqual(Msg2, remove_deliver_flag(Msg2T)), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> @@ -319,8 +326,9 @@ t_retry(_) -> {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ElapseMs = 200, %% 0.2s ok = timer:sleep(ElapseMs), - Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), + Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs], + {ok, Msgs1T, 100, Session2} = emqx_session:retry(clientinfo(), Session1), + ?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -344,7 +352,7 @@ t_replay(_) -> Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), - ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), + ?assertEqual(Pubs1 ++ [{3, Msg}], remove_deliver_flag(ReplayPubs)), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). t_expire_awaiting_rel(_) -> @@ -404,3 +412,32 @@ ts(second) -> erlang:system_time(second); ts(millisecond) -> erlang:system_time(millisecond). + +with_ts(Phase, Msg) -> + with_ts(Phase, Msg, erlang:system_time(millisecond)). + +with_ts(Phase, Msg, Ts) -> + #inflight_data{phase = Phase, + message = Msg, + timestamp = Ts}. + +remove_deliver_flag({Id, Data}) -> + {Id, remove_deliver_flag(Data)}; + +remove_deliver_flag(#inflight_data{message = Msg} = Data) -> + Data#inflight_data{message = remove_deliver_flag(Msg)}; + +remove_deliver_flag(List) when is_list(List) -> + lists:map(fun remove_deliver_flag/1, List); + +remove_deliver_flag(Msg) -> + emqx_message:remove_header(deliver_begin_at, Msg). + +inflight_data_to_msg({Id, Data}) -> + {Id, inflight_data_to_msg(Data)}; + +inflight_data_to_msg(#inflight_data{message = Msg}) -> + Msg; + +inflight_data_to_msg(List) when is_list(List) -> + lists:map(fun inflight_data_to_msg/1, List).