feat(dsrepl): allow to tune select ra options

This commit is contained in:
Andrew Mayorov 2024-02-28 18:40:12 +01:00
parent 54b5adf868
commit 5cc0246351
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
8 changed files with 123 additions and 42 deletions

View File

@ -36,13 +36,15 @@
%% API %% API
%%================================================================================ %%================================================================================
translate_builtin(#{ translate_builtin(
backend := builtin, Backend = #{
n_shards := NShards, backend := builtin,
n_sites := NSites, n_shards := NShards,
replication_factor := ReplFactor, n_sites := NSites,
layout := Layout replication_factor := ReplFactor,
}) -> layout := Layout
}
) ->
Storage = Storage =
case Layout of case Layout of
#{ #{
@ -64,6 +66,7 @@ translate_builtin(#{
n_shards => NShards, n_shards => NShards,
n_sites => NSites, n_sites => NSites,
replication_factor => ReplFactor, replication_factor => ReplFactor,
replication_options => maps:get(replication_options, Backend, #{}),
storage => Storage storage => Storage
}. }.
@ -144,6 +147,15 @@ fields(builtin) ->
importance => ?IMPORTANCE_HIDDEN importance => ?IMPORTANCE_HIDDEN
} }
)}, )},
%% TODO: Elaborate.
{"replication_options",
sc(
hoconsc:map(name, any()),
#{
default => #{},
importance => ?IMPORTANCE_HIDDEN
}
)},
{local_write_buffer, {local_write_buffer,
sc( sc(
ref(builtin_local_write_buffer), ref(builtin_local_write_buffer),

View File

@ -50,6 +50,20 @@ init_per_testcase(t_message_gc = TestCase, Config) ->
"\n durable_storage.messages.n_shards = 3" "\n durable_storage.messages.n_shards = 3"
}, },
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts); common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
init_per_testcase(t_replication_options = TestCase, Config) ->
Opts = #{
extra_emqx_conf =>
"\n durable_storage.messages.replication_options {"
"\n wal_max_size_bytes = 16000000"
"\n wal_max_batch_size = 1024"
"\n wal_write_strategy = o_sync"
"\n wal_sync_method = datasync"
"\n wal_compute_checksums = false"
"\n snapshot_interval = 64"
"\n resend_window = 60"
"\n}"
},
common_init_per_testcase(TestCase, Config, Opts);
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config, _Opts = #{}). common_init_per_testcase(TestCase, Config, _Opts = #{}).
@ -448,6 +462,33 @@ t_metrics_not_dropped(_Config) ->
ok. ok.
t_replication_options(_Config) ->
?assertMatch(
#{
backend := builtin,
replication_options := #{
wal_max_size_bytes := 16000000,
wal_max_batch_size := 1024,
wal_write_strategy := o_sync,
wal_sync_method := datasync,
wal_compute_checksums := false,
snapshot_interval := 64,
resend_window := 60
}
},
emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB)
),
?assertMatch(
#{
wal_max_size_bytes := 16000000,
wal_max_batch_size := 1024,
wal_write_strategy := o_sync,
wal_compute_checksums := false,
wal_sync_method := datasync
},
ra_system:fetch(?PERSISTENT_MESSAGE_DB)
).
%% %%
connect(ClientId, CleanStart, EI) -> connect(ClientId, CleanStart, EI) ->

View File

@ -101,6 +101,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
%% Spec for the top-level supervisor for the database: %% Spec for the top-level supervisor for the database:
logger:notice("Starting DS DB ~p", [DB]), logger:notice("Starting DS DB ~p", [DB]),
Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
ok = start_ra_system(DB, Opts),
Children = [ Children = [
sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?shard_sup{db = DB}, []),
sup_spec(#?egress_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, []),
@ -135,12 +136,40 @@ init({allocator, DB, Opts}) ->
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
init_allocator(DB, Opts). init_allocator(DB, Opts).
start_shards(DB, Shards) -> start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
DataDir = filename:join([emqx:data_dir(), DB, dsrepl]),
Config = lists:foldr(fun maps:merge/2, #{}, [
ra_system:default_config(),
#{
name => DB,
data_dir => DataDir,
wal_data_dir => DataDir,
names => ra_system:derive_names(DB)
},
maps:with(
[
wal_max_size_bytes,
wal_max_batch_size,
wal_write_strategy,
wal_sync_method,
wal_compute_checksums
],
ReplicationOpts
)
]),
case ra_system:start(Config) of
{ok, _System} ->
ok;
{error, {already_started, _System}} ->
ok
end.
start_shards(DB, Shards, Opts) ->
SupRef = ?via(#?shard_sup{db = DB}), SupRef = ?via(#?shard_sup{db = DB}),
lists:foreach( lists:foreach(
fun(Shard) -> fun(Shard) ->
{ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard)), {ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard, Opts)),
{ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard)) {ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard, Opts))
end, end,
Shards Shards
). ).
@ -174,7 +203,9 @@ sup_spec(Id, Options) ->
}. }.
shard_spec(DB, Shard) -> shard_spec(DB, Shard) ->
Options = emqx_ds_replication_layer_meta:get_options(DB), shard_spec(DB, Shard, emqx_ds_replication_layer_meta:get_options(DB)).
shard_spec(DB, Shard, Options) ->
#{ #{
id => {Shard, storage}, id => {Shard, storage},
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
@ -183,10 +214,10 @@ shard_spec(DB, Shard) ->
type => worker type => worker
}. }.
shard_replication_spec(DB, Shard) -> shard_replication_spec(DB, Shard, Opts) ->
#{ #{
id => {Shard, replication}, id => {Shard, replication},
start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard]}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]},
restart => transient, restart => transient,
type => worker type => worker
}. }.
@ -249,6 +280,7 @@ handle_info(timeout, State) ->
end. end.
terminate(_Reason, #{db := DB, shards := Shards}) -> terminate(_Reason, #{db := DB, shards := Shards}) ->
%% FIXME
erase_shards_meta(DB, Shards). erase_shards_meta(DB, Shards).
%% %%
@ -258,7 +290,7 @@ allocate_shards(State = #{db := DB, opts := Opts}) ->
{ok, Shards} -> {ok, Shards} ->
logger:notice(#{msg => "Shards allocated", shards => Shards}), logger:notice(#{msg => "Shards allocated", shards => Shards}),
ok = save_shards_meta(DB, Shards), ok = save_shards_meta(DB, Shards),
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB), Opts),
logger:notice(#{ logger:notice(#{
msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB) msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB)
}), }),

View File

@ -92,7 +92,8 @@
storage := emqx_ds_storage_layer:prototype(), storage := emqx_ds_storage_layer:prototype(),
n_shards => pos_integer(), n_shards => pos_integer(),
n_sites => pos_integer(), n_sites => pos_integer(),
replication_factor => pos_integer() replication_factor => pos_integer(),
replication_options => _TODO :: #{}
}. }.
%% This enapsulates the stream entity from the replication level. %% This enapsulates the stream entity from the replication level.

View File

@ -224,25 +224,11 @@ drop_db(DB) ->
init([]) -> init([]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
logger:set_process_metadata(#{domain => [ds, meta]}), logger:set_process_metadata(#{domain => [ds, meta]}),
init_ra(),
ensure_tables(), ensure_tables(),
ensure_site(), ensure_site(),
S = #s{}, S = #s{},
{ok, S}. {ok, S}.
init_ra() ->
DataDir = filename:join([emqx:data_dir(), "dsrepl"]),
Config = maps:merge(ra_system:default_config(), #{
data_dir => DataDir,
wal_data_dir => DataDir
}),
case ra_system:start(Config) of
{ok, _System} ->
ok;
{error, {already_started, _System}} ->
ok
end.
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.

View File

@ -16,7 +16,7 @@
-module(emqx_ds_replication_layer_shard). -module(emqx_ds_replication_layer_shard).
-export([start_link/2]). -export([start_link/3]).
-export([shard_servers/2]). -export([shard_servers/2]).
-export([ -export([
@ -45,8 +45,8 @@
%% %%
start_link(DB, Shard) -> start_link(DB, Shard, Opts) ->
gen_server:start_link(?MODULE, {DB, Shard}, []). gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
shard_servers(DB, Shard) -> shard_servers(DB, Shard) ->
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
@ -125,9 +125,9 @@ get_shard_servers(DB, Shard) ->
%% %%
init({DB, Shard}) -> init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true), _ = process_flag(trap_exit, true),
_Meta = start_shard(DB, Shard), _Meta = start_shard(DB, Shard, Opts),
{ok, {DB, Shard}}. {ok, {DB, Shard}}.
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
@ -138,28 +138,33 @@ handle_cast(_Msg, State) ->
terminate(_Reason, {DB, Shard}) -> terminate(_Reason, {DB, Shard}) ->
LocalServer = get_local_server(DB, Shard), LocalServer = get_local_server(DB, Shard),
ok = ra:stop_server(LocalServer). ok = ra:stop_server(DB, LocalServer).
%% %%
start_shard(DB, Shard) -> start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
System = default,
Site = emqx_ds_replication_layer_meta:this_site(), Site = emqx_ds_replication_layer_meta:this_site(),
ClusterName = cluster_name(DB, Shard), ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard), Servers = shard_servers(DB, Shard),
case ra:restart_server(System, LocalServer) of case ra:restart_server(DB, LocalServer) of
ok -> ok ->
Bootstrap = false; Bootstrap = false;
{error, name_not_registered} -> {error, name_not_registered} ->
Bootstrap = true, Bootstrap = true,
ok = ra:start_server(System, #{ ok = ra:start_server(DB, #{
id => LocalServer, id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>, uid => <<ClusterName/binary, "_", Site/binary>>,
cluster_name => ClusterName, cluster_name => ClusterName,
initial_members => Servers, initial_members => Servers,
machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
log_init_args => #{} log_init_args => maps:with(
[
snapshot_interval,
resend_window
],
ReplicationOpts
)
}) })
end, end,
case Servers of case Servers of

View File

@ -31,7 +31,9 @@ opts() ->
backend => builtin, backend => builtin,
storage => {emqx_ds_storage_reference, #{}}, storage => {emqx_ds_storage_reference, #{}},
n_shards => ?N_SHARDS, n_shards => ?N_SHARDS,
replication_factor => 3 n_sites => 1,
replication_factor => 3,
replication_options => #{}
}. }.
%% A simple smoke test that verifies that opening/closing the DB %% A simple smoke test that verifies that opening/closing the DB

View File

@ -29,7 +29,9 @@
backend => builtin, backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}, storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 1, n_shards => 1,
replication_factor => 1 n_sites => 1,
replication_factor => 1,
replication_options => #{}
}). }).
-define(COMPACT_CONFIG, #{ -define(COMPACT_CONFIG, #{