From d8691f1d6426c638eb42d15f5a24b33d87b5cf90 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 29 Nov 2023 13:01:07 +0300 Subject: [PATCH] =?UTF-8?q?refactor(sessds):=20rename=20marker=20=E2=86=92?= =?UTF-8?q?=20committed=20offset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For better clarity. --- .../emqx_persistent_message_ds_replayer.erl | 49 ++++++++++--------- apps/emqx/src/emqx_persistent_session_ds.erl | 38 +++++++------- apps/emqx/src/emqx_persistent_session_ds.hrl | 6 +-- 3 files changed, 47 insertions(+), 46 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index f1c861e5d..fb8170904 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -21,7 +21,7 @@ %% API: -export([new/0, open/1, next_packet_id/1, n_inflight/1]). --export([poll/4, replay/2, commit_offset/4, commit_marker/4]). +-export([poll/4, replay/2, commit_offset/4]). -export([seqno_to_packet_id/1, packet_id_to_seqno/2]). @@ -55,11 +55,11 @@ -type seqno() :: non_neg_integer(). -type track() :: ack | comp. --type marker() :: rec. +-type commit_type() :: rec. -record(inflight, { next_seqno = 1 :: seqno(), - commits = #{ack => 1, comp => 1, rec => 1} :: #{track() | marker() => seqno()}, + commits = #{ack => 1, comp => 1, rec => 1} :: #{track() | commit_type() => seqno()}, %% Ranges are sorted in ascending order of their sequence numbers. offset_ranges = [] :: [ds_pubrange()] }). @@ -82,7 +82,7 @@ new() -> -spec open(emqx_persistent_session_ds:id()) -> inflight(). open(SessionId) -> {Ranges, RecUntil} = ro_transaction( - fun() -> {get_ranges(SessionId), get_marker(SessionId, rec)} end + fun() -> {get_ranges(SessionId), get_committed_offset(SessionId, rec)} end ), {Commits, NextSeqno} = compute_inflight_range(Ranges), #inflight{ @@ -128,14 +128,16 @@ replay(ReplyFun, Inflight0 = #inflight{offset_ranges = Ranges0}) -> Inflight = Inflight0#inflight{offset_ranges = Ranges}, {Replies, Inflight}. --spec commit_offset(emqx_persistent_session_ds:id(), track(), emqx_types:packet_id(), inflight()) -> - {_IsValidOffset :: boolean(), inflight()}. +-spec commit_offset(emqx_persistent_session_ds:id(), Offset, emqx_types:packet_id(), inflight()) -> + {_IsValidOffset :: boolean(), inflight()} +when + Offset :: track() | commit_type(). commit_offset( SessionId, Track, PacketId, Inflight0 = #inflight{commits = Commits} -) -> +) when Track == ack orelse Track == comp -> case validate_commit(Track, PacketId, Inflight0) of CommitUntil when is_integer(CommitUntil) -> %% TODO @@ -148,20 +150,17 @@ commit_offset( {true, Inflight}; false -> {false, Inflight0} - end. - --spec commit_marker(emqx_persistent_session_ds:id(), marker(), emqx_types:packet_id(), inflight()) -> - {_IsValidMarker :: boolean(), inflight()}. -commit_marker( + end; +commit_offset( SessionId, - Marker = rec, + CommitType = rec, PacketId, Inflight0 = #inflight{commits = Commits} ) -> - case validate_commit(Marker, PacketId, Inflight0) of + case validate_commit(CommitType, PacketId, Inflight0) of CommitUntil when is_integer(CommitUntil) -> - update_marker(SessionId, Marker, CommitUntil), - Inflight = Inflight0#inflight{commits = Commits#{Marker := CommitUntil}}, + update_committed_offset(SessionId, CommitType, CommitUntil), + Inflight = Inflight0#inflight{commits = Commits#{CommitType := CommitUntil}}, {true, Inflight}; false -> {false, Inflight0} @@ -187,7 +186,7 @@ poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize %% Which seqno this track is committed until. %% "Until" means this is first seqno that is _not yet committed_ for this track. --spec committed_until(track() | marker(), inflight()) -> seqno(). +-spec committed_until(track() | commit_type(), inflight()) -> seqno(). committed_until(Track, #inflight{commits = Commits}) -> maps:get(Track, Commits). @@ -491,18 +490,20 @@ get_last_iterator(DSStream = #ds_stream{ref = StreamRef}, Ranges) -> get_streams(SessionId) -> mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId). --spec get_marker(emqx_persistent_session_ds:id(), _Name) -> seqno(). -get_marker(SessionId, Name) -> - case mnesia:read(?SESSION_MARKER_TAB, {SessionId, Name}) of +-spec get_committed_offset(emqx_persistent_session_ds:id(), _Name) -> seqno(). +get_committed_offset(SessionId, Name) -> + case mnesia:read(?SESSION_COMMITTED_OFFSET_TAB, {SessionId, Name}) of [] -> 1; - [#ds_marker{until = Seqno}] -> + [#ds_committed_offset{until = Seqno}] -> Seqno end. --spec update_marker(emqx_persistent_session_ds:id(), _Name, seqno()) -> ok. -update_marker(SessionId, Name, Until) -> - mria:dirty_write(?SESSION_MARKER_TAB, #ds_marker{id = {SessionId, Name}, until = Until}). +-spec update_committed_offset(emqx_persistent_session_ds:id(), _Name, seqno()) -> ok. +update_committed_offset(SessionId, Name, Until) -> + mria:dirty_write(?SESSION_COMMITTED_OFFSET_TAB, #ds_committed_offset{ + id = {SessionId, Name}, until = Until + }). next_seqno(Seqno) -> NextSeqno = Seqno + 1, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 32f7418f5..d989c41c8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -239,7 +239,7 @@ print_session(ClientId) -> session => Session, streams => mnesia:read(?SESSION_STREAM_TAB, ClientId), pubranges => session_read_pubranges(ClientId), - markers => session_read_markers(ClientId), + offsets => session_read_offsets(ClientId), subscriptions => session_read_subscriptions(ClientId) }; [] -> @@ -338,7 +338,7 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}. pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) -> - case emqx_persistent_message_ds_replayer:commit_marker(Id, rec, PacketId, Inflight0) of + case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of {true, Inflight} -> %% TODO Msg = emqx_message:make(Id, <<>>, <<>>), @@ -552,13 +552,13 @@ create_tables() -> ] ), ok = mria:create_table( - ?SESSION_MARKER_TAB, + ?SESSION_COMMITTED_OFFSET_TAB, [ {rlog_shard, ?DS_MRIA_SHARD}, {type, set}, {storage, storage()}, - {record_name, ds_marker}, - {attributes, record_info(fields, ds_marker)} + {record_name, ds_committed_offset}, + {attributes, record_info(fields, ds_committed_offset)} ] ), ok = mria:wait_for_tables([ @@ -566,7 +566,7 @@ create_tables() -> ?SESSION_SUBSCRIPTIONS_TAB, ?SESSION_STREAM_TAB, ?SESSION_PUBRANGE_TAB, - ?SESSION_MARKER_TAB + ?SESSION_COMMITTED_OFFSET_TAB ]), ok. @@ -633,7 +633,7 @@ session_drop(DSSessionId) -> transaction(fun() -> ok = session_drop_subscriptions(DSSessionId), ok = session_drop_pubranges(DSSessionId), - ok = session_drop_markers(DSSessionId), + ok = session_drop_offsets(DSSessionId), ok = session_drop_streams(DSSessionId), ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) end). @@ -725,16 +725,16 @@ session_read_pubranges(DSSessionId, LockKind) -> ), mnesia:select(?SESSION_PUBRANGE_TAB, MS, LockKind). -session_read_markers(DSSessionID) -> - session_read_markers(DSSessionID, read). +session_read_offsets(DSSessionID) -> + session_read_offsets(DSSessionID, read). -session_read_markers(DSSessionId, LockKind) -> +session_read_offsets(DSSessionId, LockKind) -> MS = ets:fun2ms( - fun(#ds_marker{id = {Sess, Name}}) when Sess =:= DSSessionId -> - {DSSessionId, Name} + fun(#ds_committed_offset{id = {Sess, Type}}) when Sess =:= DSSessionId -> + {DSSessionId, Type} end ), - mnesia:select(?SESSION_MARKER_TAB, MS, LockKind). + mnesia:select(?SESSION_COMMITTED_OFFSET_TAB, MS, LockKind). -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. new_subscription_id(DSSessionId, TopicFilter) -> @@ -846,14 +846,14 @@ session_drop_pubranges(DSSessionId) -> ). %% must be called inside a transaction --spec session_drop_markers(id()) -> ok. -session_drop_markers(DSSessionId) -> - MarkerIds = session_read_markers(DSSessionId, write), +-spec session_drop_offsets(id()) -> ok. +session_drop_offsets(DSSessionId) -> + OffsetIds = session_read_offsets(DSSessionId, write), lists:foreach( - fun(MarkerId) -> - mnesia:delete(?SESSION_MARKER_TAB, MarkerId, write) + fun(OffsetId) -> + mnesia:delete(?SESSION_COMMITTED_OFFSET_TAB, OffsetId, write) end, - MarkerIds + OffsetIds ). %%-------------------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 73ff609b5..7b2b27764 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -22,7 +22,7 @@ -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). -define(SESSION_STREAM_TAB, emqx_ds_stream_tab). -define(SESSION_PUBRANGE_TAB, emqx_ds_pubrange_tab). --define(SESSION_MARKER_TAB, emqx_ds_marker_tab). +-define(SESSION_COMMITTED_OFFSET_TAB, emqx_ds_committed_offset_tab). -define(DS_MRIA_SHARD, emqx_ds_session_shard). -define(T_INFLIGHT, 1). @@ -76,12 +76,12 @@ }). -type ds_pubrange() :: #ds_pubrange{}. --record(ds_marker, { +-record(ds_committed_offset, { id :: { %% What session this marker belongs to. _Session :: emqx_persistent_session_ds:id(), %% Marker name. - _MarkerName + _CommitType }, %% Where this marker is pointing to: the first seqno that is not marked. until :: emqx_persistent_message_ds_replayer:seqno()