diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d6d36bf97..47fe047fc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -168,11 +168,14 @@ until := emqx_ds:time() | undefined }. +%% Module-specific runtime data, as instantiated by `Mod:open/5` callback function. +-type generation_data() :: term(). + %% Schema for a generation. Persistent term. -type generation_schema() :: generation(term()). %% Runtime view of generation: --type generation() :: generation(term()). +-type generation() :: generation(generation_data()). %%%% Shard: @@ -204,21 +207,21 @@ rocksdb:db_handle(), gen_id(), Options :: map(), - PrevRuntimeData :: term() + generation_data() | undefined ) -> {_Schema, cf_refs()}. %% Open the existing schema -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> - _Data. + generation_data(). %% Delete the schema and data --callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> +-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), generation_data()) -> ok | {error, _Reason}. -callback prepare_batch( shard_id(), - _Data, + generation_data(), [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> @@ -226,30 +229,42 @@ -callback commit_batch( shard_id(), - _Data, + generation_data(), _CookedBatch ) -> ok | emqx_ds:error(_). --callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> +-callback get_streams( + shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time() +) -> [_Stream]. --callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> +-callback make_iterator( + shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time() +) -> emqx_ds:make_iterator_result(_Iterator). -callback make_delete_iterator( - shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() + shard_id(), generation_data(), _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) -> +-callback next( + shard_id(), generation_data(), Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean() +) -> {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. -callback delete_next( - shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() + shard_id(), + generation_data(), + DeleteIterator, + emqx_ds:delete_selector(), + pos_integer(), + emqx_ds:time() ) -> {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. --callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> + [CustomEvent]. -optional_callbacks([handle_event/4]).