fix(ds): Apply review remarks

This commit is contained in:
ieQu1 2023-11-21 18:33:20 +01:00
parent 4d47490734
commit 3d823beb11
2 changed files with 16 additions and 14 deletions

View File

@ -53,6 +53,7 @@
%% tags: %% tags:
-define(STREAM, 1). -define(STREAM, 1).
-define(IT, 2). -define(IT, 2).
-define(BATCH, 3).
%% keys: %% keys:
-define(tag, 1). -define(tag, 1).
@ -91,12 +92,12 @@
-type message_id() :: emqx_ds_storage_layer:message_id(). -type message_id() :: emqx_ds_storage_layer:message_id().
-record(batch, { -define(batch_messages, 2).
messages :: [emqx_types:message()],
misc = #{} :: map()
}).
-type batch() :: #batch{}. -type batch() :: #{
?tag := ?BATCH,
?batch_messages := [emqx_types:message()]
}.
%%================================================================================ %%================================================================================
%% API functions %% API functions
@ -130,7 +131,7 @@ drop_db(DB) ->
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, Opts) ->
Shard = shard_of_messages(DB, Messages), Shard = shard_of_messages(DB, Messages),
Node = node_of_shard(DB, Shard), Node = node_of_shard(DB, Shard),
Batch = #batch{messages = Messages}, Batch = #{?tag => ?BATCH, ?batch_messages => Messages},
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
@ -214,7 +215,7 @@ do_drop_db_v1(DB) ->
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
do_store_batch_v1(DB, Shard, #batch{messages = Messages}, Options) -> do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
-spec do_get_streams_v1( -spec do_get_streams_v1(

View File

@ -81,7 +81,8 @@
-record(?SHARD_TAB, { -record(?SHARD_TAB, {
shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
%% Sites that the %% Sites that should contain the data when the cluster is in the
%% stable state (no nodes are being added or removed from it):
replica_set :: [site()], replica_set :: [site()],
%% Sites that contain the actual data: %% Sites that contain the actual data:
in_sync_replicas :: [site()], in_sync_replicas :: [site()],
@ -99,6 +100,10 @@
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
-spec this_site() -> site().
this_site() ->
persistent_term:get(?emqx_ds_builtin_site).
-spec n_shards(emqx_ds:db()) -> pos_integer(). -spec n_shards(emqx_ds:db()) -> pos_integer().
n_shards(DB) -> n_shards(DB) ->
[#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB), [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB),
@ -301,17 +306,13 @@ ensure_site() ->
persistent_term:put(?emqx_ds_builtin_site, Site), persistent_term:put(?emqx_ds_builtin_site, Site),
ok. ok.
-spec this_site() -> site().
this_site() ->
persistent_term:get(?emqx_ds_builtin_site).
-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
create_shards(DB, NShards, ReplicationFactor) -> create_shards(DB, NShards, ReplicationFactor) ->
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
Sites = sites(), AllSites = sites(),
lists:foreach( lists:foreach(
fun(Shard) -> fun(Shard) ->
Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
Hashes = lists:sort(Hashes0), Hashes = lists:sort(Hashes0),
{_, Sites} = lists:unzip(Hashes), {_, Sites} = lists:unzip(Hashes),
[First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),