test(dsbackend): add shared tests for atomic batches + preconditions

This commit is contained in:
Andrew Mayorov 2024-07-30 16:57:21 +02:00 committed by Thales Macedo Garitezi
parent 68990f1538
commit 1559aac486
1 changed files with 150 additions and 14 deletions

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("../../emqx/include/emqx.hrl"). -include("../../emqx/include/emqx.hrl").
-include("../../emqx_durable_storage/include/emqx_ds.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include("../../emqx/include/asserts.hrl"). -include("../../emqx/include/asserts.hrl").
@ -145,7 +146,7 @@ t_06_smoke_add_generation(Config) ->
?assertMatch(ok, emqx_ds:add_generation(DB)), ?assertMatch(ok, emqx_ds:add_generation(DB)),
[ [
{Gen1, #{created_at := Created1, since := Since1, until := Until1}}, {Gen1, #{created_at := Created1, since := Since1, until := Until1}},
{Gen2, #{created_at := Created2, since := Since2, until := undefined}} {_Gen2, #{created_at := Created2, since := Since2, until := undefined}}
] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)), ] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)),
%% Check units of the return values (+/- 10s from test begin time): %% Check units of the return values (+/- 10s from test begin time):
?give_or_take(BeginTime, 10_000, Created1), ?give_or_take(BeginTime, 10_000, Created1),
@ -234,8 +235,8 @@ t_09_atomic_store_batch(Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?check_trace( ?check_trace(
begin begin
application:set_env(emqx_durable_storage, egress_batch_size, 1), DBOpts = (opts(Config))#{atomic_batches => true},
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
Msgs = [ Msgs = [
message(<<"1">>, <<"1">>, 0), message(<<"1">>, <<"1">>, 0),
message(<<"2">>, <<"2">>, 1), message(<<"2">>, <<"2">>, 1),
@ -243,13 +244,8 @@ t_09_atomic_store_batch(Config) ->
], ],
?assertEqual( ?assertEqual(
ok, ok,
emqx_ds:store_batch(DB, Msgs, #{ emqx_ds:store_batch(DB, Msgs, #{sync => true})
atomic => true, )
sync => true
})
),
{ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}),
?assertMatch(#{batch := [_, _, _]}, Flush)
end, end,
[] []
), ),
@ -289,6 +285,124 @@ t_10_non_atomic_store_batch(Config) ->
), ),
ok. ok.
t_11_batch_preconditions(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
DBOpts = (opts(Config))#{
atomic_batches => true,
force_monotonic_timestamps => false
},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
%% Conditional delete
TS = 42,
Batch1 = #dsbatch{
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
operations = [{delete, matcher(<<"c1">>, <<"t/a">>, '_', TS)}]
},
%% Conditional insert
M1 = message(<<"c1">>, <<"t/a">>, <<"M1">>, TS),
Batch2 = #dsbatch{
preconditions = [{unless_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
operations = [M1]
},
%% No such message yet, precondition fails:
?assertEqual(
{error, unrecoverable, {precondition_failed, not_found}},
emqx_ds:store_batch(DB, Batch1)
),
%% No such message yet, `unless` precondition holds:
?assertEqual(
ok,
emqx_ds:store_batch(DB, Batch2)
),
%% Now there's such message, `unless` precondition now fails:
?assertEqual(
{error, unrecoverable, {precondition_failed, M1}},
emqx_ds:store_batch(DB, Batch2)
),
%% On the other hand, `if` precondition now holds:
?assertEqual(
ok,
emqx_ds:store_batch(DB, Batch1)
)
end,
fun(_Trace) ->
%% Wait at least until current epoch ends.
ct:sleep(1000),
%% There's no messages in the DB.
?assertEqual(
[],
emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>))
)
end
).
t_12_batch_precondition_conflicts(Config) ->
DB = ?FUNCTION_NAME,
NBatches = 50,
NMessages = 10,
?check_trace(
begin
DBOpts = (opts(Config))#{
atomic_batches => true,
force_monotonic_timestamps => false
},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
ConflictBatches = [
#dsbatch{
%% If the slot is free...
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/slot">>, _Free = <<>>, 0)}],
%% Take it and write NMessages extra messages, so that batches take longer to
%% process and have higher chances to conflict with each other.
operations =
[
message(<<"c1">>, <<"t/slot">>, integer_to_binary(I), _TS = 0)
| [
message(<<"c1">>, {"t/owner/~p/~p", [I, J]}, <<>>, I * 100 + J)
|| J <- lists:seq(1, NMessages)
]
]
}
|| I <- lists:seq(1, NBatches)
],
%% Run those batches concurrently.
ok = emqx_ds:store_batch(DB, [message(<<"c1">>, <<"t/slot">>, <<>>, 0)]),
Results = emqx_utils:pmap(
fun(B) -> emqx_ds:store_batch(DB, B) end,
ConflictBatches,
infinity
),
%% Only one should have succeeded.
?assertEqual([ok], [Ok || Ok = ok <- Results]),
%% While other failed with an identical `precondition_failed`.
Failures = lists:usort([PreconditionFailed || {error, _, PreconditionFailed} <- Results]),
?assertMatch(
[{precondition_failed, #message{topic = <<"t/slot">>, payload = <<_/bytes>>}}],
Failures
),
%% Wait at least until current epoch ends.
ct:sleep(1000),
%% Storage should contain single batch's messages.
[{precondition_failed, #message{payload = IOwner}}] = Failures,
WinnerBatch = lists:nth(binary_to_integer(IOwner), ConflictBatches),
BatchMessages = lists:sort(WinnerBatch#dsbatch.operations),
DBMessages = emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)),
?assertEqual(
BatchMessages,
DBMessages
)
end,
[]
).
t_smoke_delete_next(Config) -> t_smoke_delete_next(Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?check_trace( ?check_trace(
@ -534,12 +648,25 @@ message(ClientId, Topic, Payload, PublishedAt) ->
message(Topic, Payload, PublishedAt) -> message(Topic, Payload, PublishedAt) ->
#message{ #message{
topic = Topic, topic = try_format(Topic),
payload = Payload, payload = try_format(Payload),
timestamp = PublishedAt, timestamp = PublishedAt,
id = emqx_guid:gen() id = emqx_guid:gen()
}. }.
matcher(ClientID, Topic, Payload, Timestamp) ->
#message_matcher{
from = ClientID,
topic = try_format(Topic),
timestamp = Timestamp,
payload = Payload
}.
try_format({Fmt, Args}) ->
emqx_utils:format(Fmt, Args);
try_format(String) ->
String.
delete(DB, It, Selector, BatchSize) -> delete(DB, It, Selector, BatchSize) ->
delete(DB, It, Selector, BatchSize, 0). delete(DB, It, Selector, BatchSize, 0).
@ -562,9 +689,18 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
BuiltinLocalTCs =
TCs --
[ [
{builtin_local, TCs}, t_09_atomic_store_batch,
{builtin_raft, TCs} t_11_batch_preconditions,
t_12_batch_precondition_conflicts
],
BuiltinRaftTCs = TCs,
[
{builtin_local, BuiltinLocalTCs},
{builtin_raft, BuiltinRaftTCs}
]. ].
init_per_group(builtin_local, Config) -> init_per_group(builtin_local, Config) ->