diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 93502df8c..6cee4877e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -22,13 +22,18 @@ %% API: -export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]). +-export([status/1]). %% behaviour callbacks: -export([init/1]). +-export([handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: -export([start_link_sup/2]). +%% FIXME +-export([lookup_shard_meta/2]). + %%================================================================================ %% Type declarations %%================================================================================ @@ -43,6 +48,8 @@ -record(?shard_sup, {db}). -record(?egress_sup, {db}). +-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). + %%================================================================================ %% API funcions %%================================================================================ @@ -79,6 +86,13 @@ ensure_shard(Shard) -> {error, Reason} end. +status(DB) -> + State = sys:get_state(?via({allocator, DB})), + maps:get(status, State). + +lookup_shard_meta(DB, Shard) -> + persistent_term:get(?shard_meta(DB, Shard)). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -86,40 +100,59 @@ ensure_shard(Shard) -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), - _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), - Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])], + Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), + Children = [ + sup_spec(#?shard_sup{db = DB}, []), + sup_spec(#?egress_sup{db = DB}, []), + shard_allocator_spec(DB, Opts) + ], SupFlags = #{ strategy => one_for_all, intensity => 0, period => 1 }, {ok, {SupFlags, Children}}; -init({#?shard_sup{db = DB}, _}) -> +init({#?shard_sup{db = _DB}, _}) -> %% Spec for the supervisor that manages the worker processes for %% each local shard of the DB: - MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - Children = [ - Child - || Shard <- MyShards, - Child <- [shard_spec(DB, Shard), shard_replication_spec(DB, Shard)] - ], SupFlags = #{ strategy => one_for_one, intensity => 10, period => 1 }, - {ok, {SupFlags, Children}}; -init({#?egress_sup{db = DB}, _}) -> + {ok, {SupFlags, []}}; +init({#?egress_sup{db = _DB}, _}) -> %% Spec for the supervisor that manages the egress proxy processes %% managing traffic towards each of the shards of the DB: - Shards = emqx_ds_replication_layer_meta:shards(DB), - Children = [egress_spec(DB, Shard) || Shard <- Shards], SupFlags = #{ strategy => one_for_one, intensity => 0, period => 1 }, - {ok, {SupFlags, Children}}. + {ok, {SupFlags, []}}; +init({allocator, DB, Opts}) -> + _ = erlang:process_flag(trap_exit, true), + _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), + init_allocator(DB, Opts). + +start_shards(DB, Shards) -> + SupRef = ?via(#?shard_sup{db = DB}), + lists:foreach( + fun(Shard) -> + {ok, _} = supervisor:start_child(SupRef, shard_spec(DB, Shard)), + {ok, _} = supervisor:start_child(SupRef, shard_replication_spec(DB, Shard)) + end, + Shards + ). + +start_egresses(DB, Shards) -> + SupRef = ?via(#?egress_sup{db = DB}), + lists:foreach( + fun(Shard) -> + {ok, _} = supervisor:start_child(SupRef, egress_spec(DB, Shard)) + end, + Shards + ). %%================================================================================ %% Internal exports @@ -143,7 +176,7 @@ sup_spec(Id, Options) -> shard_spec(DB, Shard) -> Options = emqx_ds_replication_layer_meta:get_options(DB), #{ - id => Shard, + id => {Shard, storage}, start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]}, shutdown => 5_000, restart => permanent, @@ -158,6 +191,15 @@ shard_replication_spec(DB, Shard) -> type => worker }. +shard_allocator_spec(DB, Opts) -> + #{ + id => shard_allocator, + start => + {gen_server, start_link, [?via({allocator, DB}), ?MODULE, {allocator, DB, Opts}, []]}, + restart => permanent, + type => worker + }. + egress_spec(DB, Shard) -> #{ id => Shard, @@ -166,3 +208,78 @@ egress_spec(DB, Shard) -> restart => permanent, type => worker }. + +%% Allocator + +-define(ALLOCATE_RETRY_TIMEOUT, 1_000). + +init_allocator(DB, Opts) -> + State = #{db => DB, opts => Opts, status => allocating}, + case allocate_shards(State) of + NState = #{} -> + {ok, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {ok, State, ?ALLOCATE_RETRY_TIMEOUT} + end. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(timeout, State) -> + case allocate_shards(State) of + NState = #{} -> + {noreply, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {noreply, State, ?ALLOCATE_RETRY_TIMEOUT} + end. + +terminate(_Reason, #{db := DB, shards := Shards}) -> + erase_shards_meta(DB, Shards). + +%% + +allocate_shards(State = #{db := DB, opts := Opts}) -> + case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of + {ok, Shards} -> + logger:notice(#{msg => "Shards allocated", shards => Shards}), + ok = save_shards_meta(DB, Shards), + ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), + logger:notice(#{ + msg => "Shards started", shards => emqx_ds_replication_layer_meta:my_shards(DB) + }), + ok = start_egresses(DB, Shards), + logger:notice(#{msg => "Egresses started", shards => Shards}), + State#{shards => Shards, status := ready}; + {error, Reason} -> + {error, Reason} + end. + +save_shards_meta(DB, Shards) -> + lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards). + +save_shard_meta(DB, Shard) -> + Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard), + persistent_term:put(?shard_meta(DB, Shard), #{ + servers => Servers + }). + +erase_shards_meta(DB, Shards) -> + lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards). + +erase_shard_meta(DB, Shard) -> + persistent_term:erase(?shard_meta(DB, Shard)). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index ddecd4f94..7ca345367 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,7 +29,7 @@ -export([ shards/1, my_shards/1, - shard_meta/2, + allocate_shards/2, replica_set/2, in_sync_replicas/2, sites/0, @@ -48,6 +48,7 @@ %% internal exports: -export([ open_db_trans/2, + allocate_shards_trans/2, update_db_config_trans/2, drop_db_trans/1, claim_site/2, @@ -101,8 +102,6 @@ %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). --define(DB_META(DB), {?MODULE, DB}). - %%================================================================================ %% API funcions %%================================================================================ @@ -159,10 +158,14 @@ my_shards(DB) -> lists:member(Site, ReplicaSet) end). -shard_meta(DB, Shard) -> - case get_db_meta(DB) of - #{Shard := Meta} -> Meta; - #{} -> undefined +allocate_shards(DB, Opts) -> + case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of + {atomic, Shards} -> + {ok, Shards}; + {aborted, {shards_already_allocated, Shards}} -> + {ok, Shards}; + {aborted, {insufficient_sites_online, Needed, Sites}} -> + {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}} end. -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> @@ -201,24 +204,18 @@ node(Site) -> -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). get_options(DB) -> - {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]), - Opts. + case mnesia:dirty_read(?META_TAB, DB) of + [#?META_TAB{db_props = Opts}] -> + Opts; + [] -> + #{} + end. -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> - case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of - {atomic, Opts} -> - Opts; - {aborted, {insufficient_sites_online, NNeeded, Sites}} -> - %% TODO: Still ugly, it blocks the whole node startup. - logger:notice( - "Shard allocation still in progress, not enough sites: ~p, need: ~p", - [Sites, NNeeded] - ), - ok = timer:sleep(1000), - open_db(DB, DefaultOpts) - end. + {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), + Opts. -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> ok | {error, _}. @@ -279,22 +276,53 @@ terminate(_Reason, #s{}) -> %% Internal exports %%================================================================================ --spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) -> +-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db_trans(DB, CreateOpts) -> case mnesia:wread({?META_TAB, DB}) of - [] when is_map(CreateOpts) -> - NShards = maps:get(n_shards, CreateOpts), - NSites = maps:get(n_sites, CreateOpts), - ReplicationFactor = maps:get(replication_factor, CreateOpts), + [] -> mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), - create_shards(DB, NSites, NShards, ReplicationFactor), - save_db_meta(DB), CreateOpts; [#?META_TAB{db_props = Opts}] -> Opts end. +-spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard]. +allocate_shards_trans(DB, Opts) -> + NShards = maps:get(n_shards, Opts), + NSites = maps:get(n_sites, Opts), + ReplicationFactor = maps:get(replication_factor, Opts), + Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], + AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), + case length(AllSites) of + N when N >= NSites -> + ok; + _ -> + mnesia:abort({insufficient_sites_online, NSites, AllSites}) + end, + case mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{shard = {DB, '_'}, _ = '_'}, write) of + [] -> + ok; + Records -> + ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], + mnesia:abort({shards_already_allocated, ShardsAllocated}) + end, + lists:map( + fun(Shard) -> + Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], + Hashes = lists:sort(Hashes0), + {_, Sites} = lists:unzip(Hashes), + ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + Record = #?SHARD_TAB{ + shard = {DB, Shard}, + replica_set = ReplicaSet + }, + ok = mnesia:write(Record), + Shard + end, + Shards + ). + -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> ok | {error, database}. update_db_config_trans(DB, CreateOpts) -> @@ -390,53 +418,6 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec create_shards(emqx_ds:db(), pos_integer(), pos_integer(), pos_integer()) -> ok. -create_shards(DB, NSites, NShards, ReplicationFactor) -> - Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], - AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), - case length(AllSites) of - N when N >= NSites -> - ok; - _ -> - mnesia:abort({insufficient_sites_online, NSites, AllSites}) - end, - lists:foreach( - fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], - Hashes = lists:sort(Hashes0), - {_, Sites} = lists:unzip(Hashes), - ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), - Record = #?SHARD_TAB{ - shard = {DB, Shard}, - replica_set = ReplicaSet - }, - mnesia:write(Record) - end, - Shards - ). - -save_db_meta(DB) -> - Shards = mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{_ = '_'}, read), - Meta = maps:from_list([mk_shard_meta(Shard) || Shard <- Shards]), - persistent_term:put(?DB_META(DB), Meta). - -get_db_meta(DB) -> - persistent_term:get(?DB_META(DB)). - -% erase_db_meta(DB) -> -% persistent_term:erase(?DB_META(DB)). - -mk_shard_meta(#?SHARD_TAB{shard = {DB, Shard}, replica_set = ReplicaSet}) -> - %% FIXME: Wrong place. - Servers = [ - {emqx_ds_replication_layer_shard:server_name(DB, Shard, Site), Node#?NODE_TAB.node} - || Site <- ReplicaSet, - Node <- mnesia:read(?NODE_TAB, Site) - ], - {Shard, #{ - servers => Servers - }}. - -spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). hash(Shard, Site) -> erlang:phash2({Shard, Site}). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 9c66650d3..c5bbff8e0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -124,7 +124,7 @@ get_local_server(DB, Shard) -> ?MEMOIZE(DB, Shard, local_server(DB, Shard)). get_shard_servers(DB, Shard) -> - maps:get(servers, emqx_ds_replication_layer_meta:get_shard_meta(DB, Shard)). + maps:get(servers, emqx_ds_builtin_db_sup:lookup_shard_meta(DB, Shard)). %%