diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index d8dd1cff4..8563a3d7e 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -82,6 +82,7 @@ -export_type([ payload/0, message/0, + message/1, flag/0, flags/0, headers/0 @@ -238,7 +239,8 @@ -type subscription() :: #subscription{}. -type subscriber() :: {pid(), subid()}. -type payload() :: binary() | iodata(). --type message() :: #message{}. +-type message() :: #message{payload :: emqx_types:payload()}. +-type message(Payload) :: #message{payload :: Payload}. -type flag() :: sys | dup | retain | atom(). -type flags() :: #{flag() := boolean()}. -type headers() :: #{ diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index cc7a7431f..e2377a9b0 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -16,4 +16,9 @@ -ifndef(EMQX_DS_HRL). -define(EMQX_DS_HRL, true). +-record(dsbatch, { + operations :: [emqx_ds:operation()], + preconditions = [] :: [emqx_ds:precondition()] +}). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index ff157d01b..d745c8c74 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -19,6 +19,8 @@ %% It takes care of forwarding calls to the underlying DBMS. -module(emqx_ds). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). + %% Management API: -export([ register_backend/2, @@ -52,6 +54,9 @@ time/0, topic_filter/0, topic/0, + batch/0, + operation/0, + precondition/0, stream/0, delete_stream/0, delete_selector/0, @@ -84,8 +89,6 @@ %% Type declarations %%================================================================================ --define(APP, emqx_durable_storage). - -type db() :: atom(). %% Parsed topic. @@ -94,6 +97,31 @@ %% Parsed topic filter. -type topic_filter() :: list(binary() | '+' | '#' | ''). +-type message() :: emqx_types:message(). + +%% Message pattern. +-type message(Pattern) :: emqx_types:message(Pattern). + +%% A batch of storage operations. +-type batch() :: [operation()] | #dsbatch{}. + +-type operation() :: + %% Store a message. + message() + %% Delete a message. + %% Does nothing if the message does not exist. + | {delete, message(_)}. + +%% Precondition. +%% Fails whole batch if the message in question does not match (`while'), or +%% does match (`unless'). Here "match" means that it either just exists (when +%% pattern is '_') or has exactly the same payload, rest of the message fields are +%% irrelevant. Useful to construct batches with "compare-and-set" semantics. +%% Note: backends may not support this, but if they do only DBs with `atomic_batches' +%% enabled are expected to support preconditions in batches. +-type precondition() :: + {while | unless, message(iodata() | '_')}. + -type rank_x() :: term(). -type rank_y() :: integer(). @@ -157,13 +185,7 @@ %% Whether to wait until the message storage has been acknowledged to return from %% `store_batch'. %% Default: `true'. - sync => boolean(), - %% Whether the whole batch given to `store_batch' should be inserted atomically as - %% a unit. Note: the whole batch must be crafted so that it belongs to a single - %% shard (if applicable to the backend), as the batch will be split accordingly - %% even if this flag is `true'. - %% Default: `false'. - atomic => boolean() + sync => boolean() }. -type generic_db_opts() :: @@ -176,6 +198,12 @@ %% If `false' then message timestamps are respected; timestamp, topic and %% serialization key uniquely identify a message. force_monotonic_timestamps => boolean(), + %% Whether the whole batch given to `store_batch' should be processed and + %% inserted atomically as a unit, in isolation from other batches. + %% Default: `false'. + %% The whole batch must be crafted so that it belongs to a single shard (if + %% applicable to the backend). + atomic_batches => boolean(), serialize_by => clientid | topic, _ => _ }. @@ -313,11 +341,11 @@ drop_db(DB) -> Module:drop_db(DB) end. --spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). +-spec store_batch(db(), batch(), message_store_opts()) -> store_batch_result(). store_batch(DB, Msgs, Opts) -> ?module(DB):store_batch(DB, Msgs, Opts). --spec store_batch(db(), [emqx_types:message()]) -> store_batch_result(). +-spec store_batch(db(), batch()) -> store_batch_result(). store_batch(DB, Msgs) -> store_batch(DB, Msgs, #{}). @@ -418,7 +446,10 @@ timestamp_us() -> %%================================================================================ set_db_defaults(Opts) -> - Defaults = #{force_monotonic_timestamps => true}, + Defaults = #{ + force_monotonic_timestamps => true, + atomic_batches => false + }, maps:merge(Defaults, Opts). call_if_implemented(Mod, Fun, Args, Default) -> diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index 4bbc367da..5def5e5a1 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -33,7 +33,7 @@ %% Topic that the message is published to topic :: emqx_types:topic(), %% Message Payload - payload :: emqx_types:payload(), + payload, %% Timestamp (Unit: millisecond) timestamp :: integer(), %% Miscellaneous extensions, currently used for OpenTelemetry context propagation