From 137535a82140313b1b3334e75717862cb91bac48 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Jan 2024 22:45:56 +0100 Subject: [PATCH 1/4] feat(ds): Introduce egress process for the builtin backend --- .../emqx_persistent_session_ds_SUITE.erl | 11 +- apps/emqx/src/emqx_persistent_message.erl | 4 +- .../test/emqx_persistent_messages_SUITE.erl | 4 +- .../test/emqx_persistent_session_SUITE.erl | 2 +- apps/emqx_durable_storage/src/emqx_ds.erl | 8 +- apps/emqx_durable_storage/src/emqx_ds_app.erl | 2 +- .../src/emqx_ds_builtin_db_sup.erl | 152 +++++++++++++++ .../src/emqx_ds_builtin_sup.erl | 132 +++++++++++++ .../src/emqx_ds_replication_layer.erl | 112 ++++++----- .../src/emqx_ds_replication_layer.hrl | 33 ++++ .../src/emqx_ds_replication_layer_egress.erl | 175 ++++++++++++++++++ .../src/emqx_ds_replication_layer_meta.erl | 12 +- .../src/emqx_ds_storage_layer.erl | 6 +- .../src/emqx_ds_storage_layer_sup.erl | 4 +- apps/emqx_durable_storage/src/emqx_ds_sup.erl | 66 +++---- .../test/emqx_ds_SUITE.erl | 2 +- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 31 ++-- 17 files changed, 615 insertions(+), 141 deletions(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl create mode 100644 apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl create mode 100644 apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl create mode 100644 apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index b5beb9ae7..fba36601f 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_persistent_session_ds_SUITE). @@ -18,6 +18,9 @@ %% CT boilerplate %%------------------------------------------------------------------------------ +suite() -> + [{timetrap, {seconds, 60}}]. + all() -> emqx_common_test_helpers:all(?MODULE). @@ -191,6 +194,7 @@ t_non_persistent_session_subscription(_Config) -> ClientId = atom_to_binary(?FUNCTION_NAME), SubTopicFilter = <<"t/#">>, ?check_trace( + #{timetrap => 30_000}, begin ?tp(notice, "starting", #{}), Client = start_client(#{ @@ -220,6 +224,7 @@ t_session_subscription_idempotency(Config) -> SubTopicFilter = <<"t/+">>, ClientId = <<"myclientid">>, ?check_trace( + #{timetrap => 30_000}, begin ?force_ordering( #{?snk_kind := persistent_session_ds_subscription_added}, @@ -281,6 +286,7 @@ t_session_unsubscription_idempotency(Config) -> SubTopicFilter = <<"t/+">>, ClientId = <<"myclientid">>, ?check_trace( + #{timetrap => 30_000}, begin ?force_ordering( #{ @@ -385,6 +391,7 @@ do_t_session_discard(Params) -> ReconnectOpts = ReconnectOpts0#{clientid => ClientId}, SubTopicFilter = <<"t/+">>, ?check_trace( + #{timetrap => 30_000}, begin ?tp(notice, "starting", #{}), Client0 = start_client(#{ @@ -472,6 +479,7 @@ do_t_session_expiration(_Config, Opts) -> } = Opts, CommonParams = #{proto_ver => v5, clientid => ClientId}, ?check_trace( + #{timetrap => 30_000}, begin Topic = <<"some/topic">>, Params0 = maps:merge(CommonParams, FirstConn), @@ -539,6 +547,7 @@ t_session_gc(Config) -> end, ?check_trace( + #{timetrap => 30_000}, begin ClientId0 = <<"session_gc0">>, Client0 = StartClient(ClientId0, Port1, 30), diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 8e3755fdb..effad17dd 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -99,7 +99,7 @@ needs_persistence(Msg) -> -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> - emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]). + emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}). has_subscribers(#message{topic = Topic}) -> emqx_persistent_session_ds_router:has_any_route(Topic). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 36c8848cf..0c0eaac28 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -376,7 +376,7 @@ t_publish_empty_topic_levels(_Config) -> {<<"t/3/bar">>, <<"6">>} ], [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages], - Received = receive_messages(length(Messages), 1_500), + Received = receive_messages(length(Messages)), ?assertMatch( [ #{topic := <<"t//1/">>, payload := <<"2">>}, diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 09cbf306d..f2a42332e 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 434169520..1402a19e3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -111,11 +111,15 @@ %% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). --type message_store_opts() :: #{}. +-type message_store_opts() :: + #{ + sync => boolean() + }. -type generic_db_opts() :: #{ backend := atom(), + serialize_by => clientid | topic, _ => _ }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 858855b6f..dcf353a99 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_ds_app). diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl new file mode 100644 index 000000000..9df07eb18 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -0,0 +1,152 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc Supervisor that contains all the processes that belong to a +%% given builtin DS database. +-module(emqx_ds_builtin_db_sup). + +-behaviour(supervisor). + +%% API: +-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]). + +%% behaviour callbacks: +-export([init/1]). + +%% internal exports: +-export([start_link_sup/2]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(via(REC), {via, gproc, {n, l, REC}}). + +-define(db_sup, ?MODULE). +-define(shard_sup, emqx_ds_builtin_db_shard_sup). +-define(egress_sup, emqx_ds_builtin_db_egress_sup). + +-record(?db_sup, {db}). +-record(?shard_sup, {db}). +-record(?egress_sup, {db}). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> {ok, pid()}. +start_db(DB, Opts) -> + start_link_sup(#?db_sup{db = DB}, Opts). + +-spec start_shard(emqx_ds_storage_layer:shard_id()) -> + supervisor:startchild_ret(). +start_shard(Shard = {DB, _}) -> + supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)). + +-spec start_egress(emqx_ds_storage_layer:shard_id()) -> + supervisor:startchild_ret(). +start_egress({DB, Shard}) -> + supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)). + +-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. +stop_shard(Shard = {DB, _}) -> + Sup = ?via(#?shard_sup{db = DB}), + ok = supervisor:terminate_child(Sup, Shard), + ok = supervisor:delete_child(Sup, Shard). + +-spec ensure_shard(emqx_ds_storage_layer:shard_id()) -> + ok | {error, _Reason}. +ensure_shard(Shard) -> + case start_shard(Shard) of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +%%================================================================================ +%% behaviour callbacks +%%================================================================================ + +init({#?db_sup{db = DB}, DefaultOpts}) -> + %% Spec for the top-level supervisor for the database: + logger:notice("Starting DS DB ~p", [DB]), + _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), + %% TODO: before the leader election is implemented, we set ourselves as the leader for all shards: + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + lists:foreach( + fun(Shard) -> + emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard) + end, + MyShards + ), + Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])], + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, Children}}; +init({#?shard_sup{db = DB}, _}) -> + %% Spec for the supervisor that manages the worker processes for + %% each local shard of the DB: + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + Children = [shard_spec(DB, Shard) || Shard <- MyShards], + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 1 + }, + {ok, {SupFlags, Children}}; +init({#?egress_sup{db = DB}, _}) -> + %% Spec for the supervisor that manages the egress proxy processes + %% managing traffic towards each of the shards of the DB: + Shards = emqx_ds_replication_layer_meta:shards(DB), + Children = [egress_spec(DB, Shard) || Shard <- Shards], + SupFlags = #{ + strategy => one_for_one, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, Children}}. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +start_link_sup(Id, Options) -> + supervisor:start_link(?via(Id), ?MODULE, {Id, Options}). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +sup_spec(Id, Options) -> + #{ + id => element(1, Id), + start => {?MODULE, start_link_sup, [Id, Options]}, + type => supervisor, + shutdown => infinity + }. + +shard_spec(DB, Shard) -> + Options = emqx_ds_replication_layer_meta:get_options(DB), + #{ + id => Shard, + start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. + +egress_spec(DB, Shard) -> + #{ + id => Shard, + start => {emqx_ds_replication_layer_egress, start_link, [DB, Shard]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl new file mode 100644 index 000000000..50ed18de1 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -0,0 +1,132 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This supervisor manages the global worker processes needed for +%% the functioning of builtin databases, and all builtin database +%% attach to it. +-module(emqx_ds_builtin_sup). + +-behaviour(supervisor). + +%% API: +-export([start_db/2, stop_db/1]). + +%% behavior callbacks: +-export([init/1]). + +%% internal exports: +-export([start_top/0, start_databases_sup/0]). + +-export_type([]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(top, ?MODULE). +-define(databases, emqx_ds_builtin_databases_sup). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + supervisor:startchild_ret(). +start_db(DB, Opts) -> + ensure_top(), + ChildSpec = #{ + id => DB, + start => {emqx_ds_builtin_db_sup, start_db, [DB, Opts]}, + type => supervisor, + shutdown => infinity + }, + supervisor:start_child(?databases, ChildSpec). + +-spec stop_db(emqx_ds:db()) -> ok. +stop_db(DB) -> + case whereis(?databases) of + Pid when is_pid(Pid) -> + _ = supervisor:terminate_child(?databases, DB), + _ = supervisor:delete_child(?databases, DB), + ok; + undefined -> + ok + end. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%% There are two layers of supervision: +%% +%% 1. top supervisor for the builtin backend. It contains the global +%% worker processes (like the metadata server), and `?databases' +%% supervisior. +%% +%% 2. `?databases': a `one_for_one' supervisor where each child is a +%% `db' supervisor that contains processes that represent the DB. +%% Chidren are attached dynamically to this one. +init(?top) -> + %% Children: + MetadataServer = #{ + id => metadata_server, + start => {emqx_ds_replication_layer_meta, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5000 + }, + DBsSup = #{ + id => ?databases, + start => {?MODULE, start_databases_sup, []}, + restart => permanent, + type => supervisor, + shutdown => infinity + }, + %% + SupFlags = #{ + strategy => one_for_all, + intensity => 1, + period => 1, + auto_shutdown => never + }, + {ok, {SupFlags, [MetadataServer, DBsSup]}}; +init(?databases) -> + %% Children are added dynamically: + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 1 + }, + {ok, {SupFlags, []}}. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec start_top() -> {ok, pid()}. +start_top() -> + supervisor:start_link({local, ?top}, ?MODULE, ?top). + +start_databases_sup() -> + supervisor:start_link({local, ?databases}, ?MODULE, ?databases). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ensure_top() -> + {ok, _} = emqx_ds_sup:attach_backend(builtin, {?MODULE, start_top, []}), + ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 387587570..7f696e3ce 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -32,7 +32,10 @@ get_streams/3, make_iterator/4, update_iterator/3, - next/3 + next/3, + node_of_shard/2, + shard_of_message/3, + maybe_set_myself_as_leader/2 ]). %% internal exports: @@ -51,24 +54,12 @@ -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include("emqx_ds_replication_layer.hrl"). %%================================================================================ %% Type declarations %%================================================================================ -%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending -%% records over the wire. - -%% tags: --define(STREAM, 1). --define(IT, 2). --define(BATCH, 3). - -%% keys: --define(tag, 1). --define(shard, 2). --define(enc, 3). - -type shard_id() :: binary(). -type builtin_db_opts() :: @@ -101,8 +92,6 @@ -type message_id() :: emqx_ds:message_id(). --define(batch_messages, 2). - -type batch() :: #{ ?tag := ?BATCH, ?batch_messages := [emqx_types:message()] @@ -120,16 +109,14 @@ list_shards(DB) -> -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. open_db(DB, CreateOpts) -> - ok = emqx_ds_sup:ensure_workers(), - Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), - MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - lists:foreach( - fun(Shard) -> - emqx_ds_storage_layer:open_shard({DB, Shard}, Opts), - maybe_set_myself_as_leader(DB, Shard) - end, - MyShards - ). + case emqx_ds_builtin_sup:start_db(DB, CreateOpts) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ok; + {error, Err} -> + {error, Err} + end. -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> @@ -170,17 +157,15 @@ drop_generation(DB, {Shard, GenId}) -> -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), - _ = emqx_ds_proto_v1:drop_db(Nodes, DB), + _ = emqx_ds_proto_v2:drop_db(Nodes, DB), _ = emqx_ds_replication_layer_meta:drop_db(DB), + emqx_ds_builtin_sup:stop_db(DB), ok. -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> - Shard = shard_of_messages(DB, Messages), - Node = node_of_shard(DB, Shard), - Batch = #{?tag => ?BATCH, ?batch_messages => Messages}, - emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). + emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -262,6 +247,41 @@ next(DB, Iter0, BatchSize) -> Other end. +-spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). +node_of_shard(DB, Shard) -> + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + %% TODO: use optvar + timer:sleep(500), + node_of_shard(DB, Shard) + end. + +-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> + emqx_ds_replication_layer:shard_id(). +shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> + N = emqx_ds_replication_layer_meta:n_shards(DB), + Hash = + case SerializeBy of + clientid -> erlang:phash2(From, N); + topic -> erlang:phash2(Topic, N) + end, + integer_to_binary(Hash). + +%% TODO: there's no real leader election right now +-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. +maybe_set_myself_as_leader(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of + [Site | _] -> + %% Currently the first in-sync replica always becomes the + %% leader + ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); + _Sites -> + ok + end. + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -273,6 +293,7 @@ next(DB, Iter0, BatchSize) -> -spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}. do_drop_db_v1(DB) -> MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + emqx_ds_builtin_sup:stop_db(DB), lists:foreach( fun(Shard) -> emqx_ds_storage_layer:drop_shard({DB, Shard}) @@ -354,34 +375,5 @@ do_drop_generation_v3(DB, ShardId, GenId) -> %% Internal functions %%================================================================================ -%% TODO: there's no real leader election right now --spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. -maybe_set_myself_as_leader(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of - [Site | _] -> - %% Currently the first in-sync replica always becomes the - %% leader - ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); - _Sites -> - ok - end. - --spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(DB, Shard) -> - case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of - {ok, Leader} -> - Leader; - {error, no_leader_for_shard} -> - %% TODO: use optvar - timer:sleep(500), - node_of_shard(DB, Shard) - end. - -%% Here we assume that all messages in the batch come from the same client -shard_of_messages(DB, [#message{from = From} | _]) -> - N = emqx_ds_replication_layer_meta:n_shards(DB), - integer_to_binary(erlang:phash2(From, N)). - list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl new file mode 100644 index 000000000..42e72f258 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022, 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_REPLICATION_LAYER_HRL). +-define(EMQX_DS_REPLICATION_LAYER_HRL, true). + +%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending +%% records over the wire. + +%% tags: +-define(STREAM, 1). +-define(IT, 2). +-define(BATCH, 3). + +%% keys: +-define(tag, 1). +-define(shard, 2). +-define(enc, 3). +-define(batch_messages, 2). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl new file mode 100644 index 000000000..3b264d9d1 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -0,0 +1,175 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Egress servers are responsible for proxing the outcoming +%% `store_batch' requests towards EMQX DS shards. +%% +%% They re-assemble messages from different local processes into +%% fixed-sized batches, and introduce centralized channels between the +%% nodes. They are also responsible for maintaining backpressure +%% towards the local publishers. +%% +%% There is (currently) one egress process for each shard running on +%% each node, but it should be possible to have a pool of egress +%% servers, if needed. +-module(emqx_ds_replication_layer_egress). + +-behaviour(gen_server). + +%% API: +-export([start_link/2, store_batch/3]). + +%% behavior callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% internal exports: +-export([]). + +-export_type([]). + +-include("emqx_ds_replication_layer.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). +-define(flush, flush). + +-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}. +start_link(DB, Shard) -> + gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). + +-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + ok. +store_batch(DB, Messages, Opts) -> + Sync = maps:get(sync, Opts, true), + lists:foreach( + fun(Message) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) + end, + Messages + ). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-record(s, { + db :: emqx_ds:db(), + shard :: emqx_ds_replication_layer:shard_id(), + leader :: node(), + n = 0 :: non_neg_integer(), + tref :: reference(), + batch = [] :: [emqx_types:message()], + pending_replies = [] :: [gen_server:from()] +}). + +init([DB, Shard]) -> + process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), + %% TODO: adjust leader dynamically + {ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard), + S = #s{ + db = DB, + shard = Shard, + leader = Leader, + tref = start_timer() + }, + {ok, S}. + +handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> + do_enqueue(From, Sync, Msg, S); +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(?flush, S) -> + {noreply, do_flush(S)}; +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ + +do_flush(S = #s{batch = []}) -> + S#s{tref = start_timer()}; +do_flush( + S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader} +) -> + Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)}, + ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}), + [gen_server:reply(From, ok) || From <- lists:reverse(Replies)], + ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}), + erlang:garbage_collect(), + S#s{ + n = 0, + batch = [], + pending_replies = [], + tref = start_timer() + }. + +do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> + NMax = 1000, + S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, + S2 = + case N >= NMax of + true -> + _ = erlang:cancel_timer(S0#s.tref), + do_flush(S1); + false -> + S1 + end, + %% TODO: later we may want to delay the reply until the message is + %% replicated, but it requies changes to the PUBACK/PUBREC flow to + %% allow for async replies. For now, we ack when the message is + %% _buffered_ rather than stored. + %% + %% Otherwise, the client would freeze for at least flush interval, + %% or until the buffer is filled. + S = + case Sync of + true -> + S2#s{pending_replies = [From | Replies]}; + false -> + gen_server:reply(From, ok), + S2 + end, + %% TODO: add a backpressure mechanism for the server to avoid + %% building a long message queue. + {noreply, S}. + +start_timer() -> + Interval = 10, + erlang:send_after(Interval, self(), flush). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 16c52f20e..b49b0e8f7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ in_sync_replicas/2, sites/0, open_db/2, + get_options/1, update_db_config/2, drop_db/1, shard_leader/2, @@ -230,6 +231,11 @@ is_leader(Node) -> {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]), Result. +-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). +get_options(DB) -> + {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]), + Opts. + -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> @@ -293,11 +299,11 @@ terminate(_Reason, #s{}) -> %% Internal exports %%================================================================================ --spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> +-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) -> emqx_ds_replication_layer:builtin_db_opts(). open_db_trans(DB, CreateOpts) -> case mnesia:wread({?META_TAB, DB}) of - [] -> + [] when is_map(CreateOpts) -> NShards = maps:get(n_shards, CreateOpts), ReplicationFactor = maps:get(replication_factor, CreateOpts), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 0dcb8ce52..85a94a846 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -199,7 +199,6 @@ open_shard(Shard, Options) -> -spec drop_shard(shard_id()) -> ok. drop_shard(Shard) -> - catch emqx_ds_storage_layer_sup:stop_shard(Shard), case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of undefined -> ok; @@ -586,7 +585,8 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB rocksdb_open(Shard, Options) -> DBOptions = [ {create_if_missing, true}, - {create_missing_column_families, true} + {create_missing_column_families, true}, + {enable_write_thread_adaptive_yield, false} | maps:get(db_options, Options, []) ], DataDir = maps:get(data_dir, Options, emqx:data_dir()), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index fd8cf289f..424b35133 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_ds_storage_layer_sup). @@ -23,7 +23,7 @@ -spec start_link() -> {ok, pid()}. start_link() -> - supervisor:start_link({local, ?SUP}, ?MODULE, []). + supervisor:start_link(?MODULE, []). -spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> supervisor:startchild_ret(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index 819d7d874..e863e74ce 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, ensure_workers/0]). +-export([start_link/0, attach_backend/2]). %% behaviour callbacks: -export([init/1]). @@ -25,64 +25,40 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, top). --spec ensure_workers() -> ok. -ensure_workers() -> - ChildSpec = #{ - id => workers_sup, - restart => temporary, - type => supervisor, - start => {supervisor, start_link, [?MODULE, workers]} +%% @doc Attach a child backend-specific supervisor to the top +%% application supervisor, if not yet present +-spec attach_backend(_BackendId, {module(), atom(), list()}) -> + {ok, pid()} | {error, _}. +attach_backend(Backend, Start) -> + Spec = #{ + id => Backend, + start => Start, + significant => false, + shutdown => infinity, + type => supervisor }, - case supervisor:start_child(?SUP, ChildSpec) of - {ok, _} -> - ok; - {error, already_present} -> - ok; - {error, {already_started, _}} -> - ok + case supervisor:start_child(?SUP, Spec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, Err} -> + {error, Err} end. %%================================================================================ %% behaviour callbacks %%================================================================================ --dialyzer({nowarn_function, init/1}). init(top) -> + Children = [], SupFlags = #{ - strategy => one_for_all, + strategy => one_for_one, intensity => 10, period => 1 }, - {ok, {SupFlags, []}}; -init(workers) -> - %% TODO: technically, we don't need rocksDB for the alternative - %% backends. But right now we have any: - Children = [meta(), storage_layer_sup()], - SupFlags = #{ - strategy => one_for_all, - intensity => 0, - period => 1 - }, {ok, {SupFlags, Children}}. %%================================================================================ %% Internal functions %%================================================================================ - -meta() -> - #{ - id => emqx_ds_replication_layer_meta, - start => {emqx_ds_replication_layer_meta, start_link, []}, - restart => permanent, - type => worker, - shutdown => 5000 - }. - -storage_layer_sup() -> - #{ - id => local_store_shard_sup, - start => {emqx_ds_storage_layer_sup, start_link, []}, - restart => permanent, - type => supervisor, - shutdown => infinity - }. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index d7dccccf5..9dae8e699 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 5d32143a7..03d86dd88 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -16,8 +16,8 @@ -define(DEFAULT_CONFIG, #{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 16, - replication_factor => 3 + n_shards => 1, + replication_factor => 1 }). -define(COMPACT_CONFIG, #{ @@ -26,15 +26,10 @@ {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 }}, - n_shards => 16, - replication_factor => 3 + n_shards => 1, + replication_factor => 1 }). -%% Smoke test for opening and reopening the database -t_open(_Config) -> - ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). - %% Smoke test of store function t_store(_Config) -> MessageID = emqx_guid:gen(), @@ -98,8 +93,8 @@ t_get_streams(_Config) -> [FooBarBaz] = GetStream(<<"foo/bar/baz">>), [A] = GetStream(<<"a">>), %% Restart shard to make sure trie is persisted and restored: - ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}), + ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME), + {ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}), %% Verify that there are no "ghost streams" for topics that don't %% have any messages: [] = GetStream(<<"bar/foo">>), @@ -196,9 +191,9 @@ t_replay(_Config) -> ?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)), - %% Restart shard to make sure trie is persisted and restored: - ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}), + %% Restart the DB to make sure trie is persisted and restored: + ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME), + {ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}), %% Learned wildcard topics: ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])), ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)), @@ -412,21 +407,21 @@ suite() -> [{timetrap, {seconds, 20}}]. init_per_suite(Config) -> {ok, _} = application:ensure_all_started(emqx_durable_storage), - emqx_ds_sup:ensure_workers(), Config. end_per_suite(_Config) -> ok = application:stop(emqx_durable_storage). init_per_testcase(TC, Config) -> - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), ?DEFAULT_CONFIG), + ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG), Config. end_per_testcase(TC, _Config) -> - ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). + emqx_ds:drop_db(TC), + ok. shard(TC) -> - {?MODULE, atom_to_binary(TC)}. + {TC, <<"0">>}. keyspace(TC) -> TC. From eee221f1d018f2fef7a528f3ba16a20fd78784f9 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Jan 2024 21:29:37 +0100 Subject: [PATCH 2/4] feat(ds): Make egress batching configurable --- apps/emqx/src/emqx_schema.erl | 20 ++++++++++++++++++- .../src/emqx_ds_replication_layer_egress.erl | 4 ++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index bbca13172..b03cfe72e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -1915,6 +1915,24 @@ fields("session_storage_backend_builtin") -> default => 3, importance => ?IMPORTANCE_HIDDEN } + )}, + {"egress_batch_size", + sc( + pos_integer(), + #{ + default => 1000, + mapping => "emqx_durable_storage.egress_batch_size", + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"egress_flush_interval", + sc( + timeout_duration_ms(), + #{ + default => 100, + mapping => "emqx_durable_storage.egress_flush_interval", + importance => ?IMPORTANCE_HIDDEN + } )} ]. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 3b264d9d1..842e8e5ed 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -141,7 +141,7 @@ do_flush( }. do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> - NMax = 1000, + NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, S2 = case N >= NMax of @@ -171,5 +171,5 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl {noreply, S}. start_timer() -> - Interval = 10, + Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), flush). From 305a54f646d7539fa6820a721430130486735e87 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 24 Jan 2024 14:43:09 +0100 Subject: [PATCH 3/4] chore(ds): Update BPAPI version --- .../src/emqx_ds_replication_layer.erl | 12 ++++++------ .../src/emqx_ds_replication_layer_egress.erl | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 7f696e3ce..7432fe3c7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -121,7 +121,7 @@ open_db(DB, CreateOpts) -> -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v2:add_generation(Nodes, DB), + _ = emqx_ds_proto_v3:add_generation(Nodes, DB), ok. -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. @@ -157,7 +157,7 @@ drop_generation(DB, {Shard, GenId}) -> -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), - _ = emqx_ds_proto_v2:drop_db(Nodes, DB), + _ = emqx_ds_proto_v3:drop_db(Nodes, DB), _ = emqx_ds_replication_layer_meta:drop_db(DB), emqx_ds_builtin_sup:stop_db(DB), ok. @@ -174,7 +174,7 @@ get_streams(DB, TopicFilter, StartTime) -> lists:flatmap( fun(Shard) -> Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime), lists:map( fun({RankY, Stream}) -> RankX = Shard, @@ -196,7 +196,7 @@ get_streams(DB, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) -> #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream, Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> @@ -213,7 +213,7 @@ update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, Node = node_of_shard(DB, Shard), case - emqx_ds_proto_v2:update_iterator( + emqx_ds_proto_v3:update_iterator( Node, DB, Shard, @@ -239,7 +239,7 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of + case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 842e8e5ed..8b37b29cb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -172,4 +172,4 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - erlang:send_after(Interval, self(), flush). + erlang:send_after(Interval, self(), ?flush). From 7b5f2948fe127c85dd70b3df693769b315251cbe Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 24 Jan 2024 18:38:15 +0100 Subject: [PATCH 4/4] test(ds): Fix flaky testcase --- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 0c0eaac28..8a63d46be 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -475,7 +475,7 @@ t_metrics_not_dropped(_Config) -> {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1), emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1), - ?assertMatch([_], receive_messages(1, 1_500)), + ?assertMatch([_], receive_messages(1)), DroppedAfter = emqx_metrics:val('messages.dropped'), DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),