test(ds): Improve stability of replication test suite

This commit is contained in:
ieQu1 2024-07-04 18:42:53 +02:00
parent dc4ae82798
commit e70c1cfea3
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 10 additions and 8 deletions

View File

@ -163,6 +163,7 @@ start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
% 4. Setup isolated mnesia directory
ok = emqx_common_test_helpers:load(mnesia),
ok = application:set_env(mnesia, dir, filename:join([WorkDir, mnesia])),
ok = application:set_env(emqx_durable_storage, db_data_dir, filename:join([WorkDir, ds])),
% 5. Start ekka separately.
% For some reason it's designed to be started in non-regular way, so we have to track
% applications started in the process manually.

View File

@ -458,7 +458,7 @@ t_rebalance_chaotic_converges(Config) ->
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?defer_assert(
?assertEqual(
@ -726,11 +726,11 @@ t_store_batch_fail('end', Config) ->
Config.
t_store_batch_fail(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
#{timetrap => 15_000},
try
meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
DB = ?FUNCTION_NAME,
ok = meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
%% Success:
Batch1 = [
@ -739,7 +739,7 @@ t_store_batch_fail(Config) ->
],
?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
%% Inject unrecoverable error:
meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
ok = meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
{error, unrecoverable, mock}
end),
Batch2 = [
@ -749,10 +749,10 @@ t_store_batch_fail(Config) ->
?assertMatch(
{error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
),
meck:unload(emqx_ds_storage_layer),
ok = meck:unload(emqx_ds_storage_layer),
%% Inject a recoveralbe error:
meck:new(ra, [passthrough, no_history]),
meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
ok = meck:new(ra, [passthrough, no_history]),
ok = meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}),
{timeout, mock}
end),
@ -768,7 +768,7 @@ t_store_batch_fail(Config) ->
{error, recoverable, {timeout, mock}},
emqx_ds:store_batch(DB, Batch3, #{sync => true})
),
meck:unload(ra),
ok = meck:unload(ra),
?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 0))
after
@ -1027,5 +1027,6 @@ init_per_testcase(TCName, Config0) ->
end_per_testcase(TCName, Config) ->
ok = snabbkaffe:stop(),
Result = emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config),
catch emqx_ds:drop_db(TCName),
emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
Result.