diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 6f32b1549..9cc3aea94 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -1037,7 +1037,7 @@ next_seqno_gen() -> ?LET( {Epoch, Offset}, {non_neg_integer(), range(0, ?EPOCH_SIZE)}, - Epoch bsl 15 + Offset + Epoch bsl ?EPOCH_BITS + Offset ). %%%% Property-based tests: diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 32d661354..0f617153b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -85,7 +85,7 @@ #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), - ?expiry_interval => emqx_types:conninfo(), + ?expiry_interval => non_neg_integer(), ?last_id => integer() }. diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index b321b324b..03a6fbf80 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -39,6 +39,8 @@ %% API functions %%================================================================================ +%% @doc Find the streams that have uncommitted (in-flight) messages. +%% Return them in the order they were previously replayed. -spec find_replay_streams(emqx_persistent_session_ds_state:t()) -> [{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}]. find_replay_streams(S) -> @@ -59,6 +61,15 @@ find_replay_streams(S) -> ), lists:sort(fun compare_streams/2, Streams). +%% @doc Find streams from which the new messages can be fetched. +%% +%% Currently it amounts to the streams that don't have any inflight +%% messages, since for performance reasons we keep only one record of +%% in-flight messages per stream, and we don't want to overwrite these +%% records prematurely. +%% +%% This function is non-detereministic: it randomizes the order of +%% streams to ensure fair replay of different topics. -spec find_new_streams(emqx_persistent_session_ds_state:t()) -> [{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}]. find_new_streams(S) -> @@ -91,6 +102,23 @@ find_new_streams(S) -> ) ). +%% @doc This function makes the session aware of the new streams. +%% +%% It has the following properties: +%% +%% 1. For each RankX, it keeps only the streams with the same RankY. +%% +%% 2. For each RankX, it never advances RankY until _all_ streams with +%% the same RankX are replayed. +%% +%% 3. Once all streams with the given rank are replayed, it advances +%% the RankY to the smallest known RankY that is greater than replayed +%% RankY. +%% +%% 4. If the RankX has never been replayed, it selects the streams +%% with the smallest RankY. +%% +%% This way, messages from the same topic/shard are never reordered. -spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). renew_streams(S0) -> S1 = remove_fully_replayed_streams(S0), @@ -192,6 +220,12 @@ select_streams(SubId, RankX, Streams0, S) -> lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams) end. +%% @doc Advance RankY for each RankX that doesn't have any unreplayed +%% streams. +%% +%% Drop streams with the fully replayed rank. This function relies on +%% the fact that all streams with the same RankX have also the same +%% RankY. -spec remove_fully_replayed_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). remove_fully_replayed_streams(S0) -> @@ -246,6 +280,7 @@ remove_fully_replayed_streams(S0) -> S1 ). +%% @doc Compare the streams by the order in which they were replayed. compare_streams( {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, {_KeyB, #srs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}} diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 730fdd297..72c04ff74 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -928,7 +928,7 @@ t_publish_many_while_client_is_gone(Config) -> {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]), {ok, _} = emqtt:ConnFun(Client3), - %% Check that the messages are retransmitted with DUP=1: + %% Check that we receive the rest of the messages: Msgs3 = receive_messages(NPubs, _Timeout = 2000), ct:pal("Msgs3 = ~p", [Msgs3]), ?assertMatch( diff --git a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl index ebf04eeb3..61e0575a8 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl @@ -27,6 +27,9 @@ %% Type declarations %%================================================================================ +%% Note: here `committed' != `dirty'. It means "has been committed at +%% least once since the creation", and it's used by the iteration +%% test. -record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}). -type state() :: #{emqx_persistent_session_ds:id() => #s{}}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index 226af62f0..6ebfc820d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -213,6 +213,10 @@ trie_next(#trie{trie = Trie}, State, ?EOT) -> [] -> undefined end; trie_next(#trie{trie = Trie}, State, Token) -> + %% NOTE: it's crucial to return the original (non-wildcard) index + %% for the topic, if found. Otherwise messages from the same topic + %% will end up in different streams, once the wildcard is learned, + %% and their replay order will become undefined: case ets:lookup(Trie, {State, Token}) of [#trans{next = Next}] -> {false, Next};