feat(dsrepl): add APIs to manage DB replication sites
This commit is contained in:
parent
879709e686
commit
ad52f7838e
|
@ -476,7 +476,7 @@ t_replication_options(_Config) ->
|
|||
resend_window := 60
|
||||
}
|
||||
},
|
||||
emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB)
|
||||
emqx_ds_replication_layer_meta:db_config(?PERSISTENT_MESSAGE_DB)
|
||||
),
|
||||
?assertMatch(
|
||||
#{
|
||||
|
|
|
@ -116,7 +116,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
|
|||
Children = [
|
||||
sup_spec(#?shards_sup{db = DB}, []),
|
||||
sup_spec(#?egress_sup{db = DB}, []),
|
||||
shard_allocator_spec(DB, Opts)
|
||||
shard_allocator_spec(DB)
|
||||
],
|
||||
SupFlags = #{
|
||||
strategy => one_for_all,
|
||||
|
@ -148,7 +148,7 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
|
|||
intensity => 10,
|
||||
period => 100
|
||||
},
|
||||
Opts = emqx_ds_replication_layer_meta:get_options(DB),
|
||||
Opts = emqx_ds_replication_layer_meta:db_config(DB),
|
||||
Children = [
|
||||
shard_storage_spec(DB, Shard, Opts),
|
||||
shard_replication_spec(DB, Shard, Opts)
|
||||
|
@ -228,10 +228,10 @@ shard_replication_spec(DB, Shard, Opts) ->
|
|||
type => worker
|
||||
}.
|
||||
|
||||
shard_allocator_spec(DB, Opts) ->
|
||||
shard_allocator_spec(DB) ->
|
||||
#{
|
||||
id => shard_allocator,
|
||||
start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]},
|
||||
start => {emqx_ds_replication_shard_allocator, start_link, [DB]},
|
||||
restart => permanent,
|
||||
type => worker
|
||||
}.
|
||||
|
|
|
@ -189,8 +189,7 @@ add_generation(DB) ->
|
|||
|
||||
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
||||
update_db_config(DB, CreateOpts) ->
|
||||
ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
|
||||
Opts = emqx_ds_replication_layer_meta:get_options(DB),
|
||||
Opts = #{} = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
|
||||
foreach_shard(
|
||||
DB,
|
||||
fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
|
||||
|
|
|
@ -29,25 +29,39 @@
|
|||
-export([
|
||||
shards/1,
|
||||
my_shards/1,
|
||||
allocate_shards/2,
|
||||
replica_set/2,
|
||||
allocate_shards/1,
|
||||
sites/0,
|
||||
node/1,
|
||||
open_db/2,
|
||||
get_options/1,
|
||||
update_db_config/2,
|
||||
drop_db/1,
|
||||
this_site/0,
|
||||
print_status/0
|
||||
]).
|
||||
|
||||
%% DB API:
|
||||
-export([
|
||||
open_db/2,
|
||||
db_config/1,
|
||||
update_db_config/2,
|
||||
drop_db/1
|
||||
]).
|
||||
|
||||
%% Site / shard allocation:
|
||||
-export([
|
||||
assign_db_sites/2,
|
||||
replica_set_transitions/2,
|
||||
update_replica_set/3,
|
||||
replica_set/2,
|
||||
target_set/2
|
||||
]).
|
||||
|
||||
%% gen_server
|
||||
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||
|
||||
%% internal exports:
|
||||
-export([
|
||||
open_db_trans/2,
|
||||
allocate_shards_trans/2,
|
||||
allocate_shards_trans/1,
|
||||
assign_db_sites_trans/2,
|
||||
update_replica_set_trans/3,
|
||||
update_db_config_trans/2,
|
||||
drop_db_trans/1,
|
||||
claim_site/2,
|
||||
|
@ -86,15 +100,20 @@
|
|||
|
||||
-record(?SHARD_TAB, {
|
||||
shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
|
||||
%% Sites that currently contain the data:
|
||||
replica_set :: [site()],
|
||||
%% Sites that should contain the data when the cluster is in the
|
||||
%% stable state (no nodes are being added or removed from it):
|
||||
replica_set :: [site()],
|
||||
target_set :: [site()] | undefined,
|
||||
misc = #{} :: map()
|
||||
}).
|
||||
|
||||
%% Persistent ID of the node (independent from the IP/FQDN):
|
||||
-type site() :: binary().
|
||||
|
||||
%% Membership transition of shard's replica set:
|
||||
-type transition() :: {add | del, site()}.
|
||||
|
||||
%% Peristent term key:
|
||||
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
|
||||
|
||||
|
@ -156,17 +175,17 @@ start_link() ->
|
|||
|
||||
-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
|
||||
shards(DB) ->
|
||||
filter_shards(DB).
|
||||
Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
|
||||
[Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
|
||||
|
||||
-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
|
||||
my_shards(DB) ->
|
||||
Site = this_site(),
|
||||
filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) ->
|
||||
lists:member(Site, ReplicaSet)
|
||||
end).
|
||||
Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
|
||||
[Shard || #?SHARD_TAB{shard = {_, Shard}, replica_set = RS} <- Recs, lists:member(Site, RS)].
|
||||
|
||||
allocate_shards(DB, Opts) ->
|
||||
case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of
|
||||
allocate_shards(DB) ->
|
||||
case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of
|
||||
{atomic, Shards} ->
|
||||
{ok, Shards};
|
||||
{aborted, {shards_already_allocated, Shards}} ->
|
||||
|
@ -175,16 +194,6 @@ allocate_shards(DB, Opts) ->
|
|||
{error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
|
||||
end.
|
||||
|
||||
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||
{ok, [site()]} | {error, _}.
|
||||
replica_set(DB, Shard) ->
|
||||
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
|
||||
[#?SHARD_TAB{replica_set = ReplicaSet}] ->
|
||||
{ok, ReplicaSet};
|
||||
[] ->
|
||||
{error, no_shard}
|
||||
end.
|
||||
|
||||
-spec sites() -> [site()].
|
||||
sites() ->
|
||||
eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
|
||||
|
@ -198,8 +207,12 @@ node(Site) ->
|
|||
undefined
|
||||
end.
|
||||
|
||||
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
|
||||
get_options(DB) ->
|
||||
%%===============================================================================
|
||||
%% DB API
|
||||
%%===============================================================================
|
||||
|
||||
-spec db_config(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
|
||||
db_config(DB) ->
|
||||
case mnesia:dirty_read(?META_TAB, DB) of
|
||||
[#?META_TAB{db_props = Opts}] ->
|
||||
Opts;
|
||||
|
@ -210,21 +223,64 @@ get_options(DB) ->
|
|||
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||
emqx_ds_replication_layer:builtin_db_opts().
|
||||
open_db(DB, DefaultOpts) ->
|
||||
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
|
||||
Opts.
|
||||
transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]).
|
||||
|
||||
-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||
ok | {error, _}.
|
||||
emqx_ds_replication_layer:builtin_db_opts() | {error, _}.
|
||||
update_db_config(DB, DefaultOpts) ->
|
||||
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:update_db_config_trans/2, [
|
||||
DB, DefaultOpts
|
||||
]),
|
||||
Opts.
|
||||
transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]).
|
||||
|
||||
-spec drop_db(emqx_ds:db()) -> ok.
|
||||
drop_db(DB) ->
|
||||
_ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]),
|
||||
ok.
|
||||
transaction(fun ?MODULE:drop_db_trans/1, [DB]).
|
||||
|
||||
-spec assign_db_sites(emqx_ds:db(), [site()]) -> ok.
|
||||
assign_db_sites(DB, Sites) ->
|
||||
case mria:transaction(?SHARD, fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||
[transition()] | undefined.
|
||||
replica_set_transitions(DB, Shard) ->
|
||||
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
|
||||
[#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] ->
|
||||
compute_transitions(TargetSet, ReplicaSet);
|
||||
[] ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
|
||||
update_replica_set(DB, Shard, Trans) ->
|
||||
case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||
[site()] | undefined.
|
||||
replica_set(DB, Shard) ->
|
||||
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
|
||||
[#?SHARD_TAB{replica_set = ReplicaSet}] ->
|
||||
ReplicaSet;
|
||||
[] ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
-spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||
[site()] | undefined.
|
||||
target_set(DB, Shard) ->
|
||||
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
|
||||
[#?SHARD_TAB{target_set = TargetSet}] ->
|
||||
TargetSet;
|
||||
[] ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
%% behavior callbacks
|
||||
|
@ -268,19 +324,15 @@ open_db_trans(DB, CreateOpts) ->
|
|||
Opts
|
||||
end.
|
||||
|
||||
-spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard].
|
||||
allocate_shards_trans(DB, Opts) ->
|
||||
NShards = maps:get(n_shards, Opts),
|
||||
NSites = maps:get(n_sites, Opts),
|
||||
ReplicationFactor = maps:get(replication_factor, Opts),
|
||||
NReplicas = min(NSites, ReplicationFactor),
|
||||
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
|
||||
AllSites = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
|
||||
case length(AllSites) of
|
||||
-spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
|
||||
allocate_shards_trans(DB) ->
|
||||
Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB),
|
||||
Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
|
||||
case length(Nodes) of
|
||||
N when N >= NSites ->
|
||||
ok;
|
||||
_ ->
|
||||
mnesia:abort({insufficient_sites_online, NSites, AllSites})
|
||||
mnesia:abort({insufficient_sites_online, NSites, Nodes})
|
||||
end,
|
||||
case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
|
||||
[] ->
|
||||
|
@ -289,18 +341,11 @@ allocate_shards_trans(DB, Opts) ->
|
|||
ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
|
||||
mnesia:abort({shards_already_allocated, ShardsAllocated})
|
||||
end,
|
||||
{Allocation, _} = lists:mapfoldl(
|
||||
fun(Shard, SSites) ->
|
||||
{Sites, _} = emqx_utils_stream:consume(NReplicas, SSites),
|
||||
{_, SRest} = emqx_utils_stream:consume(1, SSites),
|
||||
{{Shard, Sites}, SRest}
|
||||
end,
|
||||
emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)),
|
||||
Shards
|
||||
),
|
||||
Shards = gen_shards(NShards),
|
||||
Sites = [S || #?NODE_TAB{site = S} <- Nodes],
|
||||
Allocation = compute_allocation(Shards, Sites, Opts),
|
||||
lists:map(
|
||||
fun({Shard, Sites}) ->
|
||||
ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites],
|
||||
fun({Shard, ReplicaSet}) ->
|
||||
Record = #?SHARD_TAB{
|
||||
shard = {DB, Shard},
|
||||
replica_set = ReplicaSet
|
||||
|
@ -311,31 +356,71 @@ allocate_shards_trans(DB, Opts) ->
|
|||
Allocation
|
||||
).
|
||||
|
||||
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok.
|
||||
assign_db_sites_trans(DB, Sites) ->
|
||||
Opts = db_config_trans(DB),
|
||||
case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
|
||||
[] ->
|
||||
ok;
|
||||
NonexistentSites ->
|
||||
mnesia:abort({nonexistent_sites, NonexistentSites})
|
||||
end,
|
||||
%% TODO
|
||||
%% Optimize reallocation. The goals are:
|
||||
%% 1. Minimize the number of membership transitions.
|
||||
%% 2. Ensure that sites are responsible for roughly the same number of shards.
|
||||
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
|
||||
Reallocation = compute_allocation(Shards, Sites, Opts),
|
||||
lists:foreach(
|
||||
fun({Record, ReplicaSet}) ->
|
||||
ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
|
||||
end,
|
||||
Reallocation
|
||||
).
|
||||
|
||||
update_replica_set_trans(DB, Shard, Trans) ->
|
||||
case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
|
||||
[Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
|
||||
ReplicaSet = apply_transition(Trans, ReplicaSet0),
|
||||
case lists:usort(TargetSet0) of
|
||||
ReplicaSet ->
|
||||
TargetSet = undefined;
|
||||
TS ->
|
||||
TargetSet = TS
|
||||
end,
|
||||
mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet});
|
||||
[] ->
|
||||
mnesia:abort({nonexistent_shard, {DB, Shard}})
|
||||
end.
|
||||
|
||||
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||
ok | {error, database}.
|
||||
update_db_config_trans(DB, CreateOpts) ->
|
||||
ok | {error, _}.
|
||||
update_db_config_trans(DB, UpdateOpts) ->
|
||||
case mnesia:wread({?META_TAB, DB}) of
|
||||
[#?META_TAB{db_props = Opts}] ->
|
||||
%% Since this is an update and not a reopen,
|
||||
%% we should keep the shard number and replication factor
|
||||
%% and not create a new shard server
|
||||
#{
|
||||
n_shards := NShards,
|
||||
replication_factor := ReplicationFactor
|
||||
} = Opts,
|
||||
|
||||
ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts),
|
||||
EffectiveOpts = maps:merge(Opts, ChangeableOpts),
|
||||
mnesia:write(#?META_TAB{
|
||||
db = DB,
|
||||
db_props = CreateOpts#{
|
||||
n_shards := NShards,
|
||||
replication_factor := ReplicationFactor
|
||||
}
|
||||
db_props = EffectiveOpts
|
||||
}),
|
||||
ok;
|
||||
EffectiveOpts;
|
||||
[] ->
|
||||
{error, no_database}
|
||||
end.
|
||||
|
||||
-spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
|
||||
db_config_trans(DB) ->
|
||||
case mnesia:read(?META_TAB, DB, read) of
|
||||
[#?META_TAB{db_props = Config}] ->
|
||||
Config;
|
||||
[] ->
|
||||
mnesia:abort({nonexistent_db, DB})
|
||||
end.
|
||||
|
||||
-spec drop_db_trans(emqx_ds:db()) -> ok.
|
||||
drop_db_trans(DB) ->
|
||||
mnesia:delete({?META_TAB, DB}),
|
||||
|
@ -391,6 +476,38 @@ ensure_site() ->
|
|||
persistent_term:put(?emqx_ds_builtin_site, Site),
|
||||
ok.
|
||||
|
||||
compute_allocation(Shards, Sites, Opts) ->
|
||||
NSites = length(Sites),
|
||||
ReplicationFactor = maps:get(replication_factor, Opts),
|
||||
NReplicas = min(NSites, ReplicationFactor),
|
||||
ShardsSorted = lists:sort(Shards),
|
||||
SitesSorted = lists:sort(Sites),
|
||||
{Allocation, _} = lists:mapfoldl(
|
||||
fun(Shard, SSites) ->
|
||||
{ReplicaSet, _} = emqx_utils_stream:consume(NReplicas, SSites),
|
||||
{_, SRest} = emqx_utils_stream:consume(1, SSites),
|
||||
{{Shard, ReplicaSet}, SRest}
|
||||
end,
|
||||
emqx_utils_stream:repeat(emqx_utils_stream:list(SitesSorted)),
|
||||
ShardsSorted
|
||||
),
|
||||
Allocation.
|
||||
|
||||
compute_transitions(undefined, _ReplicaSet) ->
|
||||
[];
|
||||
compute_transitions(TargetSet, ReplicaSet) ->
|
||||
Additions = TargetSet -- ReplicaSet,
|
||||
Deletions = ReplicaSet -- TargetSet,
|
||||
intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).
|
||||
|
||||
apply_transition({add, S}, Sites) ->
|
||||
lists:usort([S | Sites]);
|
||||
apply_transition({del, S}, Sites) ->
|
||||
lists:delete(S, Sites).
|
||||
|
||||
gen_shards(NShards) ->
|
||||
[integer_to_binary(I) || I <- lists:seq(0, NShards - 1)].
|
||||
|
||||
eval_qlc(Q) ->
|
||||
case mnesia:is_transaction() of
|
||||
true ->
|
||||
|
@ -400,29 +517,16 @@ eval_qlc(Q) ->
|
|||
Result
|
||||
end.
|
||||
|
||||
filter_shards(DB) ->
|
||||
filter_shards(DB, const(true)).
|
||||
transaction(Fun, Args) ->
|
||||
{atomic, Result} = mria:transaction(?SHARD, Fun, Args),
|
||||
Result.
|
||||
|
||||
-spec filter_shards(emqx_ds:db(), fun((_) -> boolean())) ->
|
||||
[emqx_ds_replication_layer:shard_id()].
|
||||
filter_shards(DB, Predicte) ->
|
||||
filter_shards(DB, Predicte, fun(#?SHARD_TAB{shard = {_, ShardId}}) ->
|
||||
ShardId
|
||||
end).
|
||||
|
||||
filter_shards(DB, Predicate, Mapper) ->
|
||||
eval_qlc(
|
||||
qlc:q([
|
||||
Mapper(Shard)
|
||||
|| #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table(
|
||||
?SHARD_TAB
|
||||
),
|
||||
D =:= DB,
|
||||
Predicate(Shard)
|
||||
])
|
||||
).
|
||||
|
||||
const(Result) ->
|
||||
fun(_) ->
|
||||
Result
|
||||
end.
|
||||
%% @doc Intersperse elements of two lists.
|
||||
%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
|
||||
-spec intersperse([X], [Y]) -> [X | Y].
|
||||
intersperse(L1, []) ->
|
||||
L1;
|
||||
intersperse([], L2) ->
|
||||
L2;
|
||||
intersperse([H1 | T1], L2) ->
|
||||
[H1 | intersperse(L2, T1)].
|
||||
|
|
|
@ -44,7 +44,7 @@ start_link(DB, Shard, Opts) ->
|
|||
gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
|
||||
|
||||
shard_servers(DB, Shard) ->
|
||||
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
|
||||
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
|
||||
[
|
||||
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
|
||||
|| Site <- ReplicaSet
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
-module(emqx_ds_replication_shard_allocator).
|
||||
|
||||
-export([start_link/2]).
|
||||
-export([start_link/1]).
|
||||
|
||||
-export([n_shards/1]).
|
||||
-export([shard_meta/2]).
|
||||
|
@ -35,8 +35,8 @@
|
|||
|
||||
%%
|
||||
|
||||
start_link(DB, Opts) ->
|
||||
gen_server:start_link(?MODULE, {DB, Opts}, []).
|
||||
start_link(DB) ->
|
||||
gen_server:start_link(?MODULE, DB, []).
|
||||
|
||||
n_shards(DB) ->
|
||||
Meta = persistent_term:get(?db_meta(DB)),
|
||||
|
@ -49,22 +49,11 @@ shard_meta(DB, Shard) ->
|
|||
|
||||
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
|
||||
|
||||
init({DB, Opts}) ->
|
||||
init(DB) ->
|
||||
_ = erlang:process_flag(trap_exit, true),
|
||||
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
|
||||
State = #{db => DB, opts => Opts, status => allocating},
|
||||
case allocate_shards(State) of
|
||||
{ok, NState} ->
|
||||
{ok, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{ok, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end.
|
||||
State = #{db => DB, status => allocating},
|
||||
handle_allocate_shards(State, ok).
|
||||
|
||||
handle_call(_Call, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
@ -73,18 +62,7 @@ handle_cast(_Cast, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
case allocate_shards(State) of
|
||||
{ok, NState} ->
|
||||
{noreply, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end;
|
||||
handle_allocate_shards(State, noreply);
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
@ -96,8 +74,24 @@ terminate(_Reason, #{}) ->
|
|||
|
||||
%%
|
||||
|
||||
allocate_shards(State = #{db := DB, opts := Opts}) ->
|
||||
case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
|
||||
handle_allocate_shards(State, Ret) ->
|
||||
case allocate_shards(State) of
|
||||
{ok, NState} ->
|
||||
{Ret, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{Ret, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end.
|
||||
|
||||
%%
|
||||
|
||||
allocate_shards(State = #{db := DB}) ->
|
||||
case emqx_ds_replication_layer_meta:allocate_shards(DB) of
|
||||
{ok, Shards} ->
|
||||
logger:notice(#{msg => "Shards allocated", shards => Shards}),
|
||||
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
|
||||
|
|
|
@ -53,7 +53,7 @@ t_00_smoke_open_drop(_Config) ->
|
|||
lists:foreach(
|
||||
fun(Shard) ->
|
||||
?assertEqual(
|
||||
{ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
|
||||
[Site], emqx_ds_replication_layer_meta:replica_set(DB, Shard)
|
||||
)
|
||||
end,
|
||||
Shards
|
||||
|
|
Loading…
Reference in New Issue