wip: allow to tune select ra options
This commit is contained in:
parent
da6844957e
commit
475efc93b3
|
@ -61,7 +61,7 @@ force_ds() ->
|
|||
emqx_config:get([session_persistence, force_persistence]).
|
||||
|
||||
storage_backend(#{
|
||||
builtin := #{
|
||||
builtin := Backend = #{
|
||||
enable := true,
|
||||
n_shards := NShards,
|
||||
n_sites := NSites,
|
||||
|
@ -73,7 +73,8 @@ storage_backend(#{
|
|||
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
||||
n_shards => NShards,
|
||||
n_sites => NSites,
|
||||
replication_factor => ReplicationFactor
|
||||
replication_factor => ReplicationFactor,
|
||||
replication_options => maps:get(replication_options, Backend, #{})
|
||||
};
|
||||
storage_backend(#{
|
||||
fdb := #{enable := true} = FDBConfig
|
||||
|
|
|
@ -1953,6 +1953,14 @@ fields("session_storage_backend_builtin") ->
|
|||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"replication_options",
|
||||
sc(
|
||||
map(name, any()),
|
||||
#{
|
||||
default => #{},
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"egress_batch_size",
|
||||
sc(
|
||||
pos_integer(),
|
||||
|
|
|
@ -101,6 +101,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
|
|||
%% Spec for the top-level supervisor for the database:
|
||||
logger:notice("Starting DS DB ~p", [DB]),
|
||||
Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
|
||||
ok = start_ra_system(DB, Opts),
|
||||
Children = [
|
||||
sup_spec(#?shard_sup{db = DB}, []),
|
||||
sup_spec(#?egress_sup{db = DB}, []),
|
||||
|
@ -135,12 +136,39 @@ init({allocator, DB, Opts}) ->
|
|||
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
|
||||
init_allocator(DB, Opts).
|
||||
|
||||
start_shards(DB, Shards) ->
|
||||
start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
|
||||
DataDir = filename:join([emqx:data_dir(), "dsrepl"]),
|
||||
Config = lists:foldr(fun maps:merge/2, #{}, [
|
||||
ra_system:default_config(),
|
||||
#{
|
||||
name => DB,
|
||||
data_dir => DataDir,
|
||||
wal_data_dir => DataDir
|
||||
},
|
||||
maps:with(
|
||||
[
|
||||
wal_max_size_bytes,
|
||||
wal_max_batch_size,
|
||||
wal_write_strategy,
|
||||
wal_sync_method,
|
||||
wal_compute_checksums
|
||||
],
|
||||
ReplicationOpts
|
||||
)
|
||||
]),
|
||||
case ra_system:start(Config) of
|
||||
{ok, _System} ->
|
||||
ok;
|
||||
{error, {already_started, _System}} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
start_shards(DB, Shards, Opts) ->
|
||||
SupRef = ?via(#?shard_sup{db = DB}),
|
||||
lists:foreach(
|
||||
fun(Shard) ->
|
||||
{ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard)),
|
||||
{ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard))
|
||||
{ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard, Opts)),
|
||||
{ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard, Opts))
|
||||
end,
|
||||
Shards
|
||||
).
|
||||
|
@ -174,7 +202,9 @@ sup_spec(Id, Options) ->
|
|||
}.
|
||||
|
||||
shard_spec(DB, Shard) ->
|
||||
Options = emqx_ds_replication_layer_meta:get_options(DB),
|
||||
shard_spec(DB, Shard, emqx_ds_replication_layer_meta:get_options(DB)).
|
||||
|
||||
shard_spec(DB, Shard, Options) ->
|
||||
#{
|
||||
id => {Shard, storage},
|
||||
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
|
||||
|
@ -183,10 +213,10 @@ shard_spec(DB, Shard) ->
|
|||
type => worker
|
||||
}.
|
||||
|
||||
shard_replication_spec(DB, Shard) ->
|
||||
shard_replication_spec(DB, Shard, Opts) ->
|
||||
#{
|
||||
id => {Shard, replication},
|
||||
start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard]},
|
||||
start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]},
|
||||
restart => transient,
|
||||
type => worker
|
||||
}.
|
||||
|
@ -249,6 +279,7 @@ handle_info(timeout, State) ->
|
|||
end.
|
||||
|
||||
terminate(_Reason, #{db := DB, shards := Shards}) ->
|
||||
%% FIXME
|
||||
erase_shards_meta(DB, Shards).
|
||||
|
||||
%%
|
||||
|
@ -258,7 +289,7 @@ allocate_shards(State = #{db := DB, opts := Opts}) ->
|
|||
{ok, Shards} ->
|
||||
logger:notice(#{msg => "Shards allocated", shards => Shards}),
|
||||
ok = save_shards_meta(DB, Shards),
|
||||
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
|
||||
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB), Opts),
|
||||
logger:notice(#{
|
||||
msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB)
|
||||
}),
|
||||
|
|
|
@ -78,7 +78,8 @@
|
|||
storage := emqx_ds_storage_layer:prototype(),
|
||||
n_shards => pos_integer(),
|
||||
n_sites => pos_integer(),
|
||||
replication_factor => pos_integer()
|
||||
replication_factor => pos_integer(),
|
||||
replication_options => _TODO :: #{}
|
||||
}.
|
||||
|
||||
%% This enapsulates the stream entity from the replication level.
|
||||
|
|
|
@ -239,25 +239,11 @@ drop_db(DB) ->
|
|||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
logger:set_process_metadata(#{domain => [ds, meta]}),
|
||||
init_ra(),
|
||||
ensure_tables(),
|
||||
ensure_site(),
|
||||
S = #s{},
|
||||
{ok, S}.
|
||||
|
||||
init_ra() ->
|
||||
DataDir = filename:join([emqx:data_dir(), "dsrepl"]),
|
||||
Config = maps:merge(ra_system:default_config(), #{
|
||||
data_dir => DataDir,
|
||||
wal_data_dir => DataDir
|
||||
}),
|
||||
case ra_system:start(Config) of
|
||||
{ok, _System} ->
|
||||
ok;
|
||||
{error, {already_started, _System}} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
handle_call(_Call, _From, S) ->
|
||||
{reply, {error, unknown_call}, S}.
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
-module(emqx_ds_replication_layer_shard).
|
||||
|
||||
-export([start_link/2]).
|
||||
-export([start_link/3]).
|
||||
-export([shard_servers/2]).
|
||||
|
||||
-export([
|
||||
|
@ -45,8 +45,8 @@
|
|||
|
||||
%%
|
||||
|
||||
start_link(DB, Shard) ->
|
||||
gen_server:start_link(?MODULE, {DB, Shard}, []).
|
||||
start_link(DB, Shard, Opts) ->
|
||||
gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
|
||||
|
||||
shard_servers(DB, Shard) ->
|
||||
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
|
||||
|
@ -125,9 +125,9 @@ get_shard_servers(DB, Shard) ->
|
|||
|
||||
%%
|
||||
|
||||
init({DB, Shard}) ->
|
||||
init({DB, Shard, Opts}) ->
|
||||
_ = process_flag(trap_exit, true),
|
||||
_Meta = start_shard(DB, Shard),
|
||||
_Meta = start_shard(DB, Shard, Opts),
|
||||
{ok, {DB, Shard}}.
|
||||
|
||||
handle_call(_Call, _From, State) ->
|
||||
|
@ -138,28 +138,33 @@ handle_cast(_Msg, State) ->
|
|||
|
||||
terminate(_Reason, {DB, Shard}) ->
|
||||
LocalServer = get_local_server(DB, Shard),
|
||||
ok = ra:stop_server(LocalServer).
|
||||
ok = ra:stop_server(DB, LocalServer).
|
||||
|
||||
%%
|
||||
|
||||
start_shard(DB, Shard) ->
|
||||
System = default,
|
||||
start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
||||
Site = emqx_ds_replication_layer_meta:this_site(),
|
||||
ClusterName = cluster_name(DB, Shard),
|
||||
LocalServer = local_server(DB, Shard),
|
||||
Servers = shard_servers(DB, Shard),
|
||||
case ra:restart_server(System, LocalServer) of
|
||||
case ra:restart_server(DB, LocalServer) of
|
||||
ok ->
|
||||
Bootstrap = false;
|
||||
{error, name_not_registered} ->
|
||||
Bootstrap = true,
|
||||
ok = ra:start_server(System, #{
|
||||
ok = ra:start_server(DB, #{
|
||||
id => LocalServer,
|
||||
uid => <<ClusterName/binary, "_", Site/binary>>,
|
||||
cluster_name => ClusterName,
|
||||
initial_members => Servers,
|
||||
machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
||||
log_init_args => #{}
|
||||
log_init_args => maps:with(
|
||||
[
|
||||
snapshot_interval,
|
||||
resend_window
|
||||
],
|
||||
ReplicationOpts
|
||||
)
|
||||
})
|
||||
end,
|
||||
case Servers of
|
||||
|
|
Loading…
Reference in New Issue