From 7ab57824dcfaf07ce9f7de32b26d1941e64701cc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 2 Oct 2023 16:20:16 -0300 Subject: [PATCH] chore(ds): change return type of `storage_layer:next/{1,2}` Part of https://emqx.atlassian.net/browse/EMQX-10942 The goal is to help make it clear to the caller of `next` what to do next: if the iterator should still be used or if no new messages will ever come out of it. From: ```erlang -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. ``` To: ```erlang -spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream. -spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream. ``` --- .../test/emqx_persistent_messages_SUITE.erl | 4 +- .../src/emqx_ds_message_storage_bitmask.erl | 27 ++++++--- .../src/emqx_ds_storage_layer.erl | 57 ++++++++++++++----- .../test/emqx_ds_storage_layer_SUITE.erl | 21 ++++--- .../props/prop_replay_message_storage.erl | 7 +-- 5 files changed, 76 insertions(+), 40 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 751b7e4b8..2d8768e65 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -272,9 +272,9 @@ consume(Shard, IteratorId) when is_binary(IteratorId) -> consume(It) -> case emqx_ds_storage_layer:next(It) of - {value, Msg, NIt} -> + {ok, NIt, [Msg]} -> [emqx_persistent_message:deserialize(Msg) | consume(NIt)]; - none -> + end_of_stream -> [] end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 7b141b202..be8a207bb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -90,7 +90,7 @@ -export([next/1]). -export([preserve_iterator/1]). --export([restore_iterator/3]). +-export([restore_iterator/2]). -export([refresh_iterator/1]). %% Debug/troubleshooting: @@ -217,6 +217,7 @@ -opaque db() :: #db{}. -opaque iterator() :: #it{}. +-type serialized_iterator() :: binary(). -type keymapper() :: #keymapper{}. -type keyspace_filter() :: #filter{}. @@ -340,22 +341,30 @@ next(It0 = #it{filter = #filter{keymapper = Keymapper}}) -> {error, closed} end. --spec preserve_iterator(iterator()) -> binary(). -preserve_iterator(#it{cursor = Cursor}) -> +-spec preserve_iterator(iterator()) -> serialized_iterator(). +preserve_iterator(#it{ + cursor = Cursor, + filter = #filter{ + topic_filter = TopicFilter, + start_time = StartTime + } +}) -> State = #{ v => 1, - cursor => Cursor + cursor => Cursor, + replay => {TopicFilter, StartTime} }, term_to_binary(State). --spec restore_iterator(db(), emqx_ds:replay(), binary()) -> +-spec restore_iterator(db(), serialized_iterator()) -> {ok, iterator()} | {error, _TODO}. -restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> +restore_iterator(DB, Serial) when is_binary(Serial) -> State = binary_to_term(Serial), - restore_iterator(DB, Replay, State); -restore_iterator(DB, Replay, #{ + restore_iterator(DB, State); +restore_iterator(DB, #{ v := 1, - cursor := Cursor + cursor := Cursor, + replay := Replay = {_TopicFilter, _StartTime} }) -> case make_iterator(DB, Replay) of {ok, It} when Cursor == undefined -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 6137a1ed7..25a58950d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -12,7 +12,7 @@ -export([store/5]). -export([delete/4]). --export([make_iterator/2, next/1]). +-export([make_iterator/2, next/1, next/2]). -export([ preserve_iterator/2, @@ -131,7 +131,7 @@ -callback make_iterator(_Schema, emqx_ds:replay()) -> {ok, _It} | {error, _}. --callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. +-callback restore_iterator(_Schema, _Serialized :: binary()) -> {ok, _It} | {error, _}. -callback preserve_iterator(_It) -> term(). @@ -175,21 +175,52 @@ make_iterator(Shard, Replay = {_, StartTime}) -> replay = Replay }). --spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It = #it{module = Mod, data = ItData}) -> +-spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream. +next(It = #it{}) -> + next(It, _BatchSize = 1). + +-spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream. +next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) -> + end_of_stream; +next( + It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize +) -> + #{data := DBData} = meta_get_gen(Shard, Gen), + {ok, ItData} = Mod:restore_iterator(DBData, Serialized), + next(It#it{data = ItData}, BatchSize); +next(It = #it{}, BatchSize) -> + do_next(It, BatchSize, _Acc = []). + +-spec do_next(iterator(), non_neg_integer(), [binary()]) -> + {ok, iterator(), [binary()]} | end_of_stream. +do_next(It, N, Acc) when N =< 0 -> + {ok, It, lists:reverse(Acc)}; +do_next(It = #it{module = Mod, data = ItData}, N, Acc) -> case Mod:next(ItData) of {value, Val, ItDataNext} -> - {value, Val, It#it{data = ItDataNext}}; - {error, _} = Error -> - Error; + do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]); + {error, _} = _Error -> + %% todo: log? + %% iterator might be invalid now; will need to re-open it. + Serialized = Mod:preserve_iterator(ItData), + {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; none -> case open_next_iterator(It) of {ok, ItNext} -> - next(ItNext); - {error, _} = Error -> - Error; + do_next(ItNext, N, Acc); + {error, _} = _Error -> + %% todo: log? + %% fixme: only bad options may lead to this? + %% return an "empty" iterator to be re-opened when retrying? + Serialized = Mod:preserve_iterator(ItData), + {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; none -> - none + case Acc of + [] -> + end_of_stream; + _ -> + {ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)} + end end end. @@ -407,8 +438,8 @@ open_iterator(#{module := Mod, data := Data}, It = #it{}) -> -spec open_restore_iterator(generation(), iterator(), binary()) -> {ok, iterator()} | {error, _Reason}. -open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) -> - case Mod:restore_iterator(Data, Replay, Serial) of +open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) -> + case Mod:restore_iterator(Data, Serial) of {ok, ItData} -> {ok, It#it{module = Mod, data = ItData}}; Err -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index 3a872934f..10596e216 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -201,7 +201,7 @@ t_iterate_multigen_preserve_restore(_Config) -> ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), {It5, Res200} = iterate(It4, 1000), - ?assertEqual(none, It5), + ?assertEqual({end_of_stream, []}, iterate(It5, 1)), ?assertEqual( lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) @@ -224,21 +224,20 @@ iterate(DB, TopicFilter, StartTime) -> iterate(It) -> case emqx_ds_storage_layer:next(It) of - {value, Payload, ItNext} -> + {ok, ItNext, [Payload]} -> [Payload | iterate(ItNext)]; - none -> + end_of_stream -> [] end. -iterate(It, 0) -> - {It, []}; +iterate(end_of_stream, _N) -> + {end_of_stream, []}; iterate(It, N) -> - case emqx_ds_storage_layer:next(It) of - {value, Payload, ItNext} -> - {ItFinal, Ps} = iterate(ItNext, N - 1), - {ItFinal, [Payload | Ps]}; - none -> - {none, []} + case emqx_ds_storage_layer:next(It, N) of + {ok, ItFinal, Payloads} -> + {ItFinal, Payloads}; + end_of_stream -> + {end_of_stream, []} end. iterator(DB, TopicFilter, StartTime) -> diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index f9964bebe..d96996534 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -225,12 +225,9 @@ run_iterator_commands([iterate | Rest], It, Ctx) -> [] end; run_iterator_commands([{preserve, restore} | Rest], It, Ctx) -> - #{ - db := DB, - replay := Replay - } = Ctx, + #{db := DB} = Ctx, Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It), - {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Replay, Serial), + {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Serial), run_iterator_commands(Rest, ItNext, Ctx); run_iterator_commands([], It, _Ctx) -> iterate_db(It).