diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 8563a3d7e..d8dd1cff4 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -82,7 +82,6 @@ -export_type([ payload/0, message/0, - message/1, flag/0, flags/0, headers/0 @@ -239,8 +238,7 @@ -type subscription() :: #subscription{}. -type subscriber() :: {pid(), subid()}. -type payload() :: binary() | iodata(). --type message() :: #message{payload :: emqx_types:payload()}. --type message(Payload) :: #message{payload :: Payload}. +-type message() :: #message{}. -type flag() :: sys | dup | retain | atom(). -type flags() :: #{flag() := boolean()}. -type headers() :: #{ diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index e2377a9b0..fa9b844f8 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -21,4 +21,23 @@ preconditions = [] :: [emqx_ds:precondition()] }). +-record(message_matcher, { + %% Fields identifying the message: + %% Client identifier + from :: binary(), + %% Topic that the message is published to + topic :: emqx_types:topic(), + %% Timestamp (Unit: millisecond) + timestamp :: integer(), + + %% Fields the message is matched against: + %% Message Payload + payload, + %% Message headers + headers = #{} :: emqx_types:headers(), + %% Extra filters + %% Reserved for the forward compatibility purposes. + filters = #{} +}). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 1bf72dc59..69de92325 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -99,8 +99,8 @@ -type message() :: emqx_types:message(). -%% Message pattern. --type message(Pattern) :: emqx_types:message(Pattern). +%% Message matcher. +-type message_matcher(Payload) :: #message_matcher{payload :: Payload}. %% A batch of storage operations. -type batch() :: [operation()] | #dsbatch{}. @@ -110,7 +110,7 @@ message() %% Delete a message. %% Does nothing if the message does not exist. - | {delete, message(_)}. + | {delete, message_matcher('_')}. %% Precondition. %% Fails whole batch if the storage already has the matching message (`if_exists'), @@ -121,7 +121,7 @@ %% Note: backends may not support this, but if they do only DBs with `atomic_batches' %% enabled are expected to support preconditions in batches. -type precondition() :: - {if_exists | unless_exists, message(iodata() | '_')}. + {if_exists | unless_exists, message_matcher(iodata() | '_')}. -type rank_x() :: term(). diff --git a/apps/emqx_utils/include/emqx_message.hrl b/apps/emqx_utils/include/emqx_message.hrl index 5def5e5a1..4bbc367da 100644 --- a/apps/emqx_utils/include/emqx_message.hrl +++ b/apps/emqx_utils/include/emqx_message.hrl @@ -33,7 +33,7 @@ %% Topic that the message is published to topic :: emqx_types:topic(), %% Message Payload - payload, + payload :: emqx_types:payload(), %% Timestamp (Unit: millisecond) timestamp :: integer(), %% Miscellaneous extensions, currently used for OpenTelemetry context propagation