diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index cf519fd5d..f4661a85e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -191,7 +191,9 @@ info(topic_aliases, #channel{topic_aliases = Aliases}) -> info(alias_maximum, #channel{alias_maximum = Limits}) -> Limits; info(timers, #channel{timers = Timers}) -> - Timers. + Timers; +info(session_state, #channel{session = Session}) -> + Session. set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 2e6714e7f..10cd3d6cc 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -1,5 +1,5 @@ %%------------------------------------------------------------------- -%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1ab256d32..ee7fb3eb9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -27,6 +27,11 @@ -include("emqx_persistent_session_ds.hrl"). +-ifdef(TEST). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %% Session API -export([ create/3, @@ -216,12 +221,12 @@ info(retry_interval, #{props := Conf}) -> maps:get(retry_interval, Conf); % info(mqueue, #sessmem{mqueue = MQueue}) -> % MQueue; -% info(mqueue_len, #sessmem{mqueue = MQueue}) -> -% emqx_mqueue:len(MQueue); +info(mqueue_len, #{inflight := Inflight}) -> + emqx_persistent_session_ds_inflight:n_buffered(Inflight); % info(mqueue_max, #sessmem{mqueue = MQueue}) -> % emqx_mqueue:max_len(MQueue); -% info(mqueue_dropped, #sessmem{mqueue = MQueue}) -> -% emqx_mqueue:dropped(MQueue); +info(mqueue_dropped, _Session) -> + 0; %% info(next_pkt_id, #{s := S}) -> %% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S), %% PacketId; @@ -750,6 +755,11 @@ drain_buffer(Session = #{inflight := Inflight0}) -> ((STREAM#ifs.last_seqno_qos1 =< COMMITTEDQOS1 orelse STREAM#ifs.last_seqno_qos1 =:= undefined) andalso (STREAM#ifs.last_seqno_qos2 =< COMMITTEDQOS2 orelse STREAM#ifs.last_seqno_qos2 =:= undefined))). +%% erlfmt-ignore +-define(last_replayed(STREAM, NEXTQOS1, NEXTQOS2), + ((STREAM#ifs.last_seqno_qos1 == NEXTQOS1 orelse STREAM#ifs.last_seqno_qos1 =:= undefined) andalso + (STREAM#ifs.last_seqno_qos2 == NEXTQOS2 orelse STREAM#ifs.last_seqno_qos2 =:= undefined))). + -spec find_replay_streams(session()) -> [{emqx_persistent_session_ds_state:stream_key(), stream_state()}]. find_replay_streams(#{s := S}) -> @@ -1002,9 +1012,6 @@ commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> seqno(). packet_id_to_seqno(PacketId, S) -> NextSeqNo = emqx_persistent_session_ds_state:get_seqno(?next(packet_id_to_qos(PacketId)), S), - packet_id_to_seqno_(PacketId, NextSeqNo). - -packet_id_to_seqno_(PacketId, NextSeqNo) -> Epoch = NextSeqNo bsr 15, SeqNo = (Epoch bsl 15) + (PacketId bsr 1), case SeqNo =< NextSeqNo of @@ -1059,6 +1066,74 @@ list_all_sessions() -> %%%% Proper generators: -%%%% Unit tests: +%% Generate a sequence number that smaller than the given `NextSeqNo' +%% number by at most `?EPOCH_SIZE': +seqno_gen(NextSeqNo) -> + WindowSize = ?EPOCH_SIZE - 1, + Min = max(0, NextSeqNo - WindowSize), + Max = max(0, NextSeqNo - 1), + range(Min, Max). + +%% Generate a sequence number: +next_seqno_gen() -> + ?LET( + {Epoch, Offset}, + {non_neg_integer(), non_neg_integer()}, + Epoch bsl 15 + Offset + ). + +%%%% Property-based tests: + +%% erlfmt-ignore +packet_id_to_seqno_prop() -> + ?FORALL( + {Qos, NextSeqNo}, {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()}, + ?FORALL( + ExpectedSeqNo, seqno_gen(NextSeqNo), + begin + PacketId = seqno_to_packet_id(Qos, ExpectedSeqNo), + SeqNo = packet_id_to_seqno(PacketId, NextSeqNo), + ?WHENFAIL( + begin + io:format(user, " *** PacketID = ~p~n", [PacketId]), + io:format(user, " *** SeqNo = ~p -> ~p~n", [ExpectedSeqNo, SeqNo]), + io:format(user, " *** NextSeqNo = ~p~n", [NextSeqNo]) + end, + PacketId < 16#10000 andalso SeqNo =:= ExpectedSeqNo + ) + end)). + +inc_seqno_prop() -> + ?FORALL( + {Qos, SeqNo}, + {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()}, + begin + NewSeqNo = inc_seqno(Qos, SeqNo), + PacketId = seqno_to_packet_id(Qos, NewSeqNo), + ?WHENFAIL( + begin + io:format(user, " *** SeqNo = ~p -> ~p~n", [SeqNo, NewSeqNo]), + io:format(user, " *** PacketId = ~p~n", [PacketId]) + end, + PacketId > 0 andalso PacketId < 16#10000 + ) + end + ). + +seqno_proper_test_() -> + Props = [packet_id_to_seqno_prop(), inc_seqno_prop()], + Opts = [{numtests, 10000}, {to_file, user}], + {timeout, 30, + {setup, + fun() -> + meck:new(emqx_persistent_session_ds_state, [no_history]), + ok = meck:expect(emqx_persistent_session_ds_state, get_seqno, fun(_Track, Seqno) -> + Seqno + end) + end, + fun(_) -> + meck:unload(emqx_persistent_session_ds_state) + end, + [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}}. -endif. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 64cd9c6a8..6c9da71e0 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -36,7 +36,7 @@ all() -> % NOTE % Tests are disabled while existing session persistence impl is being % phased out. - %{group, persistence_disabled}, + %%{group, persistence_disabled}, {group, persistence_enabled} ]. @@ -56,7 +56,7 @@ groups() -> TCsNonGeneric = [t_choose_impl], TCGroups = [{group, tcp}, {group, quic}, {group, ws}], [ - {persistence_disabled, TCGroups}, + %% {persistence_disabled, TCGroups}, {persistence_enabled, TCGroups}, {tcp, [], TCs}, {quic, [], TCs -- TCsNonGeneric}, @@ -782,8 +782,9 @@ t_publish_many_while_client_is_gone(Config) -> ClientOpts = [ {proto_ver, v5}, {clientid, ClientId}, - {properties, #{'Session-Expiry-Interval' => 30}}, - {auto_ack, never} + %, + {properties, #{'Session-Expiry-Interval' => 30}} + %{auto_ack, never} | Config ], @@ -810,7 +811,7 @@ t_publish_many_while_client_is_gone(Config) -> Msgs1 = receive_messages(NPubs1), ct:pal("Msgs1 = ~p", [Msgs1]), NMsgs1 = length(Msgs1), - ?assertEqual(NPubs1, NMsgs1), + ?assertEqual(NPubs1, NMsgs1, debug_info(ClientId)), ?assertEqual( get_topicwise_order(Pubs1), @@ -1084,3 +1085,12 @@ skip_ds_tc(Config) -> _ -> Config end. + +fail_with_debug_info(Exception, ClientId) -> + case emqx_cm:lookup_channels(ClientId) of + [Chan] -> + sys:get_state(Chan, 1000); + [] -> + no_channel + end, + exit(Exception).