diff --git a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index 1fc1594fc..95ebfe99c 100644 --- a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include("../../emqx/include/emqx.hrl"). +-include("../../emqx_durable_storage/include/emqx_ds.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include("../../emqx/include/asserts.hrl"). @@ -145,7 +146,7 @@ t_06_smoke_add_generation(Config) -> ?assertMatch(ok, emqx_ds:add_generation(DB)), [ {Gen1, #{created_at := Created1, since := Since1, until := Until1}}, - {Gen2, #{created_at := Created2, since := Since2, until := undefined}} + {_Gen2, #{created_at := Created2, since := Since2, until := undefined}} ] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)), %% Check units of the return values (+/- 10s from test begin time): ?give_or_take(BeginTime, 10_000, Created1), @@ -234,8 +235,8 @@ t_09_atomic_store_batch(Config) -> DB = ?FUNCTION_NAME, ?check_trace( begin - application:set_env(emqx_durable_storage, egress_batch_size, 1), - ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), + DBOpts = (opts(Config))#{atomic_batches => true}, + ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)), Msgs = [ message(<<"1">>, <<"1">>, 0), message(<<"2">>, <<"2">>, 1), @@ -243,13 +244,8 @@ t_09_atomic_store_batch(Config) -> ], ?assertEqual( ok, - emqx_ds:store_batch(DB, Msgs, #{ - atomic => true, - sync => true - }) - ), - {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}), - ?assertMatch(#{batch := [_, _, _]}, Flush) + emqx_ds:store_batch(DB, Msgs, #{sync => true}) + ) end, [] ), @@ -289,6 +285,124 @@ t_10_non_atomic_store_batch(Config) -> ), ok. +t_11_batch_preconditions(Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + DBOpts = (opts(Config))#{ + atomic_batches => true, + force_monotonic_timestamps => false + }, + ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)), + + %% Conditional delete + TS = 42, + Batch1 = #dsbatch{ + preconditions = [{if_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}], + operations = [{delete, matcher(<<"c1">>, <<"t/a">>, '_', TS)}] + }, + %% Conditional insert + M1 = message(<<"c1">>, <<"t/a">>, <<"M1">>, TS), + Batch2 = #dsbatch{ + preconditions = [{unless_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}], + operations = [M1] + }, + + %% No such message yet, precondition fails: + ?assertEqual( + {error, unrecoverable, {precondition_failed, not_found}}, + emqx_ds:store_batch(DB, Batch1) + ), + %% No such message yet, `unless` precondition holds: + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Batch2) + ), + %% Now there's such message, `unless` precondition now fails: + ?assertEqual( + {error, unrecoverable, {precondition_failed, M1}}, + emqx_ds:store_batch(DB, Batch2) + ), + %% On the other hand, `if` precondition now holds: + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Batch1) + ), + + %% Wait at least until current epoch ends. + ct:sleep(1000), + %% There's no messages in the DB. + ?assertEqual( + [], + emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)) + ) + end, + [] + ). + +t_12_batch_precondition_conflicts(Config) -> + DB = ?FUNCTION_NAME, + NBatches = 50, + NMessages = 10, + ?check_trace( + begin + DBOpts = (opts(Config))#{ + atomic_batches => true, + force_monotonic_timestamps => false + }, + ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)), + + ConflictBatches = [ + #dsbatch{ + %% If the slot is free... + preconditions = [{if_exists, matcher(<<"c1">>, <<"t/slot">>, _Free = <<>>, 0)}], + %% Take it and write NMessages extra messages, so that batches take longer to + %% process and have higher chances to conflict with each other. + operations = + [ + message(<<"c1">>, <<"t/slot">>, integer_to_binary(I), _TS = 0) + | [ + message(<<"c1">>, {"t/owner/~p/~p", [I, J]}, <<>>, I * 100 + J) + || J <- lists:seq(1, NMessages) + ] + ] + } + || I <- lists:seq(1, NBatches) + ], + + %% Run those batches concurrently. + ok = emqx_ds:store_batch(DB, [message(<<"c1">>, <<"t/slot">>, <<>>, 0)]), + Results = emqx_utils:pmap( + fun(B) -> emqx_ds:store_batch(DB, B) end, + ConflictBatches, + infinity + ), + + %% Only one should have succeeded. + ?assertEqual([ok], [Ok || Ok = ok <- Results]), + + %% While other failed with an identical `precondition_failed`. + Failures = lists:usort([PreconditionFailed || {error, _, PreconditionFailed} <- Results]), + ?assertMatch( + [{precondition_failed, #message{topic = <<"t/slot">>, payload = <<_/bytes>>}}], + Failures + ), + + %% Wait at least until current epoch ends. + ct:sleep(1000), + %% Storage should contain single batch's messages. + [{precondition_failed, #message{payload = IOwner}}] = Failures, + WinnerBatch = lists:nth(binary_to_integer(IOwner), ConflictBatches), + BatchMessages = lists:sort(WinnerBatch#dsbatch.operations), + DBMessages = emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)), + ?assertEqual( + BatchMessages, + DBMessages + ) + end, + [] + ). + t_smoke_delete_next(Config) -> DB = ?FUNCTION_NAME, ?check_trace( @@ -534,12 +648,25 @@ message(ClientId, Topic, Payload, PublishedAt) -> message(Topic, Payload, PublishedAt) -> #message{ - topic = Topic, - payload = Payload, + topic = try_format(Topic), + payload = try_format(Payload), timestamp = PublishedAt, id = emqx_guid:gen() }. +matcher(ClientID, Topic, Payload, Timestamp) -> + #message_matcher{ + from = ClientID, + topic = try_format(Topic), + timestamp = Timestamp, + payload = Payload + }. + +try_format({Fmt, Args}) -> + emqx_utils:format(Fmt, Args); +try_format(String) -> + String. + delete(DB, It, Selector, BatchSize) -> delete(DB, It, Selector, BatchSize, 0). @@ -562,9 +689,18 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), + %% TODO: Remove once builtin-local supports preconditions + atomic batches. + BuiltinLocalTCs = + TCs -- + [ + t_09_atomic_store_batch, + t_11_batch_preconditions, + t_12_batch_precondition_conflicts + ], + BuiltinRaftTCs = TCs, [ - {builtin_local, TCs}, - {builtin_raft, TCs} + {builtin_local, BuiltinLocalTCs}, + {builtin_raft, BuiltinRaftTCs} ]. init_per_group(builtin_local, Config) -> diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index a7cc795b6..f002c26de 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -43,7 +43,7 @@ %% `emqx_ds_buffer': init_buffer/3, flush_buffer/4, - shard_of_message/4 + shard_of_operation/4 ]). %% Internal exports: @@ -55,6 +55,7 @@ -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). %%================================================================================ %% Type declarations @@ -230,9 +231,9 @@ flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) -> make_batch(_ForceMonotonic = true, Latest, Messages) -> assign_monotonic_timestamps(Latest, Messages, []); make_batch(false, Latest, Messages) -> - assign_message_timestamps(Latest, Messages, []). + assign_operation_timestamps(Latest, Messages, []). -assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> +assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) -> case emqx_message:timestamp(Message, microsecond) of TimestampUs when TimestampUs > Latest0 -> Latest = TimestampUs; @@ -241,28 +242,43 @@ assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> end, Acc = [assign_timestamp(Latest, Message) | Acc0], assign_monotonic_timestamps(Latest, Rest, Acc); +assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) -> + Acc = [Operation | Acc0], + assign_monotonic_timestamps(Latest, Rest, Acc); assign_monotonic_timestamps(Latest, [], Acc) -> {Latest, lists:reverse(Acc)}. -assign_message_timestamps(Latest0, [Message | Rest], Acc0) -> - TimestampUs = emqx_message:timestamp(Message, microsecond), +assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) -> + TimestampUs = emqx_message:timestamp(Message), Latest = max(TimestampUs, Latest0), Acc = [assign_timestamp(TimestampUs, Message) | Acc0], - assign_message_timestamps(Latest, Rest, Acc); -assign_message_timestamps(Latest, [], Acc) -> + assign_operation_timestamps(Latest, Rest, Acc); +assign_operation_timestamps(Latest, [Operation | Rest], Acc0) -> + Acc = [Operation | Acc0], + assign_operation_timestamps(Latest, Rest, Acc); +assign_operation_timestamps(Latest, [], Acc) -> {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard(). -shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> +-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard(). +shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key); +shard_of_operation(DB, {_, #message_matcher{from = From, topic = Topic}}, SerializeBy, _Options) -> + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key). + +shard_of_key(DB, Key) -> N = emqx_ds_builtin_local_meta:n_shards(DB), - Hash = - case SerializeBy of - clientid -> erlang:phash2(From, N); - topic -> erlang:phash2(Topic, N) - end, + Hash = erlang:phash2(Key, N), integer_to_binary(Hash). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -288,7 +304,7 @@ get_streams(DB, TopicFilter, StartTime) -> -spec make_iterator( emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time() ) -> - emqx_ds:make_iterator_result(emqx_ds:ds_specific_iterator()). + emqx_ds:make_iterator_result(iterator()). make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) -> ShardId = {DB, Shard}, case @@ -302,7 +318,7 @@ make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) -> Error end. --spec update_iterator(emqx_ds:db(), emqx_ds:ds_specific_iterator(), emqx_ds:message_key()) -> +-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) -> emqx_ds:make_iterator_result(iterator()). update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) -> case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of @@ -380,7 +396,7 @@ do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) -> end. -spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> - emqx_ds:delete_next_result(emqx_ds:delete_iterator()). + emqx_ds:delete_next_result(delete_iterator()). do_delete_next( DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N ) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 669abdbf1..11c809dbd 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -29,7 +29,7 @@ current_timestamp/2, - shard_of_message/4, + shard_of_operation/4, flush_buffer/4, init_buffer/3 ]). @@ -83,6 +83,7 @@ ra_state/0 ]). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -include("emqx_ds_replication_layer.hrl"). @@ -100,7 +101,10 @@ n_shards => pos_integer(), n_sites => pos_integer(), replication_factor => pos_integer(), - replication_options => _TODO :: #{} + replication_options => _TODO :: #{}, + %% Inherited from `emqx_ds:generic_db_opts()`. + force_monotonic_timestamps => boolean(), + atomic_batches => boolean() }. %% This enapsulates the stream entity from the replication level. @@ -135,11 +139,12 @@ ?enc := emqx_ds_storage_layer:delete_iterator() }. -%% TODO: this type is obsolete and is kept only for compatibility with -%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6) +%% Write batch. +%% Instances of this type currently form the majority of the Raft log. -type batch() :: #{ ?tag := ?BATCH, - ?batch_messages := [emqx_types:message()] + ?batch_operations := [emqx_ds:operation()], + ?batch_preconditions => [emqx_ds:precondition()] }. -type generation_rank() :: {shard_id(), term()}. @@ -240,16 +245,45 @@ drop_db(DB) -> _ = emqx_ds_proto_v4:drop_db(list_nodes(), DB), emqx_ds_replication_layer_meta:drop_db(DB). --spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Messages, Opts) -> +store_batch(DB, Batch = #dsbatch{preconditions = [_ | _]}, Opts) -> + %% NOTE: Atomic batch is implied, will not check with DB config. + store_batch_atomic(DB, Batch, Opts); +store_batch(DB, Batch, Opts) -> + case emqx_ds_replication_layer_meta:db_config(DB) of + #{atomic_batches := true} -> + store_batch_atomic(DB, Batch, Opts); + #{} -> + store_batch_buffered(DB, Batch, Opts) + end. + +store_batch_buffered(DB, #dsbatch{operations = Operations}, Opts) -> + store_batch_buffered(DB, Operations, Opts); +store_batch_buffered(DB, Batch, Opts) -> try - emqx_ds_buffer:store_batch(DB, Messages, Opts) + emqx_ds_buffer:store_batch(DB, Batch, Opts) catch error:{Reason, _Call} when Reason == timeout; Reason == noproc -> {error, recoverable, Reason} end. +store_batch_atomic(DB, Batch, _Opts) -> + Shards = shards_of_batch(DB, Batch), + case Shards of + [Shard] -> + case ra_store_batch(DB, Shard, Batch) of + {timeout, ServerId} -> + {error, recoverable, {timeout, ServerId}}; + Result -> + Result + end; + [] -> + ok; + [_ | _] -> + {error, unrecoverable, atomic_batch_spans_multiple_shards} + end. + -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. get_streams(DB, TopicFilter, StartTime) -> @@ -392,17 +426,49 @@ flush_buffer(DB, Shard, Messages, State) -> end, {State, Result}. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> +-spec shard_of_operation( + emqx_ds:db(), + emqx_ds:operation() | emqx_ds:precondition(), + clientid | topic, + _Options +) -> emqx_ds_replication_layer:shard_id(). -shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> +shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key); +shard_of_operation(DB, {_OpName, Matcher}, SerializeBy, _Options) -> + #message_matcher{from = From, topic = Topic} = Matcher, + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key). + +shard_of_key(DB, Key) -> N = emqx_ds_replication_shard_allocator:n_shards(DB), - Hash = - case SerializeBy of - clientid -> erlang:phash2(From, N); - topic -> erlang:phash2(Topic, N) - end, + Hash = erlang:phash2(Key, N), integer_to_binary(Hash). +shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) -> + shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, [])); +shards_of_batch(DB, Operations) -> + shards_of_batch(DB, Operations, []). + +shards_of_batch(DB, [Operation | Rest], Acc) -> + case shard_of_operation(DB, Operation, clientid, #{}) of + Shard when Shard =:= hd(Acc) -> + shards_of_batch(DB, Rest, Acc); + Shard when Acc =:= [] -> + shards_of_batch(DB, Rest, [Shard]); + ShardAnother -> + [ShardAnother | Acc] + end; +shards_of_batch(_DB, [], Acc) -> + Acc. + %%================================================================================ %% Internal exports (RPC targets) %%================================================================================ @@ -612,7 +678,7 @@ list_nodes() -> -define(SHARD_RPC(DB, SHARD, NODE, BODY), case emqx_ds_replication_layer_shard:servers( - DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred) + DB, SHARD, application:get_env(emqx_ds_builtin_raft, reads, leader_preferred) ) of [{_, NODE} | _] -> @@ -624,13 +690,22 @@ list_nodes() -> end ). --spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> - ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}. -ra_store_batch(DB, Shard, Messages) -> - Command = #{ - ?tag => ?BATCH, - ?batch_messages => Messages - }, +-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:batch()) -> + ok | {timeout, _} | emqx_ds:error(_). +ra_store_batch(DB, Shard, Batch) -> + case Batch of + #dsbatch{operations = Operations, preconditions = Preconditions} -> + Command = #{ + ?tag => ?BATCH, + ?batch_operations => Operations, + ?batch_preconditions => Preconditions + }; + Operations -> + Command = #{ + ?tag => ?BATCH, + ?batch_operations => Operations + } + end, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> @@ -767,6 +842,7 @@ ra_drop_shard(DB, Shard) -> -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). -define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release'). +-define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic'). -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> @@ -776,18 +852,30 @@ init(#{db := DB, shard := Shard}) -> {ra_state(), _Reply, _Effects}. apply( RaftMeta, - #{ + Command = #{ ?tag := ?BATCH, - ?batch_messages := MessagesIn + ?batch_operations := OperationsIn }, #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> - ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), - {Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn), - Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), - State = State0#{latest := Latest}, - set_ts(DBShard, Latest), - Effects = try_release_log(Stats, RaftMeta, State), + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}), + Preconditions = maps:get(?batch_preconditions, Command, []), + {Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn), + %% FIXME + case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of + ok -> + Result = emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}), + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), + Effects = try_release_log(Stats, RaftMeta, State); + PreconditionFailed = {precondition_failed, _} -> + Result = {error, unrecoverable, PreconditionFailed}, + State = State0, + Effects = []; + Result -> + State = State0, + Effects = [] + end, Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), {State, Result, Effects}; apply( @@ -862,6 +950,21 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. +assign_timestamps(DB, Latest, Messages) -> + ForceMonotonic = force_monotonic_timestamps(DB), + assign_timestamps(ForceMonotonic, Latest, Messages, [], 0, 0). + +force_monotonic_timestamps(DB) -> + case erlang:get(?pd_ra_force_monotonic) of + undefined -> + DBConfig = emqx_ds_replication_layer_meta:db_config(DB), + Flag = maps:get(force_monotonic_timestamps, DBConfig), + erlang:put(?pd_ra_force_monotonic, Flag); + Flag -> + ok + end, + Flag. + try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) -> %% NOTE %% Because cursor release means storage flush (see @@ -924,10 +1027,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}), handle_custom_event(DBShard, Timestamp, tick). -assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, [], 0, 0). - -assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> +assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) -> case emqx_message:timestamp(Message0, microsecond) of TimestampUs when TimestampUs > Latest0 -> Latest = TimestampUs, @@ -936,8 +1036,17 @@ assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> Latest = Latest0 + 1, Message = assign_timestamp(Latest, Message0) end, - assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0)); -assign_timestamps(Latest, [], Acc, N, Size) -> + MSize = approx_message_size(Message0), + assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize); +assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) -> + TimestampUs = emqx_message:timestamp(Message0), + Latest = max(Latest0, TimestampUs), + Message = assign_timestamp(TimestampUs, Message0), + MSize = approx_message_size(Message0), + assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize); +assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) -> + assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz); +assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) -> {{N, Size}, Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl index f33090c46..c87e9bcba 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl @@ -19,7 +19,8 @@ -define(enc, 3). %% ?BATCH --define(batch_messages, 2). +-define(batch_operations, 2). +-define(batch_preconditions, 4). -define(timestamp, 3). %% add_generation / update_config diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 69de92325..0e4336a26 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -56,6 +56,7 @@ topic/0, batch/0, operation/0, + deletion/0, precondition/0, stream/0, delete_stream/0, @@ -110,7 +111,9 @@ message() %% Delete a message. %% Does nothing if the message does not exist. - | {delete, message_matcher('_')}. + | deletion(). + +-type deletion() :: {delete, message_matcher('_')}. %% Precondition. %% Fails whole batch if the storage already has the matching message (`if_exists'), diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index dec9eea80..cf83c8f2e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -21,7 +21,7 @@ -behaviour(gen_server). %% API: --export([start_link/4, store_batch/3, shard_of_message/3]). +-export([start_link/4, store_batch/3, shard_of_operation/3]). -export([ls/0]). %% behavior callbacks: @@ -46,19 +46,18 @@ -define(cbm(DB), {?MODULE, DB}). -record(enqueue_req, { - messages :: [emqx_types:message()], + operations :: [emqx_ds:operation()], sync :: boolean(), - atomic :: boolean(), - n_messages :: non_neg_integer(), + n_operations :: non_neg_integer(), payload_bytes :: non_neg_integer() }). -callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}. --callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) -> +-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_ds:operation()], State) -> {State, ok | {error, recoverable | unrecoverable, _}}. --callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) -> +-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) -> _Shard. %%================================================================================ @@ -77,39 +76,33 @@ start_link(CallbackModule, CallbackOptions, DB, Shard) -> ?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], [] ). --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), [emqx_ds:operation()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Messages, Opts) -> +store_batch(DB, Operations, Opts) -> Sync = maps:get(sync, Opts, true), - Atomic = maps:get(atomic, Opts, false), %% Usually we expect all messages in the batch to go into the %% single shard, so this function is optimized for the happy case. - case shards_of_batch(DB, Messages) of - [{Shard, {NMsgs, NBytes}}] -> + case shards_of_batch(DB, Operations) of + [{Shard, {NOps, NBytes}}] -> %% Happy case: enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ - messages = Messages, + operations = Operations, sync = Sync, - atomic = Atomic, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = NBytes } ); - [_, _ | _] when Atomic -> - %% It's impossible to commit a batch to multiple shards - %% atomically - {error, unrecoverable, atomic_commit_to_multiple_shards}; _Shards -> %% Use a slower implementation for the unlikely case: - repackage_messages(DB, Messages, Sync) + repackage_messages(DB, Operations, Sync) end. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard. -shard_of_message(DB, Message, ShardBy) -> +-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic) -> _Shard. +shard_of_operation(DB, Operation, ShardBy) -> {CBM, Options} = persistent_term:get(?cbm(DB)), - CBM:shard_of_message(DB, Message, ShardBy, Options). + CBM:shard_of_operation(DB, Operation, ShardBy, Options). %%================================================================================ %% behavior callbacks @@ -129,7 +122,7 @@ shard_of_message(DB, Message, ShardBy) -> n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), tref :: undefined | reference(), - queue :: queue:queue(emqx_types:message()), + queue :: queue:queue(emqx_ds:operation()), pending_replies = [] :: [gen_server:from()] }). @@ -168,31 +161,29 @@ format_status(Status) -> handle_call( #enqueue_req{ - messages = Msgs, + operations = Operations, sync = Sync, - atomic = Atomic, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = NBytes }, From, S0 = #s{pending_replies = Replies0} ) -> S = S0#s{pending_replies = [From | Replies0]}, - {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; + {noreply, enqueue(Sync, Operations, NOps, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. handle_cast( #enqueue_req{ - messages = Msgs, + operations = Operations, sync = Sync, - atomic = Atomic, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = NBytes }, S ) -> - {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; + {noreply, enqueue(Sync, Operations, NOps, NBytes, S)}; handle_cast(_Cast, S) -> {noreply, S}. @@ -215,11 +206,10 @@ terminate(_Reason, #s{db = DB}) -> enqueue( Sync, - Atomic, - Msgs, + Ops, BatchSize, BatchBytes, - S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} + S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0} ) -> %% At this point we don't split the batches, even when they aren't %% atomic. It wouldn't win us anything in terms of memory, and @@ -227,18 +217,18 @@ enqueue( %% granularity should be fine enough. NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), - NMsgs = NMsgs0 + BatchSize, + NMsgs = NOps0 + BatchSize, NBytes = NBytes0 + BatchBytes, - case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of + case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of true -> %% Adding this batch would cause buffer to overflow. Flush %% it now, and retry: S1 = flush(S0), - enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + enqueue(Sync, Ops, BatchSize, BatchBytes, S1); false -> %% The buffer is empty, we enqueue the atomic batch in its %% entirety: - Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), + Q1 = lists:foldl(fun queue:in/2, Q0, Ops), S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, case NMsgs >= NMax orelse NBytes >= NBytesMax of true -> @@ -336,18 +326,18 @@ do_flush( } end. --spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> +-spec shards_of_batch(emqx_ds:db(), [emqx_ds:operation()]) -> [{_ShardId, {NMessages, NBytes}}] when NMessages :: non_neg_integer(), NBytes :: non_neg_integer(). -shards_of_batch(DB, Messages) -> +shards_of_batch(DB, Batch) -> maps:to_list( lists:foldl( - fun(Message, Acc) -> + fun(Operation, Acc) -> %% TODO: sharding strategy must be part of the DS DB schema: - Shard = shard_of_message(DB, Message, clientid), - Size = payload_size(Message), + Shard = shard_of_operation(DB, Operation, clientid), + Size = payload_size(Operation), maps:update_with( Shard, fun({N, S}) -> @@ -358,36 +348,35 @@ shards_of_batch(DB, Messages) -> ) end, #{}, - Messages + Batch ) ). -repackage_messages(DB, Messages, Sync) -> +repackage_messages(DB, Batch, Sync) -> Batches = lists:foldl( - fun(Message, Acc) -> - Shard = shard_of_message(DB, Message, clientid), - Size = payload_size(Message), + fun(Operation, Acc) -> + Shard = shard_of_operation(DB, Operation, clientid), + Size = payload_size(Operation), maps:update_with( Shard, fun({N, S, Msgs}) -> - {N + 1, S + Size, [Message | Msgs]} + {N + 1, S + Size, [Operation | Msgs]} end, - {1, Size, [Message]}, + {1, Size, [Operation]}, Acc ) end, #{}, - Messages + Batch ), maps:fold( - fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + fun(Shard, {NOps, ByteSize, RevOperations}, ErrAcc) -> Err = enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ - messages = lists:reverse(RevMessages), + operations = lists:reverse(RevOperations), sync = Sync, - atomic = false, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = ByteSize } ), @@ -427,4 +416,6 @@ cancel_timer(S = #s{tref = TRef}) -> %% @doc Return approximate size of the MQTT message (it doesn't take %% all things into account, for example headers and extras) payload_size(#message{payload = P, topic = T}) -> - size(P) + size(T). + size(P) + size(T); +payload_size({_OpName, _}) -> + 0. diff --git a/apps/emqx_durable_storage/src/emqx_ds_precondition.erl b/apps/emqx_durable_storage/src/emqx_ds_precondition.erl new file mode 100644 index 000000000..3002fcd08 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_precondition.erl @@ -0,0 +1,184 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_precondition). +-include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). + +-export([verify/3]). +-export([matches/2]). + +-export_type([matcher/0, mismatch/0]). + +-type matcher() :: #message_matcher{}. +-type mismatch() :: emqx_types:message() | not_found. + +-callback lookup_message(_Ctx, matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). + +%% + +-spec verify(module(), _Ctx, [emqx_ds:precondition()]) -> + ok | {precondition_failed, mismatch()} | emqx_ds:error(_). +verify(Mod, Ctx, [_Precondition = {Cond, Msg} | Rest]) -> + case verify_precondition(Mod, Ctx, Cond, Msg) of + ok -> + verify(Mod, Ctx, Rest); + Failed -> + Failed + end; +verify(_Mod, _Ctx, []) -> + ok. + +verify_precondition(Mod, Ctx, if_exists, Matcher) -> + case Mod:lookup_message(Ctx, Matcher) of + Msg = #message{} -> + verify_match(Msg, Matcher); + not_found -> + {precondition_failed, not_found}; + Error = {error, _, _} -> + Error + end; +verify_precondition(Mod, Ctx, unless_exists, Matcher) -> + case Mod:lookup_message(Ctx, Matcher) of + Msg = #message{} -> + verify_nomatch(Msg, Matcher); + not_found -> + ok; + Error = {error, _, _} -> + Error + end. + +verify_match(Msg, Matcher) -> + case matches(Msg, Matcher) of + true -> ok; + false -> {precondition_failed, Msg} + end. + +verify_nomatch(Msg, Matcher) -> + case matches(Msg, Matcher) of + false -> ok; + true -> {precondition_failed, Msg} + end. + +-spec matches(emqx_types:message(), matcher()) -> boolean(). +matches( + Message, + #message_matcher{from = From, topic = Topic, payload = Pat, headers = Headers} +) -> + case Message of + #message{from = From, topic = Topic} when Pat =:= '_' -> + matches_headers(Message, Headers); + #message{from = From, topic = Topic, payload = Pat} -> + matches_headers(Message, Headers); + _ -> + false + end. + +matches_headers(_Message, MatchHeaders) when map_size(MatchHeaders) =:= 0 -> + true; +matches_headers(#message{headers = Headers}, MatchHeaders) -> + maps:intersect(MatchHeaders, Headers) =:= MatchHeaders. + +%% Basic tests + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). + +conjunction_test() -> + %% Contradictory preconditions, always false. + Preconditions = [ + {if_exists, matcher(<<"c1">>, <<"t/1">>, 0, '_')}, + {unless_exists, matcher(<<"c1">>, <<"t/1">>, 0, '_')} + ], + ?assertEqual( + {precondition_failed, not_found}, + verify(?MODULE, [], Preconditions) + ), + %% Check that the order does not matter. + ?assertEqual( + {precondition_failed, not_found}, + verify(?MODULE, [], lists:reverse(Preconditions)) + ), + ?assertEqual( + {precondition_failed, message(<<"c1">>, <<"t/1">>, 0, <<>>)}, + verify( + ?MODULE, + [message(<<"c1">>, <<"t/1">>, 0, <<>>)], + Preconditions + ) + ). + +matches_test() -> + ?assert( + matches( + message(<<"mtest1">>, <<"t/same">>, 12345, <>), + matcher(<<"mtest1">>, <<"t/same">>, 12345, '_') + ) + ). + +matches_headers_test() -> + ?assert( + matches( + message(<<"mtest2">>, <<"t/same">>, 23456, <>, #{h1 => 42, h2 => <<>>}), + matcher(<<"mtest2">>, <<"t/same">>, 23456, '_', #{h2 => <<>>}) + ) + ). + +mismatches_headers_test() -> + ?assertNot( + matches( + message(<<"mtest3">>, <<"t/same">>, 23456, <>, #{h1 => 42, h2 => <<>>}), + matcher(<<"mtest3">>, <<"t/same">>, 23456, '_', #{h2 => <<>>, h3 => <<"required">>}) + ) + ). + +matcher(ClientID, Topic, TS, Payload) -> + matcher(ClientID, Topic, TS, Payload, #{}). + +matcher(ClientID, Topic, TS, Payload, Headers) -> + #message_matcher{ + from = ClientID, + topic = Topic, + timestamp = TS, + payload = Payload, + headers = Headers + }. + +message(ClientID, Topic, TS, Payload) -> + message(ClientID, Topic, TS, Payload, #{}). + +message(ClientID, Topic, TS, Payload, Headers) -> + #message{ + id = <<>>, + qos = 0, + from = ClientID, + topic = Topic, + timestamp = TS, + payload = Payload, + headers = Headers + }. + +lookup_message(Messages, Matcher) -> + case lists:search(fun(M) -> matches(M, Matcher) end, Messages) of + {value, Message} -> + Message; + false -> + not_found + end. + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index fb831318e..28a4d54c2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -37,6 +37,7 @@ update_iterator/4, next/6, delete_next/7, + lookup_message/3, handle_event/4 ]). @@ -46,6 +47,7 @@ -export_type([options/0]). +-include("emqx_ds.hrl"). -include("emqx_ds_metrics.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -68,10 +70,13 @@ -define(start_time, 3). -define(storage_key, 4). -define(last_seen_key, 5). --define(cooked_payloads, 6). +-define(cooked_msg_ops, 6). -define(cooked_lts_ops, 7). -define(cooked_ts, 8). +%% atoms: +-define(delete, 100). + -type options() :: #{ bits_per_wildcard_level => pos_integer(), @@ -110,7 +115,7 @@ -type cooked_batch() :: #{ - ?cooked_payloads := [{binary(), binary()}], + ?cooked_msg_ops := [{binary(), binary() | ?delete}], ?cooked_lts_ops := [{binary(), binary()}], ?cooked_ts := integer() }. @@ -271,24 +276,28 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}, ...], + emqx_ds_storage_layer:batch(), emqx_ds_storage_layer:batch_store_opts() ) -> {ok, cooked_batch()}. -prepare_batch(_ShardId, S, Messages, _Options) -> +prepare_batch(_ShardId, S, Batch, _Options) -> _ = erase(?lts_persist_ops), - {Payloads, MaxTs} = + {Operations, MaxTs} = lists:mapfoldl( - fun({Timestamp, Msg}, Acc) -> - {Key, _} = make_key(S, Timestamp, Msg), - Payload = {Key, message_to_value_v1(Msg)}, - {Payload, max(Acc, Timestamp)} + fun + ({Timestamp, Msg = #message{topic = Topic}}, Acc) -> + {Key, _} = make_key(S, Timestamp, Topic), + Op = {Key, message_to_value_v1(Msg)}, + {Op, max(Acc, Timestamp)}; + ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}, Acc) -> + {Key, _} = make_key(S, Timestamp, Topic), + {_Op = {Key, ?delete}, Acc} end, 0, - Messages + Batch ), {ok, #{ - ?cooked_payloads => Payloads, + ?cooked_msg_ops => Operations, ?cooked_lts_ops => pop_lts_persist_ops(), ?cooked_ts => MaxTs }}. @@ -302,7 +311,7 @@ prepare_batch(_ShardId, S, Messages, _Options) -> commit_batch( _ShardId, _Data, - #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, + #{?cooked_msg_ops := [], ?cooked_lts_ops := LTS}, _Options ) -> %% Assert: @@ -311,7 +320,7 @@ commit_batch( commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, + #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations, ?cooked_ts := MaxTs}, Options ) -> {ok, Batch} = rocksdb:batch(), @@ -326,10 +335,13 @@ commit_batch( _ = emqx_ds_lts:trie_update(Trie, LtsOps), %% Commit payloads: lists:foreach( - fun({Key, Val}) -> - ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)) + fun + ({Key, Val}) when is_tuple(Val) -> + ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)); + ({Key, ?delete}) -> + ok = rocksdb:batch_delete(Batch, DataCF, Key) end, - Payloads + Operations ), Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), @@ -556,6 +568,23 @@ delete_next_until( rocksdb:iterator_close(ITHandle) end. +-spec lookup_message(emqx_ds_storage_layer:shard_id(), s(), emqx_ds_precondition:matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). +lookup_message( + _ShardId, + S = #s{db = DB, data = CF}, + #message_matcher{topic = Topic, timestamp = Timestamp} +) -> + {Key, _} = make_key(S, Timestamp, Topic), + case rocksdb:get(DB, CF, Key, _ReadOpts = []) of + {ok, Blob} -> + deserialize(Blob); + not_found -> + not_found; + Error -> + {error, unrecoverable, {rocksdb, Error}} + end. + handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> %% If the last message was published more than one epoch ago, and %% the shard remains idle, we need to advance safety cutoff @@ -811,9 +840,9 @@ format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). --spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}. -make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) -> - Tokens = emqx_topic:words(TopicBin), +-spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}. +make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) -> + Tokens = emqx_topic:words(Topic), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), VaryingHashes = [hash_topic_level(I) || I <- Varying], KeyMapper = array:get(length(Varying), KeyMappers), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d6250254d..3afdad01a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -37,6 +37,9 @@ next/4, delete_next/5, + %% Preconditions + lookup_message/2, + %% Generations update_config/3, add_generation/2, @@ -61,6 +64,7 @@ -export_type([ gen_id/0, generation/0, + batch/0, cf_refs/0, stream/0, delete_stream/0, @@ -74,6 +78,7 @@ batch_store_opts/0 ]). +-include("emqx_ds.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). @@ -115,6 +120,11 @@ -type gen_id() :: 0..16#ffff. +-type batch() :: [ + {emqx_ds:time(), emqx_types:message()} + | emqx_ds:deletion() +]. + %% Options affecting how batches should be stored. %% See also: `emqx_ds:message_store_opts()'. -type batch_store_opts() :: @@ -294,6 +304,10 @@ | {ok, end_of_stream} | emqx_ds:error(_). +%% Lookup a single message, for preconditions to work. +-callback lookup_message(shard_id(), generation_data(), emqx_ds_precondition:matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). + -callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. @@ -317,14 +331,10 @@ drop_shard(Shard) -> %% @doc This is a convenicence wrapper that combines `prepare' and %% `commit' operations. --spec store_batch( - shard_id(), - [{emqx_ds:time(), emqx_types:message()}], - batch_store_opts() -) -> +-spec store_batch(shard_id(), batch(), batch_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(Shard, Messages, Options) -> - case prepare_batch(Shard, Messages, #{}) of +store_batch(Shard, Batch, Options) -> + case prepare_batch(Shard, Batch, #{}) of {ok, CookedBatch} -> commit_batch(Shard, CookedBatch, Options); ignore -> @@ -342,23 +352,21 @@ store_batch(Shard, Messages, Options) -> %% %% The underlying storage layout MAY use timestamp as a unique message %% ID. --spec prepare_batch( - shard_id(), - [{emqx_ds:time(), emqx_types:message()}], - batch_prepare_opts() -) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> +-spec prepare_batch(shard_id(), batch(), batch_prepare_opts()) -> + {ok, cooked_batch()} | ignore | emqx_ds:error(_). +prepare_batch(Shard, Batch, Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. ?tp(emqx_ds_storage_layer_prepare_batch, #{ - shard => Shard, messages => Messages, options => Options + shard => Shard, batch => Batch, options => Options }), %% FIXME: always store messages in the current generation - case generation_at(Shard, Time) of + Time = batch_starts_at(Batch), + case is_integer(Time) andalso generation_at(Shard, Time) of {GenId, #{module := Mod, data := GenData}} -> T0 = erlang:monotonic_time(microsecond), Result = - case Mod:prepare_batch(Shard, GenData, Messages, Options) of + case Mod:prepare_batch(Shard, GenData, Batch, Options) of {ok, CookedBatch} -> {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> @@ -368,11 +376,21 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% TODO store->prepare emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result; + false -> + %% No write operations in this batch. + ignore; not_found -> + %% Generation is likely already GCed. ignore - end; -prepare_batch(_Shard, [], _Options) -> - ignore. + end. + +-spec batch_starts_at(batch()) -> emqx_ds:time() | undefined. +batch_starts_at([{Time, _Message} | _]) when is_integer(Time) -> + Time; +batch_starts_at([{delete, #message_matcher{timestamp = Time}} | _]) -> + Time; +batch_starts_at([]) -> + undefined. %% @doc Commit cooked batch to the storage. %% @@ -559,6 +577,16 @@ update_config(ShardId, Since, Options) -> add_generation(ShardId, Since) -> gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). +-spec lookup_message(shard_id(), emqx_ds_precondition:matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). +lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) -> + case generation_at(ShardId, Time) of + {_GenId, #{module := Mod, data := GenData}} -> + Mod:lookup_message(ShardId, GenData, Matcher); + not_found -> + not_found + end. + -spec list_generations_with_lifetimes(shard_id()) -> #{ gen_id() => #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index cfd6f30ac..869602fcd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -21,6 +21,8 @@ %% used for testing. -module(emqx_ds_storage_reference). +-include("emqx_ds.hrl"). + -behaviour(emqx_ds_storage_layer). %% API: @@ -39,7 +41,8 @@ make_delete_iterator/5, update_iterator/4, next/6, - delete_next/7 + delete_next/7, + lookup_message/3 ]). %% internal exports: @@ -49,6 +52,8 @@ -include_lib("emqx_utils/include/emqx_message.hrl"). +-define(DB_KEY(TIMESTAMP), <>). + %%================================================================================ %% Type declarations %%================================================================================ @@ -102,23 +107,22 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -prepare_batch(_ShardId, _Data, Messages, _Options) -> - {ok, Messages}. +prepare_batch(_ShardId, _Data, Batch, _Options) -> + {ok, Batch}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) -> - {ok, Batch} = rocksdb:batch(), - lists:foreach( - fun({TS, Msg}) -> - Key = <>, - Val = term_to_binary(Msg), - rocksdb:batch_put(Batch, CF, Key, Val) - end, - Messages - ), - Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), - rocksdb:release_batch(Batch), +commit_batch(_ShardId, S = #s{db = DB}, Batch, Options) -> + {ok, BatchHandle} = rocksdb:batch(), + lists:foreach(fun(Op) -> process_batch_operation(S, Op, BatchHandle) end, Batch), + Res = rocksdb:write_batch(DB, BatchHandle, write_batch_opts(Options)), + rocksdb:release_batch(BatchHandle), Res. +process_batch_operation(S, {TS, Msg = #message{}}, BatchHandle) -> + Val = encode_message(Msg), + rocksdb:batch_put(BatchHandle, S#s.cf, ?DB_KEY(TS), Val); +process_batch_operation(S, {delete, #message_matcher{timestamp = TS}}, BatchHandle) -> + rocksdb:batch_delete(BatchHandle, S#s.cf, ?DB_KEY(TS)). + get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> [#stream{}]. @@ -205,6 +209,16 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurr {ok, It, NumDeleted, NumIterated} end. +lookup_message(_ShardId, #s{db = DB, cf = CF}, #message_matcher{timestamp = TS}) -> + case rocksdb:get(DB, CF, ?DB_KEY(TS), _ReadOpts = []) of + {ok, Val} -> + decode_message(Val); + not_found -> + not_found; + {error, Reason} -> + {error, unrecoverable, Reason} + end. + %%================================================================================ %% Internal functions %%================================================================================ @@ -214,7 +228,7 @@ do_next(_, _, _, _, 0, Key, Acc) -> do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> case rocksdb:iterator_move(IT, Action) of {ok, Key = <>, Blob} -> - Msg = #message{topic = Topic} = binary_to_term(Blob), + Msg = #message{topic = Topic} = decode_message(Blob), TopicWords = emqx_topic:words(Topic), case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> @@ -234,7 +248,7 @@ do_delete_next( ) -> case rocksdb:iterator_move(IT, Action) of {ok, Key, Blob} -> - Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), + Msg = #message{topic = Topic, timestamp = TS} = decode_message(Blob), TopicWords = emqx_topic:words(Topic), case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> @@ -285,6 +299,12 @@ do_delete_next( {Key0, {AccDel, AccIter}} end. +encode_message(Msg) -> + term_to_binary(Msg). + +decode_message(Val) -> + binary_to_term(Val). + %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl index cb87b8a6f..b466983b7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl @@ -33,7 +33,8 @@ make_delete_iterator/5, update_iterator/4, next/6, - delete_next/7 + delete_next/7, + lookup_message/3 ]). %% internal exports: @@ -43,6 +44,7 @@ -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). +-include("emqx_ds.hrl"). -include("emqx_ds_metrics.hrl"). -ifdef(TEST). @@ -56,11 +58,12 @@ %%================================================================================ %% TLOG entry -%% keys: --define(cooked_payloads, 6). +%% Keys: +-define(cooked_msg_ops, 6). -define(cooked_lts_ops, 7). %% Payload: --define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE), +-define(cooked_delete, 100). +-define(cooked_msg_op(TIMESTAMP, STATIC, VARYING, VALUE), {TIMESTAMP, STATIC, VARYING, VALUE} ). @@ -176,25 +179,39 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF, ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok. -prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) -> +prepare_batch( + _ShardId, + S = #s{trie = Trie}, + Operations, + _Options +) -> _ = erase(?lts_persist_ops), - Payloads = [ - begin - Tokens = words(Topic), - {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), - ?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg)) - end - || {Timestamp, Msg = #message{topic = Topic}} <- Messages - ], + OperationsCooked = emqx_utils:flattermap( + fun + ({Timestamp, Msg = #message{topic = Topic}}) -> + Tokens = words(Topic), + {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), + ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg)); + ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) -> + case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of + {ok, {Static, Varying}} -> + ?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete); + undefined -> + %% Topic is unknown, nothing to delete. + [] + end + end, + Operations + ), {ok, #{ - ?cooked_payloads => Payloads, + ?cooked_msg_ops => OperationsCooked, ?cooked_lts_ops => pop_lts_persist_ops() }}. commit_batch( _ShardId, #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads}, + #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations}, Options ) -> {ok, Batch} = rocksdb:batch(), @@ -210,12 +227,17 @@ commit_batch( _ = emqx_ds_lts:trie_update(Trie, LtsOps), %% Commit payloads: lists:foreach( - fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) -> - MasterKey = mk_key(Static, 0, <<>>, Timestamp), - ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob), - mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp) + fun + (?cooked_msg_op(Timestamp, Static, Varying, ValBlob = <<_/bytes>>)) -> + MasterKey = mk_key(Static, 0, <<>>, Timestamp), + ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob), + mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp); + (?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete)) -> + MasterKey = mk_key(Static, 0, <<>>, Timestamp), + ok = rocksdb:batch_delete(Batch, DataCF, MasterKey), + delete_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp) end, - Payloads + Operations ), Result = rocksdb:write_batch(DB, Batch, [ {disable_wal, not maps:get(durable, Options, true)} @@ -285,6 +307,28 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) -> Ret end. +lookup_message( + Shard, + S = #s{db = DB, data_cf = CF, trie = Trie}, + #message_matcher{topic = Topic, timestamp = Timestamp} +) -> + case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of + {ok, {StaticIdx, _Varying}} -> + DSKey = mk_key(StaticIdx, 0, <<>>, Timestamp), + case rocksdb:get(DB, CF, DSKey, _ReadOpts = []) of + {ok, Val} -> + {ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx), + Msg = deserialize(S, Val), + enrich(Shard, S, TopicStructure, DSKey, Msg); + not_found -> + not_found; + {error, Reason} -> + {error, unrecoverable, Reason} + end; + undefined -> + not_found + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -330,12 +374,18 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg }, emqx_ds_msg_serializer:serialize(SSchema, Msg). +enrich(#ctx{shard = Shard, s = S, topic_structure = TopicStructure}, DSKey, Msg0) -> + enrich(Shard, S, TopicStructure, DSKey, Msg0). + enrich( - #ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}}, + Shard, + #s{with_guid = WithGuid}, + TopicStructure, DSKey, Msg0 ) -> - Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))), + Tokens = words(Msg0#message.topic), + Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(TopicStructure, Tokens)), Msg0#message{ topic = Topic, id = @@ -584,6 +634,16 @@ mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) -> mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) -> ok. +delete_index(Batch, CF, HashBytes, Static, Varying, Timestamp) -> + delete_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying). + +delete_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) -> + Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), + ok = rocksdb:batch_delete(Batch, CF, Key), + delete_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying); +delete_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) -> + ok. + %%%%%%%% Keys %%%%%%%%%% get_key_range(StaticIdx, WildcardIdx, Hash) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl index e0531dad0..e0fea7875 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl @@ -18,11 +18,14 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_ds.hrl"). -include("../../emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/assert.hrl"). +-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). + -define(FUTURE, (1 bsl 64 - 1)). -define(SHARD, shard(?FUNCTION_NAME)). @@ -66,6 +69,30 @@ t_store(_Config) -> }, ?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])). +%% Smoke test of applying batch operations +t_operations(db_config, _Config) -> + #{force_monotonic_timestamps => false}. + +t_operations(_Config) -> + Batch1 = [ + make_message(100, <<"t/1">>, <<"M1">>), + make_message(200, <<"t/2">>, <<"M2">>), + make_message(300, <<"t/3">>, <<"M3">>) + ], + Batch2 = [ + make_deletion(200, <<"t/2">>, <<"M2">>), + make_deletion(300, <<"t/3">>, '_'), + make_deletion(400, <<"t/4">>, '_') + ], + ?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch1)), + ?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch2)), + ?assertMatch( + [ + #message{timestamp = 100, topic = <<"t/1">>, payload = <<"M1">>} + ], + dump_messages(?SHARD, <<"t/#">>, 0) + ). + %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> %% Prepare data: @@ -124,8 +151,6 @@ t_delete(_Config) -> ?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}), ?assertEqual(20, length(Messages)). --define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). - %% Smoke test that verifies that concrete topics are mapped to %% individual streams, unless there's too many of them. t_get_streams(Config) -> @@ -417,79 +442,26 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> %% || Topic <- Topics, PublishedAt <- Timestamps %% ]. -%% t_iterate_multigen(_Config) -> -%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), -%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), -%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), -%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], -%% Timestamps = lists:seq(1, 100), -%% _ = [ -%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) -%% || Topic <- Topics, PublishedAt <- Timestamps -%% ], -%% ?assertEqual( -%% lists:sort([ -%% {Topic, PublishedAt} -%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps -%% ]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) -%% ), -%% ?assertEqual( -%% lists:sort([ -%% {Topic, PublishedAt} -%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) -%% ]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)]) -%% ). - -%% t_iterate_multigen_preserve_restore(_Config) -> -%% ReplayID = atom_to_binary(?FUNCTION_NAME), -%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), -%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), -%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), -%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"], -%% Timestamps = lists:seq(1, 100), -%% TopicFilter = "foo/#", -%% TopicsMatching = ["foo/bar", "foo/bar/baz"], -%% _ = [ -%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS})) -%% || Topic <- Topics, TS <- Timestamps -%% ], -%% It0 = iterator(?SHARD, TopicFilter, 0), -%% {It1, Res10} = iterate(It0, 10), -%% % preserve mid-generation -%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID), -%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), -%% {It3, Res100} = iterate(It2, 88), -%% % preserve on the generation boundary -%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), -%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), -%% {It5, Res200} = iterate(It4, 1000), -%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)), -%% ?assertEqual( -%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), -%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) -%% ), -%% ?assertEqual( -%% ok, -%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID) -%% ), -%% ?assertEqual( -%% {error, not_found}, -%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) -%% ). - make_message(PublishedAt, Topic, Payload) when is_list(Topic) -> make_message(PublishedAt, list_to_binary(Topic), Payload); make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> ID = emqx_guid:gen(), #message{ id = ID, + from = <>, topic = Topic, timestamp = PublishedAt, payload = Payload }. +make_deletion(Timestamp, Topic, Payload) -> + {delete, #message_matcher{ + from = <>, + topic = Topic, + timestamp = Timestamp, + payload = Payload + }}. + make_topic(Tokens = [_ | _]) -> emqx_topic:join([bin(T) || T <- Tokens]). @@ -535,13 +507,23 @@ end_per_suite(Config) -> ok. init_per_testcase(TC, Config) -> - ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)), + ok = emqx_ds:open_db(TC, db_config(TC, Config)), Config. end_per_testcase(TC, _Config) -> emqx_ds:drop_db(TC), ok. +db_config(TC, Config) -> + ConfigBase = ?DB_CONFIG(Config), + SpecificConfig = + try + ?MODULE:TC(?FUNCTION_NAME, Config) + catch + error:undef -> #{} + end, + maps:merge(ConfigBase, SpecificConfig). + shard(TC) -> {TC, <<"0">>}. diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 08c08e0c5..cbf38bb62 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -377,7 +377,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) -> shard_of_clientid(DB, Node, ClientId) -> ?ON( Node, - emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid) + emqx_ds_buffer:shard_of_operation(DB, #message{from = ClientId}, clientid) ). %% Consume eagerly: