feat(ds): Introduce egress process for the builtin backend

This commit is contained in:
ieQu1 2024-01-22 22:45:56 +01:00
parent ec93d8cf18
commit 137535a821
17 changed files with 615 additions and 141 deletions

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_ds_SUITE).
@ -18,6 +18,9 @@
%% CT boilerplate
%%------------------------------------------------------------------------------
suite() ->
[{timetrap, {seconds, 60}}].
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -191,6 +194,7 @@ t_non_persistent_session_subscription(_Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
SubTopicFilter = <<"t/#">>,
?check_trace(
#{timetrap => 30_000},
begin
?tp(notice, "starting", #{}),
Client = start_client(#{
@ -220,6 +224,7 @@ t_session_subscription_idempotency(Config) ->
SubTopicFilter = <<"t/+">>,
ClientId = <<"myclientid">>,
?check_trace(
#{timetrap => 30_000},
begin
?force_ordering(
#{?snk_kind := persistent_session_ds_subscription_added},
@ -281,6 +286,7 @@ t_session_unsubscription_idempotency(Config) ->
SubTopicFilter = <<"t/+">>,
ClientId = <<"myclientid">>,
?check_trace(
#{timetrap => 30_000},
begin
?force_ordering(
#{
@ -385,6 +391,7 @@ do_t_session_discard(Params) ->
ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
SubTopicFilter = <<"t/+">>,
?check_trace(
#{timetrap => 30_000},
begin
?tp(notice, "starting", #{}),
Client0 = start_client(#{
@ -472,6 +479,7 @@ do_t_session_expiration(_Config, Opts) ->
} = Opts,
CommonParams = #{proto_ver => v5, clientid => ClientId},
?check_trace(
#{timetrap => 30_000},
begin
Topic = <<"some/topic">>,
Params0 = maps:merge(CommonParams, FirstConn),
@ -539,6 +547,7 @@ t_session_gc(Config) ->
end,
?check_trace(
#{timetrap => 30_000},
begin
ClientId0 = <<"session_gc0">>,
Client0 = StartClient(ClientId0, Port1, 30),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -99,7 +99,7 @@ needs_persistence(Msg) ->
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
store_message(Msg) ->
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]).
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}).
has_subscribers(#message{topic = Topic}) ->
emqx_persistent_session_ds_router:has_any_route(Topic).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -376,7 +376,7 @@ t_publish_empty_topic_levels(_Config) ->
{<<"t/3/bar">>, <<"6">>}
],
[emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
Received = receive_messages(length(Messages), 1_500),
Received = receive_messages(length(Messages)),
?assertMatch(
[
#{topic := <<"t//1/">>, payload := <<"2">>},

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -111,11 +111,15 @@
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
-type time() :: non_neg_integer().
-type message_store_opts() :: #{}.
-type message_store_opts() ::
#{
sync => boolean()
}.
-type generic_db_opts() ::
#{
backend := atom(),
serialize_by => clientid | topic,
_ => _
}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_app).

View File

@ -0,0 +1,152 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc Supervisor that contains all the processes that belong to a
%% given builtin DS database.
-module(emqx_ds_builtin_db_sup).
-behaviour(supervisor).
%% API:
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
%% behaviour callbacks:
-export([init/1]).
%% internal exports:
-export([start_link_sup/2]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(via(REC), {via, gproc, {n, l, REC}}).
-define(db_sup, ?MODULE).
-define(shard_sup, emqx_ds_builtin_db_shard_sup).
-define(egress_sup, emqx_ds_builtin_db_egress_sup).
-record(?db_sup, {db}).
-record(?shard_sup, {db}).
-record(?egress_sup, {db}).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> {ok, pid()}.
start_db(DB, Opts) ->
start_link_sup(#?db_sup{db = DB}, Opts).
-spec start_shard(emqx_ds_storage_layer:shard_id()) ->
supervisor:startchild_ret().
start_shard(Shard = {DB, _}) ->
supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)).
-spec start_egress(emqx_ds_storage_layer:shard_id()) ->
supervisor:startchild_ret().
start_egress({DB, Shard}) ->
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
stop_shard(Shard = {DB, _}) ->
Sup = ?via(#?shard_sup{db = DB}),
ok = supervisor:terminate_child(Sup, Shard),
ok = supervisor:delete_child(Sup, Shard).
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
ok | {error, _Reason}.
ensure_shard(Shard) ->
case start_shard(Shard) of
{ok, _Pid} ->
ok;
{error, {already_started, _Pid}} ->
ok;
{error, Reason} ->
{error, Reason}
end.
%%================================================================================
%% behaviour callbacks
%%================================================================================
init({#?db_sup{db = DB}, DefaultOpts}) ->
%% Spec for the top-level supervisor for the database:
logger:notice("Starting DS DB ~p", [DB]),
_ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
%% TODO: before the leader election is implemented, we set ourselves as the leader for all shards:
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
lists:foreach(
fun(Shard) ->
emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard)
end,
MyShards
),
Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
SupFlags = #{
strategy => one_for_all,
intensity => 0,
period => 1
},
{ok, {SupFlags, Children}};
init({#?shard_sup{db = DB}, _}) ->
%% Spec for the supervisor that manages the worker processes for
%% each local shard of the DB:
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
Children = [shard_spec(DB, Shard) || Shard <- MyShards],
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 1
},
{ok, {SupFlags, Children}};
init({#?egress_sup{db = DB}, _}) ->
%% Spec for the supervisor that manages the egress proxy processes
%% managing traffic towards each of the shards of the DB:
Shards = emqx_ds_replication_layer_meta:shards(DB),
Children = [egress_spec(DB, Shard) || Shard <- Shards],
SupFlags = #{
strategy => one_for_one,
intensity => 0,
period => 1
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal exports
%%================================================================================
start_link_sup(Id, Options) ->
supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
%%================================================================================
%% Internal functions
%%================================================================================
sup_spec(Id, Options) ->
#{
id => element(1, Id),
start => {?MODULE, start_link_sup, [Id, Options]},
type => supervisor,
shutdown => infinity
}.
shard_spec(DB, Shard) ->
Options = emqx_ds_replication_layer_meta:get_options(DB),
#{
id => Shard,
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
shutdown => 5_000,
restart => permanent,
type => worker
}.
egress_spec(DB, Shard) ->
#{
id => Shard,
start => {emqx_ds_replication_layer_egress, start_link, [DB, Shard]},
shutdown => 5_000,
restart => permanent,
type => worker
}.

View File

@ -0,0 +1,132 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This supervisor manages the global worker processes needed for
%% the functioning of builtin databases, and all builtin database
%% attach to it.
-module(emqx_ds_builtin_sup).
-behaviour(supervisor).
%% API:
-export([start_db/2, stop_db/1]).
%% behavior callbacks:
-export([init/1]).
%% internal exports:
-export([start_top/0, start_databases_sup/0]).
-export_type([]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(top, ?MODULE).
-define(databases, emqx_ds_builtin_databases_sup).
%%================================================================================
%% API functions
%%================================================================================
-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
supervisor:startchild_ret().
start_db(DB, Opts) ->
ensure_top(),
ChildSpec = #{
id => DB,
start => {emqx_ds_builtin_db_sup, start_db, [DB, Opts]},
type => supervisor,
shutdown => infinity
},
supervisor:start_child(?databases, ChildSpec).
-spec stop_db(emqx_ds:db()) -> ok.
stop_db(DB) ->
case whereis(?databases) of
Pid when is_pid(Pid) ->
_ = supervisor:terminate_child(?databases, DB),
_ = supervisor:delete_child(?databases, DB),
ok;
undefined ->
ok
end.
%%================================================================================
%% behavior callbacks
%%================================================================================
%% There are two layers of supervision:
%%
%% 1. top supervisor for the builtin backend. It contains the global
%% worker processes (like the metadata server), and `?databases'
%% supervisior.
%%
%% 2. `?databases': a `one_for_one' supervisor where each child is a
%% `db' supervisor that contains processes that represent the DB.
%% Chidren are attached dynamically to this one.
init(?top) ->
%% Children:
MetadataServer = #{
id => metadata_server,
start => {emqx_ds_replication_layer_meta, start_link, []},
restart => permanent,
type => worker,
shutdown => 5000
},
DBsSup = #{
id => ?databases,
start => {?MODULE, start_databases_sup, []},
restart => permanent,
type => supervisor,
shutdown => infinity
},
%%
SupFlags = #{
strategy => one_for_all,
intensity => 1,
period => 1,
auto_shutdown => never
},
{ok, {SupFlags, [MetadataServer, DBsSup]}};
init(?databases) ->
%% Children are added dynamically:
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 1
},
{ok, {SupFlags, []}}.
%%================================================================================
%% Internal exports
%%================================================================================
-spec start_top() -> {ok, pid()}.
start_top() ->
supervisor:start_link({local, ?top}, ?MODULE, ?top).
start_databases_sup() ->
supervisor:start_link({local, ?databases}, ?MODULE, ?databases).
%%================================================================================
%% Internal functions
%%================================================================================
ensure_top() ->
{ok, _} = emqx_ds_sup:attach_backend(builtin, {?MODULE, start_top, []}),
ok.

View File

@ -32,7 +32,10 @@
get_streams/3,
make_iterator/4,
update_iterator/3,
next/3
next/3,
node_of_shard/2,
shard_of_message/3,
maybe_set_myself_as_leader/2
]).
%% internal exports:
@ -51,24 +54,12 @@
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include("emqx_ds_replication_layer.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
%% records over the wire.
%% tags:
-define(STREAM, 1).
-define(IT, 2).
-define(BATCH, 3).
%% keys:
-define(tag, 1).
-define(shard, 2).
-define(enc, 3).
-type shard_id() :: binary().
-type builtin_db_opts() ::
@ -101,8 +92,6 @@
-type message_id() :: emqx_ds:message_id().
-define(batch_messages, 2).
-type batch() :: #{
?tag := ?BATCH,
?batch_messages := [emqx_types:message()]
@ -120,16 +109,14 @@ list_shards(DB) ->
-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
open_db(DB, CreateOpts) ->
ok = emqx_ds_sup:ensure_workers(),
Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
lists:foreach(
fun(Shard) ->
emqx_ds_storage_layer:open_shard({DB, Shard}, Opts),
maybe_set_myself_as_leader(DB, Shard)
end,
MyShards
).
case emqx_ds_builtin_sup:start_db(DB, CreateOpts) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
ok;
{error, Err} ->
{error, Err}
end.
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
add_generation(DB) ->
@ -170,17 +157,15 @@ drop_generation(DB, {Shard, GenId}) ->
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
drop_db(DB) ->
Nodes = list_nodes(),
_ = emqx_ds_proto_v1:drop_db(Nodes, DB),
_ = emqx_ds_proto_v2:drop_db(Nodes, DB),
_ = emqx_ds_replication_layer_meta:drop_db(DB),
emqx_ds_builtin_sup:stop_db(DB),
ok.
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) ->
Shard = shard_of_messages(DB, Messages),
Node = node_of_shard(DB, Shard),
Batch = #{?tag => ?BATCH, ?batch_messages => Messages},
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts).
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}].
@ -262,6 +247,41 @@ next(DB, Iter0, BatchSize) ->
Other
end.
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
node_of_shard(DB, Shard) ->
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
{ok, Leader} ->
Leader;
{error, no_leader_for_shard} ->
%% TODO: use optvar
timer:sleep(500),
node_of_shard(DB, Shard)
end.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
emqx_ds_replication_layer:shard_id().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
N = emqx_ds_replication_layer_meta:n_shards(DB),
Hash =
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
end,
integer_to_binary(Hash).
%% TODO: there's no real leader election right now
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
maybe_set_myself_as_leader(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
[Site | _] ->
%% Currently the first in-sync replica always becomes the
%% leader
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
_Sites ->
ok
end.
%%================================================================================
%% behavior callbacks
%%================================================================================
@ -273,6 +293,7 @@ next(DB, Iter0, BatchSize) ->
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
do_drop_db_v1(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
emqx_ds_builtin_sup:stop_db(DB),
lists:foreach(
fun(Shard) ->
emqx_ds_storage_layer:drop_shard({DB, Shard})
@ -354,34 +375,5 @@ do_drop_generation_v3(DB, ShardId, GenId) ->
%% Internal functions
%%================================================================================
%% TODO: there's no real leader election right now
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
maybe_set_myself_as_leader(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
[Site | _] ->
%% Currently the first in-sync replica always becomes the
%% leader
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
_Sites ->
ok
end.
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
node_of_shard(DB, Shard) ->
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
{ok, Leader} ->
Leader;
{error, no_leader_for_shard} ->
%% TODO: use optvar
timer:sleep(500),
node_of_shard(DB, Shard)
end.
%% Here we assume that all messages in the batch come from the same client
shard_of_messages(DB, [#message{from = From} | _]) ->
N = emqx_ds_replication_layer_meta:n_shards(DB),
integer_to_binary(erlang:phash2(From, N)).
list_nodes() ->
mria:running_nodes().

View File

@ -0,0 +1,33 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022, 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_DS_REPLICATION_LAYER_HRL).
-define(EMQX_DS_REPLICATION_LAYER_HRL, true).
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
%% records over the wire.
%% tags:
-define(STREAM, 1).
-define(IT, 2).
-define(BATCH, 3).
%% keys:
-define(tag, 1).
-define(shard, 2).
-define(enc, 3).
-define(batch_messages, 2).
-endif.

View File

@ -0,0 +1,175 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Egress servers are responsible for proxing the outcoming
%% `store_batch' requests towards EMQX DS shards.
%%
%% They re-assemble messages from different local processes into
%% fixed-sized batches, and introduce centralized channels between the
%% nodes. They are also responsible for maintaining backpressure
%% towards the local publishers.
%%
%% There is (currently) one egress process for each shard running on
%% each node, but it should be possible to have a pool of egress
%% servers, if needed.
-module(emqx_ds_replication_layer_egress).
-behaviour(gen_server).
%% API:
-export([start_link/2, store_batch/3]).
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([]).
-export_type([]).
-include("emqx_ds_replication_layer.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
-define(flush, flush).
-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
%%================================================================================
%% API functions
%%================================================================================
-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}.
start_link(DB, Shard) ->
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
ok.
store_batch(DB, Messages, Opts) ->
Sync = maps:get(sync, Opts, true),
lists:foreach(
fun(Message) ->
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
end,
Messages
).
%%================================================================================
%% behavior callbacks
%%================================================================================
-record(s, {
db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(),
leader :: node(),
n = 0 :: non_neg_integer(),
tref :: reference(),
batch = [] :: [emqx_types:message()],
pending_replies = [] :: [gen_server:from()]
}).
init([DB, Shard]) ->
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
%% TODO: adjust leader dynamically
{ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard),
S = #s{
db = DB,
shard = Shard,
leader = Leader,
tref = start_timer()
},
{ok, S}.
handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
do_enqueue(From, Sync, Msg, S);
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(?flush, S) ->
{noreply, do_flush(S)};
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================
%% Internal functions
%%================================================================================
do_flush(S = #s{batch = []}) ->
S#s{tref = start_timer()};
do_flush(
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader}
) ->
Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
[gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}),
erlang:garbage_collect(),
S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
}.
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
NMax = 1000,
S1 = S0#s{n = N + 1, batch = [Msg | Batch]},
S2 =
case N >= NMax of
true ->
_ = erlang:cancel_timer(S0#s.tref),
do_flush(S1);
false ->
S1
end,
%% TODO: later we may want to delay the reply until the message is
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
%% allow for async replies. For now, we ack when the message is
%% _buffered_ rather than stored.
%%
%% Otherwise, the client would freeze for at least flush interval,
%% or until the buffer is filled.
S =
case Sync of
true ->
S2#s{pending_replies = [From | Replies]};
false ->
gen_server:reply(From, ok),
S2
end,
%% TODO: add a backpressure mechanism for the server to avoid
%% building a long message queue.
{noreply, S}.
start_timer() ->
Interval = 10,
erlang:send_after(Interval, self(), flush).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@
in_sync_replicas/2,
sites/0,
open_db/2,
get_options/1,
update_db_config/2,
drop_db/1,
shard_leader/2,
@ -230,6 +231,11 @@ is_leader(Node) ->
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
Result.
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
get_options(DB) ->
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
Opts.
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
emqx_ds_replication_layer:builtin_db_opts().
open_db(DB, DefaultOpts) ->
@ -293,11 +299,11 @@ terminate(_Reason, #s{}) ->
%% Internal exports
%%================================================================================
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) ->
emqx_ds_replication_layer:builtin_db_opts().
open_db_trans(DB, CreateOpts) ->
case mnesia:wread({?META_TAB, DB}) of
[] ->
[] when is_map(CreateOpts) ->
NShards = maps:get(n_shards, CreateOpts),
ReplicationFactor = maps:get(replication_factor, CreateOpts),
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -199,7 +199,6 @@ open_shard(Shard, Options) ->
-spec drop_shard(shard_id()) -> ok.
drop_shard(Shard) ->
catch emqx_ds_storage_layer_sup:stop_shard(Shard),
case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of
undefined ->
ok;
@ -586,7 +585,8 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
rocksdb_open(Shard, Options) ->
DBOptions = [
{create_if_missing, true},
{create_missing_column_families, true}
{create_missing_column_families, true},
{enable_write_thread_adaptive_yield, false}
| maps:get(db_options, Options, [])
],
DataDir = maps:get(data_dir, Options, emqx:data_dir()),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_layer_sup).
@ -23,7 +23,7 @@
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
supervisor:start_link(?MODULE, []).
-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:startchild_ret().

View File

@ -6,7 +6,7 @@
-behaviour(supervisor).
%% API:
-export([start_link/0, ensure_workers/0]).
-export([start_link/0, attach_backend/2]).
%% behaviour callbacks:
-export([init/1]).
@ -25,64 +25,40 @@
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, top).
-spec ensure_workers() -> ok.
ensure_workers() ->
ChildSpec = #{
id => workers_sup,
restart => temporary,
type => supervisor,
start => {supervisor, start_link, [?MODULE, workers]}
%% @doc Attach a child backend-specific supervisor to the top
%% application supervisor, if not yet present
-spec attach_backend(_BackendId, {module(), atom(), list()}) ->
{ok, pid()} | {error, _}.
attach_backend(Backend, Start) ->
Spec = #{
id => Backend,
start => Start,
significant => false,
shutdown => infinity,
type => supervisor
},
case supervisor:start_child(?SUP, ChildSpec) of
{ok, _} ->
ok;
{error, already_present} ->
ok;
{error, {already_started, _}} ->
ok
case supervisor:start_child(?SUP, Spec) of
{ok, Pid} ->
{ok, Pid};
{error, {already_started, Pid}} ->
{ok, Pid};
{error, Err} ->
{error, Err}
end.
%%================================================================================
%% behaviour callbacks
%%================================================================================
-dialyzer({nowarn_function, init/1}).
init(top) ->
Children = [],
SupFlags = #{
strategy => one_for_all,
strategy => one_for_one,
intensity => 10,
period => 1
},
{ok, {SupFlags, []}};
init(workers) ->
%% TODO: technically, we don't need rocksDB for the alternative
%% backends. But right now we have any:
Children = [meta(), storage_layer_sup()],
SupFlags = #{
strategy => one_for_all,
intensity => 0,
period => 1
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal functions
%%================================================================================
meta() ->
#{
id => emqx_ds_replication_layer_meta,
start => {emqx_ds_replication_layer_meta, start_link, []},
restart => permanent,
type => worker,
shutdown => 5000
}.
storage_layer_sup() ->
#{
id => local_store_shard_sup,
start => {emqx_ds_storage_layer_sup, start_link, []},
restart => permanent,
type => supervisor,
shutdown => infinity
}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -16,8 +16,8 @@
-define(DEFAULT_CONFIG, #{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 16,
replication_factor => 3
n_shards => 1,
replication_factor => 1
}).
-define(COMPACT_CONFIG, #{
@ -26,15 +26,10 @@
{emqx_ds_storage_bitfield_lts, #{
bits_per_wildcard_level => 8
}},
n_shards => 16,
replication_factor => 3
n_shards => 1,
replication_factor => 1
}).
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
%% Smoke test of store function
t_store(_Config) ->
MessageID = emqx_guid:gen(),
@ -98,8 +93,8 @@ t_get_streams(_Config) ->
[FooBarBaz] = GetStream(<<"foo/bar/baz">>),
[A] = GetStream(<<"a">>),
%% Restart shard to make sure trie is persisted and restored:
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME),
{ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}),
%% Verify that there are no "ghost streams" for topics that don't
%% have any messages:
[] = GetStream(<<"bar/foo">>),
@ -196,9 +191,9 @@ t_replay(_Config) ->
?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)),
?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
%% Restart shard to make sure trie is persisted and restored:
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
%% Restart the DB to make sure trie is persisted and restored:
ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME),
{ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}),
%% Learned wildcard topics:
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
@ -412,21 +407,21 @@ suite() -> [{timetrap, {seconds, 20}}].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_durable_storage),
emqx_ds_sup:ensure_workers(),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_durable_storage).
init_per_testcase(TC, Config) ->
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG),
Config.
end_per_testcase(TC, _Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
emqx_ds:drop_db(TC),
ok.
shard(TC) ->
{?MODULE, atom_to_binary(TC)}.
{TC, <<"0">>}.
keyspace(TC) ->
TC.