diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index e224413c1..053219945 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -42,7 +42,7 @@ -export([get_delete_streams/3, make_delete_iterator/4, delete_next/4]). %% Misc. API: --export([]). +-export([count/1]). -export_type([ create_db_opts/0, @@ -52,6 +52,7 @@ topic/0, stream/0, delete_stream/0, + delete_selector/0, rank_x/0, rank_y/0, stream_rank/0, @@ -105,6 +106,8 @@ -opaque delete_stream() :: ds_specific_delete_stream(). +-type delete_selector() :: fun((emqx_types:message()) -> boolean()). + -type ds_specific_iterator() :: term(). -type ds_specific_stream() :: term(). @@ -128,9 +131,6 @@ -type ds_specific_delete_stream() :: term(). --type ds_delete_selector() :: - fun((message_key()) -> boolean()) | fun((message_key(), emqx_types:message()) -> boolean()). - -type make_delete_iterator_result(DeleteIterator) :: {ok, DeleteIterator} | {error, term()}. -type make_delete_iterator_result() :: make_delete_iterator_result(delete_iterator()). @@ -212,16 +212,20 @@ -callback make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) -> make_delete_iterator_result(ds_specific_delete_iterator()). --callback delete_next(db(), DeleteIterator, ds_delete_selector(), pos_integer()) -> +-callback delete_next(db(), DeleteIterator, delete_selector(), pos_integer()) -> delete_next_result(DeleteIterator). +-callback count(db()) -> non_neg_integer(). + -optional_callbacks([ list_generations_with_lifetimes/1, drop_generation/2, get_delete_streams/3, make_delete_iterator/4, - delete_next/4 + delete_next/4, + + count/1 ]). %%================================================================================ @@ -358,7 +362,7 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> Mod, make_delete_iterator, [DB, Stream, TopicFilter, StartTime], {error, not_implemented} ). --spec delete_next(db(), delete_iterator(), ds_delete_selector(), pos_integer()) -> +-spec delete_next(db(), delete_iterator(), delete_selector(), pos_integer()) -> delete_next_result(). delete_next(DB, Iter, Selector, BatchSize) -> Mod = ?module(DB), @@ -366,6 +370,11 @@ delete_next(DB, Iter, Selector, BatchSize) -> Mod, delete_next, [DB, Iter, Selector, BatchSize], {error, not_implemented} ). +-spec count(db()) -> non_neg_integer() | {error, not_implemented}. +count(DB) -> + Mod = ?module(DB), + call_if_implemented(Mod, count, [DB], {error, not_implemented}). + %%================================================================================ %% Internal exports %%================================================================================