feat(ds): allow isolated batches with preconditions

Namely, single message deletions and preconditions that can be used to
build complex "compare-and-swap"-style operations. Also allow user to
declare that atomic batches support is needed for a DB.
This commit is contained in:
Andrew Mayorov 2024-07-11 19:59:15 +02:00
parent 02e1007a16
commit 14022aded1
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 52 additions and 14 deletions

View File

@ -82,6 +82,7 @@
-export_type([ -export_type([
payload/0, payload/0,
message/0, message/0,
message/1,
flag/0, flag/0,
flags/0, flags/0,
headers/0 headers/0
@ -238,7 +239,8 @@
-type subscription() :: #subscription{}. -type subscription() :: #subscription{}.
-type subscriber() :: {pid(), subid()}. -type subscriber() :: {pid(), subid()}.
-type payload() :: binary() | iodata(). -type payload() :: binary() | iodata().
-type message() :: #message{}. -type message() :: #message{payload :: emqx_types:payload()}.
-type message(Payload) :: #message{payload :: Payload}.
-type flag() :: sys | dup | retain | atom(). -type flag() :: sys | dup | retain | atom().
-type flags() :: #{flag() := boolean()}. -type flags() :: #{flag() := boolean()}.
-type headers() :: #{ -type headers() :: #{

View File

@ -16,4 +16,9 @@
-ifndef(EMQX_DS_HRL). -ifndef(EMQX_DS_HRL).
-define(EMQX_DS_HRL, true). -define(EMQX_DS_HRL, true).
-record(dsbatch, {
operations :: [emqx_ds:operation()],
preconditions = [] :: [emqx_ds:precondition()]
}).
-endif. -endif.

View File

@ -19,6 +19,8 @@
%% It takes care of forwarding calls to the underlying DBMS. %% It takes care of forwarding calls to the underlying DBMS.
-module(emqx_ds). -module(emqx_ds).
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
%% Management API: %% Management API:
-export([ -export([
register_backend/2, register_backend/2,
@ -52,6 +54,9 @@
time/0, time/0,
topic_filter/0, topic_filter/0,
topic/0, topic/0,
batch/0,
operation/0,
precondition/0,
stream/0, stream/0,
delete_stream/0, delete_stream/0,
delete_selector/0, delete_selector/0,
@ -84,8 +89,6 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-define(APP, emqx_durable_storage).
-type db() :: atom(). -type db() :: atom().
%% Parsed topic. %% Parsed topic.
@ -94,6 +97,31 @@
%% Parsed topic filter. %% Parsed topic filter.
-type topic_filter() :: list(binary() | '+' | '#' | ''). -type topic_filter() :: list(binary() | '+' | '#' | '').
-type message() :: emqx_types:message().
%% Message pattern.
-type message(Pattern) :: emqx_types:message(Pattern).
%% A batch of storage operations.
-type batch() :: [operation()] | #dsbatch{}.
-type operation() ::
%% Store a message.
message()
%% Delete a message.
%% Does nothing if the message does not exist.
| {delete, message(_)}.
%% Precondition.
%% Fails whole batch if the message in question does not match (`while'), or
%% does match (`unless'). Here "match" means that it either just exists (when
%% pattern is '_') or has exactly the same payload, rest of the message fields are
%% irrelevant. Useful to construct batches with "compare-and-set" semantics.
%% Note: backends may not support this, but if they do only DBs with `atomic_batches'
%% enabled are expected to support preconditions in batches.
-type precondition() ::
{while | unless, message(iodata() | '_')}.
-type rank_x() :: term(). -type rank_x() :: term().
-type rank_y() :: integer(). -type rank_y() :: integer().
@ -157,13 +185,7 @@
%% Whether to wait until the message storage has been acknowledged to return from %% Whether to wait until the message storage has been acknowledged to return from
%% `store_batch'. %% `store_batch'.
%% Default: `true'. %% Default: `true'.
sync => boolean(), sync => boolean()
%% Whether the whole batch given to `store_batch' should be inserted atomically as
%% a unit. Note: the whole batch must be crafted so that it belongs to a single
%% shard (if applicable to the backend), as the batch will be split accordingly
%% even if this flag is `true'.
%% Default: `false'.
atomic => boolean()
}. }.
-type generic_db_opts() :: -type generic_db_opts() ::
@ -176,6 +198,12 @@
%% If `false' then message timestamps are respected; timestamp, topic and %% If `false' then message timestamps are respected; timestamp, topic and
%% serialization key uniquely identify a message. %% serialization key uniquely identify a message.
force_monotonic_timestamps => boolean(), force_monotonic_timestamps => boolean(),
%% Whether the whole batch given to `store_batch' should be processed and
%% inserted atomically as a unit, in isolation from other batches.
%% Default: `false'.
%% The whole batch must be crafted so that it belongs to a single shard (if
%% applicable to the backend).
atomic_batches => boolean(),
serialize_by => clientid | topic, serialize_by => clientid | topic,
_ => _ _ => _
}. }.
@ -313,11 +341,11 @@ drop_db(DB) ->
Module:drop_db(DB) Module:drop_db(DB)
end. end.
-spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). -spec store_batch(db(), batch(), message_store_opts()) -> store_batch_result().
store_batch(DB, Msgs, Opts) -> store_batch(DB, Msgs, Opts) ->
?module(DB):store_batch(DB, Msgs, Opts). ?module(DB):store_batch(DB, Msgs, Opts).
-spec store_batch(db(), [emqx_types:message()]) -> store_batch_result(). -spec store_batch(db(), batch()) -> store_batch_result().
store_batch(DB, Msgs) -> store_batch(DB, Msgs) ->
store_batch(DB, Msgs, #{}). store_batch(DB, Msgs, #{}).
@ -418,7 +446,10 @@ timestamp_us() ->
%%================================================================================ %%================================================================================
set_db_defaults(Opts) -> set_db_defaults(Opts) ->
Defaults = #{force_monotonic_timestamps => true}, Defaults = #{
force_monotonic_timestamps => true,
atomic_batches => false
},
maps:merge(Defaults, Opts). maps:merge(Defaults, Opts).
call_if_implemented(Mod, Fun, Args, Default) -> call_if_implemented(Mod, Fun, Args, Default) ->

View File

@ -33,7 +33,7 @@
%% Topic that the message is published to %% Topic that the message is published to
topic :: emqx_types:topic(), topic :: emqx_types:topic(),
%% Message Payload %% Message Payload
payload :: emqx_types:payload(), payload,
%% Timestamp (Unit: millisecond) %% Timestamp (Unit: millisecond)
timestamp :: integer(), timestamp :: integer(),
%% Miscellaneous extensions, currently used for OpenTelemetry context propagation %% Miscellaneous extensions, currently used for OpenTelemetry context propagation