Merge pull request #12881 from keynslug/fix/ds-repl-flaky

fix(dsrepl): make replication-related tests more stable
This commit is contained in:
Andrew Mayorov 2024-04-16 10:17:43 +02:00 committed by GitHub
commit 088c44465b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 48 deletions

View File

@ -232,7 +232,8 @@ shard_replication_spec(DB, Shard, Opts) ->
#{ #{
id => {Shard, replication}, id => {Shard, replication},
start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]},
restart => transient, shutdown => 10_000,
restart => permanent,
type => worker type => worker
}. }.

View File

@ -189,22 +189,9 @@ add_local_server(DB, Shard) ->
-spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
ok | emqx_ds:error(_Reason). ok | emqx_ds:error(_Reason).
drop_local_server(DB, Shard) -> drop_local_server(DB, Shard) ->
ShardServers = shard_servers(DB, Shard), %% NOTE: Timeouts are ignored, it's a best effort attempt.
_ = prep_stop_server(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
case lookup_leader(DB, Shard) of
LocalServer ->
%% NOTE
%% Trigger leadership transfer *and* force to wait until the new leader
%% is elected and updated in the leaderboard. This should help to avoid
%% edge cases where entries appended right before removal are duplicated
%% due to client retries.
%% Timeouts are ignored, it's a best effort attempt.
[Candidate | _] = lists:delete(LocalServer, ShardServers),
_ = ra:transfer_leadership(LocalServer, Candidate),
_ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end);
_Another ->
ok
end,
case remove_server(DB, Shard, LocalServer) of case remove_server(DB, Shard, LocalServer) of
ok -> ok ->
ra:force_delete_server(DB, LocalServer); ra:force_delete_server(DB, LocalServer);
@ -300,7 +287,7 @@ ra_overview(Server) ->
init({DB, Shard, Opts}) -> init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true), _ = process_flag(trap_exit, true),
ok = start_shard(DB, Shard, Opts), ok = start_server(DB, Shard, Opts),
{ok, {DB, Shard}}. {ok, {DB, Shard}}.
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
@ -310,18 +297,18 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, {DB, Shard}) -> terminate(_Reason, {DB, Shard}) ->
%% NOTE: Timeouts are ignored, it's a best effort attempt.
catch prep_stop_server(DB, Shard),
LocalServer = get_local_server(DB, Shard), LocalServer = get_local_server(DB, Shard),
ok = ra:stop_server(DB, LocalServer). ok = ra:stop_server(DB, LocalServer).
%% %%
start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
ClusterName = cluster_name(DB, Shard), ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard), Servers = shard_servers(DB, Shard),
case ra:restart_server(DB, LocalServer) of case ra:restart_server(DB, LocalServer) of
ok ->
Bootstrap = false;
{error, name_not_registered} -> {error, name_not_registered} ->
Bootstrap = true, Bootstrap = true,
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
@ -339,31 +326,27 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
initial_members => Servers, initial_members => Servers,
machine => Machine, machine => Machine,
log_init_args => LogOpts log_init_args => LogOpts
}) });
ok ->
Bootstrap = false;
{error, {already_started, _}} ->
Bootstrap = false
end, end,
case Servers of %% NOTE
[LocalServer | _] -> %% Triggering election is necessary when a new consensus group is being brought up.
%% TODO %% TODO
%% Not super robust, but we probably don't expect nodes to be down %% It's probably a good idea to rebalance leaders across the cluster from time to
%% when we bring up a fresh consensus group. Triggering election %% time. There's `ra:transfer_leadership/2` for that.
%% is not really required otherwise. try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of
%% TODO false ->
%% Ensure that doing that on node restart does not disrupt consensus. ok;
%% Edit: looks like it doesn't, this could actually be quite useful ok ->
%% to "steal" leadership from nodes that have too much leader load. ok
%% TODO catch
%% It doesn't really work that way. There's `ra:transfer_leadership/2` %% TODO
%% for that. %% Tolerating exceptions because server might be occupied with log replay for
try %% a while.
ra:trigger_election(LocalServer, _Timeout = 1_000) exit:{timeout, _} when not Bootstrap ->
catch
%% TODO
%% Tolerating exceptions because server might be occupied with log
%% replay for a while.
exit:{timeout, _} when not Bootstrap ->
ok
end;
_ ->
ok ok
end. end.
@ -379,6 +362,29 @@ server_uid(_DB, Shard) ->
%% %%
prep_stop_server(DB, Shard) ->
prep_stop_server(DB, Shard, 5_000).
prep_stop_server(DB, Shard, Timeout) ->
LocalServer = get_local_server(DB, Shard),
Candidates = lists:delete(LocalServer, shard_servers(DB, Shard)),
case lookup_leader(DB, Shard) of
LocalServer when Candidates =/= [] ->
%% NOTE
%% Trigger leadership transfer *and* force to wait until the new leader
%% is elected and updated in the leaderboard. This should help to avoid
%% edge cases where entries appended right before removal are duplicated
%% due to client retries.
%% TODO: Candidate may be offline.
[Candidate | _] = Candidates,
_ = ra:transfer_leadership(LocalServer, Candidate),
wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end, Timeout);
_Another ->
ok
end.
%%
memoize(Fun, Args) -> memoize(Fun, Args) ->
%% NOTE: Assuming that the function is pure and never returns `undefined`. %% NOTE: Assuming that the function is pure and never returns `undefined`.
case persistent_term:get([Fun | Args], undefined) of case persistent_term:get([Fun | Args], undefined) of
@ -390,8 +396,8 @@ memoize(Fun, Args) ->
Result Result
end. end.
wait_until(Fun) -> wait_until(Fun, Timeout) ->
wait_until(Fun, 5_000, 250). wait_until(Fun, Timeout, 100).
wait_until(Fun, Timeout, Sleep) -> wait_until(Fun, Timeout, Sleep) ->
Deadline = erlang:monotonic_time(millisecond) + Timeout, Deadline = erlang:monotonic_time(millisecond) + Timeout,

View File

@ -435,8 +435,8 @@ t_rebalance_offline_restarts(Config) ->
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
), ),
?retry( ?retry(
500, 1000,
10, 5,
?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes]) ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes])
), ),