From ae3812da8511267fd0b0144b1bf04606de350589 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Jul 2024 19:57:25 +0200 Subject: [PATCH 1/5] feat(ds): allow to turn monotonic timestamps off for DB That tells implementation how to assign timestamps to messages. Current implicit default is now `force_monotonic_timestamps => true`. --- .../src/emqx_ds_builtin_local_meta.erl | 1 - apps/emqx_durable_storage/src/emqx_ds.erl | 7 +++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl index dbc68cd2c..0f13ac4b3 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl @@ -151,7 +151,6 @@ ensure_monotonic_timestamp(ShardId) -> %%================================================================================ -record(s, {}). --define(timer_update, timer_update). init([]) -> process_flag(trap_exit, true), diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 7f6996bd7..57d8c9234 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -169,6 +169,13 @@ -type generic_db_opts() :: #{ backend := atom(), + %% Force strictly monotonic message timestamps. + %% Default: `true'. + %% Messages are assigned unique, strictly monotonically increasing timestamps. + %% Those timestamps form a total order per each serialization key. + %% If `false' then message timestamps are respected; timestamp, topic and + %% serialization key uniquely identify a message. + force_monotonic_timestamps => boolean(), serialize_by => clientid | topic, _ => _ }. From 02e1007a161d7c7dc7082cbe33aa616529ae99c1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Jul 2024 19:56:41 +0200 Subject: [PATCH 2/5] feat(dslocal): implement `force_monotonic_timestamps => false` --- .../src/emqx_ds_builtin_local.erl | 42 ++++++++++++------- apps/emqx_durable_storage/src/emqx_ds.erl | 8 +++- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index 28e307832..a7cc795b6 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -202,7 +202,7 @@ store_batch(DB, Messages, Opts) -> {error, recoverable, Reason} end. --record(bs, {options :: term()}). +-record(bs, {options :: emqx_ds:create_db_opts()}). -type buffer_state() :: #bs{}. -spec init_buffer(emqx_ds:db(), shard(), _Options) -> {ok, buffer_state()}. @@ -220,24 +220,36 @@ init_buffer(DB, Shard, Options) -> -spec flush_buffer(emqx_ds:db(), shard(), [emqx_types:message()], buffer_state()) -> {buffer_state(), emqx_ds:store_batch_result()}. flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) -> - {Latest, Batch} = assign_timestamps(current_timestamp({DB, Shard}), Messages), - Result = emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options), - emqx_ds_builtin_local_meta:set_current_timestamp({DB, Shard}, Latest), + ShardId = {DB, Shard}, + ForceMonotonic = maps:get(force_monotonic_timestamps, Options), + {Latest, Batch} = make_batch(ForceMonotonic, current_timestamp(ShardId), Messages), + Result = emqx_ds_storage_layer:store_batch(ShardId, Batch, _Options = #{}), + emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest), {S0, Result}. -assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, []). +make_batch(_ForceMonotonic = true, Latest, Messages) -> + assign_monotonic_timestamps(Latest, Messages, []); +make_batch(false, Latest, Messages) -> + assign_message_timestamps(Latest, Messages, []). -assign_timestamps(Latest, [MessageIn | Rest], Acc) -> - case emqx_message:timestamp(MessageIn, microsecond) of - TimestampUs when TimestampUs > Latest -> - Message = assign_timestamp(TimestampUs, MessageIn), - assign_timestamps(TimestampUs, Rest, [Message | Acc]); +assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> + case emqx_message:timestamp(Message, microsecond) of + TimestampUs when TimestampUs > Latest0 -> + Latest = TimestampUs; _Earlier -> - Message = assign_timestamp(Latest + 1, MessageIn), - assign_timestamps(Latest + 1, Rest, [Message | Acc]) - end; -assign_timestamps(Latest, [], Acc) -> + Latest = Latest0 + 1 + end, + Acc = [assign_timestamp(Latest, Message) | Acc0], + assign_monotonic_timestamps(Latest, Rest, Acc); +assign_monotonic_timestamps(Latest, [], Acc) -> + {Latest, lists:reverse(Acc)}. + +assign_message_timestamps(Latest0, [Message | Rest], Acc0) -> + TimestampUs = emqx_message:timestamp(Message, microsecond), + Latest = max(TimestampUs, Latest0), + Acc = [assign_timestamp(TimestampUs, Message) | Acc0], + assign_message_timestamps(Latest, Rest, Acc); +assign_message_timestamps(Latest, [], Acc) -> {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 57d8c9234..ff157d01b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -268,7 +268,7 @@ open_db(DB, Opts = #{backend := Backend}) -> Module -> persistent_term:put(?persistent_term(DB), Module), emqx_ds_sup:register_db(DB, Backend), - ?module(DB):open_db(DB, Opts) + ?module(DB):open_db(DB, set_db_defaults(Opts)) end. -spec close_db(db()) -> ok. @@ -286,7 +286,7 @@ add_generation(DB) -> -spec update_db_config(db(), create_db_opts()) -> ok. update_db_config(DB, Opts) -> - ?module(DB):update_db_config(DB, Opts). + ?module(DB):update_db_config(DB, set_db_defaults(Opts)). -spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}. list_generations_with_lifetimes(DB) -> @@ -417,6 +417,10 @@ timestamp_us() -> %% Internal functions %%================================================================================ +set_db_defaults(Opts) -> + Defaults = #{force_monotonic_timestamps => true}, + maps:merge(Defaults, Opts). + call_if_implemented(Mod, Fun, Args, Default) -> case erlang:function_exported(Mod, Fun, length(Args)) of true -> From 14022aded13a009103344e23e7ba92c5f1ef2461 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Jul 2024 19:59:15 +0200 Subject: [PATCH 3/5] feat(ds): allow isolated batches with preconditions Namely, single message deletions and preconditions that can be used to build complex "compare-and-swap"-style operations. Also allow user to declare that atomic batches support is needed for a DB. --- apps/emqx/src/emqx_types.erl | 4 +- apps/emqx_durable_storage/include/emqx_ds.hrl | 5 ++ apps/emqx_durable_storage/src/emqx_ds.erl | 55 +++++++++++++++---- apps/emqx_utils/include/emqx_message.hrl | 2 +- 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index d8dd1cff4..8563a3d7e 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -82,6 +82,7 @@ -export_type([ payload/0, message/0, + message/1, flag/0, flags/0, headers/0 @@ -238,7 +239,8 @@ -type subscription() :: #subscription{}. -type subscriber() :: {pid(), subid()}. -type payload() :: binary() | iodata(). --type message() :: #message{}. +-type message() :: #message{payload :: emqx_types:payload()}. +-type message(Payload) :: #message{payload :: Payload}. -type flag() :: sys | dup | retain | atom(). -type flags() :: #{flag() := boolean()}. -type headers() :: #{ diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index cc7a7431f..e2377a9b0 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -16,4 +16,9 @@ -ifndef(EMQX_DS_HRL). -define(EMQX_DS_HRL, true). +-record(dsbatch, { + operations :: [emqx_ds:operation()], + preconditions = [] :: [emqx_ds:precondition()] +}). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index ff157d01b..d745c8c74 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -19,6 +19,8 @@ %% It takes care of forwarding calls to the underlying DBMS. -module(emqx_ds). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). + %% Management API: -export([ register_backend/2, @@ -52,6 +54,9 @@ time/0, topic_filter/0, topic/0, + batch/0, + operation/0, + precondition/0, stream/0, delete_stream/0, delete_selector/0, @@ -84,8 +89,6 @@ %% Type declarations %%================================================================================ --define(APP, emqx_durable_storage). - -type db() :: atom(). %% Parsed topic. @@ -94,6 +97,31 @@ %% Parsed topic filter. -type topic_filter() :: list(binary() | '+' | '#' | ''). +-type message() :: emqx_types:message(). + +%% Message pattern. +-type message(Pattern) :: emqx_types:message(Pattern). + +%% A batch of storage operations. +-type batch() :: [operation()] | #dsbatch{}. + +-type operation() :: + %% Store a message. + message() + %% Delete a message. + %% Does nothing if the message does not exist. + | {delete, message(_)}. + +%% Precondition. +%% Fails whole batch if the message in question does not match (`while'), or +%% does match (`unless'). Here "match" means that it either just exists (when +%% pattern is '_') or has exactly the same payload, rest of the message fields are +%% irrelevant. Useful to construct batches with "compare-and-set" semantics. +%% Note: backends may not support this, but if they do only DBs with `atomic_batches' +%% enabled are expected to support preconditions in batches. +-type precondition() :: + {while | unless, message(iodata() | '_')}. + -type rank_x() :: term(). -type rank_y() :: integer(). @@ -157,13 +185,7 @@ %% Whether to wait until the message storage has been acknowledged to return from %% `store_batch'. %% Default: `true'. - sync => boolean(), - %% Whether the whole batch given to `store_batch' should be inserted atomically as - %% a unit. Note: the whole batch must be crafted so that it belongs to a single - %% shard (if applicable to the backend), as the batch will be split accordingly - %% even if this flag is `true'. - %% Default: `false'. - atomic => boolean() + sync => boolean() }. -type generic_db_opts() :: @@ -176,6 +198,12 @@ %% If `false' then message timestamps are respected; timestamp, topic and %% serialization key uniquely identify a message. force_monotonic_timestamps => boolean(), + %% Whether the whole batch given to `store_batch' should be processed and + %% inserted atomically as a unit, in isolation from other batches. + %% Default: `false'. + %% The whole batch must be crafted so that it belongs to a single shard (if + %% applicable to the backend). + atomic_batches => boolean(), serialize_by => clientid | topic, _ => _ }. @@ -313,11 +341,11 @@ drop_db(DB) -> Module:drop_db(DB) end. --spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). +-spec store_batch(db(), batch(), message_store_opts()) -> store_batch_result(). store_batch(DB, Msgs, Opts) -> ?module(DB):store_batch(DB, Msgs, Opts). --spec store_batch(db(), [emqx_types:message()]) -> store_batch_result(). +-spec store_batch(db(), batch()) -> store_batch_result(). store_batch(DB, Msgs) -> store_batch(DB, Msgs, #{}). @@ -418,7 +446,10 @@ timestamp_us() -> %%================================================================================ set_db_defaults(Opts) -> - Defaults = #{force_monotonic_timestamps => true}, + Defaults = #{ + force_monotonic_timestamps => true, + atomic_batches => false + }, maps:merge(Defaults, Opts). call_if_implemented(Mod, Fun, Args, Default) -> diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index 4bbc367da..5def5e5a1 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -33,7 +33,7 @@ %% Topic that the message is published to topic :: emqx_types:topic(), %% Message Payload - payload :: emqx_types:payload(), + payload, %% Timestamp (Unit: millisecond) timestamp :: integer(), %% Miscellaneous extensions, currently used for OpenTelemetry context propagation From 0c05b3f01981b8ae29bb06cc235bfa405fde6373 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 17 Jul 2024 16:21:33 +0200 Subject: [PATCH 4/5] fix(ds): make conditionals less confusing --- apps/emqx_durable_storage/src/emqx_ds.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index d745c8c74..1bf72dc59 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -113,14 +113,15 @@ | {delete, message(_)}. %% Precondition. -%% Fails whole batch if the message in question does not match (`while'), or -%% does match (`unless'). Here "match" means that it either just exists (when -%% pattern is '_') or has exactly the same payload, rest of the message fields are -%% irrelevant. Useful to construct batches with "compare-and-set" semantics. +%% Fails whole batch if the storage already has the matching message (`if_exists'), +%% or does not yet have (`unless_exists'). Here "matching" means that it either +%% just exists (when pattern is '_') or has exactly the same payload, rest of the +%% message fields are irrelevant. +%% Useful to construct batches with "compare-and-set" semantics. %% Note: backends may not support this, but if they do only DBs with `atomic_batches' %% enabled are expected to support preconditions in batches. -type precondition() :: - {while | unless, message(iodata() | '_')}. + {if_exists | unless_exists, message(iodata() | '_')}. -type rank_x() :: term(). From 0e545ffcec3531469b120b772ecace3469b0fe19 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 17 Jul 2024 21:21:51 +0200 Subject: [PATCH 5/5] feat(ds): add dedicated `#message_matcher{}` for preconditions --- apps/emqx/src/emqx_types.erl | 4 +--- apps/emqx_durable_storage/include/emqx_ds.hrl | 19 +++++++++++++++++++ apps/emqx_durable_storage/src/emqx_ds.erl | 8 ++++---- apps/emqx_utils/include/emqx_message.hrl | 2 +- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 8563a3d7e..d8dd1cff4 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -82,7 +82,6 @@ -export_type([ payload/0, message/0, - message/1, flag/0, flags/0, headers/0 @@ -239,8 +238,7 @@ -type subscription() :: #subscription{}. -type subscriber() :: {pid(), subid()}. -type payload() :: binary() | iodata(). --type message() :: #message{payload :: emqx_types:payload()}. --type message(Payload) :: #message{payload :: Payload}. +-type message() :: #message{}. -type flag() :: sys | dup | retain | atom(). -type flags() :: #{flag() := boolean()}. -type headers() :: #{ diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index e2377a9b0..fa9b844f8 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -21,4 +21,23 @@ preconditions = [] :: [emqx_ds:precondition()] }). +-record(message_matcher, { + %% Fields identifying the message: + %% Client identifier + from :: binary(), + %% Topic that the message is published to + topic :: emqx_types:topic(), + %% Timestamp (Unit: millisecond) + timestamp :: integer(), + + %% Fields the message is matched against: + %% Message Payload + payload, + %% Message headers + headers = #{} :: emqx_types:headers(), + %% Extra filters + %% Reserved for the forward compatibility purposes. + filters = #{} +}). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 1bf72dc59..69de92325 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -99,8 +99,8 @@ -type message() :: emqx_types:message(). -%% Message pattern. --type message(Pattern) :: emqx_types:message(Pattern). +%% Message matcher. +-type message_matcher(Payload) :: #message_matcher{payload :: Payload}. %% A batch of storage operations. -type batch() :: [operation()] | #dsbatch{}. @@ -110,7 +110,7 @@ message() %% Delete a message. %% Does nothing if the message does not exist. - | {delete, message(_)}. + | {delete, message_matcher('_')}. %% Precondition. %% Fails whole batch if the storage already has the matching message (`if_exists'), @@ -121,7 +121,7 @@ %% Note: backends may not support this, but if they do only DBs with `atomic_batches' %% enabled are expected to support preconditions in batches. -type precondition() :: - {if_exists | unless_exists, message(iodata() | '_')}. + {if_exists | unless_exists, message_matcher(iodata() | '_')}. -type rank_x() :: term(). diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index 5def5e5a1..4bbc367da 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -33,7 +33,7 @@ %% Topic that the message is published to topic :: emqx_types:topic(), %% Message Payload - payload, + payload :: emqx_types:payload(), %% Timestamp (Unit: millisecond) timestamp :: integer(), %% Miscellaneous extensions, currently used for OpenTelemetry context propagation