diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index aa74e2080..f93c78980 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -21,19 +21,15 @@ -behaviour(supervisor). %% API: --export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]). --export([status/1]). +-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]). +-export([which_shards/1]). %% behaviour callbacks: -export([init/1]). --export([handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: -export([start_link_sup/2]). -%% FIXME --export([lookup_shard_meta/2]). - %%================================================================================ %% Type declarations %%================================================================================ @@ -41,14 +37,14 @@ -define(via(REC), {via, gproc, {n, l, REC}}). -define(db_sup, ?MODULE). --define(shard_sup, emqx_ds_builtin_db_shard_sup). +-define(shards_sup, emqx_ds_builtin_db_shards_sup). -define(egress_sup, emqx_ds_builtin_db_egress_sup). +-define(shard_sup, emqx_ds_builtin_db_shard_sup). -record(?db_sup, {db}). --record(?shard_sup, {db}). +-record(?shards_sup, {db}). -record(?egress_sup, {db}). - --define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). +-record(?shard_sup, {db, shard}). %%================================================================================ %% API funcions @@ -60,8 +56,8 @@ start_db(DB, Opts) -> -spec start_shard(emqx_ds_storage_layer:shard_id()) -> supervisor:startchild_ret(). -start_shard(Shard = {DB, _}) -> - supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)). +start_shard({DB, Shard}) -> + supervisor:start_child(?via(#?shards_sup{db = DB}), shard_spec(DB, Shard)). -spec start_egress(emqx_ds_storage_layer:shard_id()) -> supervisor:startchild_ret(). @@ -70,28 +66,24 @@ start_egress({DB, Shard}) -> -spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. stop_shard(Shard = {DB, _}) -> - Sup = ?via(#?shard_sup{db = DB}), + Sup = ?via(#?shards_sup{db = DB}), ok = supervisor:terminate_child(Sup, Shard), ok = supervisor:delete_child(Sup, Shard). -spec ensure_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}. ensure_shard(Shard) -> - case start_shard(Shard) of - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - ok; - {error, Reason} -> - {error, Reason} - end. + ensure_started(start_shard(Shard)). -status(DB) -> - State = sys:get_state(?via({allocator, DB})), - maps:get(status, State). +-spec ensure_egress(emqx_ds_storage_layer:shard_id()) -> + ok | {error, _Reason}. +ensure_egress(Shard) -> + ensure_started(start_egress(Shard)). -lookup_shard_meta(DB, Shard) -> - persistent_term:get(?shard_meta(DB, Shard)). +-spec which_shards(emqx_ds:db()) -> + [_Child]. +which_shards(DB) -> + supervisor:which_children(?via(#?shards_sup{db = DB})). %%================================================================================ %% behaviour callbacks @@ -103,7 +95,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) -> Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), Children = [ - sup_spec(#?shard_sup{db = DB}, []), + sup_spec(#?shards_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, []), shard_allocator_spec(DB, Opts) ], @@ -113,8 +105,8 @@ init({#?db_sup{db = DB}, DefaultOpts}) -> period => 1 }, {ok, {SupFlags, Children}}; -init({#?shard_sup{db = _DB}, _}) -> - %% Spec for the supervisor that manages the worker processes for +init({#?shards_sup{db = _DB}, _}) -> + %% Spec for the supervisor that manages the supervisors for %% each local shard of the DB: SupFlags = #{ strategy => one_for_one, @@ -131,10 +123,18 @@ init({#?egress_sup{db = _DB}, _}) -> period => 1 }, {ok, {SupFlags, []}}; -init({allocator, DB, Opts}) -> - _ = erlang:process_flag(trap_exit, true), - _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), - init_allocator(DB, Opts). +init({#?shard_sup{db = DB, shard = Shard}, _}) -> + SupFlags = #{ + strategy => rest_for_one, + intensity => 10, + period => 100 + }, + Opts = emqx_ds_replication_layer_meta:get_options(DB), + Children = [ + shard_storage_spec(DB, Shard, Opts), + shard_replication_spec(DB, Shard, Opts) + ], + {ok, {SupFlags, Children}}. start_ra_system(DB, #{replication_options := ReplicationOpts}) -> DataDir = filename:join([emqx:data_dir(), DB, dsrepl]), @@ -164,25 +164,6 @@ start_ra_system(DB, #{replication_options := ReplicationOpts}) -> ok end. -start_shards(DB, Shards, Opts) -> - SupRef = ?via(#?shard_sup{db = DB}), - lists:foreach( - fun(Shard) -> - {ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard, Opts)), - {ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard, Opts)) - end, - Shards - ). - -start_egresses(DB, Shards) -> - SupRef = ?via(#?egress_sup{db = DB}), - lists:foreach( - fun(Shard) -> - {ok, _} = supervisor:start_child(SupRef, egress_spec(DB, Shard)) - end, - Shards - ). - %%================================================================================ %% Internal exports %%================================================================================ @@ -203,12 +184,18 @@ sup_spec(Id, Options) -> }. shard_spec(DB, Shard) -> - shard_spec(DB, Shard, emqx_ds_replication_layer_meta:get_options(DB)). + #{ + id => {shard, Shard}, + start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]}, + shutdown => infinity, + restart => permanent, + type => supervisor + }. -shard_spec(DB, Shard, Options) -> +shard_storage_spec(DB, Shard, Opts) -> #{ id => {Shard, storage}, - start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, + start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Opts]}, shutdown => 5_000, restart => permanent, type => worker @@ -225,8 +212,7 @@ shard_replication_spec(DB, Shard, Opts) -> shard_allocator_spec(DB, Opts) -> #{ id => shard_allocator, - start => - {gen_server, start_link, [?via({allocator, DB}), ?MODULE, {allocator, DB, Opts}, []]}, + start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]}, restart => permanent, type => worker }. @@ -240,78 +226,12 @@ egress_spec(DB, Shard) -> type => worker }. -%% Allocator - --define(ALLOCATE_RETRY_TIMEOUT, 1_000). - -init_allocator(DB, Opts) -> - State = #{db => DB, opts => Opts, status => allocating}, - case allocate_shards(State) of - NState = #{} -> - {ok, NState}; - {error, Data} -> - _ = logger:notice( - Data#{ - msg => "Shard allocation still in progress", - retry_in => ?ALLOCATE_RETRY_TIMEOUT - } - ), - {ok, State, ?ALLOCATE_RETRY_TIMEOUT} - end. - -handle_call(_Call, _From, State) -> - {reply, ignored, State}. - -handle_cast(_Cast, State) -> - {noreply, State}. - -handle_info(timeout, State) -> - case allocate_shards(State) of - NState = #{} -> - {noreply, NState}; - {error, Data} -> - _ = logger:notice( - Data#{ - msg => "Shard allocation still in progress", - retry_in => ?ALLOCATE_RETRY_TIMEOUT - } - ), - {noreply, State, ?ALLOCATE_RETRY_TIMEOUT} - end. - -terminate(_Reason, #{db := DB, shards := Shards}) -> - %% FIXME - erase_shards_meta(DB, Shards). - -%% - -allocate_shards(State = #{db := DB, opts := Opts}) -> - case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of - {ok, Shards} -> - logger:notice(#{msg => "Shards allocated", shards => Shards}), - ok = save_shards_meta(DB, Shards), - ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB), Opts), - logger:notice(#{ - msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB) - }), - ok = start_egresses(DB, Shards), - logger:notice(#{msg => "Egresses started", shards => Shards}), - State#{shards => Shards, status := ready}; +ensure_started(Res) -> + case Res of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; {error, Reason} -> {error, Reason} end. - -save_shards_meta(DB, Shards) -> - lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards). - -save_shard_meta(DB, Shard) -> - Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard), - persistent_term:put(?shard_meta(DB, Shard), #{ - servers => Servers - }). - -erase_shards_meta(DB, Shards) -> - lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards). - -erase_shard_meta(DB, Shard) -> - persistent_term:erase(?shard_meta(DB, Shard)). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a5bf0a875..9b5db7cf4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -332,7 +332,7 @@ delete_next(DB, Iter0, Selector, BatchSize) -> -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> emqx_ds_replication_layer:shard_id(). shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> - N = emqx_ds_replication_layer_meta:n_shards(DB), + N = emqx_ds_replication_shard_allocator:n_shards(DB), Hash = case SerializeBy of clientid -> erlang:phash2(From, N); diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 190b34610..0be5509bc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -127,7 +127,7 @@ get_local_server(DB, Shard) -> ?MEMOIZE(DB, Shard, local_server(DB, Shard)). get_shard_servers(DB, Shard) -> - maps:get(servers, emqx_ds_builtin_db_sup:lookup_shard_meta(DB, Shard)). + maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)). %% diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl new file mode 100644 index 000000000..80181f032 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -0,0 +1,152 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_replication_shard_allocator). + +-export([start_link/2]). + +-export([n_shards/1]). +-export([shard_meta/2]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(db_meta(DB), {?MODULE, DB}). +-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). + +%% + +start_link(DB, Opts) -> + gen_server:start_link(?MODULE, {DB, Opts}, []). + +n_shards(DB) -> + Meta = persistent_term:get(?db_meta(DB)), + maps:get(n_shards, Meta). + +shard_meta(DB, Shard) -> + persistent_term:get(?shard_meta(DB, Shard)). + +%% + +-define(ALLOCATE_RETRY_TIMEOUT, 1_000). + +init({DB, Opts}) -> + _ = erlang:process_flag(trap_exit, true), + _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), + State = #{db => DB, opts => Opts, status => allocating}, + case allocate_shards(State) of + NState = #{} -> + {ok, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {ok, State, ?ALLOCATE_RETRY_TIMEOUT} + end. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(timeout, State) -> + case allocate_shards(State) of + NState = #{} -> + {noreply, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {noreply, State, ?ALLOCATE_RETRY_TIMEOUT} + end. + +terminate(_Reason, #{db := DB, shards := Shards}) -> + erase_db_meta(DB), + erase_shards_meta(DB, Shards); +terminate(_Reason, #{}) -> + ok. + +%% + +allocate_shards(State = #{db := DB, opts := Opts}) -> + case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of + {ok, Shards} -> + logger:notice(#{msg => "Shards allocated", shards => Shards}), + ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), + ok = start_egresses(DB, Shards), + ok = save_db_meta(DB, Shards), + ok = save_shards_meta(DB, Shards), + State#{shards => Shards, status := ready}; + {error, Reason} -> + {error, Reason} + end. + +start_shards(DB, Shards) -> + ok = lists:foreach( + fun(Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}) + end, + Shards + ), + ok = logger:info(#{msg => "Shards started", shards => Shards}), + ok. + +start_egresses(DB, Shards) -> + ok = lists:foreach( + fun(Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}) + end, + Shards + ), + logger:info(#{msg => "Egresses started", shards => Shards}), + ok. + +save_db_meta(DB, Shards) -> + persistent_term:put(?db_meta(DB), #{ + shards => Shards, + n_shards => length(Shards) + }). + +save_shards_meta(DB, Shards) -> + lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards). + +save_shard_meta(DB, Shard) -> + Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard), + persistent_term:put(?shard_meta(DB, Shard), #{ + servers => Servers + }). + +erase_db_meta(DB) -> + persistent_term:erase(?db_meta(DB)). + +erase_shards_meta(DB, Shards) -> + lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards). + +erase_shard_meta(DB, Shard) -> + persistent_term:erase(?shard_meta(DB, Shard)).