From 2d2321279240e4be2854a6d0e9b47063e9e326f6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Jan 2024 10:24:07 +0100 Subject: [PATCH] refactor(sessds): #ifs -> #srs --- apps/emqx/src/emqx_persistent_session_ds.erl | 40 +++++++++---------- apps/emqx/src/emqx_persistent_session_ds.hrl | 4 +- ...persistent_session_ds_stream_scheduler.erl | 16 ++++---- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 43a0f1bec..9f77c4219 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -138,7 +138,7 @@ ref :: reference() }). --type stream_state() :: #ifs{}. +-type stream_state() :: #srs{}. -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type millisecond() :: non_neg_integer(). @@ -495,15 +495,15 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> {ok, [], pull_now(Session)}. -spec replay_batch(stream_state(), session(), clientinfo()) -> session(). -replay_batch(Ifs0, Session, ClientInfo) -> - #ifs{batch_size = BatchSize} = Ifs0, +replay_batch(Srs0, Session, ClientInfo) -> + #srs{batch_size = BatchSize} = Srs0, %% TODO: retry on errors: - {Ifs, Inflight} = enqueue_batch(true, BatchSize, Ifs0, Session, ClientInfo), + {Srs, Inflight} = enqueue_batch(true, BatchSize, Srs0, Session, ClientInfo), %% Assert: - Ifs =:= Ifs0 orelse + Srs =:= Srs0 orelse ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{ - expected => Ifs0, - got => Ifs + expected => Srs0, + got => Srs }), Session#{inflight => Inflight}. @@ -678,29 +678,29 @@ fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo fetch_new_messages(Streams, Session, ClientInfo) end. -new_batch({StreamKey, Ifs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> +new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0), SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0), - Ifs1 = Ifs0#ifs{ + Srs1 = Srs0#srs{ first_seqno_qos1 = SN1, first_seqno_qos2 = SN2, batch_size = 0, last_seqno_qos1 = SN1, last_seqno_qos2 = SN2 }, - {Ifs, Inflight} = enqueue_batch(false, BatchSize, Ifs1, Session, ClientInfo), - S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Ifs#ifs.last_seqno_qos1, S0), - S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Ifs#ifs.last_seqno_qos2, S1), - S = emqx_persistent_session_ds_state:put_stream(StreamKey, Ifs, S2), + {Srs, Inflight} = enqueue_batch(false, BatchSize, Srs1, Session, ClientInfo), + S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Srs#srs.last_seqno_qos1, S0), + S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Srs#srs.last_seqno_qos2, S1), + S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2), Session#{s => S, inflight => Inflight}. -enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, ClientInfo) -> - #ifs{ +enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> + #srs{ it_begin = ItBegin0, it_end = ItEnd0, first_seqno_qos1 = FirstSeqnoQos1, first_seqno_qos2 = FirstSeqnoQos2 - } = Ifs0, + } = Srs0, ItBegin = case IsReplay of true -> ItBegin0; @@ -711,7 +711,7 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 ), - Ifs = Ifs0#ifs{ + Srs = Srs0#srs{ it_begin = ItBegin, it_end = ItEnd, %% TODO: it should be possible to avoid calling @@ -721,13 +721,13 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli last_seqno_qos1 = LastSeqnoQos1, last_seqno_qos2 = LastSeqnoQos2 }, - {Ifs, Inflight}; + {Srs, Inflight}; {ok, end_of_stream} -> %% No new messages; just update the end iterator: - {Ifs0#ifs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0}; + {Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0}; {error, _} when not IsReplay -> ?SLOG(info, #{msg => "failed_to_fetch_batch", iterator => ItBegin}), - {Ifs0, Inflight0} + {Srs0, Inflight0} end. %% key_of_iter(#{3 := #{3 := #{5 := K}}}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 8286a4e41..f097b2c6e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -48,8 +48,8 @@ %% Last seqno assigned to a message (it may not be sent yet). -define(next(QOS), (30 + QOS)). -%%%%% State of the stream: --record(ifs, { +%%%%% Stream Replay State: +-record(srs, { rank_x :: emqx_ds:rank_x(), rank_y :: emqx_ds:rank_y(), %% Iterators at the beginning and the end of the last batch: diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 621355005..5df56843f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -147,14 +147,14 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> {ok, Iterator} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), - NewStreamState = #ifs{ + NewStreamState = #srs{ rank_x = RankX, rank_y = RankY, it_begin = Iterator, it_end = Iterator }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); - #ifs{} -> + #srs{} -> S end. @@ -199,7 +199,7 @@ remove_fully_replayed_streams(S0) -> CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), %% 1. For each subscription, find the X ranks that were fully replayed: Groups = emqx_persistent_session_ds_state:fold_streams( - fun({SubId, _Stream}, StreamState = #ifs{rank_x = RankX, rank_y = RankY}, Acc) -> + fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) -> Key = {SubId, RankX}, case {maps:get(Key, Acc, undefined), is_fully_replayed(CommQos1, CommQos2, StreamState)} @@ -228,7 +228,7 @@ remove_fully_replayed_streams(S0) -> ), %% 3. Remove the fully replayed streams: emqx_persistent_session_ds_state:fold_streams( - fun(Key = {SubId, _Stream}, #ifs{rank_x = RankX, rank_y = RankY}, Acc) -> + fun(Key = {SubId, _Stream}, #srs{rank_x = RankX, rank_y = RankY}, Acc) -> case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of undefined -> Acc; @@ -249,8 +249,8 @@ remove_fully_replayed_streams(S0) -> ). compare_streams( - {_KeyA, #ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, - {_KeyB, #ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}} + {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, + {_KeyB, #srs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}} ) -> case A1 =:= B1 of true -> @@ -259,10 +259,10 @@ compare_streams( A1 < B1 end. -is_fully_replayed(Comm1, Comm2, S = #ifs{it_end = It}) -> +is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) -> It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S). -is_fully_acked(Comm1, Comm2, #ifs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> +is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2). -spec shuffle([A]) -> [A].