From 62542e58442b0d0d902601bfbcd0b36520653f98 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 Nov 2023 22:09:46 +0100 Subject: [PATCH 1/6] feat(ds): Metadata storage for the replication layer --- apps/emqx/src/emqx_persistent_message.erl | 4 +- apps/emqx/src/emqx_persistent_session_ds.erl | 4 +- apps/emqx_durable_storage/src/emqx_ds.erl | 9 +- apps/emqx_durable_storage/src/emqx_ds_app.erl | 1 + .../src/emqx_ds_replication_layer.erl | 26 ++- .../src/emqx_ds_replication_layer_meta.erl | 217 ++++++++++++++++++ .../src/emqx_ds_storage_layer.erl | 2 +- .../test/emqx_ds_SUITE.erl | 21 +- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 8 +- 9 files changed, 269 insertions(+), 23 deletions(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 82a345eef..50e25e0be 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -57,7 +57,9 @@ storage_backend() -> storage_backend(#{builtin := #{enable := true}}) -> #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 16, + replication_factor => 3 }. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7ba5aa527..34abba2cc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -442,7 +442,9 @@ del_subscription(TopicFilter, DSSessionId) -> create_tables() -> ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 255, + replication_factor => 3 }), ok = mria:create_table( ?SESSION_TAB, diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 725d62673..84631f38e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -35,7 +35,6 @@ -export_type([ create_db_opts/0, - builtin_db_opts/0, db/0, time/0, topic_filter/0, @@ -87,14 +86,8 @@ -type message_store_opts() :: #{}. --type builtin_db_opts() :: - #{ - backend := builtin, - storage := emqx_ds_storage_layer:prototype() - }. - -type create_db_opts() :: - builtin_db_opts(). + emqx_ds_replication_layer:builtin_db_opts(). -type message_id() :: emqx_ds_replication_layer:message_id(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 858855b6f..0528a0a2c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -7,4 +7,5 @@ -export([start/2]). start(_Type, _Args) -> + emqx_ds_replication_layer_meta:init(), emqx_ds_sup:start_link(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a06af104d..f359846eb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -40,7 +40,7 @@ do_next_v1/4 ]). --export_type([shard_id/0, stream/0, iterator/0, message_id/0]). +-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]). %%================================================================================ %% Type declarations @@ -58,7 +58,15 @@ -define(shard, 2). -define(enc, 3). --type shard_id() :: atom(). +-type shard_id() :: binary(). + +-type builtin_db_opts() :: + #{ + backend := builtin, + storage := emqx_ds_storage_layer:prototype(), + n_shards => pos_integer(), + replication_factor => pos_integer() + }. %% This enapsulates the stream entity from the replication level. %% @@ -89,11 +97,12 @@ -spec list_shards(emqx_ds:db()) -> [shard_id()]. list_shards(_DB) -> %% TODO: milestone 5 - list_nodes(). + lists:map(fun atom_to_binary/1, list_nodes()). --spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}. -open_db(DB, Opts) -> +-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. +open_db(DB, CreateOpts) -> %% TODO: improve error reporting, don't just crash + Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), lists:foreach( fun(Shard) -> Node = node_of_shard(DB, Shard), @@ -104,6 +113,7 @@ open_db(DB, Opts) -> -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> + _ = emqx_ds_replication_layer_meta:drop_db(DB), lists:foreach( fun(Shard) -> Node = node_of_shard(DB, Shard), @@ -116,7 +126,7 @@ drop_db(DB) -> emqx_ds:store_batch_result(). store_batch(DB, Batch, Opts) -> %% TODO: Currently we store messages locally. - Shard = node(), + Shard = atom_to_binary(node()), Node = node_of_shard(DB, Shard), emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). @@ -238,8 +248,8 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> %%================================================================================ -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(_DB, Node) -> - Node. +node_of_shard(_DB, Shard) -> + binary_to_atom(Shard). list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl new file mode 100644 index 000000000..0bfa2a3ee --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -0,0 +1,217 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Metadata storage for the builtin sharded database. +%% +%% Currently metadata is stored in mria; that's not ideal, but +%% eventually we'll replace it, so it's important not to leak +%% implementation details from this module. +-module(emqx_ds_replication_layer_meta). + +%% API: +-export([init/0, shards/1, replica_set/2, sites/0, open_db/2, drop_db/1]). + +%% internal exports: +-export([open_db_trans/2, drop_db_trans/1, claim_site/2]). + +-export_type([site/0]). + +-include_lib("stdlib/include/qlc.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SHARD, emqx_ds_builtin_metadata_shard). +%% DS database metadata: +-define(META_TAB, emqx_ds_builtin_metadata_tab). +%% Mapping from Site to the actual Erlang node: +-define(NODE_TAB, emqx_ds_builtin_node_tab). +%% Shard metadata: +-define(SHARD_TAB, emqx_ds_builtin_shard_tab). + +-record(?META_TAB, { + db :: emqx_ds:db(), + db_props :: emqx_ds_replication_layer:builtin_db_opts() +}). + +-record(?NODE_TAB, { + site :: site(), + node :: node(), + misc = #{} :: map() +}). + +-record(?SHARD_TAB, { + shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + replica_set :: [site()], + leader :: node() | undefined, + misc = #{} :: map() +}). + +%% Persistent ID of the node (independent from the IP/FQDN): +-type site() :: binary(). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec init() -> ok. +init() -> + ensure_tables(), + ensure_site(). + +-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +shards(DB) -> + eval_qlc( + qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB]) + ). + +-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, [site()]} | {error, _}. +replica_set(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{replica_set = ReplicaSet}] -> + {ok, ReplicaSet}; + [] -> + {error, no_shard} + end. + +-spec sites() -> [site()]. +sites() -> + eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). + +-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + emqx_ds_replication_layer:builtin_db_opts(). +open_db(DB, DefaultOpts) -> + {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), + Opts. + +-spec drop_db(emqx_ds:db()) -> ok. +drop_db(DB) -> + _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]), + ok. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + emqx_ds_replication_layer:builtin_db_opts(). +open_db_trans(DB, CreateOpts) -> + case mnesia:wread({?META_TAB, DB}) of + [] -> + NShards = maps:get(n_shards, CreateOpts), + ReplicationFactor = maps:get(replication_factor, CreateOpts), + mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), + create_shards(DB, NShards, ReplicationFactor), + CreateOpts; + [#?META_TAB{db_props = Opts}] -> + Opts + end. + +-spec drop_db_trans(emqx_ds:db()) -> ok. +drop_db_trans(DB) -> + mnesia:delete({?META_TAB, DB}), + [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)], + ok. + +-spec claim_site(site(), node()) -> ok. +claim_site(Site, Node) -> + mnesia:write(#?NODE_TAB{site = Site, node = Node}). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ensure_tables() -> + %% TODO: seems like it may introduce flakiness + Majority = false, + ok = mria:create_table(?META_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, rocksdb_copies}, + {record_name, ?META_TAB}, + {attributes, record_info(fields, ?META_TAB)} + ]), + ok = mria:create_table(?NODE_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, rocksdb_copies}, + {record_name, ?NODE_TAB}, + {attributes, record_info(fields, ?NODE_TAB)} + ]), + ok = mria:create_table(?SHARD_TAB, [ + {rlog_shard, ?SHARD}, + {majority, Majority}, + {type, ordered_set}, + {storage, ram_copies}, + {record_name, ?SHARD_TAB}, + {attributes, record_info(fields, ?SHARD_TAB)} + ]), + ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]). + +ensure_site() -> + Filename = filename:join(emqx:data_dir(), "emqx_ds_builtin_site.eterm"), + case file:consult(Filename) of + {ok, [Site]} -> + ok; + _ -> + Site = crypto:strong_rand_bytes(8), + ok = filelib:ensure_dir(Filename), + {ok, FD} = file:open(Filename, [write]), + io:format(FD, "~p.", [Site]), + file:close(FD) + end, + {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]), + ok. + +-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. +create_shards(DB, NShards, ReplicationFactor) -> + Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], + Sites = sites(), + lists:foreach( + fun(Shard) -> + Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], + Hashes = lists:sort(Hashes0), + {_, Sites} = lists:unzip(Hashes), + ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + Record = #?SHARD_TAB{ + shard = {DB, Shard}, + replica_set = ReplicaSet + }, + mnesia:write(Record) + end, + Shards + ). + +-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). +hash(Shard, Site) -> + erlang:phash2({Shard, Site}). + +eval_qlc(Q) -> + case mnesia:is_transaction() of + true -> + qlc:eval(Q); + false -> + {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end), + Result + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 0fe719dbc..54530f428 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -384,7 +384,7 @@ rocksdb_open(Shard, Options) -> -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]). + filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]). %%-------------------------------------------------------------------------------- %% Schema access diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 9b74e3227..e42f576b2 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -23,10 +23,14 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(N_SHARDS, 8). + opts() -> #{ backend => builtin, - storage => {emqx_ds_storage_reference, #{}} + storage => {emqx_ds_storage_reference, #{}}, + n_shards => ?N_SHARDS, + replication_factor => 3 }. %% A simple smoke test that verifies that opening/closing the DB @@ -34,6 +38,17 @@ opts() -> t_00_smoke_open_drop(_Config) -> DB = 'DB', ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + [Site] = emqx_ds_replication_layer_meta:sites(), + Shards = emqx_ds_replication_layer_meta:shards(DB), + ?assertEqual(?N_SHARDS, length(Shards)), + lists:foreach( + fun(Shard) -> + ?assertEqual( + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard), {DB, Shard} + ) + end, + Shards + ), ?assertMatch(ok, emqx_ds:open_db(DB, opts())), ?assertMatch(ok, emqx_ds:drop_db(DB)). @@ -143,4 +158,6 @@ init_per_testcase(_TC, Config) -> Config. end_per_testcase(_TC, _Config) -> - ok = application:stop(emqx_durable_storage). + ok = application:stop(emqx_durable_storage), + mnesia:delete_schema([node()]), + mria:stop(). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 6dc24a269..b6b84bce2 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -15,7 +15,9 @@ -define(DEFAULT_CONFIG, #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 255, + replication_factor => 3 }). -define(COMPACT_CONFIG, #{ @@ -23,7 +25,9 @@ storage => {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 - }} + }}, + n_shards => 255, + replication_factor => 3 }). %% Smoke test for opening and reopening the database From 2a1f7d946a9552f60acc13d3a5b4fca1bfb0a722 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 20 Nov 2023 22:27:10 +0100 Subject: [PATCH 2/6] feat(ds): Shard messages by publisher client ID --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- apps/emqx/src/emqx_schema.erl | 16 ++ apps/emqx_durable_storage/src/emqx_ds_app.erl | 1 - .../src/emqx_ds_replication_layer.erl | 85 ++++++----- .../src/emqx_ds_replication_layer_meta.erl | 138 +++++++++++++++++- apps/emqx_durable_storage/src/emqx_ds_sup.erl | 11 +- .../src/proto/emqx_ds_proto_v1.erl | 20 +-- .../test/emqx_ds_SUITE.erl | 26 +++- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 6 +- 9 files changed, 237 insertions(+), 68 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 34abba2cc..1e3ac69c8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -443,7 +443,7 @@ create_tables() -> ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 255, + n_shards => 16, replication_factor => 3 }), ok = mria:create_table( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ef34d3e8f..1e6db805b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -341,6 +341,22 @@ fields("persistent_session_store") -> importance => ?IMPORTANCE_HIDDEN } )}, + {"n_shards", + sc( + pos_integer(), + #{ + default => 16, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"replication_factor", + sc( + pos_integer(), + #{ + default => 3, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"on_disc", sc( boolean(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 0528a0a2c..858855b6f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -7,5 +7,4 @@ -export([start/2]). start(_Type, _Args) -> - emqx_ds_replication_layer_meta:init(), emqx_ds_sup:start_link(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index f359846eb..50331a378 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -32,8 +32,7 @@ %% internal exports: -export([ - do_open_shard_v1/3, - do_drop_shard_v1/2, + do_drop_db_v1/1, do_store_batch_v1/4, do_get_streams_v1/4, do_make_iterator_v1/5, @@ -42,6 +41,8 @@ -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]). +-include_lib("emqx_utils/include/emqx_message.hrl"). + %%================================================================================ %% Type declarations %%================================================================================ @@ -95,40 +96,34 @@ %%================================================================================ -spec list_shards(emqx_ds:db()) -> [shard_id()]. -list_shards(_DB) -> - %% TODO: milestone 5 - lists:map(fun atom_to_binary/1, list_nodes()). +list_shards(DB) -> + emqx_ds_replication_layer_meta:shards(DB). -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. open_db(DB, CreateOpts) -> - %% TODO: improve error reporting, don't just crash Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), lists:foreach( fun(Shard) -> - Node = node_of_shard(DB, Shard), - ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts) + emqx_ds_storage_layer:open_shard({DB, Shard}, Opts), + maybe_set_myself_as_leader(DB, Shard) end, - list_shards(DB) + MyShards ). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> + Nodes = list_nodes(), + _ = emqx_ds_proto_v1:drop_db(Nodes, DB), _ = emqx_ds_replication_layer_meta:drop_db(DB), - lists:foreach( - fun(Shard) -> - Node = node_of_shard(DB, Shard), - ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard) - end, - list_shards(DB) - ). + ok. --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Batch, Opts) -> - %% TODO: Currently we store messages locally. - Shard = atom_to_binary(node()), +store_batch(DB, Messages, Opts) -> + Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), - emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). + emqx_ds_proto_v1:store_batch(Node, DB, Shard, Messages, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -194,16 +189,15 @@ next(DB, Iter0, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_open_shard_v1( - emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts() -) -> - ok | {error, _}. -do_open_shard_v1(DB, Shard, Opts) -> - emqx_ds_storage_layer:open_shard({DB, Shard}, Opts). - --spec do_drop_shard_v1(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}. -do_drop_shard_v1(DB, Shard) -> - emqx_ds_storage_layer:drop_shard({DB, Shard}). +-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}. +do_drop_db_v1(DB) -> + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + lists:foreach( + fun(Shard) -> + emqx_ds_storage_layer:drop_shard({DB, Shard}) + end, + MyShards + ). -spec do_store_batch_v1( emqx_ds:db(), @@ -247,9 +241,34 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> %% Internal functions %%================================================================================ +%% TODO: there's no real leader election right now +-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. +maybe_set_myself_as_leader(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of + [Site | _] -> + %% Currently the first in-sync replica always becomes the + %% leader + ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); + _Sites -> + ok + end. + -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(_DB, Shard) -> - binary_to_atom(Shard). +node_of_shard(DB, Shard) -> + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + %% TODO: use optvar + timer:sleep(500), + node_of_shard(DB, Shard) + end. + +%% Here we assume that all messages in the batch come from the same client +shard_of_messages(DB, [#message{from = From} | _]) -> + N = emqx_ds_replication_layer_meta:n_shards(DB), + integer_to_binary(erlang:phash2(From, N)). list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 0bfa2a3ee..0f250022d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -21,11 +21,34 @@ %% implementation details from this module. -module(emqx_ds_replication_layer_meta). +-behaviour(gen_server). + %% API: --export([init/0, shards/1, replica_set/2, sites/0, open_db/2, drop_db/1]). +-export([ + shards/1, + my_shards/1, + replica_set/2, + in_sync_replicas/2, + sites/0, + open_db/2, + drop_db/1, + shard_leader/2, + this_site/0, + set_leader/3 +]). + +%% gen_server +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([open_db_trans/2, drop_db_trans/1, claim_site/2]). +-export([ + open_db_trans/2, + drop_db_trans/1, + claim_site/2, + in_sync_replicas_trans/2, + set_leader_trans/3, + n_shards/1 +]). -export_type([site/0]). @@ -35,6 +58,8 @@ %% Type declarations %%================================================================================ +-define(SERVER, ?MODULE). + -define(SHARD, emqx_ds_builtin_metadata_shard). %% DS database metadata: -define(META_TAB, emqx_ds_builtin_metadata_tab). @@ -56,7 +81,10 @@ -record(?SHARD_TAB, { shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + %% Sites that the replica_set :: [site()], + %% Sites that contain the actual data: + in_sync_replicas :: [site()], leader :: node() | undefined, misc = #{} :: map() }). @@ -64,14 +92,21 @@ %% Persistent ID of the node (independent from the IP/FQDN): -type site() :: binary(). +%% Peristent term key: +-define(emqx_ds_builtin_site, emqx_ds_builtin_site). + %%================================================================================ %% API funcions %%================================================================================ --spec init() -> ok. -init() -> - ensure_tables(), - ensure_site(). +-spec n_shards(emqx_ds:db()) -> pos_integer(). +n_shards(DB) -> + [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB), + NShards. + +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. shards(DB) -> @@ -79,6 +114,20 @@ shards(DB) -> qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB]) ). +-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +my_shards(DB) -> + Site = this_site(), + eval_qlc( + qlc:q([ + Shard + || #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table( + ?SHARD_TAB + ), + D =:= DB, + lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) + ]) + ). + -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, [site()]} | {error, _}. replica_set(DB, Shard) -> @@ -89,10 +138,37 @@ replica_set(DB, Shard) -> {error, no_shard} end. +-spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + [site()]. +in_sync_replicas(DB, ShardId) -> + {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]), + case Result of + {ok, InSync} -> + InSync; + {error, _} -> + [] + end. + -spec sites() -> [site()]. sites() -> eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). +-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, node()} | {error, no_leader_for_shard}. +shard_leader(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{leader = Leader}] -> + {ok, Leader}; + [] -> + {error, no_leader_for_shard} + end. + +-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> + ok. +set_leader(DB, Shard, Node) -> + {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), + ok. + -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> @@ -108,6 +184,29 @@ drop_db(DB) -> %% behavior callbacks %%================================================================================ +-record(s, {}). + +init([]) -> + process_flag(trap_exit, true), + logger:set_process_metadata(#{domain => [ds, meta]}), + ensure_tables(), + ensure_site(), + S = #s{}, + {ok, S}. + +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{}) -> + persistent_term:erase(?emqx_ds_builtin_site), + ok. + %%================================================================================ %% Internal exports %%================================================================================ @@ -136,6 +235,23 @@ drop_db_trans(DB) -> claim_site(Site, Node) -> mnesia:write(#?NODE_TAB{site = Site, node = Node}). +-spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, [site()]} | {error, no_shard}. +in_sync_replicas_trans(DB, Shard) -> + case mnesia:read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{in_sync_replicas = InSync}] -> + {ok, InSync}; + [] -> + {error, no_shard} + end. + +-spec set_leader_trans(emqx_ds:ds(), emqx_ds_replication_layer:shard_id(), node()) -> + ok. +set_leader_trans(DB, Shard, Node) -> + [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}), + Record = Record0#?SHARD_TAB{leader = Node}, + mnesia:write(Record). + %%================================================================================ %% Internal functions %%================================================================================ @@ -182,8 +298,13 @@ ensure_site() -> file:close(FD) end, {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]), + persistent_term:put(?emqx_ds_builtin_site, Site), ok. +-spec this_site() -> site(). +this_site() -> + persistent_term:get(?emqx_ds_builtin_site). + -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. create_shards(DB, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], @@ -193,10 +314,11 @@ create_shards(DB, NShards, ReplicationFactor) -> Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], Hashes = lists:sort(Hashes0), {_, Sites} = lists:unzip(Hashes), - ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), Record = #?SHARD_TAB{ shard = {DB, Shard}, - replica_set = ReplicaSet + replica_set = ReplicaSet, + in_sync_replicas = [First] }, mnesia:write(Record) end, diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index d371a2346..82e2711be 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -30,7 +30,7 @@ start_link() -> %%================================================================================ init([]) -> - Children = [storage_layer_sup()], + Children = [meta(), storage_layer_sup()], SupFlags = #{ strategy => one_for_all, intensity => 0, @@ -42,6 +42,15 @@ init([]) -> %% Internal functions %%================================================================================ +meta() -> + #{ + id => emqx_ds_replication_layer_meta, + start => {emqx_ds_replication_layer_meta, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5000 + }. + storage_layer_sup() -> #{ id => local_store_shard_sup, diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 10d1ed7a5..758b5148b 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]). +-export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]). %% behavior callbacks: -export([introduced_in/0]). @@ -28,20 +28,10 @@ %% API funcions %%================================================================================ --spec open_shard( - node(), - emqx_ds:db(), - emqx_ds_replication_layer:shard_id(), - emqx_ds:create_db_opts() -) -> - ok. -open_shard(Node, DB, Shard, Opts) -> - erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]). - --spec drop_shard(node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - ok. -drop_shard(Node, DB, Shard) -> - erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]). +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). -spec get_streams( node(), diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index e42f576b2..8a46804b0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(N_SHARDS, 8). +-define(N_SHARDS, 1). opts() -> #{ @@ -38,18 +38,32 @@ opts() -> t_00_smoke_open_drop(_Config) -> DB = 'DB', ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Check metadata: + %% We have only one site: [Site] = emqx_ds_replication_layer_meta:sites(), + %% Check all shards: Shards = emqx_ds_replication_layer_meta:shards(DB), + %% Since there is only one site all shards should be allocated + %% to this site: + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), ?assertEqual(?N_SHARDS, length(Shards)), lists:foreach( fun(Shard) -> ?assertEqual( - {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard), {DB, Shard} - ) + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + ), + ?assertEqual( + [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) + ), + %% Check that the leader is eleected; + ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) end, Shards ), + ?assertEqual(lists:sort(Shards), lists:sort(MyShards)), + %% Reopen the DB and make sure the operation is idempotent: ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Close the DB: ?assertMatch(ok, emqx_ds:drop_db(DB)). %% A simple smoke test that verifies that storing the messages doesn't @@ -153,11 +167,11 @@ end_per_suite(Config) -> ok. init_per_testcase(_TC, Config) -> - %% snabbkaffe:fix_ct_logging(), application:ensure_all_started(emqx_durable_storage), Config. end_per_testcase(_TC, _Config) -> ok = application:stop(emqx_durable_storage), - mnesia:delete_schema([node()]), - mria:stop(). + mria:stop(), + _ = mnesia:delete_schema([node()]), + ok. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index b6b84bce2..7b733406d 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -16,7 +16,7 @@ -define(DEFAULT_CONFIG, #{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 255, + n_shards => 16, replication_factor => 3 }). @@ -26,7 +26,7 @@ {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 }}, - n_shards => 255, + n_shards => 16, replication_factor => 3 }). @@ -391,7 +391,7 @@ end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). shard(TC) -> - {?MODULE, TC}. + {?MODULE, atom_to_binary(TC)}. keyspace(TC) -> TC. From f5c71e80680ca833ae5599145f63d52392eb7de9 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 20 Nov 2023 23:58:23 +0100 Subject: [PATCH 3/6] refactor(ds): Add a wrapper to the store batch API --- .../src/emqx_ds_replication_layer.erl | 18 +++++++++++++----- .../src/proto/emqx_ds_proto_v1.erl | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 50331a378..5128188b8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -39,7 +39,7 @@ do_next_v1/4 ]). --export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]). +-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). -include_lib("emqx_utils/include/emqx_message.hrl"). @@ -91,6 +91,13 @@ -type message_id() :: emqx_ds_storage_layer:message_id(). +-record(batch, { + messages :: [emqx_types:message()], + misc = #{} :: map() +}). + +-type batch() :: #batch{}. + %%================================================================================ %% API functions %%================================================================================ @@ -123,7 +130,8 @@ drop_db(DB) -> store_batch(DB, Messages, Opts) -> Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), - emqx_ds_proto_v1:store_batch(Node, DB, Shard, Messages, Opts). + Batch = #batch{messages = Messages}, + emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -202,12 +210,12 @@ do_drop_db_v1(DB) -> -spec do_store_batch_v1( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - [emqx_types:message()], + batch(), emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -do_store_batch_v1(DB, Shard, Batch, Options) -> - emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options). +do_store_batch_v1(DB, Shard, #batch{messages = Messages}, Options) -> + emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). -spec do_get_streams_v1( emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 758b5148b..0d7972466 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -75,7 +75,7 @@ next(Node, DB, Shard, Iter, BatchSize) -> node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id(), - [emqx_types:message()], + emqx_ds_replication_layer:batch(), emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). From 4d474907348b45bc1385ba15285719fc6ac015b8 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:06:17 +0100 Subject: [PATCH 4/6] chore(ds): Rebase configuration --- apps/emqx/src/emqx_persistent_message.erl | 8 +++--- apps/emqx/src/emqx_schema.erl | 32 +++++++++++------------ rel/i18n/emqx_schema.hocon | 3 +++ 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 50e25e0be..30ebe7417 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -54,12 +54,14 @@ is_persistence_enabled() -> storage_backend() -> storage_backend(emqx_config:get([session_persistence, storage])). -storage_backend(#{builtin := #{enable := true}}) -> +storage_backend(#{ + builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor} +}) -> #{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 16, - replication_factor => 3 + n_shards => NShards, + replication_factor => ReplicationFactor }. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 1e6db805b..8e401a442 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -341,22 +341,6 @@ fields("persistent_session_store") -> importance => ?IMPORTANCE_HIDDEN } )}, - {"n_shards", - sc( - pos_integer(), - #{ - default => 16, - importance => ?IMPORTANCE_HIDDEN - } - )}, - {"replication_factor", - sc( - pos_integer(), - #{ - default => 3, - importance => ?IMPORTANCE_HIDDEN - } - )}, {"on_disc", sc( boolean(), @@ -1807,6 +1791,22 @@ fields("session_storage_backend_builtin") -> desc => ?DESC(session_storage_backend_enable), default => true } + )}, + {"n_shards", + sc( + pos_integer(), + #{ + desc => ?DESC(session_builtin_n_shards), + default => 16 + } + )}, + {"replication_factor", + sc( + pos_integer(), + #{ + default => 3, + importance => ?IMPORTANCE_HIDDEN + } )} ]. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index d12f6a2d1..389a1f91b 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1565,6 +1565,9 @@ session_persistence_storage.desc: session_storage_backend_enable.desc: """Enable this backend.""" +session_builtin_n_shards.desc: +"""Number of shards used for storing the messages.""" + session_storage_backend_builtin.desc: """Builtin session storage backend utilizing embedded RocksDB key-value store.""" From 3d823beb11c8efd30257607bff2f799de31a269e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 Nov 2023 18:33:20 +0100 Subject: [PATCH 5/6] fix(ds): Apply review remarks --- .../src/emqx_ds_replication_layer.erl | 15 ++++++++------- .../src/emqx_ds_replication_layer_meta.erl | 15 ++++++++------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 5128188b8..7a26b696d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -53,6 +53,7 @@ %% tags: -define(STREAM, 1). -define(IT, 2). +-define(BATCH, 3). %% keys: -define(tag, 1). @@ -91,12 +92,12 @@ -type message_id() :: emqx_ds_storage_layer:message_id(). --record(batch, { - messages :: [emqx_types:message()], - misc = #{} :: map() -}). +-define(batch_messages, 2). --type batch() :: #batch{}. +-type batch() :: #{ + ?tag := ?BATCH, + ?batch_messages := [emqx_types:message()] +}. %%================================================================================ %% API functions @@ -130,7 +131,7 @@ drop_db(DB) -> store_batch(DB, Messages, Opts) -> Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), - Batch = #batch{messages = Messages}, + Batch = #{?tag => ?BATCH, ?batch_messages => Messages}, emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -214,7 +215,7 @@ do_drop_db_v1(DB) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -do_store_batch_v1(DB, Shard, #batch{messages = Messages}, Options) -> +do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) -> emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options). -spec do_get_streams_v1( diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 0f250022d..f7dbc828f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -81,7 +81,8 @@ -record(?SHARD_TAB, { shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, - %% Sites that the + %% Sites that should contain the data when the cluster is in the + %% stable state (no nodes are being added or removed from it): replica_set :: [site()], %% Sites that contain the actual data: in_sync_replicas :: [site()], @@ -99,6 +100,10 @@ %% API funcions %%================================================================================ +-spec this_site() -> site(). +this_site() -> + persistent_term:get(?emqx_ds_builtin_site). + -spec n_shards(emqx_ds:db()) -> pos_integer(). n_shards(DB) -> [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB), @@ -301,17 +306,13 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec this_site() -> site(). -this_site() -> - persistent_term:get(?emqx_ds_builtin_site). - -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. create_shards(DB, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], - Sites = sites(), + AllSites = sites(), lists:foreach( fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], + Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites], Hashes = lists:sort(Hashes0), {_, Sites} = lists:unzip(Hashes), [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), From 3165b4f645b1b110abfc553a8ed6d2491a3e33e7 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 Nov 2023 18:53:34 +0100 Subject: [PATCH 6/6] fix(ds): Abort application startup when rocksdb is not avialable --- apps/emqx/src/emqx_persistent_session_ds.erl | 6 ------ apps/emqx_durable_storage/src/emqx_ds_sup.erl | 9 ++++++++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1e3ac69c8..928115a52 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -440,12 +440,6 @@ del_subscription(TopicFilter, DSSessionId) -> %%-------------------------------------------------------------------- create_tables() -> - ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 16, - replication_factor => 3 - }), ok = mria:create_table( ?SESSION_TAB, [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index 82e2711be..081557a46 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -29,8 +29,15 @@ start_link() -> %% behaviour callbacks %%================================================================================ +-dialyzer({nowarn_function, init/1}). init([]) -> - Children = [meta(), storage_layer_sup()], + %% TODO: technically, we don't need rocksDB for the alternative + %% backends. But right now we have any: + Children = + case mria:rocksdb_backend_available() of + true -> [meta(), storage_layer_sup()]; + false -> [] + end, SupFlags = #{ strategy => one_for_all, intensity => 0,