feat(ds): Pass store_batch through RPC

This commit is contained in:
ieQu1 2023-11-10 03:10:13 +01:00
parent 8dc8237331
commit feef23fc08
3 changed files with 27 additions and 7 deletions

View File

@ -34,6 +34,7 @@
-export([
do_open_shard_v1/3,
do_drop_shard_v1/2,
do_store_batch_v1/4,
do_get_streams_v1/4,
do_make_iterator_v1/5,
do_next_v1/4
@ -115,10 +116,11 @@ drop_db(DB) ->
-spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Msg, Opts) ->
store_batch(DB, Batch, Opts) ->
%% TODO: Currently we store messages locally.
Shard = {DB, node()},
emqx_ds_storage_layer:store_batch(Shard, Msg, Opts).
Shard = node(),
Node = node_of_shard(DB, Shard),
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
-spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}].
@ -184,15 +186,22 @@ next(DB, Iter0, BatchSize) ->
%% Internal exports (RPC targets)
%%================================================================================
-spec do_open_shard_v1(db(), emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
-spec do_open_shard_v1(db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
ok | {error, _}.
do_open_shard_v1(DB, Shard, Opts) ->
emqx_ds_storage_layer:open_shard({DB, Shard}, Opts).
-spec do_drop_shard_v1(db(), emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
-spec do_drop_shard_v1(db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}.
do_drop_shard_v1(DB, Shard) ->
emqx_ds_storage_layer:drop_shard({DB, Shard}).
-spec do_store_batch_v1(
db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
do_store_batch_v1(DB, Shard, Batch, Options) ->
emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
-spec do_get_streams_v1(
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
) ->

View File

@ -33,7 +33,7 @@ start_shard(Shard, Options) ->
-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard),
Ok = supervisor:delete_child(?SUP, Shard).
ok = supervisor:delete_child(?SUP, Shard).
-spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
ok | {error, _Reason}.

View File

@ -19,7 +19,7 @@
-include_lib("emqx_utils/include/bpapi.hrl").
%% API:
-export([open_shard/4, drop_shard/3, get_streams/5, make_iterator/6, next/5]).
-export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]).
%% behavior callbacks:
-export([introduced_in/0]).
@ -81,6 +81,17 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
next(Node, DB, Shard, Iter, BatchSize) ->
erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
-spec store_batch(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
[emqx_types:message()],
emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
store_batch(Node, DB, Shard, Batch, Options) ->
erpc:call(Node, emqx_ds_replication_layer, do_store_batch_v1, [DB, Shard, Batch, Options]).
%%================================================================================
%% behavior callbacks
%%================================================================================