Merge pull request #11990 from ieQu1/dev/ds-shards
Implement a prototype of durable message sharding
This commit is contained in:
commit
ee191803ea
|
@ -54,10 +54,14 @@ is_persistence_enabled() ->
|
||||||
storage_backend() ->
|
storage_backend() ->
|
||||||
storage_backend(emqx_config:get([session_persistence, storage])).
|
storage_backend(emqx_config:get([session_persistence, storage])).
|
||||||
|
|
||||||
storage_backend(#{builtin := #{enable := true}}) ->
|
storage_backend(#{
|
||||||
|
builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor}
|
||||||
|
}) ->
|
||||||
#{
|
#{
|
||||||
backend => builtin,
|
backend => builtin,
|
||||||
storage => {emqx_ds_storage_bitfield_lts, #{}}
|
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
||||||
|
n_shards => NShards,
|
||||||
|
replication_factor => ReplicationFactor
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -440,10 +440,6 @@ del_subscription(TopicFilter, DSSessionId) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
create_tables() ->
|
create_tables() ->
|
||||||
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
|
|
||||||
backend => builtin,
|
|
||||||
storage => {emqx_ds_storage_bitfield_lts, #{}}
|
|
||||||
}),
|
|
||||||
ok = mria:create_table(
|
ok = mria:create_table(
|
||||||
?SESSION_TAB,
|
?SESSION_TAB,
|
||||||
[
|
[
|
||||||
|
|
|
@ -1791,6 +1791,22 @@ fields("session_storage_backend_builtin") ->
|
||||||
desc => ?DESC(session_storage_backend_enable),
|
desc => ?DESC(session_storage_backend_enable),
|
||||||
default => true
|
default => true
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{"n_shards",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
desc => ?DESC(session_builtin_n_shards),
|
||||||
|
default => 16
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{"replication_factor",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
default => 3,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,6 @@
|
||||||
|
|
||||||
-export_type([
|
-export_type([
|
||||||
create_db_opts/0,
|
create_db_opts/0,
|
||||||
builtin_db_opts/0,
|
|
||||||
db/0,
|
db/0,
|
||||||
time/0,
|
time/0,
|
||||||
topic_filter/0,
|
topic_filter/0,
|
||||||
|
@ -87,14 +86,8 @@
|
||||||
|
|
||||||
-type message_store_opts() :: #{}.
|
-type message_store_opts() :: #{}.
|
||||||
|
|
||||||
-type builtin_db_opts() ::
|
|
||||||
#{
|
|
||||||
backend := builtin,
|
|
||||||
storage := emqx_ds_storage_layer:prototype()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type create_db_opts() ::
|
-type create_db_opts() ::
|
||||||
builtin_db_opts().
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
|
|
||||||
-type message_id() :: emqx_ds_replication_layer:message_id().
|
-type message_id() :: emqx_ds_replication_layer:message_id().
|
||||||
|
|
||||||
|
|
|
@ -32,15 +32,16 @@
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([
|
-export([
|
||||||
do_open_shard_v1/3,
|
do_drop_db_v1/1,
|
||||||
do_drop_shard_v1/2,
|
|
||||||
do_store_batch_v1/4,
|
do_store_batch_v1/4,
|
||||||
do_get_streams_v1/4,
|
do_get_streams_v1/4,
|
||||||
do_make_iterator_v1/5,
|
do_make_iterator_v1/5,
|
||||||
do_next_v1/4
|
do_next_v1/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([shard_id/0, stream/0, iterator/0, message_id/0]).
|
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
||||||
|
|
||||||
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
|
@ -52,13 +53,22 @@
|
||||||
%% tags:
|
%% tags:
|
||||||
-define(STREAM, 1).
|
-define(STREAM, 1).
|
||||||
-define(IT, 2).
|
-define(IT, 2).
|
||||||
|
-define(BATCH, 3).
|
||||||
|
|
||||||
%% keys:
|
%% keys:
|
||||||
-define(tag, 1).
|
-define(tag, 1).
|
||||||
-define(shard, 2).
|
-define(shard, 2).
|
||||||
-define(enc, 3).
|
-define(enc, 3).
|
||||||
|
|
||||||
-type shard_id() :: atom().
|
-type shard_id() :: binary().
|
||||||
|
|
||||||
|
-type builtin_db_opts() ::
|
||||||
|
#{
|
||||||
|
backend := builtin,
|
||||||
|
storage := emqx_ds_storage_layer:prototype(),
|
||||||
|
n_shards => pos_integer(),
|
||||||
|
replication_factor => pos_integer()
|
||||||
|
}.
|
||||||
|
|
||||||
%% This enapsulates the stream entity from the replication level.
|
%% This enapsulates the stream entity from the replication level.
|
||||||
%%
|
%%
|
||||||
|
@ -82,42 +92,46 @@
|
||||||
|
|
||||||
-type message_id() :: emqx_ds_storage_layer:message_id().
|
-type message_id() :: emqx_ds_storage_layer:message_id().
|
||||||
|
|
||||||
|
-define(batch_messages, 2).
|
||||||
|
|
||||||
|
-type batch() :: #{
|
||||||
|
?tag := ?BATCH,
|
||||||
|
?batch_messages := [emqx_types:message()]
|
||||||
|
}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API functions
|
%% API functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec list_shards(emqx_ds:db()) -> [shard_id()].
|
-spec list_shards(emqx_ds:db()) -> [shard_id()].
|
||||||
list_shards(_DB) ->
|
list_shards(DB) ->
|
||||||
%% TODO: milestone 5
|
emqx_ds_replication_layer_meta:shards(DB).
|
||||||
list_nodes().
|
|
||||||
|
|
||||||
-spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
|
-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
||||||
open_db(DB, Opts) ->
|
open_db(DB, CreateOpts) ->
|
||||||
%% TODO: improve error reporting, don't just crash
|
Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
|
||||||
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
Node = node_of_shard(DB, Shard),
|
emqx_ds_storage_layer:open_shard({DB, Shard}, Opts),
|
||||||
ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts)
|
maybe_set_myself_as_leader(DB, Shard)
|
||||||
end,
|
end,
|
||||||
list_shards(DB)
|
MyShards
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
||||||
drop_db(DB) ->
|
drop_db(DB) ->
|
||||||
lists:foreach(
|
Nodes = list_nodes(),
|
||||||
fun(Shard) ->
|
_ = emqx_ds_proto_v1:drop_db(Nodes, DB),
|
||||||
Node = node_of_shard(DB, Shard),
|
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
||||||
ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard)
|
ok.
|
||||||
end,
|
|
||||||
list_shards(DB)
|
|
||||||
).
|
|
||||||
|
|
||||||
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
store_batch(DB, Batch, Opts) ->
|
store_batch(DB, Messages, Opts) ->
|
||||||
%% TODO: Currently we store messages locally.
|
Shard = shard_of_messages(DB, Messages),
|
||||||
Shard = node(),
|
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
|
Batch = #{?tag => ?BATCH, ?batch_messages => Messages},
|
||||||
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
|
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
|
||||||
|
|
||||||
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
|
@ -184,26 +198,25 @@ next(DB, Iter0, BatchSize) ->
|
||||||
%% Internal exports (RPC targets)
|
%% Internal exports (RPC targets)
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec do_open_shard_v1(
|
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
|
||||||
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()
|
do_drop_db_v1(DB) ->
|
||||||
) ->
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
ok | {error, _}.
|
lists:foreach(
|
||||||
do_open_shard_v1(DB, Shard, Opts) ->
|
fun(Shard) ->
|
||||||
emqx_ds_storage_layer:open_shard({DB, Shard}, Opts).
|
emqx_ds_storage_layer:drop_shard({DB, Shard})
|
||||||
|
end,
|
||||||
-spec do_drop_shard_v1(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}.
|
MyShards
|
||||||
do_drop_shard_v1(DB, Shard) ->
|
).
|
||||||
emqx_ds_storage_layer:drop_shard({DB, Shard}).
|
|
||||||
|
|
||||||
-spec do_store_batch_v1(
|
-spec do_store_batch_v1(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
[emqx_types:message()],
|
batch(),
|
||||||
emqx_ds:message_store_opts()
|
emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
do_store_batch_v1(DB, Shard, Batch, Options) ->
|
do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
|
||||||
emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
|
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
|
||||||
|
|
||||||
-spec do_get_streams_v1(
|
-spec do_get_streams_v1(
|
||||||
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
|
@ -237,9 +250,34 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
%% TODO: there's no real leader election right now
|
||||||
|
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
|
||||||
|
maybe_set_myself_as_leader(DB, Shard) ->
|
||||||
|
Site = emqx_ds_replication_layer_meta:this_site(),
|
||||||
|
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
|
||||||
|
[Site | _] ->
|
||||||
|
%% Currently the first in-sync replica always becomes the
|
||||||
|
%% leader
|
||||||
|
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
|
||||||
|
_Sites ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
|
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
|
||||||
node_of_shard(_DB, Node) ->
|
node_of_shard(DB, Shard) ->
|
||||||
Node.
|
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
|
||||||
|
{ok, Leader} ->
|
||||||
|
Leader;
|
||||||
|
{error, no_leader_for_shard} ->
|
||||||
|
%% TODO: use optvar
|
||||||
|
timer:sleep(500),
|
||||||
|
node_of_shard(DB, Shard)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Here we assume that all messages in the batch come from the same client
|
||||||
|
shard_of_messages(DB, [#message{from = From} | _]) ->
|
||||||
|
N = emqx_ds_replication_layer_meta:n_shards(DB),
|
||||||
|
integer_to_binary(erlang:phash2(From, N)).
|
||||||
|
|
||||||
list_nodes() ->
|
list_nodes() ->
|
||||||
mria:running_nodes().
|
mria:running_nodes().
|
||||||
|
|
|
@ -0,0 +1,340 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Metadata storage for the builtin sharded database.
|
||||||
|
%%
|
||||||
|
%% Currently metadata is stored in mria; that's not ideal, but
|
||||||
|
%% eventually we'll replace it, so it's important not to leak
|
||||||
|
%% implementation details from this module.
|
||||||
|
-module(emqx_ds_replication_layer_meta).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([
|
||||||
|
shards/1,
|
||||||
|
my_shards/1,
|
||||||
|
replica_set/2,
|
||||||
|
in_sync_replicas/2,
|
||||||
|
sites/0,
|
||||||
|
open_db/2,
|
||||||
|
drop_db/1,
|
||||||
|
shard_leader/2,
|
||||||
|
this_site/0,
|
||||||
|
set_leader/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server
|
||||||
|
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
|
||||||
|
%% internal exports:
|
||||||
|
-export([
|
||||||
|
open_db_trans/2,
|
||||||
|
drop_db_trans/1,
|
||||||
|
claim_site/2,
|
||||||
|
in_sync_replicas_trans/2,
|
||||||
|
set_leader_trans/3,
|
||||||
|
n_shards/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export_type([site/0]).
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Type declarations
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-define(SHARD, emqx_ds_builtin_metadata_shard).
|
||||||
|
%% DS database metadata:
|
||||||
|
-define(META_TAB, emqx_ds_builtin_metadata_tab).
|
||||||
|
%% Mapping from Site to the actual Erlang node:
|
||||||
|
-define(NODE_TAB, emqx_ds_builtin_node_tab).
|
||||||
|
%% Shard metadata:
|
||||||
|
-define(SHARD_TAB, emqx_ds_builtin_shard_tab).
|
||||||
|
|
||||||
|
-record(?META_TAB, {
|
||||||
|
db :: emqx_ds:db(),
|
||||||
|
db_props :: emqx_ds_replication_layer:builtin_db_opts()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(?NODE_TAB, {
|
||||||
|
site :: site(),
|
||||||
|
node :: node(),
|
||||||
|
misc = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(?SHARD_TAB, {
|
||||||
|
shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
|
||||||
|
%% Sites that should contain the data when the cluster is in the
|
||||||
|
%% stable state (no nodes are being added or removed from it):
|
||||||
|
replica_set :: [site()],
|
||||||
|
%% Sites that contain the actual data:
|
||||||
|
in_sync_replicas :: [site()],
|
||||||
|
leader :: node() | undefined,
|
||||||
|
misc = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% Persistent ID of the node (independent from the IP/FQDN):
|
||||||
|
-type site() :: binary().
|
||||||
|
|
||||||
|
%% Peristent term key:
|
||||||
|
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API funcions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec this_site() -> site().
|
||||||
|
this_site() ->
|
||||||
|
persistent_term:get(?emqx_ds_builtin_site).
|
||||||
|
|
||||||
|
-spec n_shards(emqx_ds:db()) -> pos_integer().
|
||||||
|
n_shards(DB) ->
|
||||||
|
[#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB),
|
||||||
|
NShards.
|
||||||
|
|
||||||
|
-spec start_link() -> {ok, pid()}.
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
|
||||||
|
shards(DB) ->
|
||||||
|
eval_qlc(
|
||||||
|
qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB])
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
|
||||||
|
my_shards(DB) ->
|
||||||
|
Site = this_site(),
|
||||||
|
eval_qlc(
|
||||||
|
qlc:q([
|
||||||
|
Shard
|
||||||
|
|| #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table(
|
||||||
|
?SHARD_TAB
|
||||||
|
),
|
||||||
|
D =:= DB,
|
||||||
|
lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
|
||||||
|
])
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||||
|
{ok, [site()]} | {error, _}.
|
||||||
|
replica_set(DB, Shard) ->
|
||||||
|
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
|
||||||
|
[#?SHARD_TAB{replica_set = ReplicaSet}] ->
|
||||||
|
{ok, ReplicaSet};
|
||||||
|
[] ->
|
||||||
|
{error, no_shard}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||||
|
[site()].
|
||||||
|
in_sync_replicas(DB, ShardId) ->
|
||||||
|
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]),
|
||||||
|
case Result of
|
||||||
|
{ok, InSync} ->
|
||||||
|
InSync;
|
||||||
|
{error, _} ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec sites() -> [site()].
|
||||||
|
sites() ->
|
||||||
|
eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
|
||||||
|
|
||||||
|
-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||||
|
{ok, node()} | {error, no_leader_for_shard}.
|
||||||
|
shard_leader(DB, Shard) ->
|
||||||
|
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
|
||||||
|
[#?SHARD_TAB{leader = Leader}] ->
|
||||||
|
{ok, Leader};
|
||||||
|
[] ->
|
||||||
|
{error, no_leader_for_shard}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
|
||||||
|
ok.
|
||||||
|
set_leader(DB, Shard, Node) ->
|
||||||
|
{atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
|
open_db(DB, DefaultOpts) ->
|
||||||
|
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
|
||||||
|
Opts.
|
||||||
|
|
||||||
|
-spec drop_db(emqx_ds:db()) -> ok.
|
||||||
|
drop_db(DB) ->
|
||||||
|
_ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% behavior callbacks
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-record(s, {}).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
logger:set_process_metadata(#{domain => [ds, meta]}),
|
||||||
|
ensure_tables(),
|
||||||
|
ensure_site(),
|
||||||
|
S = #s{},
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
handle_call(_Call, _From, S) ->
|
||||||
|
{reply, {error, unknown_call}, S}.
|
||||||
|
|
||||||
|
handle_cast(_Cast, S) ->
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_info(_Info, S) ->
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(_Reason, #s{}) ->
|
||||||
|
persistent_term:erase(?emqx_ds_builtin_site),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal exports
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
|
open_db_trans(DB, CreateOpts) ->
|
||||||
|
case mnesia:wread({?META_TAB, DB}) of
|
||||||
|
[] ->
|
||||||
|
NShards = maps:get(n_shards, CreateOpts),
|
||||||
|
ReplicationFactor = maps:get(replication_factor, CreateOpts),
|
||||||
|
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
|
||||||
|
create_shards(DB, NShards, ReplicationFactor),
|
||||||
|
CreateOpts;
|
||||||
|
[#?META_TAB{db_props = Opts}] ->
|
||||||
|
Opts
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec drop_db_trans(emqx_ds:db()) -> ok.
|
||||||
|
drop_db_trans(DB) ->
|
||||||
|
mnesia:delete({?META_TAB, DB}),
|
||||||
|
[mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec claim_site(site(), node()) -> ok.
|
||||||
|
claim_site(Site, Node) ->
|
||||||
|
mnesia:write(#?NODE_TAB{site = Site, node = Node}).
|
||||||
|
|
||||||
|
-spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||||
|
{ok, [site()]} | {error, no_shard}.
|
||||||
|
in_sync_replicas_trans(DB, Shard) ->
|
||||||
|
case mnesia:read(?SHARD_TAB, {DB, Shard}) of
|
||||||
|
[#?SHARD_TAB{in_sync_replicas = InSync}] ->
|
||||||
|
{ok, InSync};
|
||||||
|
[] ->
|
||||||
|
{error, no_shard}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec set_leader_trans(emqx_ds:ds(), emqx_ds_replication_layer:shard_id(), node()) ->
|
||||||
|
ok.
|
||||||
|
set_leader_trans(DB, Shard, Node) ->
|
||||||
|
[Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
|
||||||
|
Record = Record0#?SHARD_TAB{leader = Node},
|
||||||
|
mnesia:write(Record).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
ensure_tables() ->
|
||||||
|
%% TODO: seems like it may introduce flakiness
|
||||||
|
Majority = false,
|
||||||
|
ok = mria:create_table(?META_TAB, [
|
||||||
|
{rlog_shard, ?SHARD},
|
||||||
|
{majority, Majority},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage, rocksdb_copies},
|
||||||
|
{record_name, ?META_TAB},
|
||||||
|
{attributes, record_info(fields, ?META_TAB)}
|
||||||
|
]),
|
||||||
|
ok = mria:create_table(?NODE_TAB, [
|
||||||
|
{rlog_shard, ?SHARD},
|
||||||
|
{majority, Majority},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage, rocksdb_copies},
|
||||||
|
{record_name, ?NODE_TAB},
|
||||||
|
{attributes, record_info(fields, ?NODE_TAB)}
|
||||||
|
]),
|
||||||
|
ok = mria:create_table(?SHARD_TAB, [
|
||||||
|
{rlog_shard, ?SHARD},
|
||||||
|
{majority, Majority},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage, ram_copies},
|
||||||
|
{record_name, ?SHARD_TAB},
|
||||||
|
{attributes, record_info(fields, ?SHARD_TAB)}
|
||||||
|
]),
|
||||||
|
ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
|
||||||
|
|
||||||
|
ensure_site() ->
|
||||||
|
Filename = filename:join(emqx:data_dir(), "emqx_ds_builtin_site.eterm"),
|
||||||
|
case file:consult(Filename) of
|
||||||
|
{ok, [Site]} ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
Site = crypto:strong_rand_bytes(8),
|
||||||
|
ok = filelib:ensure_dir(Filename),
|
||||||
|
{ok, FD} = file:open(Filename, [write]),
|
||||||
|
io:format(FD, "~p.", [Site]),
|
||||||
|
file:close(FD)
|
||||||
|
end,
|
||||||
|
{atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
|
||||||
|
persistent_term:put(?emqx_ds_builtin_site, Site),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
|
||||||
|
create_shards(DB, NShards, ReplicationFactor) ->
|
||||||
|
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
|
||||||
|
AllSites = sites(),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Shard) ->
|
||||||
|
Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
|
||||||
|
Hashes = lists:sort(Hashes0),
|
||||||
|
{_, Sites} = lists:unzip(Hashes),
|
||||||
|
[First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
|
||||||
|
Record = #?SHARD_TAB{
|
||||||
|
shard = {DB, Shard},
|
||||||
|
replica_set = ReplicaSet,
|
||||||
|
in_sync_replicas = [First]
|
||||||
|
},
|
||||||
|
mnesia:write(Record)
|
||||||
|
end,
|
||||||
|
Shards
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
|
||||||
|
hash(Shard, Site) ->
|
||||||
|
erlang:phash2({Shard, Site}).
|
||||||
|
|
||||||
|
eval_qlc(Q) ->
|
||||||
|
case mnesia:is_transaction() of
|
||||||
|
true ->
|
||||||
|
qlc:eval(Q);
|
||||||
|
false ->
|
||||||
|
{atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end),
|
||||||
|
Result
|
||||||
|
end.
|
|
@ -384,7 +384,7 @@ rocksdb_open(Shard, Options) ->
|
||||||
|
|
||||||
-spec db_dir(shard_id()) -> file:filename().
|
-spec db_dir(shard_id()) -> file:filename().
|
||||||
db_dir({DB, ShardId}) ->
|
db_dir({DB, ShardId}) ->
|
||||||
filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]).
|
filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
%% Schema access
|
%% Schema access
|
||||||
|
|
|
@ -29,8 +29,15 @@ start_link() ->
|
||||||
%% behaviour callbacks
|
%% behaviour callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, init/1}).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Children = [storage_layer_sup()],
|
%% TODO: technically, we don't need rocksDB for the alternative
|
||||||
|
%% backends. But right now we have any:
|
||||||
|
Children =
|
||||||
|
case mria:rocksdb_backend_available() of
|
||||||
|
true -> [meta(), storage_layer_sup()];
|
||||||
|
false -> []
|
||||||
|
end,
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_all,
|
strategy => one_for_all,
|
||||||
intensity => 0,
|
intensity => 0,
|
||||||
|
@ -42,6 +49,15 @@ init([]) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
meta() ->
|
||||||
|
#{
|
||||||
|
id => emqx_ds_replication_layer_meta,
|
||||||
|
start => {emqx_ds_replication_layer_meta, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
type => worker,
|
||||||
|
shutdown => 5000
|
||||||
|
}.
|
||||||
|
|
||||||
storage_layer_sup() ->
|
storage_layer_sup() ->
|
||||||
#{
|
#{
|
||||||
id => local_store_shard_sup,
|
id => local_store_shard_sup,
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/bpapi.hrl").
|
-include_lib("emqx_utils/include/bpapi.hrl").
|
||||||
%% API:
|
%% API:
|
||||||
-export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]).
|
-export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([introduced_in/0]).
|
-export([introduced_in/0]).
|
||||||
|
@ -28,20 +28,10 @@
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec open_shard(
|
-spec drop_db([node()], emqx_ds:db()) ->
|
||||||
node(),
|
[{ok, ok} | erpc:caught_call_exception()].
|
||||||
emqx_ds:db(),
|
drop_db(Node, DB) ->
|
||||||
emqx_ds_replication_layer:shard_id(),
|
erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
|
||||||
emqx_ds:create_db_opts()
|
|
||||||
) ->
|
|
||||||
ok.
|
|
||||||
open_shard(Node, DB, Shard, Opts) ->
|
|
||||||
erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]).
|
|
||||||
|
|
||||||
-spec drop_shard(node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
|
||||||
ok.
|
|
||||||
drop_shard(Node, DB, Shard) ->
|
|
||||||
erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]).
|
|
||||||
|
|
||||||
-spec get_streams(
|
-spec get_streams(
|
||||||
node(),
|
node(),
|
||||||
|
@ -85,7 +75,7 @@ next(Node, DB, Shard, Iter, BatchSize) ->
|
||||||
node(),
|
node(),
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_replication_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
[emqx_types:message()],
|
emqx_ds_replication_layer:batch(),
|
||||||
emqx_ds:message_store_opts()
|
emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
|
|
|
@ -23,10 +23,14 @@
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
-define(N_SHARDS, 1).
|
||||||
|
|
||||||
opts() ->
|
opts() ->
|
||||||
#{
|
#{
|
||||||
backend => builtin,
|
backend => builtin,
|
||||||
storage => {emqx_ds_storage_reference, #{}}
|
storage => {emqx_ds_storage_reference, #{}},
|
||||||
|
n_shards => ?N_SHARDS,
|
||||||
|
replication_factor => 3
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%% A simple smoke test that verifies that opening/closing the DB
|
%% A simple smoke test that verifies that opening/closing the DB
|
||||||
|
@ -34,7 +38,32 @@ opts() ->
|
||||||
t_00_smoke_open_drop(_Config) ->
|
t_00_smoke_open_drop(_Config) ->
|
||||||
DB = 'DB',
|
DB = 'DB',
|
||||||
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
%% Check metadata:
|
||||||
|
%% We have only one site:
|
||||||
|
[Site] = emqx_ds_replication_layer_meta:sites(),
|
||||||
|
%% Check all shards:
|
||||||
|
Shards = emqx_ds_replication_layer_meta:shards(DB),
|
||||||
|
%% Since there is only one site all shards should be allocated
|
||||||
|
%% to this site:
|
||||||
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
|
?assertEqual(?N_SHARDS, length(Shards)),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Shard) ->
|
||||||
|
?assertEqual(
|
||||||
|
{ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
|
||||||
|
),
|
||||||
|
%% Check that the leader is eleected;
|
||||||
|
?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard))
|
||||||
|
end,
|
||||||
|
Shards
|
||||||
|
),
|
||||||
|
?assertEqual(lists:sort(Shards), lists:sort(MyShards)),
|
||||||
|
%% Reopen the DB and make sure the operation is idempotent:
|
||||||
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||||
|
%% Close the DB:
|
||||||
?assertMatch(ok, emqx_ds:drop_db(DB)).
|
?assertMatch(ok, emqx_ds:drop_db(DB)).
|
||||||
|
|
||||||
%% A simple smoke test that verifies that storing the messages doesn't
|
%% A simple smoke test that verifies that storing the messages doesn't
|
||||||
|
@ -138,9 +167,11 @@ end_per_suite(Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(_TC, Config) ->
|
init_per_testcase(_TC, Config) ->
|
||||||
%% snabbkaffe:fix_ct_logging(),
|
|
||||||
application:ensure_all_started(emqx_durable_storage),
|
application:ensure_all_started(emqx_durable_storage),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_TC, _Config) ->
|
end_per_testcase(_TC, _Config) ->
|
||||||
ok = application:stop(emqx_durable_storage).
|
ok = application:stop(emqx_durable_storage),
|
||||||
|
mria:stop(),
|
||||||
|
_ = mnesia:delete_schema([node()]),
|
||||||
|
ok.
|
||||||
|
|
|
@ -15,7 +15,9 @@
|
||||||
|
|
||||||
-define(DEFAULT_CONFIG, #{
|
-define(DEFAULT_CONFIG, #{
|
||||||
backend => builtin,
|
backend => builtin,
|
||||||
storage => {emqx_ds_storage_bitfield_lts, #{}}
|
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
||||||
|
n_shards => 16,
|
||||||
|
replication_factor => 3
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(COMPACT_CONFIG, #{
|
-define(COMPACT_CONFIG, #{
|
||||||
|
@ -23,7 +25,9 @@
|
||||||
storage =>
|
storage =>
|
||||||
{emqx_ds_storage_bitfield_lts, #{
|
{emqx_ds_storage_bitfield_lts, #{
|
||||||
bits_per_wildcard_level => 8
|
bits_per_wildcard_level => 8
|
||||||
}}
|
}},
|
||||||
|
n_shards => 16,
|
||||||
|
replication_factor => 3
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% Smoke test for opening and reopening the database
|
%% Smoke test for opening and reopening the database
|
||||||
|
@ -387,7 +391,7 @@ end_per_testcase(TC, _Config) ->
|
||||||
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
|
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
|
||||||
|
|
||||||
shard(TC) ->
|
shard(TC) ->
|
||||||
{?MODULE, TC}.
|
{?MODULE, atom_to_binary(TC)}.
|
||||||
|
|
||||||
keyspace(TC) ->
|
keyspace(TC) ->
|
||||||
TC.
|
TC.
|
||||||
|
|
|
@ -1565,6 +1565,9 @@ session_persistence_storage.desc:
|
||||||
session_storage_backend_enable.desc:
|
session_storage_backend_enable.desc:
|
||||||
"""Enable this backend."""
|
"""Enable this backend."""
|
||||||
|
|
||||||
|
session_builtin_n_shards.desc:
|
||||||
|
"""Number of shards used for storing the messages."""
|
||||||
|
|
||||||
session_storage_backend_builtin.desc:
|
session_storage_backend_builtin.desc:
|
||||||
"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
|
"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue