diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 75fb444bd..e224413c1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -38,6 +38,9 @@ %% Message replay API: -export([get_streams/3, make_iterator/4, update_iterator/3, next/3]). +%% Message delete API: +-export([get_delete_streams/3, make_delete_iterator/4, delete_next/4]). + %% Misc. API: -export([]). @@ -48,10 +51,12 @@ topic_filter/0, topic/0, stream/0, + delete_stream/0, rank_x/0, rank_y/0, stream_rank/0, iterator/0, + delete_iterator/0, iterator_id/0, message_id/0, message_key/0, @@ -59,11 +64,12 @@ next_result/1, next_result/0, store_batch_result/0, make_iterator_result/1, make_iterator_result/0, - get_iterator_result/1, ds_specific_stream/0, ds_specific_iterator/0, ds_specific_generation_rank/0, + ds_specific_delete_stream/0, + ds_specific_delete_iterator/0, generation_rank/0, generation_info/0 ]). @@ -93,8 +99,12 @@ -opaque iterator() :: ds_specific_iterator(). +-opaque delete_iterator() :: ds_specific_delete_iterator(). + -opaque stream() :: ds_specific_stream(). +-opaque delete_stream() :: ds_specific_delete_stream(). + -type ds_specific_iterator() :: term(). -type ds_specific_stream() :: term(). @@ -114,6 +124,22 @@ -type next_result() :: next_result(iterator()). +-type ds_specific_delete_iterator() :: term(). + +-type ds_specific_delete_stream() :: term(). + +-type ds_delete_selector() :: + fun((message_key()) -> boolean()) | fun((message_key(), emqx_types:message()) -> boolean()). + +-type make_delete_iterator_result(DeleteIterator) :: {ok, DeleteIterator} | {error, term()}. + +-type make_delete_iterator_result() :: make_delete_iterator_result(delete_iterator()). + +-type delete_next_result(DeleteIterator) :: + {ok, DeleteIterator, non_neg_integer()} | {ok, end_of_stream} | {error, term()}. + +-type delete_next_result() :: delete_next_result(delete_iterator()). + %% Timestamp %% Earliest possible timestamp is 0. %% TODO granularity? Currently, we should always use milliseconds, as that's the unit we @@ -137,8 +163,6 @@ -type message_id() :: emqx_ds_replication_layer:message_id(). --type get_iterator_result(Iterator) :: {ok, Iterator} | undefined. - %% An opaque term identifying a generation. Each implementation will possibly add %% information to this term to match its inner structure (e.g.: by embedding the shard id, %% in the case of `emqx_ds_replication_layer'). @@ -183,9 +207,21 @@ -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator). +-callback get_delete_streams(db(), topic_filter(), time()) -> [ds_specific_delete_stream()]. + +-callback make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) -> + make_delete_iterator_result(ds_specific_delete_iterator()). + +-callback delete_next(db(), DeleteIterator, ds_delete_selector(), pos_integer()) -> + delete_next_result(DeleteIterator). + -optional_callbacks([ list_generations_with_lifetimes/1, - drop_generation/2 + drop_generation/2, + + get_delete_streams/3, + make_delete_iterator/4, + delete_next/4 ]). %%================================================================================ @@ -219,12 +255,7 @@ update_db_config(DB, Opts) -> -spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}. list_generations_with_lifetimes(DB) -> Mod = ?module(DB), - case erlang:function_exported(Mod, list_generations_with_lifetimes, 1) of - true -> - Mod:list_generations_with_lifetimes(DB); - false -> - #{} - end. + call_if_implemented(Mod, list_generations_with_lifetimes, [DB], #{}). -spec drop_generation(db(), generation_rank()) -> ok | {error, _}. drop_generation(DB, GenId) -> @@ -314,6 +345,27 @@ update_iterator(DB, OldIter, DSKey) -> next(DB, Iter, BatchSize) -> ?module(DB):next(DB, Iter, BatchSize). +-spec get_delete_streams(db(), topic_filter(), time()) -> [delete_stream()]. +get_delete_streams(DB, TopicFilter, StartTime) -> + Mod = ?module(DB), + call_if_implemented(Mod, get_delete_streams, [DB, TopicFilter, StartTime], []). + +-spec make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) -> + make_delete_iterator_result(). +make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> + Mod = ?module(DB), + call_if_implemented( + Mod, make_delete_iterator, [DB, Stream, TopicFilter, StartTime], {error, not_implemented} + ). + +-spec delete_next(db(), delete_iterator(), ds_delete_selector(), pos_integer()) -> + delete_next_result(). +delete_next(DB, Iter, Selector, BatchSize) -> + Mod = ?module(DB), + call_if_implemented( + Mod, delete_next, [DB, Iter, Selector, BatchSize], {error, not_implemented} + ). + %%================================================================================ %% Internal exports %%================================================================================ @@ -321,3 +373,11 @@ next(DB, Iter, BatchSize) -> %%================================================================================ %% Internal functions %%================================================================================ + +call_if_implemented(Mod, Fun, Args, Default) -> + case erlang:function_exported(Mod, Fun, length(Args)) of + true -> + apply(Mod, Fun, Args); + false -> + Default + end.