diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 5128188b8..7a26b696d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -53,6 +53,7 @@ %% tags: -define(STREAM, 1). -define(IT, 2). +-define(BATCH, 3). %% keys: -define(tag, 1). @@ -91,12 +92,12 @@ -type message_id() :: emqx_ds_storage_layer:message_id(). --record(batch, { - messages :: [emqx_types:message()], - misc = #{} :: map() -}). +-define(batch_messages, 2). --type batch() :: #batch{}. +-type batch() :: #{ + ?tag := ?BATCH, + ?batch_messages := [emqx_types:message()] +}. %%================================================================================ %% API functions @@ -130,7 +131,7 @@ drop_db(DB) -> store_batch(DB, Messages, Opts) -> Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), - Batch = #batch{messages = Messages}, + Batch = #{?tag => ?BATCH, ?batch_messages => Messages}, emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -214,7 +215,7 @@ do_drop_db_v1(DB) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -do_store_batch_v1(DB, Shard, #batch{messages = Messages}, Options) -> +do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) -> emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). -spec do_get_streams_v1( diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 0f250022d..f7dbc828f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -81,7 +81,8 @@ -record(?SHARD_TAB, { shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, - %% Sites that the + %% Sites that should contain the data when the cluster is in the + %% stable state (no nodes are being added or removed from it): replica_set :: [site()], %% Sites that contain the actual data: in_sync_replicas :: [site()], @@ -99,6 +100,10 @@ %% API funcions %%================================================================================ +-spec this_site() -> site(). +this_site() -> + persistent_term:get(?emqx_ds_builtin_site). + -spec n_shards(emqx_ds:db()) -> pos_integer(). n_shards(DB) -> [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB), @@ -301,17 +306,13 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec this_site() -> site(). -this_site() -> - persistent_term:get(?emqx_ds_builtin_site). - -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. create_shards(DB, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], - Sites = sites(), + AllSites = sites(), lists:foreach( fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], + Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites], Hashes = lists:sort(Hashes0), {_, Sites} = lists:unzip(Hashes), [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),