fix(dsrepl): split shard allocator into a separate module
This commit is contained in:
parent
1b647035d0
commit
ac9700dd28
|
@ -21,19 +21,15 @@
|
|||
-behaviour(supervisor).
|
||||
|
||||
%% API:
|
||||
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
|
||||
-export([status/1]).
|
||||
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]).
|
||||
-export([which_shards/1]).
|
||||
|
||||
%% behaviour callbacks:
|
||||
-export([init/1]).
|
||||
-export([handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||
|
||||
%% internal exports:
|
||||
-export([start_link_sup/2]).
|
||||
|
||||
%% FIXME
|
||||
-export([lookup_shard_meta/2]).
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
@ -41,14 +37,14 @@
|
|||
-define(via(REC), {via, gproc, {n, l, REC}}).
|
||||
|
||||
-define(db_sup, ?MODULE).
|
||||
-define(shard_sup, emqx_ds_builtin_db_shard_sup).
|
||||
-define(shards_sup, emqx_ds_builtin_db_shards_sup).
|
||||
-define(egress_sup, emqx_ds_builtin_db_egress_sup).
|
||||
-define(shard_sup, emqx_ds_builtin_db_shard_sup).
|
||||
|
||||
-record(?db_sup, {db}).
|
||||
-record(?shard_sup, {db}).
|
||||
-record(?shards_sup, {db}).
|
||||
-record(?egress_sup, {db}).
|
||||
|
||||
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
|
||||
-record(?shard_sup, {db, shard}).
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
|
@ -60,8 +56,8 @@ start_db(DB, Opts) ->
|
|||
|
||||
-spec start_shard(emqx_ds_storage_layer:shard_id()) ->
|
||||
supervisor:startchild_ret().
|
||||
start_shard(Shard = {DB, _}) ->
|
||||
supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)).
|
||||
start_shard({DB, Shard}) ->
|
||||
supervisor:start_child(?via(#?shards_sup{db = DB}), shard_spec(DB, Shard)).
|
||||
|
||||
-spec start_egress(emqx_ds_storage_layer:shard_id()) ->
|
||||
supervisor:startchild_ret().
|
||||
|
@ -70,28 +66,24 @@ start_egress({DB, Shard}) ->
|
|||
|
||||
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
|
||||
stop_shard(Shard = {DB, _}) ->
|
||||
Sup = ?via(#?shard_sup{db = DB}),
|
||||
Sup = ?via(#?shards_sup{db = DB}),
|
||||
ok = supervisor:terminate_child(Sup, Shard),
|
||||
ok = supervisor:delete_child(Sup, Shard).
|
||||
|
||||
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
|
||||
ok | {error, _Reason}.
|
||||
ensure_shard(Shard) ->
|
||||
case start_shard(Shard) of
|
||||
{ok, _Pid} ->
|
||||
ok;
|
||||
{error, {already_started, _Pid}} ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
ensure_started(start_shard(Shard)).
|
||||
|
||||
status(DB) ->
|
||||
State = sys:get_state(?via({allocator, DB})),
|
||||
maps:get(status, State).
|
||||
-spec ensure_egress(emqx_ds_storage_layer:shard_id()) ->
|
||||
ok | {error, _Reason}.
|
||||
ensure_egress(Shard) ->
|
||||
ensure_started(start_egress(Shard)).
|
||||
|
||||
lookup_shard_meta(DB, Shard) ->
|
||||
persistent_term:get(?shard_meta(DB, Shard)).
|
||||
-spec which_shards(emqx_ds:db()) ->
|
||||
[_Child].
|
||||
which_shards(DB) ->
|
||||
supervisor:which_children(?via(#?shards_sup{db = DB})).
|
||||
|
||||
%%================================================================================
|
||||
%% behaviour callbacks
|
||||
|
@ -103,7 +95,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
|
|||
Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
|
||||
ok = start_ra_system(DB, Opts),
|
||||
Children = [
|
||||
sup_spec(#?shard_sup{db = DB}, []),
|
||||
sup_spec(#?shards_sup{db = DB}, []),
|
||||
sup_spec(#?egress_sup{db = DB}, []),
|
||||
shard_allocator_spec(DB, Opts)
|
||||
],
|
||||
|
@ -113,8 +105,8 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
|
|||
period => 1
|
||||
},
|
||||
{ok, {SupFlags, Children}};
|
||||
init({#?shard_sup{db = _DB}, _}) ->
|
||||
%% Spec for the supervisor that manages the worker processes for
|
||||
init({#?shards_sup{db = _DB}, _}) ->
|
||||
%% Spec for the supervisor that manages the supervisors for
|
||||
%% each local shard of the DB:
|
||||
SupFlags = #{
|
||||
strategy => one_for_one,
|
||||
|
@ -131,10 +123,18 @@ init({#?egress_sup{db = _DB}, _}) ->
|
|||
period => 1
|
||||
},
|
||||
{ok, {SupFlags, []}};
|
||||
init({allocator, DB, Opts}) ->
|
||||
_ = erlang:process_flag(trap_exit, true),
|
||||
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
|
||||
init_allocator(DB, Opts).
|
||||
init({#?shard_sup{db = DB, shard = Shard}, _}) ->
|
||||
SupFlags = #{
|
||||
strategy => rest_for_one,
|
||||
intensity => 10,
|
||||
period => 100
|
||||
},
|
||||
Opts = emqx_ds_replication_layer_meta:get_options(DB),
|
||||
Children = [
|
||||
shard_storage_spec(DB, Shard, Opts),
|
||||
shard_replication_spec(DB, Shard, Opts)
|
||||
],
|
||||
{ok, {SupFlags, Children}}.
|
||||
|
||||
start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
|
||||
DataDir = filename:join([emqx:data_dir(), DB, dsrepl]),
|
||||
|
@ -164,25 +164,6 @@ start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
start_shards(DB, Shards, Opts) ->
|
||||
SupRef = ?via(#?shard_sup{db = DB}),
|
||||
lists:foreach(
|
||||
fun(Shard) ->
|
||||
{ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard, Opts)),
|
||||
{ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard, Opts))
|
||||
end,
|
||||
Shards
|
||||
).
|
||||
|
||||
start_egresses(DB, Shards) ->
|
||||
SupRef = ?via(#?egress_sup{db = DB}),
|
||||
lists:foreach(
|
||||
fun(Shard) ->
|
||||
{ok, _} = supervisor:start_child(SupRef, egress_spec(DB, Shard))
|
||||
end,
|
||||
Shards
|
||||
).
|
||||
|
||||
%%================================================================================
|
||||
%% Internal exports
|
||||
%%================================================================================
|
||||
|
@ -203,12 +184,18 @@ sup_spec(Id, Options) ->
|
|||
}.
|
||||
|
||||
shard_spec(DB, Shard) ->
|
||||
shard_spec(DB, Shard, emqx_ds_replication_layer_meta:get_options(DB)).
|
||||
#{
|
||||
id => {shard, Shard},
|
||||
start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]},
|
||||
shutdown => infinity,
|
||||
restart => permanent,
|
||||
type => supervisor
|
||||
}.
|
||||
|
||||
shard_spec(DB, Shard, Options) ->
|
||||
shard_storage_spec(DB, Shard, Opts) ->
|
||||
#{
|
||||
id => {Shard, storage},
|
||||
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
|
||||
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Opts]},
|
||||
shutdown => 5_000,
|
||||
restart => permanent,
|
||||
type => worker
|
||||
|
@ -225,8 +212,7 @@ shard_replication_spec(DB, Shard, Opts) ->
|
|||
shard_allocator_spec(DB, Opts) ->
|
||||
#{
|
||||
id => shard_allocator,
|
||||
start =>
|
||||
{gen_server, start_link, [?via({allocator, DB}), ?MODULE, {allocator, DB, Opts}, []]},
|
||||
start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]},
|
||||
restart => permanent,
|
||||
type => worker
|
||||
}.
|
||||
|
@ -240,78 +226,12 @@ egress_spec(DB, Shard) ->
|
|||
type => worker
|
||||
}.
|
||||
|
||||
%% Allocator
|
||||
|
||||
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
|
||||
|
||||
init_allocator(DB, Opts) ->
|
||||
State = #{db => DB, opts => Opts, status => allocating},
|
||||
case allocate_shards(State) of
|
||||
NState = #{} ->
|
||||
{ok, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{ok, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end.
|
||||
|
||||
handle_call(_Call, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
case allocate_shards(State) of
|
||||
NState = #{} ->
|
||||
{noreply, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end.
|
||||
|
||||
terminate(_Reason, #{db := DB, shards := Shards}) ->
|
||||
%% FIXME
|
||||
erase_shards_meta(DB, Shards).
|
||||
|
||||
%%
|
||||
|
||||
allocate_shards(State = #{db := DB, opts := Opts}) ->
|
||||
case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
|
||||
{ok, Shards} ->
|
||||
logger:notice(#{msg => "Shards allocated", shards => Shards}),
|
||||
ok = save_shards_meta(DB, Shards),
|
||||
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB), Opts),
|
||||
logger:notice(#{
|
||||
msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB)
|
||||
}),
|
||||
ok = start_egresses(DB, Shards),
|
||||
logger:notice(#{msg => "Egresses started", shards => Shards}),
|
||||
State#{shards => Shards, status := ready};
|
||||
ensure_started(Res) ->
|
||||
case Res of
|
||||
{ok, _Pid} ->
|
||||
ok;
|
||||
{error, {already_started, _Pid}} ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
save_shards_meta(DB, Shards) ->
|
||||
lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards).
|
||||
|
||||
save_shard_meta(DB, Shard) ->
|
||||
Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard),
|
||||
persistent_term:put(?shard_meta(DB, Shard), #{
|
||||
servers => Servers
|
||||
}).
|
||||
|
||||
erase_shards_meta(DB, Shards) ->
|
||||
lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards).
|
||||
|
||||
erase_shard_meta(DB, Shard) ->
|
||||
persistent_term:erase(?shard_meta(DB, Shard)).
|
||||
|
|
|
@ -332,7 +332,7 @@ delete_next(DB, Iter0, Selector, BatchSize) ->
|
|||
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
|
||||
emqx_ds_replication_layer:shard_id().
|
||||
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
|
||||
N = emqx_ds_replication_layer_meta:n_shards(DB),
|
||||
N = emqx_ds_replication_shard_allocator:n_shards(DB),
|
||||
Hash =
|
||||
case SerializeBy of
|
||||
clientid -> erlang:phash2(From, N);
|
||||
|
|
|
@ -127,7 +127,7 @@ get_local_server(DB, Shard) ->
|
|||
?MEMOIZE(DB, Shard, local_server(DB, Shard)).
|
||||
|
||||
get_shard_servers(DB, Shard) ->
|
||||
maps:get(servers, emqx_ds_builtin_db_sup:lookup_shard_meta(DB, Shard)).
|
||||
maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)).
|
||||
|
||||
%%
|
||||
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ds_replication_shard_allocator).
|
||||
|
||||
-export([start_link/2]).
|
||||
|
||||
-export([n_shards/1]).
|
||||
-export([shard_meta/2]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-export([
|
||||
init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2
|
||||
]).
|
||||
|
||||
-define(db_meta(DB), {?MODULE, DB}).
|
||||
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
|
||||
|
||||
%%
|
||||
|
||||
start_link(DB, Opts) ->
|
||||
gen_server:start_link(?MODULE, {DB, Opts}, []).
|
||||
|
||||
n_shards(DB) ->
|
||||
Meta = persistent_term:get(?db_meta(DB)),
|
||||
maps:get(n_shards, Meta).
|
||||
|
||||
shard_meta(DB, Shard) ->
|
||||
persistent_term:get(?shard_meta(DB, Shard)).
|
||||
|
||||
%%
|
||||
|
||||
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
|
||||
|
||||
init({DB, Opts}) ->
|
||||
_ = erlang:process_flag(trap_exit, true),
|
||||
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
|
||||
State = #{db => DB, opts => Opts, status => allocating},
|
||||
case allocate_shards(State) of
|
||||
NState = #{} ->
|
||||
{ok, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{ok, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end.
|
||||
|
||||
handle_call(_Call, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
case allocate_shards(State) of
|
||||
NState = #{} ->
|
||||
{noreply, NState};
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
msg => "Shard allocation still in progress",
|
||||
retry_in => ?ALLOCATE_RETRY_TIMEOUT
|
||||
}
|
||||
),
|
||||
{noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
|
||||
end.
|
||||
|
||||
terminate(_Reason, #{db := DB, shards := Shards}) ->
|
||||
erase_db_meta(DB),
|
||||
erase_shards_meta(DB, Shards);
|
||||
terminate(_Reason, #{}) ->
|
||||
ok.
|
||||
|
||||
%%
|
||||
|
||||
allocate_shards(State = #{db := DB, opts := Opts}) ->
|
||||
case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
|
||||
{ok, Shards} ->
|
||||
logger:notice(#{msg => "Shards allocated", shards => Shards}),
|
||||
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
|
||||
ok = start_egresses(DB, Shards),
|
||||
ok = save_db_meta(DB, Shards),
|
||||
ok = save_shards_meta(DB, Shards),
|
||||
State#{shards => Shards, status := ready};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
start_shards(DB, Shards) ->
|
||||
ok = lists:foreach(
|
||||
fun(Shard) ->
|
||||
ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard})
|
||||
end,
|
||||
Shards
|
||||
),
|
||||
ok = logger:info(#{msg => "Shards started", shards => Shards}),
|
||||
ok.
|
||||
|
||||
start_egresses(DB, Shards) ->
|
||||
ok = lists:foreach(
|
||||
fun(Shard) ->
|
||||
ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard})
|
||||
end,
|
||||
Shards
|
||||
),
|
||||
logger:info(#{msg => "Egresses started", shards => Shards}),
|
||||
ok.
|
||||
|
||||
save_db_meta(DB, Shards) ->
|
||||
persistent_term:put(?db_meta(DB), #{
|
||||
shards => Shards,
|
||||
n_shards => length(Shards)
|
||||
}).
|
||||
|
||||
save_shards_meta(DB, Shards) ->
|
||||
lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards).
|
||||
|
||||
save_shard_meta(DB, Shard) ->
|
||||
Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard),
|
||||
persistent_term:put(?shard_meta(DB, Shard), #{
|
||||
servers => Servers
|
||||
}).
|
||||
|
||||
erase_db_meta(DB) ->
|
||||
persistent_term:erase(?db_meta(DB)).
|
||||
|
||||
erase_shards_meta(DB, Shards) ->
|
||||
lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards).
|
||||
|
||||
erase_shard_meta(DB, Shard) ->
|
||||
persistent_term:erase(?shard_meta(DB, Shard)).
|
Loading…
Reference in New Issue