diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl index 2348d7c2d..7bf838e6f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl @@ -41,6 +41,7 @@ leave_db_site/2, assign_db_sites/2, replica_set_transitions/2, + claim_transition/3, update_replica_set/3, db_sites/1, target_set/2 @@ -61,6 +62,7 @@ allocate_shards_trans/1, assign_db_sites_trans/2, modify_db_sites_trans/2, + claim_transition_trans/3, update_replica_set_trans/3, update_db_config_trans/2, drop_db_trans/1, @@ -92,6 +94,8 @@ -define(NODE_TAB, emqx_ds_builtin_node_tab). %% Shard metadata: -define(SHARD_TAB, emqx_ds_builtin_shard_tab). +%% Membership transitions: +-define(TRANSITION_TAB, emqx_ds_builtin_trans_tab). -record(?META_TAB, { db :: emqx_ds:db(), @@ -111,6 +115,13 @@ %% Sites that should contain the data when the cluster is in the %% stable state (no nodes are being added or removed from it): target_set :: [site()] | undefined, + % target_set :: [transition() | site()] | undefined, + misc = #{} :: map() +}). + +-record(?TRANSITION_TAB, { + shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + transition :: transition(), misc = #{} :: map() }). @@ -184,6 +195,7 @@ print_status() -> eval_qlc(mnesia:table(?NODE_TAB)) ), Shards = eval_qlc(mnesia:table(?SHARD_TAB)), + Transitions = eval_qlc(mnesia:table(?TRANSITION_TAB)), io:format( "~nSHARDS:~n~s~s~n", [string:pad("Shard", 30), "Replicas"] @@ -201,9 +213,10 @@ print_status() -> ), PendingTransitions = lists:filtermap( fun(Record = #?SHARD_TAB{shard = DBShard}) -> - case compute_transitions(Record) of + ClaimedTs = [T || T = #?TRANSITION_TAB{shard = S} <- Transitions, S == DBShard], + case compute_transitions(Record, ClaimedTs) of [] -> false; - Transitions -> {true, {DBShard, Transitions}} + ShardTransitions -> {true, {DBShard, ShardTransitions}} end end, Shards @@ -214,9 +227,9 @@ print_status() -> [string:pad("Shard", 30), "Transitions"] ), lists:foreach( - fun({DBShard, Transitions}) -> + fun({DBShard, ShardTransitions}) -> ShardStr = format_shard(DBShard), - TransStr = string:join(lists:map(fun format_transition/1, Transitions), " "), + TransStr = string:join(lists:map(fun format_transition/1, ShardTransitions), " "), io:format( "~s~s~n", [string:pad(ShardStr, 30), TransStr] @@ -381,21 +394,25 @@ db_sites(DB) -> replica_set_transitions(DB, Shard) -> case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of [Record] -> - compute_transitions(Record); + PendingTransitions = mnesia:dirty_read(?TRANSITION_TAB, {DB, Shard}), + compute_transitions(Record, PendingTransitions); [] -> undefined end. +%% @doc Claim the intention to start the replica set transition for the given shard. +%% To be called before starting acting on transition, so that information about this +%% will not get lost. Once it finishes, call `update_replica_set/3`. +-spec claim_transition(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> + ok | {error, {conflict, transition()} | {outdated, _Expected :: [transition()]}}. +claim_transition(DB, Shard, Trans) -> + transaction(fun ?MODULE:claim_transition_trans/3, [DB, Shard, Trans]). + %% @doc Update the set of replication sites for a shard. %% To be called after a `transition()` has been conducted successfully. -spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok. update_replica_set(DB, Shard, Trans) -> - case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. + transaction(fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]). %% @doc Get the current set of replication sites for a shard. -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> @@ -556,6 +573,28 @@ modify_db_sites_trans(DB, Modifications) -> assign_db_sites_trans(DB, Sites) end. +claim_transition_trans(DB, Shard, Trans) -> + ShardRecord = + case mnesia:read(?SHARD_TAB, {DB, Shard}, read) of + [Record] -> + Record; + [] -> + mnesia:abort({nonexistent_shard, {DB, Shard}}) + end, + case mnesia:read(?TRANSITION_TAB, {DB, Shard}, write) of + [#?TRANSITION_TAB{transition = Trans}] -> + ok; + [#?TRANSITION_TAB{transition = Conflict}] -> + mnesia:abort({conflict, Conflict}); + [] -> + case compute_transitions(ShardRecord) of + [Trans | _] -> + mnesia:write(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}); + Expected -> + mnesia:abort({outdated, Expected}) + end + end. + update_replica_set_trans(DB, Shard, Trans) -> case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> @@ -570,6 +609,8 @@ update_replica_set_trans(DB, Shard, Trans) -> TS -> TargetSet = TS end, + %% NOTE: Not enforcing existence on that level, makes little sense. + mnesia:delete_object(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}), mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet}); [] -> mnesia:abort({nonexistent_shard, {DB, Shard}}) @@ -663,6 +704,13 @@ ensure_tables() -> {record_name, ?SHARD_TAB}, {attributes, record_info(fields, ?SHARD_TAB)} ]), + ok = mria:create_table(?TRANSITION_TAB, [ + {rlog_shard, ?SHARD}, + {type, bag}, + {storage, disc_copies}, + {record_name, ?TRANSITION_TAB}, + {attributes, record_info(fields, ?TRANSITION_TAB)} + ]), ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]). ensure_site() -> @@ -733,12 +781,17 @@ compute_allocation(Shards, Sites, Opts) -> ), Allocation. -compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) -> - compute_transitions(TargetSet, ReplicaSet). +compute_transitions(Shard, []) -> + compute_transitions(Shard); +compute_transitions(Shard, [#?TRANSITION_TAB{transition = Trans}]) -> + [Trans | lists:delete(Trans, compute_transitions(Shard))]. -compute_transitions(undefined, _ReplicaSet) -> +compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) -> + do_compute_transitions(TargetSet, ReplicaSet). + +do_compute_transitions(undefined, _ReplicaSet) -> []; -compute_transitions(TargetSet, ReplicaSet) -> +do_compute_transitions(TargetSet, ReplicaSet) -> Additions = TargetSet -- ReplicaSet, Deletions = ReplicaSet -- TargetSet, intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]). diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl index 699237227..5984f75d9 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl @@ -32,7 +32,6 @@ -define(TRIGGER_PENDING_TIMEOUT, 60_000). -define(TRANS_RETRY_TIMEOUT, 5_000). --define(CRASH_RETRY_DELAY, 20_000). -define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -ifdef(TEST). @@ -176,7 +175,7 @@ handle_shard_transitions(_Shard, [], State) -> handle_shard_transitions(Shard, [Trans | _Rest], State) -> case transition_handler(Shard, Trans, State) of {Track, Handler} -> - ensure_transition_handler(Track, Shard, Trans, Handler, State); + ensure_transition(Track, Shard, Trans, Handler, State); undefined -> State end. @@ -185,9 +184,9 @@ transition_handler(Shard, Trans, _State = #{db := DB}) -> ThisSite = catch emqx_ds_replication_layer_meta:this_site(), case Trans of {add, ThisSite} -> - {Shard, fun trans_add_local/3}; + {Shard, {fun trans_claim/4, [fun trans_add_local/3]}}; {del, ThisSite} -> - {Shard, fun trans_drop_local/3}; + {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}}; {del, Site} -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), case lists:member(Site, ReplicaSet) of @@ -198,7 +197,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) -> %% unresponsive. Handler = {fun trans_delay/5, [ ?REMOVE_REPLICA_DELAY, - fun trans_rm_unresponsive/3 + {fun trans_claim/4, [fun trans_rm_unresponsive/3]} ]}, %% NOTE %% Putting this transition handler on separate "track" so that it @@ -231,6 +230,20 @@ apply_handler({Fun, Args}, DB, Shard, Trans) -> apply_handler(Fun, DB, Shard, Trans) -> erlang:apply(Fun, [DB, Shard, Trans]). +trans_claim(DB, Shard, Trans, TransHandler) -> + case claim_transition(DB, Shard, Trans) of + ok -> + apply_handler(TransHandler, DB, Shard, Trans); + {error, {outdated, Expected}} -> + ?tp(debug, "Transition became outdated", #{ + db => DB, + shard => Shard, + trans => Trans, + expected => Expected + }), + exit({shutdown, skipped}) + end. + trans_add_local(DB, Shard, {add, Site}) -> ?tp(info, "Adding new local shard replica", #{ site => Site, @@ -331,7 +344,7 @@ trans_delay(DB, Shard, Trans, Delay, NextHandler) -> %% -ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> +ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> case maps:get(Track, Ts, undefined) of undefined -> Pid = start_transition_handler(Shard, Trans, Handler, State), @@ -342,6 +355,12 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := State end. +claim_transition(DB, Shard, Trans) -> + emqx_ds_replication_layer_meta:claim_transition(DB, Shard, Trans). + +commit_transition(Shard, Trans, #{db := DB}) -> + emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans). + start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). @@ -364,32 +383,25 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> State0 end. -handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> +handle_transition_exit(Shard, Trans, normal, State) -> %% NOTE: This will trigger the next transition if any. - ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans), + ok = commit_transition(Shard, Trans, State), State; handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> State; handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) -> + %% NOTE + %% In case of `{add, Site}` transition failure, we have no choice but to retry: + %% no other node can perform the transition and make progress towards the desired + %% state. Assuming `?TRIGGER_PENDING_TIMEOUT` timer will take care of that. ?tp(warning, "Shard membership transition failed", #{ db => DB, shard => Shard, transition => Trans, reason => Reason, - retry_in => ?CRASH_RETRY_DELAY + retry_in => ?TRIGGER_PENDING_TIMEOUT }), - %% NOTE - %% In case of `{add, Site}` transition failure, we have no choice but to retry: - %% no other node can perform the transition and make progress towards the desired - %% state. - case Trans of - {add, _ThisSite} -> - {Track, Handler} = transition_handler(Shard, Trans, State), - RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, - ensure_transition_handler(Track, Shard, Trans, RetryHandler, State); - _Another -> - State - end. + State. %%