diff --git a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index 1fc1594fc..e70d2b682 100644 --- a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include("../../emqx/include/emqx.hrl"). +-include("../../emqx_durable_storage/include/emqx_ds.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include("../../emqx/include/asserts.hrl"). @@ -145,7 +146,7 @@ t_06_smoke_add_generation(Config) -> ?assertMatch(ok, emqx_ds:add_generation(DB)), [ {Gen1, #{created_at := Created1, since := Since1, until := Until1}}, - {Gen2, #{created_at := Created2, since := Since2, until := undefined}} + {_Gen2, #{created_at := Created2, since := Since2, until := undefined}} ] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)), %% Check units of the return values (+/- 10s from test begin time): ?give_or_take(BeginTime, 10_000, Created1), @@ -234,8 +235,8 @@ t_09_atomic_store_batch(Config) -> DB = ?FUNCTION_NAME, ?check_trace( begin - application:set_env(emqx_durable_storage, egress_batch_size, 1), - ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + DBOpts = (opts(Config))#{atomic_batches => true}, + ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)), Msgs = [ message(<<"1">>, <<"1">>, 0), message(<<"2">>, <<"2">>, 1), @@ -243,13 +244,8 @@ t_09_atomic_store_batch(Config) -> ], ?assertEqual( ok, - emqx_ds:store_batch(DB, Msgs, #{ - atomic => true, - sync => true - }) - ), - {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}), - ?assertMatch(#{batch := [_, _, _]}, Flush) + emqx_ds:store_batch(DB, Msgs, #{sync => true}) + ) end, [] ), @@ -289,6 +285,124 @@ t_10_non_atomic_store_batch(Config) -> ), ok. +t_11_batch_preconditions(Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + DBOpts = (opts(Config))#{ + atomic_batches => true, + force_monotonic_timestamps => false + }, + ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)), + + %% Conditional delete + TS = 42, + Batch1 = #dsbatch{ + preconditions = [{if_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}], + operations = [{delete, matcher(<<"c1">>, <<"t/a">>, '_', TS)}] + }, + %% Conditional insert + M1 = message(<<"c1">>, <<"t/a">>, <<"M1">>, TS), + Batch2 = #dsbatch{ + preconditions = [{unless_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}], + operations = [M1] + }, + + %% No such message yet, precondition fails: + ?assertEqual( + {error, unrecoverable, {precondition_failed, not_found}}, + emqx_ds:store_batch(DB, Batch1) + ), + %% No such message yet, `unless` precondition holds: + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Batch2) + ), + %% Now there's such message, `unless` precondition now fails: + ?assertEqual( + {error, unrecoverable, {precondition_failed, M1}}, + emqx_ds:store_batch(DB, Batch2) + ), + %% On the other hand, `if` precondition now holds: + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Batch1) + ) + end, + fun(_Trace) -> + %% Wait at least until current epoch ends. + ct:sleep(1000), + %% There's no messages in the DB. + ?assertEqual( + [], + emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)) + ) + end + ). + +t_12_batch_precondition_conflicts(Config) -> + DB = ?FUNCTION_NAME, + NBatches = 50, + NMessages = 10, + ?check_trace( + begin + DBOpts = (opts(Config))#{ + atomic_batches => true, + force_monotonic_timestamps => false + }, + ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)), + + ConflictBatches = [ + #dsbatch{ + %% If the slot is free... + preconditions = [{if_exists, matcher(<<"c1">>, <<"t/slot">>, _Free = <<>>, 0)}], + %% Take it and write NMessages extra messages, so that batches take longer to + %% process and have higher chances to conflict with each other. + operations = + [ + message(<<"c1">>, <<"t/slot">>, integer_to_binary(I), _TS = 0) + | [ + message(<<"c1">>, {"t/owner/~p/~p", [I, J]}, <<>>, I * 100 + J) + || J <- lists:seq(1, NMessages) + ] + ] + } + || I <- lists:seq(1, NBatches) + ], + + %% Run those batches concurrently. + ok = emqx_ds:store_batch(DB, [message(<<"c1">>, <<"t/slot">>, <<>>, 0)]), + Results = emqx_utils:pmap( + fun(B) -> emqx_ds:store_batch(DB, B) end, + ConflictBatches, + infinity + ), + + %% Only one should have succeeded. + ?assertEqual([ok], [Ok || Ok = ok <- Results]), + + %% While other failed with an identical `precondition_failed`. + Failures = lists:usort([PreconditionFailed || {error, _, PreconditionFailed} <- Results]), + ?assertMatch( + [{precondition_failed, #message{topic = <<"t/slot">>, payload = <<_/bytes>>}}], + Failures + ), + + %% Wait at least until current epoch ends. + ct:sleep(1000), + %% Storage should contain single batch's messages. + [{precondition_failed, #message{payload = IOwner}}] = Failures, + WinnerBatch = lists:nth(binary_to_integer(IOwner), ConflictBatches), + BatchMessages = lists:sort(WinnerBatch#dsbatch.operations), + DBMessages = emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)), + ?assertEqual( + BatchMessages, + DBMessages + ) + end, + [] + ). + t_smoke_delete_next(Config) -> DB = ?FUNCTION_NAME, ?check_trace( @@ -534,12 +648,25 @@ message(ClientId, Topic, Payload, PublishedAt) -> message(Topic, Payload, PublishedAt) -> #message{ - topic = Topic, - payload = Payload, + topic = try_format(Topic), + payload = try_format(Payload), timestamp = PublishedAt, id = emqx_guid:gen() }. +matcher(ClientID, Topic, Payload, Timestamp) -> + #message_matcher{ + from = ClientID, + topic = try_format(Topic), + timestamp = Timestamp, + payload = Payload + }. + +try_format({Fmt, Args}) -> + emqx_utils:format(Fmt, Args); +try_format(String) -> + String. + delete(DB, It, Selector, BatchSize) -> delete(DB, It, Selector, BatchSize, 0). @@ -562,9 +689,18 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), + %% TODO: Remove once builtin-local supports preconditions + atomic batches. + BuiltinLocalTCs = + TCs -- + [ + t_09_atomic_store_batch, + t_11_batch_preconditions, + t_12_batch_precondition_conflicts + ], + BuiltinRaftTCs = TCs, [ - {builtin_local, TCs}, - {builtin_raft, TCs} + {builtin_local, BuiltinLocalTCs}, + {builtin_raft, BuiltinRaftTCs} ]. init_per_group(builtin_local, Config) ->