diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 62d6369ea..e7890a3a1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -51,6 +51,7 @@ shard/0, shard_id/0, topic/0, + topic_filter/0, time/0 ]). @@ -69,7 +70,7 @@ props := map() }. --type iterators() :: #{topic() => iterator()}. +-type iterators() :: #{topic_filter() => iterator()}. %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). @@ -79,17 +80,18 @@ -type iterator_id() :: binary(). -%%-type session() :: #session{}. - -type message_store_opts() :: #{}. -type message_stats() :: #{}. -type message_id() :: binary(). -%% Parsed topic: +%% Parsed topic. -type topic() :: list(binary()). +%% Parsed topic filter. +-type topic_filter() :: list(binary() | '+' | '#' | ''). + -type keyspace() :: atom(). -type shard_id() :: binary(). -type shard() :: {keyspace(), shard_id()}. @@ -103,7 +105,7 @@ -type replay_id() :: binary(). -type replay() :: { - _TopicFilter :: topic(), + _TopicFilter :: topic_filter(), _StartTime :: time() }. @@ -197,7 +199,7 @@ session_suspend(_SessionId) -> ok. %% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_iterator(session_id(), topic(), _Props :: map()) -> +-spec session_add_iterator(session_id(), topic_filter(), _Props :: map()) -> {ok, iterator(), _IsNew :: boolean()}. session_add_iterator(DSSessionId, TopicFilter, Props) -> IteratorRefId = {DSSessionId, TopicFilter}, @@ -238,7 +240,7 @@ session_update_iterator(IteratorRef, Props) -> ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), NIteratorRef. --spec session_get_iterator_id(session_id(), topic()) -> +-spec session_get_iterator_id(session_id(), topic_filter()) -> {ok, iterator_id()} | {error, not_found}. session_get_iterator_id(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, @@ -250,7 +252,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) -> end. %% @doc Called when a client unsubscribes from a topic. --spec session_del_iterator(session_id(), topic()) -> ok. +-spec session_del_iterator(session_id(), topic_filter()) -> ok. session_del_iterator(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, transaction(fun() -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index bca0088b5..162d14b83 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -31,7 +31,7 @@ }). -record(iterator_ref, { - ref_id :: {emqx_ds:session_id(), emqx_topic:words()}, + ref_id :: {emqx_ds:session_id(), emqx_ds:topic_filter()}, it_id :: emqx_ds:iterator_id(), start_time :: emqx_ds:time(), props = #{} :: map() diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 437cc5b06..7b141b202 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -132,6 +132,7 @@ %%================================================================================ -type topic() :: emqx_ds:topic(). +-type topic_filter() :: emqx_ds:topic_filter(). -type time() :: emqx_ds:time(). %% Number of bits @@ -191,7 +192,7 @@ -record(filter, { keymapper :: keymapper(), - topic_filter :: emqx_topic:words(), + topic_filter :: topic_filter(), start_time :: integer(), hash_bitfilter :: integer(), hash_bitmask :: integer(), @@ -412,11 +413,11 @@ extract(Key, #keymapper{bitsize = Size}) -> <> = Key, Bitstring. --spec compute_bitstring(topic(), time(), keymapper()) -> integer(). -compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) -> - compute_bitstring(Topic, Timestamp, Source, 0). +-spec compute_bitstring(topic_filter(), time(), keymapper()) -> integer(). +compute_bitstring(TopicFilter, Timestamp, #keymapper{source = Source}) -> + compute_bitstring(TopicFilter, Timestamp, Source, 0). --spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer(). +-spec compute_topic_bitmask(topic_filter(), keymapper()) -> integer(). compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> compute_topic_bitmask(TopicFilter, Source, 0).