From 9a4f3f88e302f34c7c239f2d05ddc783a8ebc64d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 2 Jul 2024 15:40:40 +0200 Subject: [PATCH] feat(sessds): allow stream iteration starting from a specific key --- .../emqx_persistent_session_ds_state.erl | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl index aa520b252..e9d538693 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl @@ -39,7 +39,7 @@ -export([get_peername/1, set_peername/2]). -export([get_protocol/1, set_protocol/2]). -export([new_id/1]). --export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). +-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, iter_streams/2, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([ @@ -66,11 +66,14 @@ n_awaiting_rel/1 ]). +-export([iterate/1]). + -export([make_session_iterator/0, session_iterator_next/2]). -export_type([ t/0, metadata/0, + iter/2, seqno_type/0, stream_key/0, rank_key/0, @@ -89,6 +92,8 @@ -type message() :: emqx_types:message(). +-opaque iter(K, V) :: gb_trees:iter(K, V). + -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. %% Generic key-value wrapper that is used for exporting arbitrary @@ -476,6 +481,11 @@ del_stream(Key, Rec) -> fold_streams(Fun, Acc, Rec) -> gen_fold(?streams, Fun, Acc, Rec). +-spec iter_streams(_StartAfter :: stream_key() | beginning, t()) -> + iter(stream_key(), emqx_persistent_session_ds:stream_state()). +iter_streams(After, Rec) -> + gen_iter_after(?streams, After, Rec). + -spec n_streams(t()) -> non_neg_integer(). n_streams(Rec) -> gen_size(?streams, Rec). @@ -534,6 +544,12 @@ n_awaiting_rel(Rec) -> %% +-spec iterate(iter(K, V)) -> {K, V, iter(K, V)} | none. +iterate(It0) -> + gen_iter_next(It0). + +%% + -spec make_session_iterator() -> session_iterator(). make_session_iterator() -> mnesia:dirty_first(?session_tab). @@ -601,6 +617,14 @@ gen_size(Field, Rec) -> check_sequence(Rec), pmap_size(maps:get(Field, Rec)). +gen_iter_after(Field, After, Rec) -> + check_sequence(Rec), + pmap_iter_after(After, maps:get(Field, Rec)). + +gen_iter_next(It) -> + %% NOTE: Currently, gbt iterators is the only type of iterators. + gbt_iter_next(It). + -spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map(). update_pmaps(Fun, Map) -> lists:foldl(