diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 82a345eef..30ebe7417 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -54,10 +54,14 @@ is_persistence_enabled() -> storage_backend() -> storage_backend(emqx_config:get([session_persistence, storage])). -storage_backend(#{builtin := #{enable := true}}) -> +storage_backend(#{ + builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor} +}) -> #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => NShards, + replication_factor => ReplicationFactor }. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7ba5aa527..928115a52 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -440,10 +440,6 @@ del_subscription(TopicFilter, DSSessionId) -> %%-------------------------------------------------------------------- create_tables() -> - ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} - }), ok = mria:create_table( ?SESSION_TAB, [ diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ef34d3e8f..8e401a442 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1791,6 +1791,22 @@ fields("session_storage_backend_builtin") -> desc => ?DESC(session_storage_backend_enable), default => true } + )}, + {"n_shards", + sc( + pos_integer(), + #{ + desc => ?DESC(session_builtin_n_shards), + default => 16 + } + )}, + {"replication_factor", + sc( + pos_integer(), + #{ + default => 3, + importance => ?IMPORTANCE_HIDDEN + } )} ]. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 725d62673..84631f38e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -35,7 +35,6 @@ -export_type([ create_db_opts/0, - builtin_db_opts/0, db/0, time/0, topic_filter/0, @@ -87,14 +86,8 @@ -type message_store_opts() :: #{}. --type builtin_db_opts() :: - #{ - backend := builtin, - storage := emqx_ds_storage_layer:prototype() - }. - -type create_db_opts() :: - builtin_db_opts(). + emqx_ds_replication_layer:builtin_db_opts(). -type message_id() :: emqx_ds_replication_layer:message_id(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a06af104d..7a26b696d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -32,15 +32,16 @@ %% internal exports: -export([ - do_open_shard_v1/3, - do_drop_shard_v1/2, + do_drop_db_v1/1, do_store_batch_v1/4, do_get_streams_v1/4, do_make_iterator_v1/5, do_next_v1/4 ]). --export_type([shard_id/0, stream/0, iterator/0, message_id/0]). +-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). + +-include_lib("emqx_utils/include/emqx_message.hrl"). %%================================================================================ %% Type declarations @@ -52,13 +53,22 @@ %% tags: -define(STREAM, 1). -define(IT, 2). +-define(BATCH, 3). %% keys: -define(tag, 1). -define(shard, 2). -define(enc, 3). --type shard_id() :: atom(). +-type shard_id() :: binary(). + +-type builtin_db_opts() :: + #{ + backend := builtin, + storage := emqx_ds_storage_layer:prototype(), + n_shards => pos_integer(), + replication_factor => pos_integer() + }. %% This enapsulates the stream entity from the replication level. %% @@ -82,42 +92,46 @@ -type message_id() :: emqx_ds_storage_layer:message_id(). +-define(batch_messages, 2). + +-type batch() :: #{ + ?tag := ?BATCH, + ?batch_messages := [emqx_types:message()] +}. + %%================================================================================ %% API functions %%================================================================================ -spec list_shards(emqx_ds:db()) -> [shard_id()]. -list_shards(_DB) -> - %% TODO: milestone 5 - list_nodes(). +list_shards(DB) -> + emqx_ds_replication_layer_meta:shards(DB). --spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}. -open_db(DB, Opts) -> - %% TODO: improve error reporting, don't just crash +-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. +open_db(DB, CreateOpts) -> + Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), lists:foreach( fun(Shard) -> - Node = node_of_shard(DB, Shard), - ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts) + emqx_ds_storage_layer:open_shard({DB, Shard}, Opts), + maybe_set_myself_as_leader(DB, Shard) end, - list_shards(DB) + MyShards ). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> - lists:foreach( - fun(Shard) -> - Node = node_of_shard(DB, Shard), - ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard) - end, - list_shards(DB) - ). + Nodes = list_nodes(), + _ = emqx_ds_proto_v1:drop_db(Nodes, DB), + _ = emqx_ds_replication_layer_meta:drop_db(DB), + ok. --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Batch, Opts) -> - %% TODO: Currently we store messages locally. - Shard = node(), +store_batch(DB, Messages, Opts) -> + Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), + Batch = #{?tag => ?BATCH, ?batch_messages => Messages}, emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -184,26 +198,25 @@ next(DB, Iter0, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_open_shard_v1( - emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts() -) -> - ok | {error, _}. -do_open_shard_v1(DB, Shard, Opts) -> - emqx_ds_storage_layer:open_shard({DB, Shard}, Opts). - --spec do_drop_shard_v1(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}. -do_drop_shard_v1(DB, Shard) -> - emqx_ds_storage_layer:drop_shard({DB, Shard}). +-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}. +do_drop_db_v1(DB) -> + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + lists:foreach( + fun(Shard) -> + emqx_ds_storage_layer:drop_shard({DB, Shard}) + end, + MyShards + ). -spec do_store_batch_v1( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - [emqx_types:message()], + batch(), emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -do_store_batch_v1(DB, Shard, Batch, Options) -> - emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options). +do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) -> + emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). -spec do_get_streams_v1( emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() @@ -237,9 +250,34 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> %% Internal functions %%================================================================================ +%% TODO: there's no real leader election right now +-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. +maybe_set_myself_as_leader(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of + [Site | _] -> + %% Currently the first in-sync replica always becomes the + %% leader + ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); + _Sites -> + ok + end. + -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(_DB, Node) -> - Node. +node_of_shard(DB, Shard) -> + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + %% TODO: use optvar + timer:sleep(500), + node_of_shard(DB, Shard) + end. + +%% Here we assume that all messages in the batch come from the same client +shard_of_messages(DB, [#message{from = From} | _]) -> + N = emqx_ds_replication_layer_meta:n_shards(DB), + integer_to_binary(erlang:phash2(From, N)). list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl new file mode 100644 index 000000000..f7dbc828f --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -0,0 +1,340 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Metadata storage for the builtin sharded database. +%% +%% Currently metadata is stored in mria; that's not ideal, but +%% eventually we'll replace it, so it's important not to leak +%% implementation details from this module. +-module(emqx_ds_replication_layer_meta). + +-behaviour(gen_server). + +%% API: +-export([ + shards/1, + my_shards/1, + replica_set/2, + in_sync_replicas/2, + sites/0, + open_db/2, + drop_db/1, + shard_leader/2, + this_site/0, + set_leader/3 +]). + +%% gen_server +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% internal exports: +-export([ + open_db_trans/2, + drop_db_trans/1, + claim_site/2, + in_sync_replicas_trans/2, + set_leader_trans/3, + n_shards/1 +]). + +-export_type([site/0]). + +-include_lib("stdlib/include/qlc.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SERVER, ?MODULE). + +-define(SHARD, emqx_ds_builtin_metadata_shard). +%% DS database metadata: +-define(META_TAB, emqx_ds_builtin_metadata_tab). +%% Mapping from Site to the actual Erlang node: +-define(NODE_TAB, emqx_ds_builtin_node_tab). +%% Shard metadata: +-define(SHARD_TAB, emqx_ds_builtin_shard_tab). + +-record(?META_TAB, { + db :: emqx_ds:db(), + db_props :: emqx_ds_replication_layer:builtin_db_opts() +}). + +-record(?NODE_TAB, { + site :: site(), + node :: node(), + misc = #{} :: map() +}). + +-record(?SHARD_TAB, { + shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + %% Sites that should contain the data when the cluster is in the + %% stable state (no nodes are being added or removed from it): + replica_set :: [site()], + %% Sites that contain the actual data: + in_sync_replicas :: [site()], + leader :: node() | undefined, + misc = #{} :: map() +}). + +%% Persistent ID of the node (independent from the IP/FQDN): +-type site() :: binary(). + +%% Peristent term key: +-define(emqx_ds_builtin_site, emqx_ds_builtin_site). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec this_site() -> site(). +this_site() -> + persistent_term:get(?emqx_ds_builtin_site). + +-spec n_shards(emqx_ds:db()) -> pos_integer(). +n_shards(DB) -> + [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB), + NShards. + +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +shards(DB) -> + eval_qlc( + qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB]) + ). + +-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +my_shards(DB) -> + Site = this_site(), + eval_qlc( + qlc:q([ + Shard + || #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table( + ?SHARD_TAB + ), + D =:= DB, + lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) + ]) + ). + +-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, [site()]} | {error, _}. +replica_set(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{replica_set = ReplicaSet}] -> + {ok, ReplicaSet}; + [] -> + {error, no_shard} + end. + +-spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + [site()]. +in_sync_replicas(DB, ShardId) -> + {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]), + case Result of + {ok, InSync} -> + InSync; + {error, _} -> + [] + end. + +-spec sites() -> [site()]. +sites() -> + eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). + +-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, node()} | {error, no_leader_for_shard}. +shard_leader(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{leader = Leader}] -> + {ok, Leader}; + [] -> + {error, no_leader_for_shard} + end. + +-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> + ok. +set_leader(DB, Shard, Node) -> + {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), + ok. + +-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + emqx_ds_replication_layer:builtin_db_opts(). +open_db(DB, DefaultOpts) -> + {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), + Opts. + +-spec drop_db(emqx_ds:db()) -> ok. +drop_db(DB) -> + _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]), + ok. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-record(s, {}). + +init([]) -> + process_flag(trap_exit, true), + logger:set_process_metadata(#{domain => [ds, meta]}), + ensure_tables(), + ensure_site(), + S = #s{}, + {ok, S}. + +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{}) -> + persistent_term:erase(?emqx_ds_builtin_site), + ok. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + emqx_ds_replication_layer:builtin_db_opts(). +open_db_trans(DB, CreateOpts) -> + case mnesia:wread({?META_TAB, DB}) of + [] -> + NShards = maps:get(n_shards, CreateOpts), + ReplicationFactor = maps:get(replication_factor, CreateOpts), + mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), + create_shards(DB, NShards, ReplicationFactor), + CreateOpts; + [#?META_TAB{db_props = Opts}] -> + Opts + end. + +-spec drop_db_trans(emqx_ds:db()) -> ok. +drop_db_trans(DB) -> + mnesia:delete({?META_TAB, DB}), + [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)], + ok. + +-spec claim_site(site(), node()) -> ok. +claim_site(Site, Node) -> + mnesia:write(#?NODE_TAB{site = Site, node = Node}). + +-spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, [site()]} | {error, no_shard}. +in_sync_replicas_trans(DB, Shard) -> + case mnesia:read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{in_sync_replicas = InSync}] -> + {ok, InSync}; + [] -> + {error, no_shard} + end. + +-spec set_leader_trans(emqx_ds:ds(), emqx_ds_replication_layer:shard_id(), node()) -> + ok. +set_leader_trans(DB, Shard, Node) -> + [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}), + Record = Record0#?SHARD_TAB{leader = Node}, + mnesia:write(Record). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ensure_tables() -> + %% TODO: seems like it may introduce flakiness + Majority = false, + ok = mria:create_table(?META_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, rocksdb_copies}, + {record_name, ?META_TAB}, + {attributes, record_info(fields, ?META_TAB)} + ]), + ok = mria:create_table(?NODE_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, rocksdb_copies}, + {record_name, ?NODE_TAB}, + {attributes, record_info(fields, ?NODE_TAB)} + ]), + ok = mria:create_table(?SHARD_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, ram_copies}, + {record_name, ?SHARD_TAB}, + {attributes, record_info(fields, ?SHARD_TAB)} + ]), + ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]). + +ensure_site() -> + Filename = filename:join(emqx:data_dir(), "emqx_ds_builtin_site.eterm"), + case file:consult(Filename) of + {ok, [Site]} -> + ok; + _ -> + Site = crypto:strong_rand_bytes(8), + ok = filelib:ensure_dir(Filename), + {ok, FD} = file:open(Filename, [write]), + io:format(FD, "~p.", [Site]), + file:close(FD) + end, + {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]), + persistent_term:put(?emqx_ds_builtin_site, Site), + ok. + +-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. +create_shards(DB, NShards, ReplicationFactor) -> + Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], + AllSites = sites(), + lists:foreach( + fun(Shard) -> + Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites], + Hashes = lists:sort(Hashes0), + {_, Sites} = lists:unzip(Hashes), + [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + Record = #?SHARD_TAB{ + shard = {DB, Shard}, + replica_set = ReplicaSet, + in_sync_replicas = [First] + }, + mnesia:write(Record) + end, + Shards + ). + +-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). +hash(Shard, Site) -> + erlang:phash2({Shard, Site}). + +eval_qlc(Q) -> + case mnesia:is_transaction() of + true -> + qlc:eval(Q); + false -> + {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end), + Result + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 0fe719dbc..54530f428 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -384,7 +384,7 @@ rocksdb_open(Shard, Options) -> -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]). + filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]). %%-------------------------------------------------------------------------------- %% Schema access diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index d371a2346..081557a46 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -29,8 +29,15 @@ start_link() -> %% behaviour callbacks %%================================================================================ +-dialyzer({nowarn_function, init/1}). init([]) -> - Children = [storage_layer_sup()], + %% TODO: technically, we don't need rocksDB for the alternative + %% backends. But right now we have any: + Children = + case mria:rocksdb_backend_available() of + true -> [meta(), storage_layer_sup()]; + false -> [] + end, SupFlags = #{ strategy => one_for_all, intensity => 0, @@ -42,6 +49,15 @@ init([]) -> %% Internal functions %%================================================================================ +meta() -> + #{ + id => emqx_ds_replication_layer_meta, + start => {emqx_ds_replication_layer_meta, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5000 + }. + storage_layer_sup() -> #{ id => local_store_shard_sup, diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 10d1ed7a5..0d7972466 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]). +-export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]). %% behavior callbacks: -export([introduced_in/0]). @@ -28,20 +28,10 @@ %% API funcions %%================================================================================ --spec open_shard( - node(), - emqx_ds:db(), - emqx_ds_replication_layer:shard_id(), - emqx_ds:create_db_opts() -) -> - ok. -open_shard(Node, DB, Shard, Opts) -> - erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]). - --spec drop_shard(node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - ok. -drop_shard(Node, DB, Shard) -> - erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]). +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). -spec get_streams( node(), @@ -85,7 +75,7 @@ next(Node, DB, Shard, Iter, BatchSize) -> node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - [emqx_types:message()], + emqx_ds_replication_layer:batch(), emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 9b74e3227..8a46804b0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -23,10 +23,14 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(N_SHARDS, 1). + opts() -> #{ backend => builtin, - storage => {emqx_ds_storage_reference, #{}} + storage => {emqx_ds_storage_reference, #{}}, + n_shards => ?N_SHARDS, + replication_factor => 3 }. %% A simple smoke test that verifies that opening/closing the DB @@ -34,7 +38,32 @@ opts() -> t_00_smoke_open_drop(_Config) -> DB = 'DB', ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Check metadata: + %% We have only one site: + [Site] = emqx_ds_replication_layer_meta:sites(), + %% Check all shards: + Shards = emqx_ds_replication_layer_meta:shards(DB), + %% Since there is only one site all shards should be allocated + %% to this site: + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + ?assertEqual(?N_SHARDS, length(Shards)), + lists:foreach( + fun(Shard) -> + ?assertEqual( + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + ), + ?assertEqual( + [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) + ), + %% Check that the leader is eleected; + ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) + end, + Shards + ), + ?assertEqual(lists:sort(Shards), lists:sort(MyShards)), + %% Reopen the DB and make sure the operation is idempotent: ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Close the DB: ?assertMatch(ok, emqx_ds:drop_db(DB)). %% A simple smoke test that verifies that storing the messages doesn't @@ -138,9 +167,11 @@ end_per_suite(Config) -> ok. init_per_testcase(_TC, Config) -> - %% snabbkaffe:fix_ct_logging(), application:ensure_all_started(emqx_durable_storage), Config. end_per_testcase(_TC, _Config) -> - ok = application:stop(emqx_durable_storage). + ok = application:stop(emqx_durable_storage), + mria:stop(), + _ = mnesia:delete_schema([node()]), + ok. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 6dc24a269..7b733406d 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -15,7 +15,9 @@ -define(DEFAULT_CONFIG, #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 16, + replication_factor => 3 }). -define(COMPACT_CONFIG, #{ @@ -23,7 +25,9 @@ storage => {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 - }} + }}, + n_shards => 16, + replication_factor => 3 }). %% Smoke test for opening and reopening the database @@ -387,7 +391,7 @@ end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). shard(TC) -> - {?MODULE, TC}. + {?MODULE, atom_to_binary(TC)}. keyspace(TC) -> TC. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index d12f6a2d1..389a1f91b 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1565,6 +1565,9 @@ session_persistence_storage.desc: session_storage_backend_enable.desc: """Enable this backend.""" +session_builtin_n_shards.desc: +"""Number of shards used for storing the messages.""" + session_storage_backend_builtin.desc: """Builtin session storage backend utilizing embedded RocksDB key-value store."""