test(dsbackend): add shared tests for atomic batches + preconditions

This commit is contained in:
Andrew Mayorov 2024-07-30 16:57:21 +02:00
parent 7b243ef7ad
commit 810a4d3cf9
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 150 additions and 14 deletions

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all).
-include("../../emqx/include/emqx.hrl").
-include("../../emqx_durable_storage/include/emqx_ds.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include("../../emqx/include/asserts.hrl").
@ -145,7 +146,7 @@ t_06_smoke_add_generation(Config) ->
?assertMatch(ok, emqx_ds:add_generation(DB)),
[
{Gen1, #{created_at := Created1, since := Since1, until := Until1}},
{Gen2, #{created_at := Created2, since := Since2, until := undefined}}
{_Gen2, #{created_at := Created2, since := Since2, until := undefined}}
] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)),
%% Check units of the return values (+/- 10s from test begin time):
?give_or_take(BeginTime, 10_000, Created1),
@ -234,8 +235,8 @@ t_09_atomic_store_batch(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
application:set_env(emqx_durable_storage, egress_batch_size, 1),
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
DBOpts = (opts(Config))#{atomic_batches => true},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
Msgs = [
message(<<"1">>, <<"1">>, 0),
message(<<"2">>, <<"2">>, 1),
@ -243,13 +244,8 @@ t_09_atomic_store_batch(Config) ->
],
?assertEqual(
ok,
emqx_ds:store_batch(DB, Msgs, #{
atomic => true,
sync => true
})
),
{ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}),
?assertMatch(#{batch := [_, _, _]}, Flush)
emqx_ds:store_batch(DB, Msgs, #{sync => true})
)
end,
[]
),
@ -289,6 +285,124 @@ t_10_non_atomic_store_batch(Config) ->
),
ok.
t_11_batch_preconditions(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
DBOpts = (opts(Config))#{
atomic_batches => true,
force_monotonic_timestamps => false
},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
%% Conditional delete
TS = 42,
Batch1 = #dsbatch{
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
operations = [{delete, matcher(<<"c1">>, <<"t/a">>, '_', TS)}]
},
%% Conditional insert
M1 = message(<<"c1">>, <<"t/a">>, <<"M1">>, TS),
Batch2 = #dsbatch{
preconditions = [{unless_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
operations = [M1]
},
%% No such message yet, precondition fails:
?assertEqual(
{error, unrecoverable, {precondition_failed, not_found}},
emqx_ds:store_batch(DB, Batch1)
),
%% No such message yet, `unless` precondition holds:
?assertEqual(
ok,
emqx_ds:store_batch(DB, Batch2)
),
%% Now there's such message, `unless` precondition now fails:
?assertEqual(
{error, unrecoverable, {precondition_failed, M1}},
emqx_ds:store_batch(DB, Batch2)
),
%% On the other hand, `if` precondition now holds:
?assertEqual(
ok,
emqx_ds:store_batch(DB, Batch1)
)
end,
fun(_Trace) ->
%% Wait at least until current epoch ends.
ct:sleep(1000),
%% There's no messages in the DB.
?assertEqual(
[],
emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>))
)
end
).
t_12_batch_precondition_conflicts(Config) ->
DB = ?FUNCTION_NAME,
NBatches = 50,
NMessages = 10,
?check_trace(
begin
DBOpts = (opts(Config))#{
atomic_batches => true,
force_monotonic_timestamps => false
},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
ConflictBatches = [
#dsbatch{
%% If the slot is free...
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/slot">>, _Free = <<>>, 0)}],
%% Take it and write NMessages extra messages, so that batches take longer to
%% process and have higher chances to conflict with each other.
operations =
[
message(<<"c1">>, <<"t/slot">>, integer_to_binary(I), _TS = 0)
| [
message(<<"c1">>, {"t/owner/~p/~p", [I, J]}, <<>>, I * 100 + J)
|| J <- lists:seq(1, NMessages)
]
]
}
|| I <- lists:seq(1, NBatches)
],
%% Run those batches concurrently.
ok = emqx_ds:store_batch(DB, [message(<<"c1">>, <<"t/slot">>, <<>>, 0)]),
Results = emqx_utils:pmap(
fun(B) -> emqx_ds:store_batch(DB, B) end,
ConflictBatches,
infinity
),
%% Only one should have succeeded.
?assertEqual([ok], [Ok || Ok = ok <- Results]),
%% While other failed with an identical `precondition_failed`.
Failures = lists:usort([PreconditionFailed || {error, _, PreconditionFailed} <- Results]),
?assertMatch(
[{precondition_failed, #message{topic = <<"t/slot">>, payload = <<_/bytes>>}}],
Failures
),
%% Wait at least until current epoch ends.
ct:sleep(1000),
%% Storage should contain single batch's messages.
[{precondition_failed, #message{payload = IOwner}}] = Failures,
WinnerBatch = lists:nth(binary_to_integer(IOwner), ConflictBatches),
BatchMessages = lists:sort(WinnerBatch#dsbatch.operations),
DBMessages = emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)),
?assertEqual(
BatchMessages,
DBMessages
)
end,
[]
).
t_smoke_delete_next(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
@ -534,12 +648,25 @@ message(ClientId, Topic, Payload, PublishedAt) ->
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,
payload = Payload,
topic = try_format(Topic),
payload = try_format(Payload),
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
matcher(ClientID, Topic, Payload, Timestamp) ->
#message_matcher{
from = ClientID,
topic = try_format(Topic),
timestamp = Timestamp,
payload = Payload
}.
try_format({Fmt, Args}) ->
emqx_utils:format(Fmt, Args);
try_format(String) ->
String.
delete(DB, It, Selector, BatchSize) ->
delete(DB, It, Selector, BatchSize, 0).
@ -562,9 +689,18 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
BuiltinLocalTCs =
TCs --
[
{builtin_local, TCs},
{builtin_raft, TCs}
t_09_atomic_store_batch,
t_11_batch_preconditions,
t_12_batch_precondition_conflicts
],
BuiltinRaftTCs = TCs,
[
{builtin_local, BuiltinLocalTCs},
{builtin_raft, BuiltinRaftTCs}
].
init_per_group(builtin_local, Config) ->