feat(sessds): allow stream iteration starting from a specific key

This commit is contained in:
Andrew Mayorov 2024-07-02 15:40:40 +02:00
parent dc73b957b3
commit 9a4f3f88e3
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 25 additions and 1 deletions

View File

@ -39,7 +39,7 @@
-export([get_peername/1, set_peername/2]).
-export([get_protocol/1, set_protocol/2]).
-export([new_id/1]).
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]).
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, iter_streams/2, n_streams/1]).
-export([get_seqno/2, put_seqno/3]).
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
-export([
@ -66,11 +66,14 @@
n_awaiting_rel/1
]).
-export([iterate/1]).
-export([make_session_iterator/0, session_iterator_next/2]).
-export_type([
t/0,
metadata/0,
iter/2,
seqno_type/0,
stream_key/0,
rank_key/0,
@ -89,6 +92,8 @@
-type message() :: emqx_types:message().
-opaque iter(K, V) :: gb_trees:iter(K, V).
-opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
%% Generic key-value wrapper that is used for exporting arbitrary
@ -476,6 +481,11 @@ del_stream(Key, Rec) ->
fold_streams(Fun, Acc, Rec) ->
gen_fold(?streams, Fun, Acc, Rec).
-spec iter_streams(_StartAfter :: stream_key() | beginning, t()) ->
iter(stream_key(), emqx_persistent_session_ds:stream_state()).
iter_streams(After, Rec) ->
gen_iter_after(?streams, After, Rec).
-spec n_streams(t()) -> non_neg_integer().
n_streams(Rec) ->
gen_size(?streams, Rec).
@ -534,6 +544,12 @@ n_awaiting_rel(Rec) ->
%%
-spec iterate(iter(K, V)) -> {K, V, iter(K, V)} | none.
iterate(It0) ->
gen_iter_next(It0).
%%
-spec make_session_iterator() -> session_iterator().
make_session_iterator() ->
mnesia:dirty_first(?session_tab).
@ -601,6 +617,14 @@ gen_size(Field, Rec) ->
check_sequence(Rec),
pmap_size(maps:get(Field, Rec)).
gen_iter_after(Field, After, Rec) ->
check_sequence(Rec),
pmap_iter_after(After, maps:get(Field, Rec)).
gen_iter_next(It) ->
%% NOTE: Currently, gbt iterators is the only type of iterators.
gbt_iter_next(It).
-spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map().
update_pmaps(Fun, Map) ->
lists:foldl(