feat(ds builtin local): add basic support for atomic batches + preconditions

This commit is contained in:
Thales Macedo Garitezi 2024-08-08 16:13:03 -03:00
parent c00b178b57
commit a849e6df4c
5 changed files with 184 additions and 17 deletions

View File

@ -689,18 +689,9 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
BuiltinLocalTCs =
TCs --
[ [
t_09_atomic_store_batch, {builtin_local, TCs},
t_11_batch_preconditions, {builtin_raft, TCs}
t_12_batch_precondition_conflicts
],
BuiltinRaftTCs = TCs,
[
{builtin_local, BuiltinLocalTCs},
{builtin_raft, BuiltinRaftTCs}
]. ].
init_per_group(builtin_local, Config) -> init_per_group(builtin_local, Config) ->

View File

@ -49,7 +49,9 @@
%% Internal exports: %% Internal exports:
-export([ -export([
do_next/3, do_next/3,
do_delete_next/4 do_delete_next/4,
%% Used by batch serializer
make_batch/3
]). ]).
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
@ -88,7 +90,10 @@
#{ #{
backend := builtin_local, backend := builtin_local,
storage := emqx_ds_storage_layer:prototype(), storage := emqx_ds_storage_layer:prototype(),
n_shards := pos_integer() n_shards := pos_integer(),
%% Inherited from `emqx_ds:generic_db_opts()`.
force_monotonic_timestamps => boolean(),
atomic_batches => boolean()
}. }.
-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}. -type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
@ -193,9 +198,17 @@ drop_db(DB) ->
), ),
emqx_ds_builtin_local_meta:drop_db(DB). emqx_ds_builtin_local_meta:drop_db(DB).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) -> store_batch(DB, Batch, Opts) ->
case emqx_ds_builtin_local_meta:db_config(DB) of
#{atomic_batches := true} ->
store_batch_atomic(DB, Batch, Opts);
_ ->
store_batch_buffered(DB, Batch, Opts)
end.
store_batch_buffered(DB, Messages, Opts) ->
try try
emqx_ds_buffer:store_batch(DB, Messages, Opts) emqx_ds_buffer:store_batch(DB, Messages, Opts)
catch catch
@ -203,6 +216,34 @@ store_batch(DB, Messages, Opts) ->
{error, recoverable, Reason} {error, recoverable, Reason}
end. end.
store_batch_atomic(DB, Batch, Opts) ->
Shards = shards_of_batch(DB, Batch),
case Shards of
[Shard] ->
emqx_ds_builtin_local_batch_serializer:store_batch_atomic(DB, Shard, Batch, Opts);
[] ->
ok;
[_ | _] ->
{error, unrecoverable, atomic_batch_spans_multiple_shards}
end.
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
shards_of_batch(DB, Operations) ->
shards_of_batch(DB, Operations, []).
shards_of_batch(DB, [Operation | Rest], Acc) ->
case shard_of_operation(DB, Operation, clientid, #{}) of
Shard when Shard =:= hd(Acc) ->
shards_of_batch(DB, Rest, Acc);
Shard when Acc =:= [] ->
shards_of_batch(DB, Rest, [Shard]);
ShardAnother ->
[ShardAnother | Acc]
end;
shards_of_batch(_DB, [], Acc) ->
Acc.
-record(bs, {options :: emqx_ds:create_db_opts()}). -record(bs, {options :: emqx_ds:create_db_opts()}).
-type buffer_state() :: #bs{}. -type buffer_state() :: #bs{}.

View File

@ -0,0 +1,122 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_builtin_local_batch_serializer).
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
%% API
-export([
start_link/3,
store_batch_atomic/4
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}).
-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}).
-record(store_batch_atomic, {batch :: emqx_ds:batch(), opts :: emqx_ds:message_store_opts()}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link(DB, Shard, _Opts) ->
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
store_batch_atomic(DB, Shard, Batch, Opts) ->
gen_server:call(?via(DB, Shard), #store_batch_atomic{batch = Batch, opts = Opts}, infinity).
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------
init([DB, Shard]) ->
process_flag(message_queue_data, off_heap),
State = #{
db => DB,
shard => Shard
},
{ok, State}.
handle_call(#store_batch_atomic{batch = Batch, opts = StoreOpts}, _From, State) ->
ShardId = shard_id(State),
DBOpts = db_config(State),
Result = do_store_batch_atomic(ShardId, Batch, DBOpts, StoreOpts),
{reply, Result, State};
handle_call(Call, _From, State) ->
{reply, {error, {unknown_call, Call}}, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
shard_id(#{db := DB, shard := Shard}) ->
{DB, Shard}.
db_config(#{db := DB}) ->
emqx_ds_builtin_local_meta:db_config(DB).
-spec do_store_batch_atomic(
emqx_ds_storage_layer:shard_id(),
emqx_ds:dsbatch(),
emqx_ds_builtin_local:db_opts(),
emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
do_store_batch_atomic(ShardId, #dsbatch{} = Batch, DBOpts, StoreOpts) ->
#dsbatch{
operations = Operations0,
preconditions = Preconditions
} = Batch,
case emqx_ds_precondition:verify(emqx_ds_storage_layer, ShardId, Preconditions) of
ok ->
do_store_operations(ShardId, Operations0, DBOpts, StoreOpts);
{precondition_failed, _} = PreconditionFailed ->
{error, unrecoverable, PreconditionFailed};
Error ->
Error
end;
do_store_batch_atomic(ShardId, Operations, DBOpts, StoreOpts) ->
do_store_operations(ShardId, Operations, DBOpts, StoreOpts).
do_store_operations(ShardId, Operations0, DBOpts, _StoreOpts) ->
ForceMonotonic = maps:get(force_monotonic_timestamps, DBOpts),
{Latest, Operations} =
emqx_ds_builtin_local:make_batch(
ForceMonotonic,
current_timestamp(ShardId),
Operations0
),
Result = emqx_ds_storage_layer:store_batch(ShardId, Operations, _Options = #{}),
emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
Result.
current_timestamp(ShardId) ->
emqx_ds_builtin_local_meta:current_timestamp(ShardId).

View File

@ -158,7 +158,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
Opts = emqx_ds_builtin_local_meta:db_config(DB), Opts = emqx_ds_builtin_local_meta:db_config(DB),
Children = [ Children = [
shard_storage_spec(DB, Shard, Opts), shard_storage_spec(DB, Shard, Opts),
shard_buffer_spec(DB, Shard, Opts) shard_buffer_spec(DB, Shard, Opts),
shard_batch_serializer_spec(DB, Shard, Opts)
], ],
{ok, {SupFlags, Children}}. {ok, {SupFlags, Children}}.
@ -208,6 +209,15 @@ shard_buffer_spec(DB, Shard, Options) ->
type => worker type => worker
}. }.
shard_batch_serializer_spec(DB, Shard, Opts) ->
#{
id => {Shard, batch_serializer},
start => {emqx_ds_builtin_local_batch_serializer, start_link, [DB, Shard, Opts]},
shutdown => 5_000,
restart => permanent,
type => worker
}.
ensure_started(Res) -> ensure_started(Res) ->
case Res of case Res of
{ok, _Pid} -> {ok, _Pid} ->

View File

@ -55,6 +55,7 @@
topic_filter/0, topic_filter/0,
topic/0, topic/0,
batch/0, batch/0,
dsbatch/0,
operation/0, operation/0,
deletion/0, deletion/0,
precondition/0, precondition/0,
@ -104,7 +105,9 @@
-type message_matcher(Payload) :: #message_matcher{payload :: Payload}. -type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
%% A batch of storage operations. %% A batch of storage operations.
-type batch() :: [operation()] | #dsbatch{}. -type batch() :: [operation()] | dsbatch().
-type dsbatch() :: #dsbatch{}.
-type operation() :: -type operation() ::
%% Store a message. %% Store a message.