feat(dsrepl): move shard allocation to separate process
That starts shard and egress processes only when shards are fully allocated.
This commit is contained in:
parent
4dafbf21f6
commit
3b59cf2ebf
|
@ -22,13 +22,18 @@
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
|
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
|
||||||
|
-export([status/1]).
|
||||||
|
|
||||||
%% behaviour callbacks:
|
%% behaviour callbacks:
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
-export([handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([start_link_sup/2]).
|
-export([start_link_sup/2]).
|
||||||
|
|
||||||
|
%% FIXME
|
||||||
|
-export([lookup_shard_meta/2]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -43,6 +48,8 @@
|
||||||
-record(?shard_sup, {db}).
|
-record(?shard_sup, {db}).
|
||||||
-record(?egress_sup, {db}).
|
-record(?egress_sup, {db}).
|
||||||
|
|
||||||
|
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -79,6 +86,13 @@ ensure_shard(Shard) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
status(DB) ->
|
||||||
|
State = sys:get_state(?via({allocator, DB})),
|
||||||
|
maps:get(status, State).
|
||||||
|
|
||||||
|
lookup_shard_meta(DB, Shard) ->
|
||||||
|
persistent_term:get(?shard_meta(DB, Shard)).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behaviour callbacks
|
%% behaviour callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -86,40 +100,59 @@ ensure_shard(Shard) ->
|
||||||
init({#?db_sup{db = DB}, DefaultOpts}) ->
|
init({#?db_sup{db = DB}, DefaultOpts}) ->
|
||||||
%% Spec for the top-level supervisor for the database:
|
%% Spec for the top-level supervisor for the database:
|
||||||
logger:notice("Starting DS DB ~p", [DB]),
|
logger:notice("Starting DS DB ~p", [DB]),
|
||||||
_ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
|
Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
|
||||||
Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
|
Children = [
|
||||||
|
sup_spec(#?shard_sup{db = DB}, []),
|
||||||
|
sup_spec(#?egress_sup{db = DB}, []),
|
||||||
|
shard_allocator_spec(DB, Opts)
|
||||||
|
],
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_all,
|
strategy => one_for_all,
|
||||||
intensity => 0,
|
intensity => 0,
|
||||||
period => 1
|
period => 1
|
||||||
},
|
},
|
||||||
{ok, {SupFlags, Children}};
|
{ok, {SupFlags, Children}};
|
||||||
init({#?shard_sup{db = DB}, _}) ->
|
init({#?shard_sup{db = _DB}, _}) ->
|
||||||
%% Spec for the supervisor that manages the worker processes for
|
%% Spec for the supervisor that manages the worker processes for
|
||||||
%% each local shard of the DB:
|
%% each local shard of the DB:
|
||||||
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
|
||||||
Children = [
|
|
||||||
Child
|
|
||||||
|| Shard <- MyShards,
|
|
||||||
Child <- [shard_spec(DB, Shard), shard_replication_spec(DB, Shard)]
|
|
||||||
],
|
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_one,
|
strategy => one_for_one,
|
||||||
intensity => 10,
|
intensity => 10,
|
||||||
period => 1
|
period => 1
|
||||||
},
|
},
|
||||||
{ok, {SupFlags, Children}};
|
{ok, {SupFlags, []}};
|
||||||
init({#?egress_sup{db = DB}, _}) ->
|
init({#?egress_sup{db = _DB}, _}) ->
|
||||||
%% Spec for the supervisor that manages the egress proxy processes
|
%% Spec for the supervisor that manages the egress proxy processes
|
||||||
%% managing traffic towards each of the shards of the DB:
|
%% managing traffic towards each of the shards of the DB:
|
||||||
Shards = emqx_ds_replication_layer_meta:shards(DB),
|
|
||||||
Children = [egress_spec(DB, Shard) || Shard <- Shards],
|
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_one,
|
strategy => one_for_one,
|
||||||
intensity => 0,
|
intensity => 0,
|
||||||
period => 1
|
period => 1
|
||||||
},
|
},
|
||||||
{ok, {SupFlags, Children}}.
|
{ok, {SupFlags, []}};
|
||||||
|
init({allocator, DB, Opts}) ->
|
||||||
|
_ = erlang:process_flag(trap_exit, true),
|
||||||
|
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
|
||||||
|
init_allocator(DB, Opts).
|
||||||
|
|
||||||
|
start_shards(DB, Shards) ->
|
||||||
|
SupRef = ?via(#?shard_sup{db = DB}),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Shard) ->
|
||||||
|
{ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard)),
|
||||||
|
{ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard))
|
||||||
|
end,
|
||||||
|
Shards
|
||||||
|
).
|
||||||
|
|
||||||
|
start_egresses(DB, Shards) ->
|
||||||
|
SupRef = ?via(#?egress_sup{db = DB}),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Shard) ->
|
||||||
|
{ok, _} = supervisor:start_child(SupRef, egress_spec(DB, Shard))
|
||||||
|
end,
|
||||||
|
Shards
|
||||||
|
).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
|
@ -143,7 +176,7 @@ sup_spec(Id, Options) ->
|
||||||
shard_spec(DB, Shard) ->
|
shard_spec(DB, Shard) ->
|
||||||
Options = emqx_ds_replication_layer_meta:get_options(DB),
|
Options = emqx_ds_replication_layer_meta:get_options(DB),
|
||||||
#{
|
#{
|
||||||
id => Shard,
|
id => {Shard, storage},
|
||||||
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
|
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
|
||||||
shutdown => 5_000,
|
shutdown => 5_000,
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
|
@ -158,6 +191,15 @@ shard_replication_spec(DB, Shard) ->
|
||||||
type => worker
|
type => worker
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
shard_allocator_spec(DB, Opts) ->
|
||||||
|
#{
|
||||||
|
id => shard_allocator,
|
||||||
|
start =>
|
||||||
|
{gen_server, start_link, [?via({allocator, DB}), ?MODULE, {allocator, DB, Opts}, []]},
|
||||||
|
restart => permanent,
|
||||||
|
type => worker
|
||||||
|
}.
|
||||||
|
|
||||||
egress_spec(DB, Shard) ->
|
egress_spec(DB, Shard) ->
|
||||||
#{
|
#{
|
||||||
id => Shard,
|
id => Shard,
|
||||||
|
@ -166,3 +208,78 @@ egress_spec(DB, Shard) ->
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => worker
|
type => worker
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% Allocator
|
||||||
|
|
||||||
|
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
|
||||||
|
|
||||||
|
init_allocator(DB, Opts) ->
|
||||||
|
State = #{db => DB, opts => Opts, status => allocating},
|
||||||
|
case allocate_shards(State) of
|
||||||
|
NState = #{} ->
|
||||||
|
{ok, NState};
|
||||||
|
{error, Data} ->
|
||||||
|
_ = logger:notice(
|
||||||
|
Data#{
|
||||||
|
msg => "Shard allocation still in progress",
|
||||||
|
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||||
|
}
|
||||||
|
),
|
||||||
|
{ok, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||||
|
end.
|
||||||
|
|
||||||
|
handle_call(_Call, _From, State) ->
|
||||||
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast(_Cast, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(timeout, State) ->
|
||||||
|
case allocate_shards(State) of
|
||||||
|
NState = #{} ->
|
||||||
|
{noreply, NState};
|
||||||
|
{error, Data} ->
|
||||||
|
_ = logger:notice(
|
||||||
|
Data#{
|
||||||
|
msg => "Shard allocation still in progress",
|
||||||
|
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||||
|
}
|
||||||
|
),
|
||||||
|
{noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||||
|
end.
|
||||||
|
|
||||||
|
terminate(_Reason, #{db := DB, shards := Shards}) ->
|
||||||
|
erase_shards_meta(DB, Shards).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
allocate_shards(State = #{db := DB, opts := Opts}) ->
|
||||||
|
case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
|
||||||
|
{ok, Shards} ->
|
||||||
|
logger:notice(#{msg => "Shards allocated", shards => Shards}),
|
||||||
|
ok = save_shards_meta(DB, Shards),
|
||||||
|
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
|
||||||
|
logger:notice(#{
|
||||||
|
msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB)
|
||||||
|
}),
|
||||||
|
ok = start_egresses(DB, Shards),
|
||||||
|
logger:notice(#{msg => "Egresses started", shards => Shards}),
|
||||||
|
State#{shards => Shards, status := ready};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
save_shards_meta(DB, Shards) ->
|
||||||
|
lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards).
|
||||||
|
|
||||||
|
save_shard_meta(DB, Shard) ->
|
||||||
|
Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard),
|
||||||
|
persistent_term:put(?shard_meta(DB, Shard), #{
|
||||||
|
servers => Servers
|
||||||
|
}).
|
||||||
|
|
||||||
|
erase_shards_meta(DB, Shards) ->
|
||||||
|
lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards).
|
||||||
|
|
||||||
|
erase_shard_meta(DB, Shard) ->
|
||||||
|
persistent_term:erase(?shard_meta(DB, Shard)).
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
-export([
|
-export([
|
||||||
shards/1,
|
shards/1,
|
||||||
my_shards/1,
|
my_shards/1,
|
||||||
shard_meta/2,
|
allocate_shards/2,
|
||||||
replica_set/2,
|
replica_set/2,
|
||||||
sites/0,
|
sites/0,
|
||||||
node/1,
|
node/1,
|
||||||
|
@ -47,6 +47,7 @@
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([
|
-export([
|
||||||
open_db_trans/2,
|
open_db_trans/2,
|
||||||
|
allocate_shards_trans/2,
|
||||||
update_db_config_trans/2,
|
update_db_config_trans/2,
|
||||||
drop_db_trans/1,
|
drop_db_trans/1,
|
||||||
claim_site/2,
|
claim_site/2,
|
||||||
|
@ -97,8 +98,6 @@
|
||||||
%% Peristent term key:
|
%% Peristent term key:
|
||||||
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
|
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
|
||||||
|
|
||||||
-define(DB_META(DB), {?MODULE, DB}).
|
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -155,10 +154,14 @@ my_shards(DB) ->
|
||||||
lists:member(Site, ReplicaSet)
|
lists:member(Site, ReplicaSet)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
shard_meta(DB, Shard) ->
|
allocate_shards(DB, Opts) ->
|
||||||
case get_db_meta(DB) of
|
case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of
|
||||||
#{Shard := Meta} -> Meta;
|
{atomic, Shards} ->
|
||||||
#{} -> undefined
|
{ok, Shards};
|
||||||
|
{aborted, {shards_already_allocated, Shards}} ->
|
||||||
|
{ok, Shards};
|
||||||
|
{aborted, {insufficient_sites_online, Needed, Sites}} ->
|
||||||
|
{error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
||||||
|
@ -186,24 +189,18 @@ node(Site) ->
|
||||||
|
|
||||||
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
|
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
|
||||||
get_options(DB) ->
|
get_options(DB) ->
|
||||||
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
|
case mnesia:dirty_read(?META_TAB, DB) of
|
||||||
Opts.
|
[#?META_TAB{db_props = Opts}] ->
|
||||||
|
Opts;
|
||||||
|
[] ->
|
||||||
|
#{}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
emqx_ds_replication_layer:builtin_db_opts().
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
open_db(DB, DefaultOpts) ->
|
open_db(DB, DefaultOpts) ->
|
||||||
case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of
|
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
|
||||||
{atomic, Opts} ->
|
Opts.
|
||||||
Opts;
|
|
||||||
{aborted, {insufficient_sites_online, NNeeded, Sites}} ->
|
|
||||||
%% TODO: Still ugly, it blocks the whole node startup.
|
|
||||||
logger:notice(
|
|
||||||
"Shard allocation still in progress, not enough sites: ~p, need: ~p",
|
|
||||||
[Sites, NNeeded]
|
|
||||||
),
|
|
||||||
ok = timer:sleep(1000),
|
|
||||||
open_db(DB, DefaultOpts)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
ok | {error, _}.
|
ok | {error, _}.
|
||||||
|
@ -263,22 +260,53 @@ terminate(_Reason, #s{}) ->
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) ->
|
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
emqx_ds_replication_layer:builtin_db_opts().
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
open_db_trans(DB, CreateOpts) ->
|
open_db_trans(DB, CreateOpts) ->
|
||||||
case mnesia:wread({?META_TAB, DB}) of
|
case mnesia:wread({?META_TAB, DB}) of
|
||||||
[] when is_map(CreateOpts) ->
|
[] ->
|
||||||
NShards = maps:get(n_shards, CreateOpts),
|
|
||||||
NSites = maps:get(n_sites, CreateOpts),
|
|
||||||
ReplicationFactor = maps:get(replication_factor, CreateOpts),
|
|
||||||
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
|
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
|
||||||
create_shards(DB, NSites, NShards, ReplicationFactor),
|
|
||||||
save_db_meta(DB),
|
|
||||||
CreateOpts;
|
CreateOpts;
|
||||||
[#?META_TAB{db_props = Opts}] ->
|
[#?META_TAB{db_props = Opts}] ->
|
||||||
Opts
|
Opts
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard].
|
||||||
|
allocate_shards_trans(DB, Opts) ->
|
||||||
|
NShards = maps:get(n_shards, Opts),
|
||||||
|
NSites = maps:get(n_sites, Opts),
|
||||||
|
ReplicationFactor = maps:get(replication_factor, Opts),
|
||||||
|
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
|
||||||
|
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
|
||||||
|
case length(AllSites) of
|
||||||
|
N when N >= NSites ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
mnesia:abort({insufficient_sites_online, NSites, AllSites})
|
||||||
|
end,
|
||||||
|
case mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{shard = {DB, '_'}, _ = '_'}, write) of
|
||||||
|
[] ->
|
||||||
|
ok;
|
||||||
|
Records ->
|
||||||
|
ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
|
||||||
|
mnesia:abort({shards_already_allocated, ShardsAllocated})
|
||||||
|
end,
|
||||||
|
lists:map(
|
||||||
|
fun(Shard) ->
|
||||||
|
Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites],
|
||||||
|
Hashes = lists:sort(Hashes0),
|
||||||
|
{_, Sites} = lists:unzip(Hashes),
|
||||||
|
ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
|
||||||
|
Record = #?SHARD_TAB{
|
||||||
|
shard = {DB, Shard},
|
||||||
|
replica_set = ReplicaSet
|
||||||
|
},
|
||||||
|
ok = mnesia:write(Record),
|
||||||
|
Shard
|
||||||
|
end,
|
||||||
|
Shards
|
||||||
|
).
|
||||||
|
|
||||||
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
ok | {error, database}.
|
ok | {error, database}.
|
||||||
update_db_config_trans(DB, CreateOpts) ->
|
update_db_config_trans(DB, CreateOpts) ->
|
||||||
|
@ -364,53 +392,6 @@ ensure_site() ->
|
||||||
persistent_term:put(?emqx_ds_builtin_site, Site),
|
persistent_term:put(?emqx_ds_builtin_site, Site),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer(), pos_integer()) -> ok.
|
|
||||||
create_shards(DB, NSites, NShards, ReplicationFactor) ->
|
|
||||||
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
|
|
||||||
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
|
|
||||||
case length(AllSites) of
|
|
||||||
N when N >= NSites ->
|
|
||||||
ok;
|
|
||||||
_ ->
|
|
||||||
mnesia:abort({insufficient_sites_online, NSites, AllSites})
|
|
||||||
end,
|
|
||||||
lists:foreach(
|
|
||||||
fun(Shard) ->
|
|
||||||
Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites],
|
|
||||||
Hashes = lists:sort(Hashes0),
|
|
||||||
{_, Sites} = lists:unzip(Hashes),
|
|
||||||
ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
|
|
||||||
Record = #?SHARD_TAB{
|
|
||||||
shard = {DB, Shard},
|
|
||||||
replica_set = ReplicaSet
|
|
||||||
},
|
|
||||||
mnesia:write(Record)
|
|
||||||
end,
|
|
||||||
Shards
|
|
||||||
).
|
|
||||||
|
|
||||||
save_db_meta(DB) ->
|
|
||||||
Shards = mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{_ = '_'}, read),
|
|
||||||
Meta = maps:from_list([mk_shard_meta(Shard) || Shard <- Shards]),
|
|
||||||
persistent_term:put(?DB_META(DB), Meta).
|
|
||||||
|
|
||||||
get_db_meta(DB) ->
|
|
||||||
persistent_term:get(?DB_META(DB)).
|
|
||||||
|
|
||||||
% erase_db_meta(DB) ->
|
|
||||||
% persistent_term:erase(?DB_META(DB)).
|
|
||||||
|
|
||||||
mk_shard_meta(#?SHARD_TAB{shard = {DB, Shard}, replica_set = ReplicaSet}) ->
|
|
||||||
%% FIXME: Wrong place.
|
|
||||||
Servers = [
|
|
||||||
{emqx_ds_replication_layer_shard:server_name(DB, Shard, Site), Node#?NODE_TAB.node}
|
|
||||||
|| Site <- ReplicaSet,
|
|
||||||
Node <- mnesia:read(?NODE_TAB, Site)
|
|
||||||
],
|
|
||||||
{Shard, #{
|
|
||||||
servers => Servers
|
|
||||||
}}.
|
|
||||||
|
|
||||||
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
|
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
|
||||||
hash(Shard, Site) ->
|
hash(Shard, Site) ->
|
||||||
erlang:phash2({Shard, Site}).
|
erlang:phash2({Shard, Site}).
|
||||||
|
|
|
@ -124,7 +124,7 @@ get_local_server(DB, Shard) ->
|
||||||
?MEMOIZE(DB, Shard, local_server(DB, Shard)).
|
?MEMOIZE(DB, Shard, local_server(DB, Shard)).
|
||||||
|
|
||||||
get_shard_servers(DB, Shard) ->
|
get_shard_servers(DB, Shard) ->
|
||||||
maps:get(servers, emqx_ds_replication_layer_meta:get_shard_meta(DB, Shard)).
|
maps:get(servers, emqx_ds_builtin_db_sup:lookup_shard_meta(DB, Shard)).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue