fix(dsrepl): anticipate and handle nodes leaving the cluster
Also make `claim_site/2` safer by refusing to claim a site for a node that is already there.
This commit is contained in:
parent
faf3492e58
commit
7e86e3e61c
|
@ -35,6 +35,7 @@
|
|||
sites/0,
|
||||
node/1,
|
||||
this_site/0,
|
||||
forget_site/1,
|
||||
print_status/0
|
||||
]).
|
||||
|
||||
|
@ -75,7 +76,8 @@
|
|||
update_replica_set_trans/3,
|
||||
update_db_config_trans/2,
|
||||
drop_db_trans/1,
|
||||
claim_site/2,
|
||||
claim_site_trans/2,
|
||||
forget_site_trans/1,
|
||||
n_shards/1
|
||||
]).
|
||||
|
||||
|
@ -153,6 +155,11 @@
|
|||
erlang:make_tuple(record_info(size, ?NODE_TAB), '_')
|
||||
).
|
||||
|
||||
-define(NODE_PAT(NODE),
|
||||
%% Equivalent of `#?NODE_TAB{node = NODE, _ = '_'}`:
|
||||
erlang:make_tuple(record_info(size, ?NODE_TAB), '_', [{#?NODE_TAB.node, NODE}])
|
||||
).
|
||||
|
||||
-define(SHARD_PAT(SHARD),
|
||||
%% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}`
|
||||
erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}])
|
||||
|
@ -256,6 +263,15 @@ node(Site) ->
|
|||
undefined
|
||||
end.
|
||||
|
||||
-spec forget_site(site()) -> ok | {error, _}.
|
||||
forget_site(Site) ->
|
||||
case mnesia:dirty_read(?NODE_TAB, Site) of
|
||||
[] ->
|
||||
{error, nonexistent_site};
|
||||
[Record] ->
|
||||
transaction(fun ?MODULE:forget_site_trans/1, [Record])
|
||||
end.
|
||||
|
||||
%%===============================================================================
|
||||
%% DB API
|
||||
%%===============================================================================
|
||||
|
@ -374,6 +390,7 @@ unsubscribe(Pid) ->
|
|||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
logger:set_process_metadata(#{domain => [ds, meta]}),
|
||||
ok = ekka:monitor(membership),
|
||||
ensure_tables(),
|
||||
ensure_site(),
|
||||
S = #s{},
|
||||
|
@ -395,6 +412,9 @@ handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}},
|
|||
{noreply, S};
|
||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
|
||||
{noreply, handle_unsubscribe(Pid, S)};
|
||||
handle_info({membership, {node, leaving, Node}}, S) ->
|
||||
forget_node(Node),
|
||||
{noreply, S};
|
||||
handle_info(_Info, S) ->
|
||||
{noreply, S}.
|
||||
|
||||
|
@ -464,7 +484,7 @@ assign_db_sites_trans(DB, Sites) ->
|
|||
%% Optimize reallocation. The goals are:
|
||||
%% 1. Minimize the number of membership transitions.
|
||||
%% 2. Ensure that sites are responsible for roughly the same number of shards.
|
||||
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
|
||||
Shards = db_shards_trans(DB),
|
||||
Reallocation = compute_allocation(Shards, Sites, Opts),
|
||||
ok = lists:foreach(
|
||||
fun({Record, ReplicaSet}) ->
|
||||
|
@ -476,7 +496,7 @@ assign_db_sites_trans(DB, Sites) ->
|
|||
|
||||
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}.
|
||||
modify_db_sites_trans(DB, Modifications) ->
|
||||
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
|
||||
Shards = db_shards_trans(DB),
|
||||
Sites0 = list_db_target_sites(Shards),
|
||||
Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
|
||||
case Sites of
|
||||
|
@ -532,15 +552,40 @@ db_config_trans(DB, LockType) ->
|
|||
mnesia:abort({nonexistent_db, DB})
|
||||
end.
|
||||
|
||||
db_shards_trans(DB) ->
|
||||
mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write).
|
||||
|
||||
-spec drop_db_trans(emqx_ds:db()) -> ok.
|
||||
drop_db_trans(DB) ->
|
||||
mnesia:delete({?META_TAB, DB}),
|
||||
[mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
|
||||
ok.
|
||||
|
||||
-spec claim_site(site(), node()) -> ok.
|
||||
claim_site(Site, Node) ->
|
||||
mnesia:write(#?NODE_TAB{site = Site, node = Node}).
|
||||
-spec claim_site_trans(site(), node()) -> ok.
|
||||
claim_site_trans(Site, Node) ->
|
||||
case node_sites(Node) of
|
||||
[] ->
|
||||
mnesia:write(#?NODE_TAB{site = Site, node = Node});
|
||||
[#?NODE_TAB{site = Site}] ->
|
||||
ok;
|
||||
Records ->
|
||||
ExistingSites = [S || #?NODE_TAB{site = S} <- Records],
|
||||
mnesia:abort({conflicting_node_site, ExistingSites})
|
||||
end.
|
||||
|
||||
-spec forget_site_trans(_Record :: tuple()) -> ok.
|
||||
forget_site_trans(Record = #?NODE_TAB{site = Site}) ->
|
||||
DBs = mnesia:all_keys(?META_TAB),
|
||||
SiteDBs = [DB || DB <- DBs, S <- list_db_target_sites(db_shards_trans(DB)), S == Site],
|
||||
case SiteDBs of
|
||||
[] ->
|
||||
mnesia:delete_object(?NODE_TAB, Record, write);
|
||||
[_ | _] ->
|
||||
mnesia:abort({member_of_replica_sets, SiteDBs})
|
||||
end.
|
||||
|
||||
node_sites(Node) ->
|
||||
mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT(Node)).
|
||||
|
||||
%%================================================================================
|
||||
%% Internal functions
|
||||
|
@ -583,9 +628,22 @@ ensure_site() ->
|
|||
io:format(FD, "~p.", [Site]),
|
||||
file:close(FD)
|
||||
end,
|
||||
{atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
|
||||
persistent_term:put(?emqx_ds_builtin_site, Site),
|
||||
ok.
|
||||
case transaction(fun ?MODULE:claim_site_trans/2, [Site, node()]) of
|
||||
ok ->
|
||||
persistent_term:put(?emqx_ds_builtin_site, Site);
|
||||
{error, Reason} ->
|
||||
logger:error("Attempt to claim site with ID=~s failed: ~p", [Site, Reason])
|
||||
end.
|
||||
|
||||
forget_node(Node) ->
|
||||
Sites = node_sites(Node),
|
||||
Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
|
||||
case [Reason || {error, Reason} <- Results] of
|
||||
[] ->
|
||||
ok;
|
||||
Errors ->
|
||||
logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors])
|
||||
end.
|
||||
|
||||
%% @doc Returns sorted list of sites shards are replicated across.
|
||||
-spec list_db_sites([_Shard]) -> [site()].
|
||||
|
|
|
@ -191,7 +191,7 @@ handle_shard_transitions(Shard, [Trans | _Rest], State) ->
|
|||
end.
|
||||
|
||||
transition_handler(Shard, Trans, _State = #{db := DB}) ->
|
||||
ThisSite = emqx_ds_replication_layer_meta:this_site(),
|
||||
ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
|
||||
case Trans of
|
||||
{add, ThisSite} ->
|
||||
{Shard, fun trans_add_local/3};
|
||||
|
|
|
@ -35,7 +35,9 @@
|
|||
|
||||
update_db_sites/3,
|
||||
join/3,
|
||||
leave/3
|
||||
leave/3,
|
||||
|
||||
forget/2
|
||||
]).
|
||||
|
||||
%% behavior callbacks:
|
||||
|
@ -376,6 +378,14 @@ leave(DB, Site, Via) ->
|
|||
}),
|
||||
meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)).
|
||||
|
||||
-spec forget(emqx_ds_replication_layer_meta:site(), rest | cli) ->
|
||||
ok | {error, _}.
|
||||
forget(Site, Via) ->
|
||||
?SLOG(warning, #{
|
||||
msg => "durable_storage_forget_request", site => Site, via => Via
|
||||
}),
|
||||
meta_result_to_binary(emqx_ds_replication_layer_meta:forget_site(Site)).
|
||||
|
||||
%%================================================================================
|
||||
%% Internal functions
|
||||
%%================================================================================
|
||||
|
@ -466,14 +476,20 @@ list_shards(DB) ->
|
|||
|| Shard <- emqx_ds_replication_layer_meta:shards(DB)
|
||||
].
|
||||
|
||||
meta_result_to_binary({ok, Result}) ->
|
||||
{ok, Result};
|
||||
meta_result_to_binary(Ok) when Ok == ok orelse element(1, Ok) == ok ->
|
||||
Ok;
|
||||
meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
|
||||
Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
|
||||
{error, iolist_to_binary(Msg)};
|
||||
meta_result_to_binary({error, {nonexistent_db, DB}}) ->
|
||||
IOList = io_lib:format("Unknown storage: ~p", [DB]),
|
||||
{error, iolist_to_binary(IOList)};
|
||||
meta_result_to_binary({error, nonexistent_site}) ->
|
||||
{error, <<"Unknown site">>};
|
||||
meta_result_to_binary({error, {member_of_replica_sets, DBNames}}) ->
|
||||
DBs = lists:map(fun atom_to_binary/1, DBNames),
|
||||
Msg = ["Site is still a member of replica sets of: " | lists:join(", ", DBs)],
|
||||
{error, iolist_to_binary(Msg)};
|
||||
meta_result_to_binary({error, Err}) ->
|
||||
IOList = io_lib:format("Error: ~p", [Err]),
|
||||
{error, iolist_to_binary(IOList)}.
|
||||
|
|
|
@ -891,13 +891,21 @@ do_ds(["leave", DBStr, Site]) ->
|
|||
{error, _} ->
|
||||
emqx_ctl:print("Unknown durable storage~n")
|
||||
end;
|
||||
do_ds(["forget", Site]) ->
|
||||
case emqx_mgmt_api_ds:forget(list_to_binary(Site), cli) of
|
||||
ok ->
|
||||
emqx_ctl:print("ok~n");
|
||||
{error, Description} ->
|
||||
emqx_ctl:print("Unable to forget site: ~s~n", [Description])
|
||||
end;
|
||||
do_ds(_) ->
|
||||
emqx_ctl:usage([
|
||||
{"ds info", "Show overview of the embedded durable storage state"},
|
||||
{"ds set_replicas <storage> <site1> <site2> ...",
|
||||
"Change the replica set of the durable storage"},
|
||||
{"ds join <storage> <site>", "Add site to the replica set of the storage"},
|
||||
{"ds leave <storage> <site>", "Remove site from the replica set of the storage"}
|
||||
{"ds leave <storage> <site>", "Remove site from the replica set of the storage"},
|
||||
{"ds forget <site>", "Forcefully remove a site from the list of known sites"}
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue