From a849e6df4c4700ccdf9613cff68cda0ee101f140 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 8 Aug 2024 16:13:03 -0300 Subject: [PATCH] feat(ds builtin local): add basic support for atomic batches + preconditions --- .../test/emqx_ds_backends_SUITE.erl | 13 +- .../src/emqx_ds_builtin_local.erl | 49 ++++++- ...emqx_ds_builtin_local_batch_serializer.erl | 122 ++++++++++++++++++ .../src/emqx_ds_builtin_local_db_sup.erl | 12 +- apps/emqx_durable_storage/src/emqx_ds.erl | 5 +- 5 files changed, 184 insertions(+), 17 deletions(-) create mode 100644 apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_batch_serializer.erl diff --git a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index 95ebfe99c..ad0f5b400 100644 --- a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -689,18 +689,9 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - %% TODO: Remove once builtin-local supports preconditions + atomic batches. - BuiltinLocalTCs = - TCs -- - [ - t_09_atomic_store_batch, - t_11_batch_preconditions, - t_12_batch_precondition_conflicts - ], - BuiltinRaftTCs = TCs, [ - {builtin_local, BuiltinLocalTCs}, - {builtin_raft, BuiltinRaftTCs} + {builtin_local, TCs}, + {builtin_raft, TCs} ]. init_per_group(builtin_local, Config) -> diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index f002c26de..9010c4e70 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -49,7 +49,9 @@ %% Internal exports: -export([ do_next/3, - do_delete_next/4 + do_delete_next/4, + %% Used by batch serializer + make_batch/3 ]). -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). @@ -88,7 +90,10 @@ #{ backend := builtin_local, storage := emqx_ds_storage_layer:prototype(), - n_shards := pos_integer() + n_shards := pos_integer(), + %% Inherited from `emqx_ds:generic_db_opts()`. + force_monotonic_timestamps => boolean(), + atomic_batches => boolean() }. -type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}. @@ -193,9 +198,17 @@ drop_db(DB) -> ), emqx_ds_builtin_local_meta:drop_db(DB). --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Messages, Opts) -> +store_batch(DB, Batch, Opts) -> + case emqx_ds_builtin_local_meta:db_config(DB) of + #{atomic_batches := true} -> + store_batch_atomic(DB, Batch, Opts); + _ -> + store_batch_buffered(DB, Batch, Opts) + end. + +store_batch_buffered(DB, Messages, Opts) -> try emqx_ds_buffer:store_batch(DB, Messages, Opts) catch @@ -203,6 +216,34 @@ store_batch(DB, Messages, Opts) -> {error, recoverable, Reason} end. +store_batch_atomic(DB, Batch, Opts) -> + Shards = shards_of_batch(DB, Batch), + case Shards of + [Shard] -> + emqx_ds_builtin_local_batch_serializer:store_batch_atomic(DB, Shard, Batch, Opts); + [] -> + ok; + [_ | _] -> + {error, unrecoverable, atomic_batch_spans_multiple_shards} + end. + +shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) -> + shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, [])); +shards_of_batch(DB, Operations) -> + shards_of_batch(DB, Operations, []). + +shards_of_batch(DB, [Operation | Rest], Acc) -> + case shard_of_operation(DB, Operation, clientid, #{}) of + Shard when Shard =:= hd(Acc) -> + shards_of_batch(DB, Rest, Acc); + Shard when Acc =:= [] -> + shards_of_batch(DB, Rest, [Shard]); + ShardAnother -> + [ShardAnother | Acc] + end; +shards_of_batch(_DB, [], Acc) -> + Acc. + -record(bs, {options :: emqx_ds:create_db_opts()}). -type buffer_state() :: #bs{}. diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_batch_serializer.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_batch_serializer.erl new file mode 100644 index 000000000..c70ce1f0f --- /dev/null +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_batch_serializer.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_local_batch_serializer). + +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). + +%% API +-export([ + start_link/3, + + store_batch_atomic/4 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}). +-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}). + +-record(store_batch_atomic, {batch :: emqx_ds:batch(), opts :: emqx_ds:message_store_opts()}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link(DB, Shard, _Opts) -> + gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). + +store_batch_atomic(DB, Shard, Batch, Opts) -> + gen_server:call(?via(DB, Shard), #store_batch_atomic{batch = Batch, opts = Opts}, infinity). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init([DB, Shard]) -> + process_flag(message_queue_data, off_heap), + State = #{ + db => DB, + shard => Shard + }, + {ok, State}. + +handle_call(#store_batch_atomic{batch = Batch, opts = StoreOpts}, _From, State) -> + ShardId = shard_id(State), + DBOpts = db_config(State), + Result = do_store_batch_atomic(ShardId, Batch, DBOpts, StoreOpts), + {reply, Result, State}; +handle_call(Call, _From, State) -> + {reply, {error, {unknown_call, Call}}, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +shard_id(#{db := DB, shard := Shard}) -> + {DB, Shard}. + +db_config(#{db := DB}) -> + emqx_ds_builtin_local_meta:db_config(DB). + +-spec do_store_batch_atomic( + emqx_ds_storage_layer:shard_id(), + emqx_ds:dsbatch(), + emqx_ds_builtin_local:db_opts(), + emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). +do_store_batch_atomic(ShardId, #dsbatch{} = Batch, DBOpts, StoreOpts) -> + #dsbatch{ + operations = Operations0, + preconditions = Preconditions + } = Batch, + case emqx_ds_precondition:verify(emqx_ds_storage_layer, ShardId, Preconditions) of + ok -> + do_store_operations(ShardId, Operations0, DBOpts, StoreOpts); + {precondition_failed, _} = PreconditionFailed -> + {error, unrecoverable, PreconditionFailed}; + Error -> + Error + end; +do_store_batch_atomic(ShardId, Operations, DBOpts, StoreOpts) -> + do_store_operations(ShardId, Operations, DBOpts, StoreOpts). + +do_store_operations(ShardId, Operations0, DBOpts, _StoreOpts) -> + ForceMonotonic = maps:get(force_monotonic_timestamps, DBOpts), + {Latest, Operations} = + emqx_ds_builtin_local:make_batch( + ForceMonotonic, + current_timestamp(ShardId), + Operations0 + ), + Result = emqx_ds_storage_layer:store_batch(ShardId, Operations, _Options = #{}), + emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest), + Result. + +current_timestamp(ShardId) -> + emqx_ds_builtin_local_meta:current_timestamp(ShardId). diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl index 8776416e0..edc73c2a3 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl @@ -158,7 +158,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) -> Opts = emqx_ds_builtin_local_meta:db_config(DB), Children = [ shard_storage_spec(DB, Shard, Opts), - shard_buffer_spec(DB, Shard, Opts) + shard_buffer_spec(DB, Shard, Opts), + shard_batch_serializer_spec(DB, Shard, Opts) ], {ok, {SupFlags, Children}}. @@ -208,6 +209,15 @@ shard_buffer_spec(DB, Shard, Options) -> type => worker }. +shard_batch_serializer_spec(DB, Shard, Opts) -> + #{ + id => {Shard, batch_serializer}, + start => {emqx_ds_builtin_local_batch_serializer, start_link, [DB, Shard, Opts]}, + shutdown => 5_000, + restart => permanent, + type => worker + }. + ensure_started(Res) -> case Res of {ok, _Pid} -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 0e4336a26..191a9a787 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -55,6 +55,7 @@ topic_filter/0, topic/0, batch/0, + dsbatch/0, operation/0, deletion/0, precondition/0, @@ -104,7 +105,9 @@ -type message_matcher(Payload) :: #message_matcher{payload :: Payload}. %% A batch of storage operations. --type batch() :: [operation()] | #dsbatch{}. +-type batch() :: [operation()] | dsbatch(). + +-type dsbatch() :: #dsbatch{}. -type operation() :: %% Store a message.