fix(ds): refine `topic()` type to describe parsed topics

And separate it from `topic_filter()` type, which describes parsed
topic filters.
This commit is contained in:
Andrew Mayorov 2023-09-19 18:35:00 +04:00
parent 9362ef6f73
commit c1583f7f9d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 17 additions and 14 deletions

View File

@ -51,6 +51,7 @@
shard/0, shard/0,
shard_id/0, shard_id/0,
topic/0, topic/0,
topic_filter/0,
time/0 time/0
]). ]).
@ -69,7 +70,7 @@
props := map() props := map()
}. }.
-type iterators() :: #{topic() => iterator()}. -type iterators() :: #{topic_filter() => iterator()}.
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
%% an atom, in theory (?). %% an atom, in theory (?).
@ -79,17 +80,18 @@
-type iterator_id() :: binary(). -type iterator_id() :: binary().
%%-type session() :: #session{}.
-type message_store_opts() :: #{}. -type message_store_opts() :: #{}.
-type message_stats() :: #{}. -type message_stats() :: #{}.
-type message_id() :: binary(). -type message_id() :: binary().
%% Parsed topic: %% Parsed topic.
-type topic() :: list(binary()). -type topic() :: list(binary()).
%% Parsed topic filter.
-type topic_filter() :: list(binary() | '+' | '#' | '').
-type keyspace() :: atom(). -type keyspace() :: atom().
-type shard_id() :: binary(). -type shard_id() :: binary().
-type shard() :: {keyspace(), shard_id()}. -type shard() :: {keyspace(), shard_id()}.
@ -103,7 +105,7 @@
-type replay_id() :: binary(). -type replay_id() :: binary().
-type replay() :: { -type replay() :: {
_TopicFilter :: topic(), _TopicFilter :: topic_filter(),
_StartTime :: time() _StartTime :: time()
}. }.
@ -197,7 +199,7 @@ session_suspend(_SessionId) ->
ok. ok.
%% @doc Called when a client subscribes to a topic. Idempotent. %% @doc Called when a client subscribes to a topic. Idempotent.
-spec session_add_iterator(session_id(), topic(), _Props :: map()) -> -spec session_add_iterator(session_id(), topic_filter(), _Props :: map()) ->
{ok, iterator(), _IsNew :: boolean()}. {ok, iterator(), _IsNew :: boolean()}.
session_add_iterator(DSSessionId, TopicFilter, Props) -> session_add_iterator(DSSessionId, TopicFilter, Props) ->
IteratorRefId = {DSSessionId, TopicFilter}, IteratorRefId = {DSSessionId, TopicFilter},
@ -238,7 +240,7 @@ session_update_iterator(IteratorRef, Props) ->
ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write),
NIteratorRef. NIteratorRef.
-spec session_get_iterator_id(session_id(), topic()) -> -spec session_get_iterator_id(session_id(), topic_filter()) ->
{ok, iterator_id()} | {error, not_found}. {ok, iterator_id()} | {error, not_found}.
session_get_iterator_id(DSSessionId, TopicFilter) -> session_get_iterator_id(DSSessionId, TopicFilter) ->
IteratorRefId = {DSSessionId, TopicFilter}, IteratorRefId = {DSSessionId, TopicFilter},
@ -250,7 +252,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) ->
end. end.
%% @doc Called when a client unsubscribes from a topic. %% @doc Called when a client unsubscribes from a topic.
-spec session_del_iterator(session_id(), topic()) -> ok. -spec session_del_iterator(session_id(), topic_filter()) -> ok.
session_del_iterator(DSSessionId, TopicFilter) -> session_del_iterator(DSSessionId, TopicFilter) ->
IteratorRefId = {DSSessionId, TopicFilter}, IteratorRefId = {DSSessionId, TopicFilter},
transaction(fun() -> transaction(fun() ->

View File

@ -31,7 +31,7 @@
}). }).
-record(iterator_ref, { -record(iterator_ref, {
ref_id :: {emqx_ds:session_id(), emqx_topic:words()}, ref_id :: {emqx_ds:session_id(), emqx_ds:topic_filter()},
it_id :: emqx_ds:iterator_id(), it_id :: emqx_ds:iterator_id(),
start_time :: emqx_ds:time(), start_time :: emqx_ds:time(),
props = #{} :: map() props = #{} :: map()

View File

@ -132,6 +132,7 @@
%%================================================================================ %%================================================================================
-type topic() :: emqx_ds:topic(). -type topic() :: emqx_ds:topic().
-type topic_filter() :: emqx_ds:topic_filter().
-type time() :: emqx_ds:time(). -type time() :: emqx_ds:time().
%% Number of bits %% Number of bits
@ -191,7 +192,7 @@
-record(filter, { -record(filter, {
keymapper :: keymapper(), keymapper :: keymapper(),
topic_filter :: emqx_topic:words(), topic_filter :: topic_filter(),
start_time :: integer(), start_time :: integer(),
hash_bitfilter :: integer(), hash_bitfilter :: integer(),
hash_bitmask :: integer(), hash_bitmask :: integer(),
@ -412,11 +413,11 @@ extract(Key, #keymapper{bitsize = Size}) ->
<<Bitstring:Size/integer, _MessageID/binary>> = Key, <<Bitstring:Size/integer, _MessageID/binary>> = Key,
Bitstring. Bitstring.
-spec compute_bitstring(topic(), time(), keymapper()) -> integer(). -spec compute_bitstring(topic_filter(), time(), keymapper()) -> integer().
compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) -> compute_bitstring(TopicFilter, Timestamp, #keymapper{source = Source}) ->
compute_bitstring(Topic, Timestamp, Source, 0). compute_bitstring(TopicFilter, Timestamp, Source, 0).
-spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer(). -spec compute_topic_bitmask(topic_filter(), keymapper()) -> integer().
compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
compute_topic_bitmask(TopicFilter, Source, 0). compute_topic_bitmask(TopicFilter, Source, 0).