diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 82a345eef..50e25e0be 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -57,7 +57,9 @@ storage_backend() -> storage_backend(#{builtin := #{enable := true}}) -> #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 16, + replication_factor => 3 }. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7ba5aa527..34abba2cc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -442,7 +442,9 @@ del_subscription(TopicFilter, DSSessionId) -> create_tables() -> ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 255, + replication_factor => 3 }), ok = mria:create_table( ?SESSION_TAB, diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 725d62673..84631f38e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -35,7 +35,6 @@ -export_type([ create_db_opts/0, - builtin_db_opts/0, db/0, time/0, topic_filter/0, @@ -87,14 +86,8 @@ -type message_store_opts() :: #{}. --type builtin_db_opts() :: - #{ - backend := builtin, - storage := emqx_ds_storage_layer:prototype() - }. - -type create_db_opts() :: - builtin_db_opts(). + emqx_ds_replication_layer:builtin_db_opts(). -type message_id() :: emqx_ds_replication_layer:message_id(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 858855b6f..0528a0a2c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -7,4 +7,5 @@ -export([start/2]). start(_Type, _Args) -> + emqx_ds_replication_layer_meta:init(), emqx_ds_sup:start_link(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a06af104d..f359846eb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -40,7 +40,7 @@ do_next_v1/4 ]). --export_type([shard_id/0, stream/0, iterator/0, message_id/0]). +-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]). %%================================================================================ %% Type declarations @@ -58,7 +58,15 @@ -define(shard, 2). -define(enc, 3). --type shard_id() :: atom(). +-type shard_id() :: binary(). + +-type builtin_db_opts() :: + #{ + backend := builtin, + storage := emqx_ds_storage_layer:prototype(), + n_shards => pos_integer(), + replication_factor => pos_integer() + }. %% This enapsulates the stream entity from the replication level. %% @@ -89,11 +97,12 @@ -spec list_shards(emqx_ds:db()) -> [shard_id()]. list_shards(_DB) -> %% TODO: milestone 5 - list_nodes(). + lists:map(fun atom_to_binary/1, list_nodes()). --spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}. -open_db(DB, Opts) -> +-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. +open_db(DB, CreateOpts) -> %% TODO: improve error reporting, don't just crash + Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), lists:foreach( fun(Shard) -> Node = node_of_shard(DB, Shard), @@ -104,6 +113,7 @@ open_db(DB, Opts) -> -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> + _ = emqx_ds_replication_layer_meta:drop_db(DB), lists:foreach( fun(Shard) -> Node = node_of_shard(DB, Shard), @@ -116,7 +126,7 @@ drop_db(DB) -> emqx_ds:store_batch_result(). store_batch(DB, Batch, Opts) -> %% TODO: Currently we store messages locally. - Shard = node(), + Shard = atom_to_binary(node()), Node = node_of_shard(DB, Shard), emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). @@ -238,8 +248,8 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> %%================================================================================ -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(_DB, Node) -> - Node. +node_of_shard(_DB, Shard) -> + binary_to_atom(Shard). list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl new file mode 100644 index 000000000..0bfa2a3ee --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -0,0 +1,217 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Metadata storage for the builtin sharded database. +%% +%% Currently metadata is stored in mria; that's not ideal, but +%% eventually we'll replace it, so it's important not to leak +%% implementation details from this module. +-module(emqx_ds_replication_layer_meta). + +%% API: +-export([init/0, shards/1, replica_set/2, sites/0, open_db/2, drop_db/1]). + +%% internal exports: +-export([open_db_trans/2, drop_db_trans/1, claim_site/2]). + +-export_type([site/0]). + +-include_lib("stdlib/include/qlc.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SHARD, emqx_ds_builtin_metadata_shard). +%% DS database metadata: +-define(META_TAB, emqx_ds_builtin_metadata_tab). +%% Mapping from Site to the actual Erlang node: +-define(NODE_TAB, emqx_ds_builtin_node_tab). +%% Shard metadata: +-define(SHARD_TAB, emqx_ds_builtin_shard_tab). + +-record(?META_TAB, { + db :: emqx_ds:db(), + db_props :: emqx_ds_replication_layer:builtin_db_opts() +}). + +-record(?NODE_TAB, { + site :: site(), + node :: node(), + misc = #{} :: map() +}). + +-record(?SHARD_TAB, { + shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + replica_set :: [site()], + leader :: node() | undefined, + misc = #{} :: map() +}). + +%% Persistent ID of the node (independent from the IP/FQDN): +-type site() :: binary(). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec init() -> ok. +init() -> + ensure_tables(), + ensure_site(). + +-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +shards(DB) -> + eval_qlc( + qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB]) + ). + +-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, [site()]} | {error, _}. +replica_set(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{replica_set = ReplicaSet}] -> + {ok, ReplicaSet}; + [] -> + {error, no_shard} + end. + +-spec sites() -> [site()]. +sites() -> + eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). + +-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + emqx_ds_replication_layer:builtin_db_opts(). +open_db(DB, DefaultOpts) -> + {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), + Opts. + +-spec drop_db(emqx_ds:db()) -> ok. +drop_db(DB) -> + _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]), + ok. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + emqx_ds_replication_layer:builtin_db_opts(). +open_db_trans(DB, CreateOpts) -> + case mnesia:wread({?META_TAB, DB}) of + [] -> + NShards = maps:get(n_shards, CreateOpts), + ReplicationFactor = maps:get(replication_factor, CreateOpts), + mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), + create_shards(DB, NShards, ReplicationFactor), + CreateOpts; + [#?META_TAB{db_props = Opts}] -> + Opts + end. + +-spec drop_db_trans(emqx_ds:db()) -> ok. +drop_db_trans(DB) -> + mnesia:delete({?META_TAB, DB}), + [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)], + ok. + +-spec claim_site(site(), node()) -> ok. +claim_site(Site, Node) -> + mnesia:write(#?NODE_TAB{site = Site, node = Node}). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ensure_tables() -> + %% TODO: seems like it may introduce flakiness + Majority = false, + ok = mria:create_table(?META_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, rocksdb_copies}, + {record_name, ?META_TAB}, + {attributes, record_info(fields, ?META_TAB)} + ]), + ok = mria:create_table(?NODE_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, rocksdb_copies}, + {record_name, ?NODE_TAB}, + {attributes, record_info(fields, ?NODE_TAB)} + ]), + ok = mria:create_table(?SHARD_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, ram_copies}, + {record_name, ?SHARD_TAB}, + {attributes, record_info(fields, ?SHARD_TAB)} + ]), + ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]). + +ensure_site() -> + Filename = filename:join(emqx:data_dir(), "emqx_ds_builtin_site.eterm"), + case file:consult(Filename) of + {ok, [Site]} -> + ok; + _ -> + Site = crypto:strong_rand_bytes(8), + ok = filelib:ensure_dir(Filename), + {ok, FD} = file:open(Filename, [write]), + io:format(FD, "~p.", [Site]), + file:close(FD) + end, + {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]), + ok. + +-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. +create_shards(DB, NShards, ReplicationFactor) -> + Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], + Sites = sites(), + lists:foreach( + fun(Shard) -> + Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], + Hashes = lists:sort(Hashes0), + {_, Sites} = lists:unzip(Hashes), + ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + Record = #?SHARD_TAB{ + shard = {DB, Shard}, + replica_set = ReplicaSet + }, + mnesia:write(Record) + end, + Shards + ). + +-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). +hash(Shard, Site) -> + erlang:phash2({Shard, Site}). + +eval_qlc(Q) -> + case mnesia:is_transaction() of + true -> + qlc:eval(Q); + false -> + {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end), + Result + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 0fe719dbc..54530f428 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -384,7 +384,7 @@ rocksdb_open(Shard, Options) -> -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]). + filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]). %%-------------------------------------------------------------------------------- %% Schema access diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 9b74e3227..e42f576b2 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -23,10 +23,14 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(N_SHARDS, 8). + opts() -> #{ backend => builtin, - storage => {emqx_ds_storage_reference, #{}} + storage => {emqx_ds_storage_reference, #{}}, + n_shards => ?N_SHARDS, + replication_factor => 3 }. %% A simple smoke test that verifies that opening/closing the DB @@ -34,6 +38,17 @@ opts() -> t_00_smoke_open_drop(_Config) -> DB = 'DB', ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + [Site] = emqx_ds_replication_layer_meta:sites(), + Shards = emqx_ds_replication_layer_meta:shards(DB), + ?assertEqual(?N_SHARDS, length(Shards)), + lists:foreach( + fun(Shard) -> + ?assertEqual( + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard), {DB, Shard} + ) + end, + Shards + ), ?assertMatch(ok, emqx_ds:open_db(DB, opts())), ?assertMatch(ok, emqx_ds:drop_db(DB)). @@ -143,4 +158,6 @@ init_per_testcase(_TC, Config) -> Config. end_per_testcase(_TC, _Config) -> - ok = application:stop(emqx_durable_storage). + ok = application:stop(emqx_durable_storage), + mnesia:delete_schema([node()]), + mria:stop(). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 6dc24a269..b6b84bce2 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -15,7 +15,9 @@ -define(DEFAULT_CONFIG, #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 255, + replication_factor => 3 }). -define(COMPACT_CONFIG, #{ @@ -23,7 +25,9 @@ storage => {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 - }} + }}, + n_shards => 255, + replication_factor => 3 }). %% Smoke test for opening and reopening the database