chore(dsstore): refine module and callback typespecs

This commit is contained in:
Andrew Mayorov 2024-06-17 17:50:19 +02:00
parent 4f7b13e634
commit 82588fbc35
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 27 additions and 12 deletions

View File

@ -168,11 +168,14 @@
until := emqx_ds:time() | undefined until := emqx_ds:time() | undefined
}. }.
%% Module-specific runtime data, as instantiated by `Mod:open/5` callback function.
-type generation_data() :: term().
%% Schema for a generation. Persistent term. %% Schema for a generation. Persistent term.
-type generation_schema() :: generation(term()). -type generation_schema() :: generation(term()).
%% Runtime view of generation: %% Runtime view of generation:
-type generation() :: generation(term()). -type generation() :: generation(generation_data()).
%%%% Shard: %%%% Shard:
@ -204,21 +207,21 @@
rocksdb:db_handle(), rocksdb:db_handle(),
gen_id(), gen_id(),
Options :: map(), Options :: map(),
PrevRuntimeData :: term() generation_data() | undefined
) -> ) ->
{_Schema, cf_refs()}. {_Schema, cf_refs()}.
%% Open the existing schema %% Open the existing schema
-callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
_Data. generation_data().
%% Delete the schema and data %% Delete the schema and data
-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), generation_data()) ->
ok | {error, _Reason}. ok | {error, _Reason}.
-callback prepare_batch( -callback prepare_batch(
shard_id(), shard_id(),
_Data, generation_data(),
[{emqx_ds:time(), emqx_types:message()}, ...], [{emqx_ds:time(), emqx_types:message()}, ...],
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
@ -226,30 +229,42 @@
-callback commit_batch( -callback commit_batch(
shard_id(), shard_id(),
_Data, generation_data(),
_CookedBatch _CookedBatch
) -> ok | emqx_ds:error(_). ) -> ok | emqx_ds:error(_).
-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> -callback get_streams(
shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time()
) ->
[_Stream]. [_Stream].
-callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> -callback make_iterator(
shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
) ->
emqx_ds:make_iterator_result(_Iterator). emqx_ds:make_iterator_result(_Iterator).
-callback make_delete_iterator( -callback make_delete_iterator(
shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() shard_id(), generation_data(), _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time()
) -> ) ->
emqx_ds:make_delete_iterator_result(_Iterator). emqx_ds:make_delete_iterator_result(_Iterator).
-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) -> -callback next(
shard_id(), generation_data(), Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()
) ->
{ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
-callback delete_next( -callback delete_next(
shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() shard_id(),
generation_data(),
DeleteIterator,
emqx_ds:delete_selector(),
pos_integer(),
emqx_ds:time()
) -> ) ->
{ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. -callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
[CustomEvent].
-optional_callbacks([handle_event/4]). -optional_callbacks([handle_event/4]).