diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 4a9240f95..df957a740 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -34,6 +34,7 @@ -export([ do_open_shard_v1/3, do_drop_shard_v1/2, + do_store_batch_v1/4, do_get_streams_v1/4, do_make_iterator_v1/5, do_next_v1/4 @@ -115,10 +116,11 @@ drop_db(DB) -> -spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Msg, Opts) -> +store_batch(DB, Batch, Opts) -> %% TODO: Currently we store messages locally. - Shard = {DB, node()}, - emqx_ds_storage_layer:store_batch(Shard, Msg, Opts). + Shard = node(), + Node = node_of_shard(DB, Shard), + emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). -spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -184,15 +186,22 @@ next(DB, Iter0, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_open_shard_v1(db(), emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> +-spec do_open_shard_v1(db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> ok | {error, _}. do_open_shard_v1(DB, Shard, Opts) -> emqx_ds_storage_layer:open_shard({DB, Shard}, Opts). --spec do_drop_shard_v1(db(), emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. +-spec do_drop_shard_v1(db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}. do_drop_shard_v1(DB, Shard) -> emqx_ds_storage_layer:drop_shard({DB, Shard}). +-spec do_store_batch_v1( + db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()], emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +do_store_batch_v1(DB, Shard, Batch, Options) -> + emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options). + -spec do_get_streams_v1( emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() ) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index 174312c4e..c2eee8dcb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -33,7 +33,7 @@ start_shard(Shard, Options) -> -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> ok = supervisor:terminate_child(?SUP, Shard), - Ok = supervisor:delete_child(?SUP, Shard). + ok = supervisor:delete_child(?SUP, Shard). -spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index c5fee4757..ae6932072 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([open_shard/4, drop_shard/3, get_streams/5, make_iterator/6, next/5]). +-export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]). %% behavior callbacks: -export([introduced_in/0]). @@ -81,6 +81,17 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> next(Node, DB, Shard, Iter, BatchSize) -> erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). +-spec store_batch( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + [emqx_types:message()], + emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +store_batch(Node, DB, Shard, Batch, Options) -> + erpc:call(Node, emqx_ds_replication_layer, do_store_batch_v1, [DB, Shard, Batch, Options]). + %%================================================================================ %% behavior callbacks %%================================================================================