refactor(ds): Add a wrapper to the store batch API
This commit is contained in:
parent
2a1f7d946a
commit
f5c71e8068
|
@ -39,7 +39,7 @@
|
||||||
do_next_v1/4
|
do_next_v1/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]).
|
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
|
|
||||||
|
@ -91,6 +91,13 @@
|
||||||
|
|
||||||
-type message_id() :: emqx_ds_storage_layer:message_id().
|
-type message_id() :: emqx_ds_storage_layer:message_id().
|
||||||
|
|
||||||
|
-record(batch, {
|
||||||
|
messages :: [emqx_types:message()],
|
||||||
|
misc = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type batch() :: #batch{}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API functions
|
%% API functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -123,7 +130,8 @@ drop_db(DB) ->
|
||||||
store_batch(DB, Messages, Opts) ->
|
store_batch(DB, Messages, Opts) ->
|
||||||
Shard = shard_of_messages(DB, Messages),
|
Shard = shard_of_messages(DB, Messages),
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Messages, Opts).
|
Batch = #batch{messages = Messages},
|
||||||
|
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
|
||||||
|
|
||||||
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
[{emqx_ds:stream_rank(), stream()}].
|
[{emqx_ds:stream_rank(), stream()}].
|
||||||
|
@ -202,12 +210,12 @@ do_drop_db_v1(DB) ->
|
||||||
-spec do_store_batch_v1(
|
-spec do_store_batch_v1(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
[emqx_types:message()],
|
batch(),
|
||||||
emqx_ds:message_store_opts()
|
emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
do_store_batch_v1(DB, Shard, Batch, Options) ->
|
do_store_batch_v1(DB, Shard, #batch{messages = Messages}, Options) ->
|
||||||
emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
|
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
|
||||||
|
|
||||||
-spec do_get_streams_v1(
|
-spec do_get_streams_v1(
|
||||||
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
|
|
|
@ -75,7 +75,7 @@ next(Node, DB, Shard, Iter, BatchSize) ->
|
||||||
node(),
|
node(),
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
[emqx_types:message()],
|
emqx_ds_replication_layer:batch(),
|
||||||
emqx_ds:message_store_opts()
|
emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
|
|
Loading…
Reference in New Issue