diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index db3b4e5c3..d6250254d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -290,7 +290,9 @@ emqx_ds:time(), _IsCurrentGeneration :: boolean() ) -> - {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()} | emqx_ds:error(_). + {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()} + | {ok, end_of_stream} + | emqx_ds:error(_). -callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. @@ -313,6 +315,8 @@ drop_shard(Shard) -> ok = rocksdb:destroy(db_dir(Shard), []). +%% @doc This is a convenicence wrapper that combines `prepare' and +%% `commit' operations. -spec store_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], @@ -329,6 +333,15 @@ store_batch(Shard, Messages, Options) -> Error end. +%% @doc Transform a batch of messages into a "cooked batch" that can +%% be stored in the transaction log or transfered over the network. +%% +%% Important: the caller MUST ensure that timestamps within the shard +%% form a strictly increasing monotonic sequence through out the whole +%% lifetime of the shard. +%% +%% The underlying storage layout MAY use timestamp as a unique message +%% ID. -spec prepare_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], @@ -361,6 +374,10 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> prepare_batch(_Shard, [], _Options) -> ignore. +%% @doc Commit cooked batch to the storage. +%% +%% The underlying storage layout must guarantee that this operation is +%% idempotent. -spec commit_batch( shard_id(), cooked_batch(),