diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 50331a378..5128188b8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -39,7 +39,7 @@ do_next_v1/4 ]). --export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]). +-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). -include_lib("emqx_utils/include/emqx_message.hrl"). @@ -91,6 +91,13 @@ -type message_id() :: emqx_ds_storage_layer:message_id(). +-record(batch, { + messages :: [emqx_types:message()], + misc = #{} :: map() +}). + +-type batch() :: #batch{}. + %%================================================================================ %% API functions %%================================================================================ @@ -123,7 +130,8 @@ drop_db(DB) -> store_batch(DB, Messages, Opts) -> Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), - emqx_ds_proto_v1:store_batch(Node, DB, Shard, Messages, Opts). + Batch = #batch{messages = Messages}, + emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -202,12 +210,12 @@ do_drop_db_v1(DB) -> -spec do_store_batch_v1( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - [emqx_types:message()], + batch(), emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -do_store_batch_v1(DB, Shard, Batch, Options) -> - emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options). +do_store_batch_v1(DB, Shard, #batch{messages = Messages}, Options) -> + emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). -spec do_get_streams_v1( emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 758b5148b..0d7972466 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -75,7 +75,7 @@ next(Node, DB, Shard, Iter, BatchSize) -> node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - [emqx_types:message()], + emqx_ds_replication_layer:batch(), emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result().