diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 1a150d5b1..492fcaa6b 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -476,7 +476,7 @@ t_replication_options(_Config) -> resend_window := 60 } }, - emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB) + emqx_ds_replication_layer_meta:db_config(?PERSISTENT_MESSAGE_DB) ), ?assertMatch( #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 79e2f6120..b230e4b89 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -116,7 +116,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) -> Children = [ sup_spec(#?shards_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, []), - shard_allocator_spec(DB, Opts) + shard_allocator_spec(DB) ], SupFlags = #{ strategy => one_for_all, @@ -148,7 +148,7 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) -> intensity => 10, period => 100 }, - Opts = emqx_ds_replication_layer_meta:get_options(DB), + Opts = emqx_ds_replication_layer_meta:db_config(DB), Children = [ shard_storage_spec(DB, Shard, Opts), shard_replication_spec(DB, Shard, Opts) @@ -228,10 +228,10 @@ shard_replication_spec(DB, Shard, Opts) -> type => worker }. -shard_allocator_spec(DB, Opts) -> +shard_allocator_spec(DB) -> #{ id => shard_allocator, - start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]}, + start => {emqx_ds_replication_shard_allocator, start_link, [DB]}, restart => permanent, type => worker }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 14c2268b8..6ce86e9cc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -189,8 +189,7 @@ add_generation(DB) -> -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. update_db_config(DB, CreateOpts) -> - ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts), - Opts = emqx_ds_replication_layer_meta:get_options(DB), + Opts = #{} = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts), foreach_shard( DB, fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f84863c03..723675699 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,25 +29,39 @@ -export([ shards/1, my_shards/1, - allocate_shards/2, - replica_set/2, + allocate_shards/1, sites/0, node/1, - open_db/2, - get_options/1, - update_db_config/2, - drop_db/1, this_site/0, print_status/0 ]). +%% DB API: +-export([ + open_db/2, + db_config/1, + update_db_config/2, + drop_db/1 +]). + +%% Site / shard allocation: +-export([ + assign_db_sites/2, + replica_set_transitions/2, + update_replica_set/3, + replica_set/2, + target_set/2 +]). + %% gen_server -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: -export([ open_db_trans/2, - allocate_shards_trans/2, + allocate_shards_trans/1, + assign_db_sites_trans/2, + update_replica_set_trans/3, update_db_config_trans/2, drop_db_trans/1, claim_site/2, @@ -86,15 +100,20 @@ -record(?SHARD_TAB, { shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + %% Sites that currently contain the data: + replica_set :: [site()], %% Sites that should contain the data when the cluster is in the %% stable state (no nodes are being added or removed from it): - replica_set :: [site()], + target_set :: [site()] | undefined, misc = #{} :: map() }). %% Persistent ID of the node (independent from the IP/FQDN): -type site() :: binary(). +%% Membership transition of shard's replica set: +-type transition() :: {add | del, site()}. + %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). @@ -156,17 +175,17 @@ start_link() -> -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. shards(DB) -> - filter_shards(DB). + Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})), + [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs]. -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. my_shards(DB) -> Site = this_site(), - filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) -> - lists:member(Site, ReplicaSet) - end). + Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})), + [Shard || #?SHARD_TAB{shard = {_, Shard}, replica_set = RS} <- Recs, lists:member(Site, RS)]. -allocate_shards(DB, Opts) -> - case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of +allocate_shards(DB) -> + case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of {atomic, Shards} -> {ok, Shards}; {aborted, {shards_already_allocated, Shards}} -> @@ -175,16 +194,6 @@ allocate_shards(DB, Opts) -> {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}} end. --spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - {ok, [site()]} | {error, _}. -replica_set(DB, Shard) -> - case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{replica_set = ReplicaSet}] -> - {ok, ReplicaSet}; - [] -> - {error, no_shard} - end. - -spec sites() -> [site()]. sites() -> eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). @@ -198,8 +207,12 @@ node(Site) -> undefined end. --spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). -get_options(DB) -> +%%=============================================================================== +%% DB API +%%=============================================================================== + +-spec db_config(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). +db_config(DB) -> case mnesia:dirty_read(?META_TAB, DB) of [#?META_TAB{db_props = Opts}] -> Opts; @@ -210,21 +223,64 @@ get_options(DB) -> -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> - {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), - Opts. + transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]). -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> - ok | {error, _}. + emqx_ds_replication_layer:builtin_db_opts() | {error, _}. update_db_config(DB, DefaultOpts) -> - {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:update_db_config_trans/2, [ - DB, DefaultOpts - ]), - Opts. + transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]). -spec drop_db(emqx_ds:db()) -> ok. drop_db(DB) -> - _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]), - ok. + transaction(fun ?MODULE:drop_db_trans/1, [DB]). + +-spec assign_db_sites(emqx_ds:db(), [site()]) -> ok. +assign_db_sites(DB, Sites) -> + case mria:transaction(?SHARD, fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + [transition()] | undefined. +replica_set_transitions(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] -> + compute_transitions(TargetSet, ReplicaSet); + [] -> + undefined + end. + +-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok. +update_replica_set(DB, Shard, Trans) -> + case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + [site()] | undefined. +replica_set(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{replica_set = ReplicaSet}] -> + ReplicaSet; + [] -> + undefined + end. + +-spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + [site()] | undefined. +target_set(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{target_set = TargetSet}] -> + TargetSet; + [] -> + undefined + end. %%================================================================================ %% behavior callbacks @@ -268,19 +324,15 @@ open_db_trans(DB, CreateOpts) -> Opts end. --spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard]. -allocate_shards_trans(DB, Opts) -> - NShards = maps:get(n_shards, Opts), - NSites = maps:get(n_sites, Opts), - ReplicationFactor = maps:get(replication_factor, Opts), - NReplicas = min(NSites, ReplicationFactor), - Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], - AllSites = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read), - case length(AllSites) of +-spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +allocate_shards_trans(DB) -> + Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB), + Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read), + case length(Nodes) of N when N >= NSites -> ok; _ -> - mnesia:abort({insufficient_sites_online, NSites, AllSites}) + mnesia:abort({insufficient_sites_online, NSites, Nodes}) end, case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of [] -> @@ -289,18 +341,11 @@ allocate_shards_trans(DB, Opts) -> ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], mnesia:abort({shards_already_allocated, ShardsAllocated}) end, - {Allocation, _} = lists:mapfoldl( - fun(Shard, SSites) -> - {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites), - {_, SRest} = emqx_utils_stream:consume(1, SSites), - {{Shard, Sites}, SRest} - end, - emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)), - Shards - ), + Shards = gen_shards(NShards), + Sites = [S || #?NODE_TAB{site = S} <- Nodes], + Allocation = compute_allocation(Shards, Sites, Opts), lists:map( - fun({Shard, Sites}) -> - ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites], + fun({Shard, ReplicaSet}) -> Record = #?SHARD_TAB{ shard = {DB, Shard}, replica_set = ReplicaSet @@ -311,31 +356,71 @@ allocate_shards_trans(DB, Opts) -> Allocation ). +-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok. +assign_db_sites_trans(DB, Sites) -> + Opts = db_config_trans(DB), + case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of + [] -> + ok; + NonexistentSites -> + mnesia:abort({nonexistent_sites, NonexistentSites}) + end, + %% TODO + %% Optimize reallocation. The goals are: + %% 1. Minimize the number of membership transitions. + %% 2. Ensure that sites are responsible for roughly the same number of shards. + Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write), + Reallocation = compute_allocation(Shards, Sites, Opts), + lists:foreach( + fun({Record, ReplicaSet}) -> + ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet}) + end, + Reallocation + ). + +update_replica_set_trans(DB, Shard, Trans) -> + case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of + [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> + ReplicaSet = apply_transition(Trans, ReplicaSet0), + case lists:usort(TargetSet0) of + ReplicaSet -> + TargetSet = undefined; + TS -> + TargetSet = TS + end, + mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet}); + [] -> + mnesia:abort({nonexistent_shard, {DB, Shard}}) + end. + -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> - ok | {error, database}. -update_db_config_trans(DB, CreateOpts) -> + ok | {error, _}. +update_db_config_trans(DB, UpdateOpts) -> case mnesia:wread({?META_TAB, DB}) of [#?META_TAB{db_props = Opts}] -> %% Since this is an update and not a reopen, %% we should keep the shard number and replication factor %% and not create a new shard server - #{ - n_shards := NShards, - replication_factor := ReplicationFactor - } = Opts, - + ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts), + EffectiveOpts = maps:merge(Opts, ChangeableOpts), mnesia:write(#?META_TAB{ db = DB, - db_props = CreateOpts#{ - n_shards := NShards, - replication_factor := ReplicationFactor - } + db_props = EffectiveOpts }), - ok; + EffectiveOpts; [] -> {error, no_database} end. +-spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). +db_config_trans(DB) -> + case mnesia:read(?META_TAB, DB, read) of + [#?META_TAB{db_props = Config}] -> + Config; + [] -> + mnesia:abort({nonexistent_db, DB}) + end. + -spec drop_db_trans(emqx_ds:db()) -> ok. drop_db_trans(DB) -> mnesia:delete({?META_TAB, DB}), @@ -391,6 +476,38 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. +compute_allocation(Shards, Sites, Opts) -> + NSites = length(Sites), + ReplicationFactor = maps:get(replication_factor, Opts), + NReplicas = min(NSites, ReplicationFactor), + ShardsSorted = lists:sort(Shards), + SitesSorted = lists:sort(Sites), + {Allocation, _} = lists:mapfoldl( + fun(Shard, SSites) -> + {ReplicaSet, _} = emqx_utils_stream:consume(NReplicas, SSites), + {_, SRest} = emqx_utils_stream:consume(1, SSites), + {{Shard, ReplicaSet}, SRest} + end, + emqx_utils_stream:repeat(emqx_utils_stream:list(SitesSorted)), + ShardsSorted + ), + Allocation. + +compute_transitions(undefined, _ReplicaSet) -> + []; +compute_transitions(TargetSet, ReplicaSet) -> + Additions = TargetSet -- ReplicaSet, + Deletions = ReplicaSet -- TargetSet, + intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]). + +apply_transition({add, S}, Sites) -> + lists:usort([S | Sites]); +apply_transition({del, S}, Sites) -> + lists:delete(S, Sites). + +gen_shards(NShards) -> + [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)]. + eval_qlc(Q) -> case mnesia:is_transaction() of true -> @@ -400,29 +517,16 @@ eval_qlc(Q) -> Result end. -filter_shards(DB) -> - filter_shards(DB, const(true)). +transaction(Fun, Args) -> + {atomic, Result} = mria:transaction(?SHARD, Fun, Args), + Result. --spec filter_shards(emqx_ds:db(), fun((_) -> boolean())) -> - [emqx_ds_replication_layer:shard_id()]. -filter_shards(DB, Predicte) -> - filter_shards(DB, Predicte, fun(#?SHARD_TAB{shard = {_, ShardId}}) -> - ShardId - end). - -filter_shards(DB, Predicate, Mapper) -> - eval_qlc( - qlc:q([ - Mapper(Shard) - || #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table( - ?SHARD_TAB - ), - D =:= DB, - Predicate(Shard) - ]) - ). - -const(Result) -> - fun(_) -> - Result - end. +%% @doc Intersperse elements of two lists. +%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5]. +-spec intersperse([X], [Y]) -> [X | Y]. +intersperse(L1, []) -> + L1; +intersperse([], L2) -> + L2; +intersperse([H1 | T1], L2) -> + [H1 | intersperse(L2, T1)]. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 62a6edab2..45739fbe3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -44,7 +44,7 @@ start_link(DB, Shard, Opts) -> gen_server:start_link(?MODULE, {DB, Shard, Opts}, []). shard_servers(DB, Shard) -> - {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), [ {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} || Site <- ReplicaSet diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 6da33f09f..7393da692 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -16,7 +16,7 @@ -module(emqx_ds_replication_shard_allocator). --export([start_link/2]). +-export([start_link/1]). -export([n_shards/1]). -export([shard_meta/2]). @@ -35,8 +35,8 @@ %% -start_link(DB, Opts) -> - gen_server:start_link(?MODULE, {DB, Opts}, []). +start_link(DB) -> + gen_server:start_link(?MODULE, DB, []). n_shards(DB) -> Meta = persistent_term:get(?db_meta(DB)), @@ -49,22 +49,11 @@ shard_meta(DB, Shard) -> -define(ALLOCATE_RETRY_TIMEOUT, 1_000). -init({DB, Opts}) -> +init(DB) -> _ = erlang:process_flag(trap_exit, true), _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), - State = #{db => DB, opts => Opts, status => allocating}, - case allocate_shards(State) of - {ok, NState} -> - {ok, NState}; - {error, Data} -> - _ = logger:notice( - Data#{ - msg => "Shard allocation still in progress", - retry_in => ?ALLOCATE_RETRY_TIMEOUT - } - ), - {ok, State, ?ALLOCATE_RETRY_TIMEOUT} - end. + State = #{db => DB, status => allocating}, + handle_allocate_shards(State, ok). handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -73,18 +62,7 @@ handle_cast(_Cast, State) -> {noreply, State}. handle_info(timeout, State) -> - case allocate_shards(State) of - {ok, NState} -> - {noreply, NState}; - {error, Data} -> - _ = logger:notice( - Data#{ - msg => "Shard allocation still in progress", - retry_in => ?ALLOCATE_RETRY_TIMEOUT - } - ), - {noreply, State, ?ALLOCATE_RETRY_TIMEOUT} - end; + handle_allocate_shards(State, noreply); handle_info(_Info, State) -> {noreply, State}. @@ -96,8 +74,24 @@ terminate(_Reason, #{}) -> %% -allocate_shards(State = #{db := DB, opts := Opts}) -> - case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of +handle_allocate_shards(State, Ret) -> + case allocate_shards(State) of + {ok, NState} -> + {Ret, NState}; + {error, Data} -> + _ = logger:notice( + Data#{ + msg => "Shard allocation still in progress", + retry_in => ?ALLOCATE_RETRY_TIMEOUT + } + ), + {Ret, State, ?ALLOCATE_RETRY_TIMEOUT} + end. + +%% + +allocate_shards(State = #{db := DB}) -> + case emqx_ds_replication_layer_meta:allocate_shards(DB) of {ok, Shards} -> logger:notice(#{msg => "Shards allocated", shards => Shards}), ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 3df16dc1c..18053ee7d 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -53,7 +53,7 @@ t_00_smoke_open_drop(_Config) -> lists:foreach( fun(Shard) -> ?assertEqual( - {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + [Site], emqx_ds_replication_layer_meta:replica_set(DB, Shard) ) end, Shards