From 5cc02463511cf236c11cbef8c87431ab1cad1316 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 28 Feb 2024 18:40:12 +0100 Subject: [PATCH] feat(dsrepl): allow to tune select ra options --- apps/emqx/src/emqx_ds_schema.erl | 26 ++++++++--- .../test/emqx_persistent_messages_SUITE.erl | 41 +++++++++++++++++ .../src/emqx_ds_builtin_db_sup.erl | 46 ++++++++++++++++--- .../src/emqx_ds_replication_layer.erl | 3 +- .../src/emqx_ds_replication_layer_meta.erl | 14 ------ .../src/emqx_ds_replication_layer_shard.erl | 27 ++++++----- .../test/emqx_ds_SUITE.erl | 4 +- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 4 +- 8 files changed, 123 insertions(+), 42 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index ef1f81b2b..4b9eb8773 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -36,13 +36,15 @@ %% API %%================================================================================ -translate_builtin(#{ - backend := builtin, - n_shards := NShards, - n_sites := NSites, - replication_factor := ReplFactor, - layout := Layout -}) -> +translate_builtin( + Backend = #{ + backend := builtin, + n_shards := NShards, + n_sites := NSites, + replication_factor := ReplFactor, + layout := Layout + } +) -> Storage = case Layout of #{ @@ -64,6 +66,7 @@ translate_builtin(#{ n_shards => NShards, n_sites => NSites, replication_factor => ReplFactor, + replication_options => maps:get(replication_options, Backend, #{}), storage => Storage }. @@ -144,6 +147,15 @@ fields(builtin) -> importance => ?IMPORTANCE_HIDDEN } )}, + %% TODO: Elaborate. + {"replication_options", + sc( + hoconsc:map(name, any()), + #{ + default => #{}, + importance => ?IMPORTANCE_HIDDEN + } + )}, {local_write_buffer, sc( ref(builtin_local_write_buffer), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index b2a41b0d2..33fa17ab5 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -50,6 +50,20 @@ init_per_testcase(t_message_gc = TestCase, Config) -> "\n durable_storage.messages.n_shards = 3" }, common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts); +init_per_testcase(t_replication_options = TestCase, Config) -> + Opts = #{ + extra_emqx_conf => + "\n durable_storage.messages.replication_options {" + "\n wal_max_size_bytes = 16000000" + "\n wal_max_batch_size = 1024" + "\n wal_write_strategy = o_sync" + "\n wal_sync_method = datasync" + "\n wal_compute_checksums = false" + "\n snapshot_interval = 64" + "\n resend_window = 60" + "\n}" + }, + common_init_per_testcase(TestCase, Config, Opts); init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config, _Opts = #{}). @@ -448,6 +462,33 @@ t_metrics_not_dropped(_Config) -> ok. +t_replication_options(_Config) -> + ?assertMatch( + #{ + backend := builtin, + replication_options := #{ + wal_max_size_bytes := 16000000, + wal_max_batch_size := 1024, + wal_write_strategy := o_sync, + wal_sync_method := datasync, + wal_compute_checksums := false, + snapshot_interval := 64, + resend_window := 60 + } + }, + emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB) + ), + ?assertMatch( + #{ + wal_max_size_bytes := 16000000, + wal_max_batch_size := 1024, + wal_write_strategy := o_sync, + wal_compute_checksums := false, + wal_sync_method := datasync + }, + ra_system:fetch(?PERSISTENT_MESSAGE_DB) + ). + %% connect(ClientId, CleanStart, EI) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 6cee4877e..aa74e2080 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -101,6 +101,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), + ok = start_ra_system(DB, Opts), Children = [ sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, []), @@ -135,12 +136,40 @@ init({allocator, DB, Opts}) -> _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), init_allocator(DB, Opts). -start_shards(DB, Shards) -> +start_ra_system(DB, #{replication_options := ReplicationOpts}) -> + DataDir = filename:join([emqx:data_dir(), DB, dsrepl]), + Config = lists:foldr(fun maps:merge/2, #{}, [ + ra_system:default_config(), + #{ + name => DB, + data_dir => DataDir, + wal_data_dir => DataDir, + names => ra_system:derive_names(DB) + }, + maps:with( + [ + wal_max_size_bytes, + wal_max_batch_size, + wal_write_strategy, + wal_sync_method, + wal_compute_checksums + ], + ReplicationOpts + ) + ]), + case ra_system:start(Config) of + {ok, _System} -> + ok; + {error, {already_started, _System}} -> + ok + end. + +start_shards(DB, Shards, Opts) -> SupRef = ?via(#?shard_sup{db = DB}), lists:foreach( fun(Shard) -> - {ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard)), - {ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard)) + {ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard, Opts)), + {ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard, Opts)) end, Shards ). @@ -174,7 +203,9 @@ sup_spec(Id, Options) -> }. shard_spec(DB, Shard) -> - Options = emqx_ds_replication_layer_meta:get_options(DB), + shard_spec(DB, Shard, emqx_ds_replication_layer_meta:get_options(DB)). + +shard_spec(DB, Shard, Options) -> #{ id => {Shard, storage}, start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, @@ -183,10 +214,10 @@ shard_spec(DB, Shard) -> type => worker }. -shard_replication_spec(DB, Shard) -> +shard_replication_spec(DB, Shard, Opts) -> #{ id => {Shard, replication}, - start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard]}, + start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, restart => transient, type => worker }. @@ -249,6 +280,7 @@ handle_info(timeout, State) -> end. terminate(_Reason, #{db := DB, shards := Shards}) -> + %% FIXME erase_shards_meta(DB, Shards). %% @@ -258,7 +290,7 @@ allocate_shards(State = #{db := DB, opts := Opts}) -> {ok, Shards} -> logger:notice(#{msg => "Shards allocated", shards => Shards}), ok = save_shards_meta(DB, Shards), - ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), + ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB), Opts), logger:notice(#{ msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB) }), diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a19f376fe..c34c8d49d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -92,7 +92,8 @@ storage := emqx_ds_storage_layer:prototype(), n_shards => pos_integer(), n_sites => pos_integer(), - replication_factor => pos_integer() + replication_factor => pos_integer(), + replication_options => _TODO :: #{} }. %% This enapsulates the stream entity from the replication level. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index a4331cdea..75b1a225a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -224,25 +224,11 @@ drop_db(DB) -> init([]) -> process_flag(trap_exit, true), logger:set_process_metadata(#{domain => [ds, meta]}), - init_ra(), ensure_tables(), ensure_site(), S = #s{}, {ok, S}. -init_ra() -> - DataDir = filename:join([emqx:data_dir(), "dsrepl"]), - Config = maps:merge(ra_system:default_config(), #{ - data_dir => DataDir, - wal_data_dir => DataDir - }), - case ra_system:start(Config) of - {ok, _System} -> - ok; - {error, {already_started, _System}} -> - ok - end. - handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index d00e8fd9d..3bf528325 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -16,7 +16,7 @@ -module(emqx_ds_replication_layer_shard). --export([start_link/2]). +-export([start_link/3]). -export([shard_servers/2]). -export([ @@ -45,8 +45,8 @@ %% -start_link(DB, Shard) -> - gen_server:start_link(?MODULE, {DB, Shard}, []). +start_link(DB, Shard, Opts) -> + gen_server:start_link(?MODULE, {DB, Shard, Opts}, []). shard_servers(DB, Shard) -> {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), @@ -125,9 +125,9 @@ get_shard_servers(DB, Shard) -> %% -init({DB, Shard}) -> +init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - _Meta = start_shard(DB, Shard), + _Meta = start_shard(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -138,28 +138,33 @@ handle_cast(_Msg, State) -> terminate(_Reason, {DB, Shard}) -> LocalServer = get_local_server(DB, Shard), - ok = ra:stop_server(LocalServer). + ok = ra:stop_server(DB, LocalServer). %% -start_shard(DB, Shard) -> - System = default, +start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> Site = emqx_ds_replication_layer_meta:this_site(), ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), - case ra:restart_server(System, LocalServer) of + case ra:restart_server(DB, LocalServer) of ok -> Bootstrap = false; {error, name_not_registered} -> Bootstrap = true, - ok = ra:start_server(System, #{ + ok = ra:start_server(DB, #{ id => LocalServer, uid => <>, cluster_name => ClusterName, initial_members => Servers, machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, - log_init_args => #{} + log_init_args => maps:with( + [ + snapshot_interval, + resend_window + ], + ReplicationOpts + ) }) end, case Servers of diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 0c48019e3..64d81307c 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -31,7 +31,9 @@ opts() -> backend => builtin, storage => {emqx_ds_storage_reference, #{}}, n_shards => ?N_SHARDS, - replication_factor => 3 + n_sites => 1, + replication_factor => 3, + replication_options => #{} }. %% A simple smoke test that verifies that opening/closing the DB diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 9a47b2b5f..6e7d8629e 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -29,7 +29,9 @@ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, n_shards => 1, - replication_factor => 1 + n_sites => 1, + replication_factor => 1, + replication_options => #{} }). -define(COMPACT_CONFIG, #{