fix(ds): tidy up few typespecs

This commit is contained in:
Andrew Mayorov 2024-03-03 22:20:16 +01:00
parent 1cf672e78d
commit b39c710ec2
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 10 additions and 12 deletions

View File

@ -533,7 +533,7 @@ replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
%% mechanisms to replay them: %% mechanisms to replay them:
pull_now(Session). pull_now(Session).
-spec replay_batch(stream_state(), session(), clientinfo()) -> session(). -spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_).
replay_batch(Srs0, Session0, ClientInfo) -> replay_batch(Srs0, Session0, ClientInfo) ->
#srs{batch_size = BatchSize} = Srs0, #srs{batch_size = BatchSize} = Srs0,
case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of

View File

@ -35,6 +35,7 @@
-export_type([ -export_type([
badrpc/0, badrpc/0,
call_result/1,
call_result/0, call_result/0,
cast_result/0, cast_result/0,
multicall_result/1, multicall_result/1,

View File

@ -171,13 +171,12 @@ drop_db(DB) ->
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, Opts) ->
case emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of try emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of
ok -> ok ->
ok; ok
Error = {error, _, _} -> catch
Error; error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
RPCError = {badrpc, _} -> {error, recoverable, Reason}
{error, recoverable, RPCError}
end. end.
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->

View File

@ -64,7 +64,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
emqx_ds:topic_filter(), emqx_ds:topic_filter(),
emqx_ds:time() emqx_ds:time()
) -> ) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}. emqx_ds:make_iterator_result().
make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [ erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [
DB, Shard, Stream, TopicFilter, StartTime DB, Shard, Stream, TopicFilter, StartTime
@ -77,9 +77,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:iterator(), emqx_ds_storage_layer:iterator(),
pos_integer() pos_integer()
) -> ) ->
{ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]} emqx_rpc:call_result(emqx_ds:next_result()).
| {ok, end_of_stream}
| {error, _}.
next(Node, DB, Shard, Iter, BatchSize) -> next(Node, DB, Shard, Iter, BatchSize) ->
emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
@ -103,7 +101,7 @@ store_batch(Node, DB, Shard, Batch, Options) ->
emqx_ds_storage_layer:iterator(), emqx_ds_storage_layer:iterator(),
emqx_ds:message_key() emqx_ds:message_key()
) -> ) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}. emqx_ds:make_iterator_result().
update_iterator(Node, DB, Shard, OldIter, DSKey) -> update_iterator(Node, DB, Shard, OldIter, DSKey) ->
erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
DB, Shard, OldIter, DSKey DB, Shard, OldIter, DSKey