From 7e86e3e61ccaf010e0e8cace30986bf18f896af8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 15 May 2024 13:52:56 +0200 Subject: [PATCH 1/4] fix(dsrepl): anticipate and handle nodes leaving the cluster Also make `claim_site/2` safer by refusing to claim a site for a node that is already there. --- .../src/emqx_ds_replication_layer_meta.erl | 76 ++++++++++++++++--- .../emqx_ds_replication_shard_allocator.erl | 2 +- apps/emqx_management/src/emqx_mgmt_api_ds.erl | 22 +++++- apps/emqx_management/src/emqx_mgmt_cli.erl | 10 ++- 4 files changed, 96 insertions(+), 14 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f04de7d90..f5df02098 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -35,6 +35,7 @@ sites/0, node/1, this_site/0, + forget_site/1, print_status/0 ]). @@ -75,7 +76,8 @@ update_replica_set_trans/3, update_db_config_trans/2, drop_db_trans/1, - claim_site/2, + claim_site_trans/2, + forget_site_trans/1, n_shards/1 ]). @@ -153,6 +155,11 @@ erlang:make_tuple(record_info(size, ?NODE_TAB), '_') ). +-define(NODE_PAT(NODE), + %% Equivalent of `#?NODE_TAB{node = NODE, _ = '_'}`: + erlang:make_tuple(record_info(size, ?NODE_TAB), '_', [{#?NODE_TAB.node, NODE}]) +). + -define(SHARD_PAT(SHARD), %% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}` erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}]) @@ -256,6 +263,15 @@ node(Site) -> undefined end. +-spec forget_site(site()) -> ok | {error, _}. +forget_site(Site) -> + case mnesia:dirty_read(?NODE_TAB, Site) of + [] -> + {error, nonexistent_site}; + [Record] -> + transaction(fun ?MODULE:forget_site_trans/1, [Record]) + end. + %%=============================================================================== %% DB API %%=============================================================================== @@ -374,6 +390,7 @@ unsubscribe(Pid) -> init([]) -> process_flag(trap_exit, true), logger:set_process_metadata(#{domain => [ds, meta]}), + ok = ekka:monitor(membership), ensure_tables(), ensure_site(), S = #s{}, @@ -395,6 +412,9 @@ handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, {noreply, S}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) -> {noreply, handle_unsubscribe(Pid, S)}; +handle_info({membership, {node, leaving, Node}}, S) -> + forget_node(Node), + {noreply, S}; handle_info(_Info, S) -> {noreply, S}. @@ -464,7 +484,7 @@ assign_db_sites_trans(DB, Sites) -> %% Optimize reallocation. The goals are: %% 1. Minimize the number of membership transitions. %% 2. Ensure that sites are responsible for roughly the same number of shards. - Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write), + Shards = db_shards_trans(DB), Reallocation = compute_allocation(Shards, Sites, Opts), ok = lists:foreach( fun({Record, ReplicaSet}) -> @@ -476,7 +496,7 @@ assign_db_sites_trans(DB, Sites) -> -spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}. modify_db_sites_trans(DB, Modifications) -> - Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write), + Shards = db_shards_trans(DB), Sites0 = list_db_target_sites(Shards), Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications), case Sites of @@ -532,15 +552,40 @@ db_config_trans(DB, LockType) -> mnesia:abort({nonexistent_db, DB}) end. +db_shards_trans(DB) -> + mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write). + -spec drop_db_trans(emqx_ds:db()) -> ok. drop_db_trans(DB) -> mnesia:delete({?META_TAB, DB}), [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)], ok. --spec claim_site(site(), node()) -> ok. -claim_site(Site, Node) -> - mnesia:write(#?NODE_TAB{site = Site, node = Node}). +-spec claim_site_trans(site(), node()) -> ok. +claim_site_trans(Site, Node) -> + case node_sites(Node) of + [] -> + mnesia:write(#?NODE_TAB{site = Site, node = Node}); + [#?NODE_TAB{site = Site}] -> + ok; + Records -> + ExistingSites = [S || #?NODE_TAB{site = S} <- Records], + mnesia:abort({conflicting_node_site, ExistingSites}) + end. + +-spec forget_site_trans(_Record :: tuple()) -> ok. +forget_site_trans(Record = #?NODE_TAB{site = Site}) -> + DBs = mnesia:all_keys(?META_TAB), + SiteDBs = [DB || DB <- DBs, S <- list_db_target_sites(db_shards_trans(DB)), S == Site], + case SiteDBs of + [] -> + mnesia:delete_object(?NODE_TAB, Record, write); + [_ | _] -> + mnesia:abort({member_of_replica_sets, SiteDBs}) + end. + +node_sites(Node) -> + mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT(Node)). %%================================================================================ %% Internal functions @@ -583,9 +628,22 @@ ensure_site() -> io:format(FD, "~p.", [Site]), file:close(FD) end, - {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]), - persistent_term:put(?emqx_ds_builtin_site, Site), - ok. + case transaction(fun ?MODULE:claim_site_trans/2, [Site, node()]) of + ok -> + persistent_term:put(?emqx_ds_builtin_site, Site); + {error, Reason} -> + logger:error("Attempt to claim site with ID=~s failed: ~p", [Site, Reason]) + end. + +forget_node(Node) -> + Sites = node_sites(Node), + Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]), + case [Reason || {error, Reason} <- Results] of + [] -> + ok; + Errors -> + logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors]) + end. %% @doc Returns sorted list of sites shards are replicated across. -spec list_db_sites([_Shard]) -> [site()]. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index f02335a10..6d8db94e3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -191,7 +191,7 @@ handle_shard_transitions(Shard, [Trans | _Rest], State) -> end. transition_handler(Shard, Trans, _State = #{db := DB}) -> - ThisSite = emqx_ds_replication_layer_meta:this_site(), + ThisSite = catch emqx_ds_replication_layer_meta:this_site(), case Trans of {add, ThisSite} -> {Shard, fun trans_add_local/3}; diff --git a/apps/emqx_management/src/emqx_mgmt_api_ds.erl b/apps/emqx_management/src/emqx_mgmt_api_ds.erl index 5f36bdce7..ad3f3c9e2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_ds.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_ds.erl @@ -35,7 +35,9 @@ update_db_sites/3, join/3, - leave/3 + leave/3, + + forget/2 ]). %% behavior callbacks: @@ -376,6 +378,14 @@ leave(DB, Site, Via) -> }), meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)). +-spec forget(emqx_ds_replication_layer_meta:site(), rest | cli) -> + ok | {error, _}. +forget(Site, Via) -> + ?SLOG(warning, #{ + msg => "durable_storage_forget_request", site => Site, via => Via + }), + meta_result_to_binary(emqx_ds_replication_layer_meta:forget_site(Site)). + %%================================================================================ %% Internal functions %%================================================================================ @@ -466,14 +476,20 @@ list_shards(DB) -> || Shard <- emqx_ds_replication_layer_meta:shards(DB) ]. -meta_result_to_binary({ok, Result}) -> - {ok, Result}; +meta_result_to_binary(Ok) when Ok == ok orelse element(1, Ok) == ok -> + Ok; meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) -> Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)], {error, iolist_to_binary(Msg)}; meta_result_to_binary({error, {nonexistent_db, DB}}) -> IOList = io_lib:format("Unknown storage: ~p", [DB]), {error, iolist_to_binary(IOList)}; +meta_result_to_binary({error, nonexistent_site}) -> + {error, <<"Unknown site">>}; +meta_result_to_binary({error, {member_of_replica_sets, DBNames}}) -> + DBs = lists:map(fun atom_to_binary/1, DBNames), + Msg = ["Site is still a member of replica sets of: " | lists:join(", ", DBs)], + {error, iolist_to_binary(Msg)}; meta_result_to_binary({error, Err}) -> IOList = io_lib:format("Error: ~p", [Err]), {error, iolist_to_binary(IOList)}. diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index fef2ff217..7dc614c6d 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -891,13 +891,21 @@ do_ds(["leave", DBStr, Site]) -> {error, _} -> emqx_ctl:print("Unknown durable storage~n") end; +do_ds(["forget", Site]) -> + case emqx_mgmt_api_ds:forget(list_to_binary(Site), cli) of + ok -> + emqx_ctl:print("ok~n"); + {error, Description} -> + emqx_ctl:print("Unable to forget site: ~s~n", [Description]) + end; do_ds(_) -> emqx_ctl:usage([ {"ds info", "Show overview of the embedded durable storage state"}, {"ds set_replicas ...", "Change the replica set of the durable storage"}, {"ds join ", "Add site to the replica set of the storage"}, - {"ds leave ", "Remove site from the replica set of the storage"} + {"ds leave ", "Remove site from the replica set of the storage"}, + {"ds forget ", "Forcefully remove a site from the list of known sites"} ]). %%-------------------------------------------------------------------- From 26c4a4f597fdfa3e356a5ca3a73f8f6856ff83b9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 15 May 2024 14:22:05 +0200 Subject: [PATCH 2/4] feat(dsrepl): reflect conflicts and inconsistencies in ds status --- .../src/emqx_ds_replication_layer_meta.erl | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f5df02098..71c25c97e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -171,32 +171,55 @@ -spec print_status() -> ok. print_status() -> - io:format("THIS SITE:~n~s~n", [this_site()]), + io:format("THIS SITE:~n"), + try this_site() of + Site -> io:format("~s~n", [Site]) + catch + error:badarg -> + io:format( + "(!) UNCLAIMED~n" + "(!) Likely this node's name is already known as another site in the cluster.~n" + "(!) Please resolve conflicts manually.~n" + ) + end, io:format("~nSITES:~n", []), - Nodes = [node() | nodes()], lists:foreach( fun(#?NODE_TAB{site = Site, node = Node}) -> Status = - case lists:member(Node, Nodes) of - true -> up; - false -> down + case mria:cluster_status(Node) of + running -> " up"; + stopped -> "(x) down"; + false -> "(!) UNIDENTIFIED" end, - io:format("~s ~p ~p~n", [Site, Node, Status]) + io:format("~s ~p ~s~n", [Site, Node, Status]) end, eval_qlc(mnesia:table(?NODE_TAB)) ), io:format( - "~nSHARDS:~nId Replicas~n", [] + "~nSHARDS:~n~s~s~n", + [string:pad("Id", 30), "Replicas"] ), lists:foreach( fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) -> - ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30), - ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40), - io:format("~s ~s~n", [ShardStr, ReplicasStr]) + ShardStr = io_lib:format("~p/~s", [DB, Shard]), + ReplicasStr = string:join([format_replica(R) || R <- RS], " "), + io:format( + "~s~s~n", + [string:pad(ShardStr, 30), ReplicasStr] + ) end, eval_qlc(mnesia:table(?SHARD_TAB)) ). +format_replica(Site) -> + Marker = + case mria:cluster_status(?MODULE:node(Site)) of + running -> " "; + stopped -> "(x)"; + false -> "(!)" + end, + io_lib:format("~s ~s", [Marker, Site]). + -spec this_site() -> site(). this_site() -> persistent_term:get(?emqx_ds_builtin_site). From 0119728d451e4fdb1d113327c1e4714a0ec04efa Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 16 May 2024 18:56:21 +0200 Subject: [PATCH 3/4] feat(dsrepl): also reflect pending transitions in ds status --- .../src/emqx_ds_replication_layer_meta.erl | 49 ++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 71c25c97e..481a47ca1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -195,22 +195,51 @@ print_status() -> end, eval_qlc(mnesia:table(?NODE_TAB)) ), + Shards = eval_qlc(mnesia:table(?SHARD_TAB)), io:format( "~nSHARDS:~n~s~s~n", - [string:pad("Id", 30), "Replicas"] + [string:pad("Shard", 30), "Replicas"] ), lists:foreach( - fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) -> - ShardStr = io_lib:format("~p/~s", [DB, Shard]), + fun(#?SHARD_TAB{shard = DBShard, replica_set = RS}) -> + ShardStr = format_shard(DBShard), ReplicasStr = string:join([format_replica(R) || R <- RS], " "), io:format( "~s~s~n", [string:pad(ShardStr, 30), ReplicasStr] ) end, - eval_qlc(mnesia:table(?SHARD_TAB)) + Shards + ), + PendingTransitions = lists:filtermap( + fun(Record = #?SHARD_TAB{shard = DBShard}) -> + case compute_transitions(Record) of + [] -> false; + Transitions -> {true, {DBShard, Transitions}} + end + end, + Shards + ), + PendingTransitions /= [] andalso + io:format( + "~nREPLICA TRANSITIONS:~n~s~s~n", + [string:pad("Shard", 30), "Transitions"] + ), + lists:foreach( + fun({DBShard, Transitions}) -> + ShardStr = format_shard(DBShard), + TransStr = string:join(lists:map(fun format_transition/1, Transitions), " "), + io:format( + "~s~s~n", + [string:pad(ShardStr, 30), TransStr] + ) + end, + PendingTransitions ). +format_shard({DB, Shard}) -> + io_lib:format("~p/~s", [DB, Shard]). + format_replica(Site) -> Marker = case mria:cluster_status(?MODULE:node(Site)) of @@ -220,6 +249,11 @@ format_replica(Site) -> end, io_lib:format("~s ~s", [Marker, Site]). +format_transition({add, Site}) -> + io_lib:format("+~s", [Site]); +format_transition({del, Site}) -> + io_lib:format("-~s", [Site]). + -spec this_site() -> site(). this_site() -> persistent_term:get(?emqx_ds_builtin_site). @@ -353,8 +387,8 @@ db_sites(DB) -> [transition()] | undefined. replica_set_transitions(DB, Shard) -> case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] -> - compute_transitions(TargetSet, ReplicaSet); + [Record] -> + compute_transitions(Record); [] -> undefined end. @@ -706,6 +740,9 @@ compute_allocation(Shards, Sites, Opts) -> ), Allocation. +compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) -> + compute_transitions(TargetSet, ReplicaSet). + compute_transitions(undefined, _ReplicaSet) -> []; compute_transitions(TargetSet, ReplicaSet) -> From 5157e614187e59b4bcffc6695098ddc254449c38 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 16 May 2024 18:56:54 +0200 Subject: [PATCH 4/4] fix(dsrepl): verify if shards already allocated first --- .../src/emqx_ds_replication_layer_meta.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 481a47ca1..fa53ecced 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -497,13 +497,6 @@ open_db_trans(DB, CreateOpts) -> -spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. allocate_shards_trans(DB) -> Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB), - Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read), - case length(Nodes) of - N when N >= NSites -> - ok; - _ -> - mnesia:abort({insufficient_sites_online, NSites, Nodes}) - end, case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of [] -> ok; @@ -511,6 +504,13 @@ allocate_shards_trans(DB) -> ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], mnesia:abort({shards_already_allocated, ShardsAllocated}) end, + Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read), + case length(Nodes) of + N when N >= NSites -> + ok; + _ -> + mnesia:abort({insufficient_sites_online, NSites, Nodes}) + end, Shards = gen_shards(NShards), Sites = [S || #?NODE_TAB{site = S} <- Nodes], Allocation = compute_allocation(Shards, Sites, Opts),