refactor(sessds): Simplify data structure of ds_state pmap datatype

This commit is contained in:
ieQu1 2024-01-05 14:02:53 +01:00
parent 4f4831fe7f
commit 1b4f69b44d
4 changed files with 58 additions and 75 deletions

View File

@ -477,20 +477,13 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
-spec replay_batch(stream_state(), session(), clientinfo()) -> session(). -spec replay_batch(stream_state(), session(), clientinfo()) -> session().
replay_batch(Ifs0, Session, ClientInfo) -> replay_batch(Ifs0, Session, ClientInfo) ->
#ifs{ #ifs{batch_size = BatchSize} = Ifs0,
batch_begin_key = BatchBeginMsgKey, %% TODO: retry on errors:
batch_size = BatchSize, {Ifs, Inflight} = enqueue_batch(true, BatchSize, Ifs0, Session, ClientInfo),
it_end = ItEnd
} = Ifs0,
%% TODO: retry
{ok, ItBegin} = emqx_ds:update_iterator(?PERSISTENT_MESSAGE_DB, ItEnd, BatchBeginMsgKey),
Ifs1 = Ifs0#ifs{it_end = ItBegin},
{Ifs, Inflight} = enqueue_batch(true, BatchSize, Ifs1, Session, ClientInfo),
%% Assert: %% Assert:
Ifs =:= Ifs1 orelse Ifs =:= Ifs0 orelse
?SLOG(warning, #{ ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
msg => "replay_inconsistency", expected => Ifs0,
expected => Ifs1,
got => Ifs got => Ifs
}), }),
Session#{inflight => Inflight}. Session#{inflight => Inflight}.
@ -645,7 +638,6 @@ new_batch({StreamKey, Ifs0}, BatchSize, Session = #{s := S0}, ClientInfo) ->
first_seqno_qos1 = SN1, first_seqno_qos1 = SN1,
first_seqno_qos2 = SN2, first_seqno_qos2 = SN2,
batch_size = 0, batch_size = 0,
batch_begin_key = undefined,
last_seqno_qos1 = SN1, last_seqno_qos1 = SN1,
last_seqno_qos2 = SN2 last_seqno_qos2 = SN2
}, },
@ -657,10 +649,16 @@ new_batch({StreamKey, Ifs0}, BatchSize, Session = #{s := S0}, ClientInfo) ->
enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, ClientInfo) -> enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, ClientInfo) ->
#ifs{ #ifs{
it_end = It0, it_begin = ItBegin,
it_end = ItEnd,
first_seqno_qos1 = FirstSeqnoQos1, first_seqno_qos1 = FirstSeqnoQos1,
first_seqno_qos2 = FirstSeqnoQos2 first_seqno_qos2 = FirstSeqnoQos2
} = Ifs0, } = Ifs0,
It0 =
case IsReplay of
true -> ItBegin;
false -> ItEnd
end,
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of
{ok, It, []} -> {ok, It, []} ->
%% No new messages; just update the end iterator: %% No new messages; just update the end iterator:
@ -668,13 +666,13 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli
{ok, end_of_stream} -> {ok, end_of_stream} ->
%% No new messages; just update the end iterator: %% No new messages; just update the end iterator:
{Ifs0#ifs{it_end = end_of_stream}, Inflight0}; {Ifs0#ifs{it_end = end_of_stream}, Inflight0};
{ok, It, [{BatchBeginMsgKey, _} | _] = Messages} -> {ok, It, Messages} ->
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
), ),
Ifs = Ifs0#ifs{ Ifs = Ifs0#ifs{
it_begin = It0,
it_end = It, it_end = It,
batch_begin_key = BatchBeginMsgKey,
%% TODO: it should be possible to avoid calling %% TODO: it should be possible to avoid calling
%% length here by diffing size of inflight before %% length here by diffing size of inflight before
%% and after inserting messages: %% and after inserting messages:
@ -852,30 +850,30 @@ commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
SeqNo = packet_id_to_seqno(PacketId, S), SeqNo = packet_id_to_seqno(PacketId, S),
case Track of case Track of
puback -> puback ->
Old = ?committed(?QOS_1), MinTrack = ?committed(?QOS_1),
Next = ?next(?QOS_1); MaxTrack = ?next(?QOS_1);
pubrec -> pubrec ->
Old = ?dup(?QOS_2), MinTrack = ?dup(?QOS_2),
Next = ?next(?QOS_2); MaxTrack = ?next(?QOS_2);
pubcomp -> pubcomp ->
Old = ?committed(?QOS_2), MinTrack = ?committed(?QOS_2),
Next = ?next(?QOS_2) MaxTrack = ?next(?QOS_2)
end, end,
NextSeqNo = emqx_persistent_session_ds_state:get_seqno(Next, S), Min = emqx_persistent_session_ds_state:get_seqno(MinTrack, S),
PrevSeqNo = emqx_persistent_session_ds_state:get_seqno(Old, S), Max = emqx_persistent_session_ds_state:get_seqno(MaxTrack, S),
case PrevSeqNo =< SeqNo andalso SeqNo =< NextSeqNo of case Min =< SeqNo andalso SeqNo =< Max of
true -> true ->
%% TODO: we pass a bogus message into the hook: %% TODO: we pass a bogus message into the hook:
Msg = emqx_message:make(SessionId, <<>>, <<>>), Msg = emqx_message:make(SessionId, <<>>, <<>>),
{ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(Old, SeqNo, S)}}; {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(MinTrack, SeqNo, S)}};
false -> false ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "out-of-order_commit", msg => "out-of-order_commit",
track => Track, track => Track,
packet_id => PacketId, packet_id => PacketId,
commit_seqno => SeqNo, seqno => SeqNo,
prev => PrevSeqNo, min => Min,
next => NextSeqNo max => Max
}), }),
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end. end.

View File

@ -42,16 +42,16 @@
-define(dup(QOS), {1, QOS}). -define(dup(QOS), {1, QOS}).
%% Last seqno assigned to some message (that may reside in the %% Last seqno assigned to some message (that may reside in the
%% mqueue): %% mqueue):
-define(next(QOS), {0, QOS}). -define(next(QOS), {2, QOS}).
%%%%% State of the stream: %%%%% State of the stream:
-record(ifs, { -record(ifs, {
rank_x :: emqx_ds:rank_x(), rank_x :: emqx_ds:rank_x(),
rank_y :: emqx_ds:rank_y(), rank_y :: emqx_ds:rank_y(),
%% Iterator at the end of the last batch: %% Iterator at the beginning and end of the last batch:
it_begin :: emqx_ds:iterator() | undefined,
it_end :: emqx_ds:iterator() | end_of_stream, it_end :: emqx_ds:iterator() | end_of_stream,
%% Key that points at the beginning of the batch: %% Key that points at the beginning of the batch:
batch_begin_key :: binary() | undefined,
batch_size = 0 :: non_neg_integer(), batch_size = 0 :: non_neg_integer(),
%% Session sequence number at the time when the batch was fetched: %% Session sequence number at the time when the batch was fetched:
first_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(), first_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),

View File

@ -66,14 +66,13 @@
%% It's implemented as three maps: `clean', `dirty' and `tombstones'. %% It's implemented as three maps: `clean', `dirty' and `tombstones'.
%% Updates are made to the `dirty' area. `pmap_commit' function saves %% Updates are made to the `dirty' area. `pmap_commit' function saves
%% the updated entries to Mnesia and moves them to the `clean' area. %% the updated entries to Mnesia and moves them to the `clean' area.
-record(pmap, {table, clean, dirty, tombstones}). -record(pmap, {table, cache, dirty}).
-type pmap(K, V) :: -type pmap(K, V) ::
#pmap{ #pmap{
table :: atom(), table :: atom(),
clean :: #{K => V}, cache :: #{K => V},
dirty :: #{K => V}, dirty :: #{K => dirty | del}
tombstones :: #{K => _}
}. }.
%% Session metadata: %% Session metadata:
@ -409,70 +408,56 @@ pmap_open(Table, SessionId) ->
Clean = maps:from_list(kv_bag_restore(Table, SessionId)), Clean = maps:from_list(kv_bag_restore(Table, SessionId)),
#pmap{ #pmap{
table = Table, table = Table,
clean = Clean, cache = Clean,
dirty = #{}, dirty = #{}
tombstones = #{}
}. }.
-spec pmap_get(K, pmap(K, V)) -> V | undefined. -spec pmap_get(K, pmap(K, V)) -> V | undefined.
pmap_get(K, #pmap{dirty = Dirty, clean = Clean}) -> pmap_get(K, #pmap{cache = Cache}) ->
case Dirty of maps:get(K, Cache, undefined).
#{K := V} ->
V;
_ ->
case Clean of
#{K := V} -> V;
_ -> undefined
end
end.
-spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V). -spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V).
pmap_put(K, V, Pmap = #pmap{dirty = Dirty, clean = Clean, tombstones = Tombstones}) -> pmap_put(K, V, Pmap = #pmap{dirty = Dirty, cache = Cache}) ->
Pmap#pmap{ Pmap#pmap{
dirty = maps:put(K, V, Dirty), cache = maps:put(K, V, Cache),
clean = maps:remove(K, Clean), dirty = Dirty#{K => dirty}
tombstones = maps:remove(K, Tombstones)
}. }.
-spec pmap_del(K, pmap(K, V)) -> pmap(K, V). -spec pmap_del(K, pmap(K, V)) -> pmap(K, V).
pmap_del( pmap_del(
Key, Key,
Pmap = #pmap{dirty = Dirty, clean = Clean, tombstones = Tombstones} Pmap = #pmap{dirty = Dirty, cache = Cache}
) -> ) ->
%% Update the caches:
Pmap#pmap{ Pmap#pmap{
dirty = maps:remove(Key, Dirty), cache = maps:remove(Key, Cache),
clean = maps:remove(Key, Clean), dirty = Dirty#{Key => del}
tombstones = Tombstones#{Key => del}
}. }.
-spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A. -spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A.
pmap_fold(Fun, Acc0, #pmap{clean = Clean, dirty = Dirty}) -> pmap_fold(Fun, Acc, #pmap{cache = Cache}) ->
Acc1 = maps:fold(Fun, Acc0, Dirty), maps:fold(Fun, Acc, Cache).
maps:fold(Fun, Acc1, Clean).
-spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V). -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
pmap_commit( pmap_commit(
SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, clean = Clean, tombstones = Tombstones} SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, cache = Cache}
) -> ) ->
%% Commit deletions:
maps:foreach(fun(K, _) -> kv_bag_delete(Tab, SessionId, K) end, Tombstones),
%% Replace all records in the bag with the entries from the dirty area:
maps:foreach( maps:foreach(
fun(K, V) -> fun
(K, del) ->
kv_bag_delete(Tab, SessionId, K);
(K, dirty) ->
V = maps:get(K, Cache),
kv_bag_persist(Tab, SessionId, K, V) kv_bag_persist(Tab, SessionId, K, V)
end, end,
Dirty Dirty
), ),
Pmap#pmap{ Pmap#pmap{
dirty = #{}, dirty = #{}
tombstones = #{},
clean = maps:merge(Clean, Dirty)
}. }.
-spec pmap_format(pmap(_K, _V)) -> map(). -spec pmap_format(pmap(_K, _V)) -> map().
pmap_format(#pmap{clean = Clean, dirty = Dirty}) -> pmap_format(#pmap{cache = Cache}) ->
maps:merge(Clean, Dirty). Cache.
%% Functions dealing with set tables: %% Functions dealing with set tables:

View File

@ -217,8 +217,8 @@ remove_fully_replayed_streams(S0) ->
). ).
compare_streams( compare_streams(
#ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}, {_KeyA, #ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}},
#ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2} {_KeyB, #ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}}
) -> ) ->
case A1 =:= B1 of case A1 =:= B1 of
true -> true ->