From df6c5b35fead5d9f778efc3b510aa6ac423fd019 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 3 Apr 2024 15:02:32 +0200 Subject: [PATCH] feat(dsrepl): add more primitive operations to modify DB sites --- .../src/emqx_ds_replication_layer_meta.erl | 138 +++++++++++++----- 1 file changed, 103 insertions(+), 35 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 5e0a2798b..b81b21d61 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -46,6 +46,8 @@ %% Site / shard allocation: -export([ + join_db_site/2, + leave_db_site/2, assign_db_sites/2, replica_set_transitions/2, update_replica_set/3, @@ -62,6 +64,7 @@ open_db_trans/2, allocate_shards_trans/1, assign_db_sites_trans/2, + modify_db_sites_trans/2, update_replica_set_trans/3, update_db_config_trans/2, drop_db_trans/1, @@ -227,7 +230,7 @@ open_db(DB, DefaultOpts) -> transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]). -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> - emqx_ds_replication_layer:builtin_db_opts() | {error, _}. + emqx_ds_replication_layer:builtin_db_opts() | {error, nonexistent_db}. update_db_config(DB, DefaultOpts) -> transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]). @@ -235,26 +238,36 @@ update_db_config(DB, DefaultOpts) -> drop_db(DB) -> transaction(fun ?MODULE:drop_db_trans/1, [DB]). --spec assign_db_sites(emqx_ds:db(), [site()]) -> ok. -assign_db_sites(DB, Sites) -> - case mria:transaction(?SHARD, fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. +%%=============================================================================== +%% Site / shard allocation API +%%=============================================================================== +%% @doc Join a site to the set of sites the DB is replicated across. +-spec join_db_site(emqx_ds:db(), site()) -> + ok | {error, nonexistent_db | nonexistent_sites}. +join_db_site(DB, Site) -> + transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{add, Site}]]). + +%% @doc Make a site leave the set of sites the DB is replicated across. +-spec leave_db_site(emqx_ds:db(), site()) -> + ok | {error, nonexistent_db | nonexistent_sites}. +leave_db_site(DB, Site) -> + transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{del, Site}]]). + +%% @doc Assign a set of sites to the DB for replication. +-spec assign_db_sites(emqx_ds:db(), [site()]) -> + ok | {error, nonexistent_db | nonexistent_sites}. +assign_db_sites(DB, Sites) -> + transaction(fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]). + +%% @doc List the sites the DB is replicated across. -spec db_sites(emqx_ds:db()) -> [site()]. db_sites(DB) -> Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})), - lists:foldl( - fun(#?SHARD_TAB{replica_set = RS}, Acc) -> - ordsets:union(ordsets:from_list(RS), Acc) - end, - ordsets:new(), - Recs - ). + list_db_sites(Recs). +%% @doc List the sequence of transitions that should be conducted in order to +%% bring the set of replicas for a DB shard in line with the target set. -spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [transition()] | undefined. replica_set_transitions(DB, Shard) -> @@ -265,6 +278,8 @@ replica_set_transitions(DB, Shard) -> undefined end. +%% @doc Update the set of replication sites for a shard. +%% To be called after a `transition()` has been conducted successfully. -spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok. update_replica_set(DB, Shard, Trans) -> case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of @@ -274,6 +289,7 @@ update_replica_set(DB, Shard, Trans) -> {error, Reason} end. +%% @doc Get the current set of replication sites for a shard. -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [site()] | undefined. replica_set(DB, Shard) -> @@ -284,6 +300,9 @@ replica_set(DB, Shard) -> undefined end. +%% @doc Get the target set of replication sites for a DB shard. +%% Target set is updated every time the set of replication sites for the DB changes. +%% See `join_db_site/2`, `leave_db_site/2`, `assign_db_sites/2`. -spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [site()] | undefined. target_set(DB, Shard) -> @@ -390,6 +409,18 @@ assign_db_sites_trans(DB, Sites) -> Reallocation ). +-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok. +modify_db_sites_trans(DB, Modifications) -> + Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write), + Sites0 = list_db_target_sites(Shards), + Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications), + case Sites of + Sites0 -> + ok; + _Chagned -> + assign_db_sites_trans(DB, Sites) + end. + update_replica_set_trans(DB, Shard, Trans) -> case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> @@ -406,27 +437,26 @@ update_replica_set_trans(DB, Shard, Trans) -> end. -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> - ok | {error, _}. + emqx_ds_replication_layer:builtin_db_opts(). update_db_config_trans(DB, UpdateOpts) -> - case mnesia:wread({?META_TAB, DB}) of - [#?META_TAB{db_props = Opts}] -> - %% Since this is an update and not a reopen, - %% we should keep the shard number and replication factor - %% and not create a new shard server - ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts), - EffectiveOpts = maps:merge(Opts, ChangeableOpts), - mnesia:write(#?META_TAB{ - db = DB, - db_props = EffectiveOpts - }), - EffectiveOpts; - [] -> - {error, no_database} - end. + Opts = db_config_trans(DB, write), + %% Since this is an update and not a reopen, + %% we should keep the shard number and replication factor + %% and not create a new shard server + ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts), + EffectiveOpts = maps:merge(Opts, ChangeableOpts), + ok = mnesia:write(#?META_TAB{ + db = DB, + db_props = EffectiveOpts + }), + EffectiveOpts. -spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). db_config_trans(DB) -> - case mnesia:read(?META_TAB, DB, read) of + db_config_trans(DB, read). + +db_config_trans(DB, LockType) -> + case mnesia:read(?META_TAB, DB, LockType) of [#?META_TAB{db_props = Config}] -> Config; [] -> @@ -488,6 +518,27 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. +%% @doc Returns sorted list of sites shards are replicated across. +-spec list_db_sites([_Shard]) -> [site()]. +list_db_sites(Shards) -> + flatmap_sorted_set(fun get_shard_sites/1, Shards). + +-spec list_db_target_sites([_Shard]) -> [site()]. +list_db_target_sites(Shards) -> + flatmap_sorted_set(fun get_shard_target_sites/1, Shards). + +-spec get_shard_sites(_Shard) -> [site()]. +get_shard_sites(#?SHARD_TAB{replica_set = ReplicaSet}) -> + ReplicaSet. + +-spec get_shard_target_sites(_Shard) -> [site()]. +get_shard_target_sites(#?SHARD_TAB{target_set = Sites}) when is_list(Sites) -> + Sites; +get_shard_target_sites(#?SHARD_TAB{target_set = undefined} = Shard) -> + get_shard_sites(Shard). + +-spec compute_allocation([Shard], [Site], emqx_ds_replication_layer:builtin_db_opts()) -> + [{Shard, [Site, ...]}]. compute_allocation(Shards, Sites, Opts) -> NSites = length(Sites), ReplicationFactor = maps:get(replication_factor, Opts), @@ -512,6 +563,8 @@ compute_transitions(TargetSet, ReplicaSet) -> Deletions = ReplicaSet -- TargetSet, intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]). +%% @doc Apply a transition to a list of sites, preserving sort order. +-spec apply_transition(transition(), [site()]) -> [site()]. apply_transition({add, S}, Sites) -> lists:usort([S | Sites]); apply_transition({del, S}, Sites) -> @@ -530,8 +583,12 @@ eval_qlc(Q) -> end. transaction(Fun, Args) -> - {atomic, Result} = mria:transaction(?SHARD, Fun, Args), - Result. + case mria:transaction(?SHARD, Fun, Args) of + {atomic, Result} -> + Result; + {aborted, Reason} -> + {error, Reason} + end. %% @doc Intersperse elements of two lists. %% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5]. @@ -542,3 +599,14 @@ intersperse([], L2) -> L2; intersperse([H1 | T1], L2) -> [H1 | intersperse(L2, T1)]. + +%% @doc Map list into a list of sets and return union, as a sorted list. +-spec flatmap_sorted_set(fun((X) -> [Y]), [X]) -> [Y]. +flatmap_sorted_set(Fun, L) -> + ordsets:to_list( + lists:foldl( + fun(X, Acc) -> ordsets:union(ordsets:from_list(Fun(X)), Acc) end, + ordsets:new(), + L + ) + ).