fix(emqx_types): spec refs `emqx_types.erl` instead directly.

This commit is contained in:
JimMoen 2021-09-17 17:15:32 +08:00
parent 9c95557bfc
commit 6edc9f4221
14 changed files with 58 additions and 59 deletions

View File

@ -119,17 +119,17 @@ is_running(Node) ->
%% PubSub API %% PubSub API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(subscribe(emqx_topic:topic() | string()) -> ok). -spec(subscribe(emqx_types:topic() | string()) -> ok).
subscribe(Topic) -> subscribe(Topic) ->
emqx_broker:subscribe(iolist_to_binary(Topic)). emqx_broker:subscribe(iolist_to_binary(Topic)).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok). -spec(subscribe(emqx_types:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)-> subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId); emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
subscribe(Topic, SubOpts) when is_map(SubOpts) -> subscribe(Topic, SubOpts) when is_map(SubOpts) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts). emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts).
-spec(subscribe(emqx_topic:topic() | string(), -spec(subscribe(emqx_types:topic() | string(),
emqx_types:subid() | pid(), emqx_types:subopts()) -> ok). emqx_types:subid() | pid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) -> subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts). emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
@ -138,7 +138,7 @@ subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)),
publish(Msg) -> publish(Msg) ->
emqx_broker:publish(Msg). emqx_broker:publish(Msg).
-spec(unsubscribe(emqx_topic:topic() | string()) -> ok). -spec(unsubscribe(emqx_types:topic() | string()) -> ok).
unsubscribe(Topic) -> unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)). emqx_broker:unsubscribe(iolist_to_binary(Topic)).
@ -146,18 +146,18 @@ unsubscribe(Topic) ->
%% PubSub management API %% PubSub management API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(topics() -> list(emqx_topic:topic())). -spec(topics() -> list(emqx_types:topic())).
topics() -> emqx_router:topics(). topics() -> emqx_router:topics().
-spec(subscribers(emqx_topic:topic() | string()) -> [pid()]). -spec(subscribers(emqx_types:topic() | string()) -> [pid()]).
subscribers(Topic) -> subscribers(Topic) ->
emqx_broker:subscribers(iolist_to_binary(Topic)). emqx_broker:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(pid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). -spec(subscriptions(pid()) -> [{emqx_types:topic(), emqx_types:subopts()}]).
subscriptions(SubPid) when is_pid(SubPid) -> subscriptions(SubPid) when is_pid(SubPid) ->
emqx_broker:subscriptions(SubPid). emqx_broker:subscriptions(SubPid).
-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic() | string()) -> boolean()). -spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic() | string()) -> boolean()).
subscribed(SubPid, Topic) when is_pid(SubPid) -> subscribed(SubPid, Topic) when is_pid(SubPid) ->
emqx_broker:subscribed(SubPid, iolist_to_binary(Topic)); emqx_broker:subscribed(SubPid, iolist_to_binary(Topic));
subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) -> subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->

View File

@ -68,7 +68,7 @@ list_authz_cache() ->
map_authz_cache(fun(Cache) -> Cache end). map_authz_cache(fun(Cache) -> Cache end).
%% We'll cleanup the cache before replacing an expired authz. %% We'll cleanup the cache before replacing an expired authz.
-spec get_authz_cache(emqx_types:pubsub(), emqx_topic:topic()) -> -spec get_authz_cache(emqx_types:pubsub(), emqx_types:topic()) ->
authz_result() | not_found. authz_result() | not_found.
get_authz_cache(PubSub, Topic) -> get_authz_cache(PubSub, Topic) ->
case erlang:get(cache_k(PubSub, Topic)) of case erlang:get(cache_k(PubSub, Topic)) of
@ -85,7 +85,7 @@ get_authz_cache(PubSub, Topic) ->
%% If the cache get full, and also the latest one %% If the cache get full, and also the latest one
%% is expired, then delete all the cache entries %% is expired, then delete all the cache entries
-spec put_authz_cache(emqx_types:pubsub(), emqx_topic:topic(), authz_result()) -spec put_authz_cache(emqx_types:pubsub(), emqx_types:topic(), authz_result())
-> ok. -> ok.
put_authz_cache(PubSub, Topic, AuthzResult) -> put_authz_cache(PubSub, Topic, AuthzResult) ->
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),

View File

@ -112,17 +112,17 @@ create_tabs() ->
%% Subscribe API %% Subscribe API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(subscribe(emqx_topic:topic()) -> ok). -spec(subscribe(emqx_types:topic()) -> ok).
subscribe(Topic) when is_binary(Topic) -> subscribe(Topic) when is_binary(Topic) ->
subscribe(Topic, undefined). subscribe(Topic, undefined).
-spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok). -spec(subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS); subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
subscribe(Topic, undefined, SubOpts). subscribe(Topic, undefined, SubOpts).
-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok). -spec(subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) -> subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) ->
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
@ -165,7 +165,7 @@ do_subscribe(Group, Topic, SubPid, SubOpts) ->
%% Unsubscribe API %% Unsubscribe API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(unsubscribe(emqx_topic:topic()) -> ok). -spec(unsubscribe(emqx_types:topic()) -> ok).
unsubscribe(Topic) when is_binary(Topic) -> unsubscribe(Topic) when is_binary(Topic) ->
SubPid = self(), SubPid = self(),
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
@ -279,7 +279,7 @@ forward(Node, To, Delivery, sync) ->
emqx_metrics:inc('messages.forward'), Result emqx_metrics:inc('messages.forward'), Result
end. end.
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
dispatch(Topic, #delivery{message = Msg}) -> dispatch(Topic, #delivery{message = Msg}) ->
DispN = lists:foldl( DispN = lists:foldl(
fun(Sub, N) -> fun(Sub, N) ->
@ -316,7 +316,7 @@ inc_dropped_cnt(Msg) ->
end. end.
-compile({inline, [subscribers/1]}). -compile({inline, [subscribers/1]}).
-spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()}) -spec(subscribers(emqx_types:topic() | {shard, emqx_types:topic(), non_neg_integer()})
-> [pid()]). -> [pid()]).
subscribers(Topic) when is_binary(Topic) -> subscribers(Topic) when is_binary(Topic) ->
lookup_value(?SUBSCRIBER, Topic, []); lookup_value(?SUBSCRIBER, Topic, []);
@ -351,7 +351,7 @@ subscriber_down(SubPid) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(subscriptions(pid() | emqx_types:subid()) -spec(subscriptions(pid() | emqx_types:subid())
-> [{emqx_topic:topic(), emqx_types:subopts()}]). -> [{emqx_types:topic(), emqx_types:subopts()}]).
subscriptions(SubPid) when is_pid(SubPid) -> subscriptions(SubPid) when is_pid(SubPid) ->
[{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})} [{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])]; || Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])];
@ -362,14 +362,14 @@ subscriptions(SubId) ->
undefined -> [] undefined -> []
end. end.
-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()). -spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
subscribed(SubPid, Topic) when is_pid(SubPid) -> subscribed(SubPid, Topic) when is_pid(SubPid) ->
ets:member(?SUBOPTION, {SubPid, Topic}); ets:member(?SUBOPTION, {SubPid, Topic});
subscribed(SubId, Topic) when ?is_subid(SubId) -> subscribed(SubId, Topic) when ?is_subid(SubId) ->
SubPid = emqx_broker_helper:lookup_subpid(SubId), SubPid = emqx_broker_helper:lookup_subpid(SubId),
ets:member(?SUBOPTION, {SubPid, Topic}). ets:member(?SUBOPTION, {SubPid, Topic}).
-spec(get_subopts(pid(), emqx_topic:topic()) -> maybe(emqx_types:subopts())). -spec(get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts())).
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
lookup_value(?SUBOPTION, {SubPid, Topic}); lookup_value(?SUBOPTION, {SubPid, Topic});
get_subopts(SubId, Topic) when ?is_subid(SubId) -> get_subopts(SubId, Topic) when ?is_subid(SubId) ->
@ -379,7 +379,7 @@ get_subopts(SubId, Topic) when ?is_subid(SubId) ->
undefined -> undefined undefined -> undefined
end. end.
-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()). -spec(set_subopts(emqx_types:topic(), emqx_types:subopts()) -> boolean()).
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
set_subopts(self(), Topic, NewOpts). set_subopts(self(), Topic, NewOpts).
@ -392,7 +392,7 @@ set_subopts(SubPid, Topic, NewOpts) ->
[] -> false [] -> false
end. end.
-spec(topics() -> [emqx_topic:topic()]). -spec(topics() -> [emqx_types:topic()]).
topics() -> topics() ->
emqx_router:topics(). emqx_router:topics().

View File

@ -78,7 +78,7 @@ lookup_subid(SubPid) when is_pid(SubPid) ->
lookup_subpid(SubId) -> lookup_subpid(SubId) ->
emqx_tables:lookup_value(?SUBID, SubId). emqx_tables:lookup_value(?SUBID, SubId).
-spec(get_sub_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). -spec(get_sub_shard(pid(), emqx_types:topic()) -> non_neg_integer()).
get_sub_shard(SubPid, Topic) -> get_sub_shard(SubPid, Topic) ->
case create_seq(Topic) of case create_seq(Topic) of
Seq when Seq =< ?SHARD -> 0; Seq when Seq =< ?SHARD -> 0;
@ -90,11 +90,11 @@ shards_num() ->
%% Dynamic sharding later... %% Dynamic sharding later...
ets:lookup_element(?HELPER, shards, 2). ets:lookup_element(?HELPER, shards, 2).
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). -spec(create_seq(emqx_types:topic()) -> emqx_sequence:seqid()).
create_seq(Topic) -> create_seq(Topic) ->
emqx_sequence:nextval(?SUBSEQ, Topic). emqx_sequence:nextval(?SUBSEQ, Topic).
-spec(reclaim_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). -spec(reclaim_seq(emqx_types:topic()) -> emqx_sequence:seqid()).
reclaim_seq(Topic) -> reclaim_seq(Topic) ->
emqx_sequence:reclaim(?SUBSEQ, Topic). emqx_sequence:reclaim(?SUBSEQ, Topic).

View File

@ -86,19 +86,19 @@
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). -spec(make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message()).
make(Topic, Payload) -> make(Topic, Payload) ->
make(undefined, Topic, Payload). make(undefined, Topic, Payload).
-spec(make(emqx_types:clientid(), -spec(make(emqx_types:clientid(),
emqx_topic:topic(), emqx_types:topic(),
emqx_types:payload()) -> emqx_types:message()). emqx_types:payload()) -> emqx_types:message()).
make(From, Topic, Payload) -> make(From, Topic, Payload) ->
make(From, ?QOS_0, Topic, Payload). make(From, ?QOS_0, Topic, Payload).
-spec(make(emqx_types:clientid(), -spec(make(emqx_types:clientid(),
emqx_types:qos(), emqx_types:qos(),
emqx_topic:topic(), emqx_types:topic(),
emqx_types:payload()) -> emqx_types:message()). emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
@ -112,7 +112,7 @@ make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
-spec(make(emqx_types:clientid(), -spec(make(emqx_types:clientid(),
emqx_types:qos(), emqx_types:qos(),
emqx_topic:topic(), emqx_types:topic(),
emqx_types:payload(), emqx_types:payload(),
emqx_types:flags(), emqx_types:flags(),
emqx_types:headers()) -> emqx_types:message()). emqx_types:headers()) -> emqx_types:message()).
@ -133,7 +133,7 @@ make(From, QoS, Topic, Payload, Flags, Headers)
-spec(make(MsgId :: binary(), -spec(make(MsgId :: binary(),
emqx_types:clientid(), emqx_types:clientid(),
emqx_types:qos(), emqx_types:qos(),
emqx_topic:topic(), emqx_types:topic(),
emqx_types:payload(), emqx_types:payload(),
emqx_types:flags(), emqx_types:flags(),
emqx_types:headers()) -> emqx_types:message()). emqx_types:headers()) -> emqx_types:message()).

View File

@ -67,7 +67,7 @@
-spec(check_pub(emqx_types:zone(), -spec(check_pub(emqx_types:zone(),
#{qos := emqx_types:qos(), #{qos := emqx_types:qos(),
retain := boolean(), retain := boolean(),
topic := emqx_topic:topic()}) topic := emqx_types:topic()})
-> ok_or_error(emqx_types:reason_code())). -> ok_or_error(emqx_types:reason_code())).
check_pub(Zone, Flags) when is_map(Flags) -> check_pub(Zone, Flags) when is_map(Flags) ->
do_check_pub(case maps:take(topic, Flags) of do_check_pub(case maps:take(topic, Flags) of

View File

@ -71,7 +71,7 @@
-export_type([mqueue/0, options/0]). -export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()). -type(topic() :: emqx_types:topic()).
-type(priority() :: infinity | integer()). -type(priority() :: infinity | integer()).
-type(pq() :: emqx_pqueue:q()). -type(pq() :: emqx_pqueue:q()).
-type(count() :: non_neg_integer()). -type(count() :: non_neg_integer()).

View File

@ -98,19 +98,19 @@ start_link(Pool, Id) ->
%% Route APIs %% Route APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(add_route(emqx_topic:topic()) -> ok | {error, term()}). -spec(add_route(emqx_types:topic()) -> ok | {error, term()}).
add_route(Topic) when is_binary(Topic) -> add_route(Topic) when is_binary(Topic) ->
add_route(Topic, node()). add_route(Topic, node()).
-spec(add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). -spec(add_route(emqx_types:topic(), dest()) -> ok | {error, term()}).
add_route(Topic, Dest) when is_binary(Topic) -> add_route(Topic, Dest) when is_binary(Topic) ->
call(pick(Topic), {add_route, Topic, Dest}). call(pick(Topic), {add_route, Topic, Dest}).
-spec(do_add_route(emqx_topic:topic()) -> ok | {error, term()}). -spec(do_add_route(emqx_types:topic()) -> ok | {error, term()}).
do_add_route(Topic) when is_binary(Topic) -> do_add_route(Topic) when is_binary(Topic) ->
do_add_route(Topic, node()). do_add_route(Topic, node()).
-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). -spec(do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}).
do_add_route(Topic, Dest) when is_binary(Topic) -> do_add_route(Topic, Dest) when is_binary(Topic) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
case lists:member(Route, lookup_routes(Topic)) of case lists:member(Route, lookup_routes(Topic)) of
@ -125,7 +125,7 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
end. end.
%% @doc Match routes %% @doc Match routes
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). -spec(match_routes(emqx_types:topic()) -> [emqx_types:route()]).
match_routes(Topic) when is_binary(Topic) -> match_routes(Topic) when is_binary(Topic) ->
case match_trie(Topic) of case match_trie(Topic) of
[] -> lookup_routes(Topic); [] -> lookup_routes(Topic);
@ -140,27 +140,27 @@ match_trie(Topic) ->
false -> emqx_trie:match(Topic) false -> emqx_trie:match(Topic)
end. end.
-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]). -spec(lookup_routes(emqx_types:topic()) -> [emqx_types:route()]).
lookup_routes(Topic) -> lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic). ets:lookup(?ROUTE_TAB, Topic).
-spec(has_routes(emqx_topic:topic()) -> boolean()). -spec(has_routes(emqx_types:topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) -> has_routes(Topic) when is_binary(Topic) ->
ets:member(?ROUTE_TAB, Topic). ets:member(?ROUTE_TAB, Topic).
-spec(delete_route(emqx_topic:topic()) -> ok | {error, term()}). -spec(delete_route(emqx_types:topic()) -> ok | {error, term()}).
delete_route(Topic) when is_binary(Topic) -> delete_route(Topic) when is_binary(Topic) ->
delete_route(Topic, node()). delete_route(Topic, node()).
-spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). -spec(delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}).
delete_route(Topic, Dest) when is_binary(Topic) -> delete_route(Topic, Dest) when is_binary(Topic) ->
call(pick(Topic), {delete_route, Topic, Dest}). call(pick(Topic), {delete_route, Topic, Dest}).
-spec(do_delete_route(emqx_topic:topic()) -> ok | {error, term()}). -spec(do_delete_route(emqx_types:topic()) -> ok | {error, term()}).
do_delete_route(Topic) when is_binary(Topic) -> do_delete_route(Topic) when is_binary(Topic) ->
do_delete_route(Topic, node()). do_delete_route(Topic, node()).
-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). -spec(do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}).
do_delete_route(Topic, Dest) -> do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
@ -169,12 +169,12 @@ do_delete_route(Topic, Dest) ->
false -> delete_direct_route(Route) false -> delete_direct_route(Route)
end. end.
-spec(topics() -> list(emqx_topic:topic())). -spec(topics() -> list(emqx_types:topic())).
topics() -> topics() ->
mnesia:dirty_all_keys(?ROUTE_TAB). mnesia:dirty_all_keys(?ROUTE_TAB).
%% @doc Print routes to a topic %% @doc Print routes to a topic
-spec(print_routes(emqx_topic:topic()) -> ok). -spec(print_routes(emqx_types:topic()) -> ok).
print_routes(Topic) -> print_routes(Topic) ->
lists:foreach(fun(#route{topic = To, dest = Dest}) -> lists:foreach(fun(#route{topic = To, dest = Dest}) ->
io:format("~s -> ~s~n", [To, Dest]) io:format("~s -> ~s~n", [To, Dest])

View File

@ -103,18 +103,18 @@ mnesia(copy) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(subscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok). -spec(subscribe(emqx_types:group(), emqx_types:topic(), pid()) -> ok).
subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}). gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
-spec(unsubscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok). -spec(unsubscribe(emqx_types:group(), emqx_types:topic(), pid()) -> ok).
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
record(Group, Topic, SubPid) -> record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) -spec(dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery())
-> emqx_types:deliver_result()). -> emqx_types:deliver_result()).
dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery) ->
dispatch(Group, Topic, Delivery, _FailedSubs = []). dispatch(Group, Topic, Delivery, _FailedSubs = []).

View File

@ -77,7 +77,7 @@ mnesia(copy) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Insert a topic filter into the trie. %% @doc Insert a topic filter into the trie.
-spec(insert(emqx_topic:topic()) -> ok). -spec(insert(emqx_types:topic()) -> ok).
insert(Topic) when is_binary(Topic) -> insert(Topic) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic), {TopicKey, PrefixKeys} = make_keys(Topic),
case mnesia:wread({?TRIE, TopicKey}) of case mnesia:wread({?TRIE, TopicKey}) of
@ -86,7 +86,7 @@ insert(Topic) when is_binary(Topic) ->
end. end.
%% @doc Delete a topic filter from the trie. %% @doc Delete a topic filter from the trie.
-spec(delete(emqx_topic:topic()) -> ok). -spec(delete(emqx_types:topic()) -> ok).
delete(Topic) when is_binary(Topic) -> delete(Topic) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic), {TopicKey, PrefixKeys} = make_keys(Topic),
case [] =/= mnesia:wread({?TRIE, TopicKey}) of case [] =/= mnesia:wread({?TRIE, TopicKey}) of
@ -95,7 +95,7 @@ delete(Topic) when is_binary(Topic) ->
end. end.
%% @doc Find trie nodes that matchs the topic name. %% @doc Find trie nodes that matchs the topic name.
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())). -spec(match(emqx_types:topic()) -> list(emqx_types:topic())).
match(Topic) when is_binary(Topic) -> match(Topic) when is_binary(Topic) ->
Words = emqx_topic:words(Topic), Words = emqx_topic:words(Topic),
case emqx_topic:wildcard(Words) of case emqx_topic:wildcard(Words) of

View File

@ -201,8 +201,8 @@
-type(publish_result() :: [{node(), topic(), deliver_result()} | -type(publish_result() :: [{node(), topic(), deliver_result()} |
{share, topic(), deliver_result()}]). {share, topic(), deliver_result()}]).
-type(route() :: #route{}). -type(route() :: #route{}).
-type(sub_group() :: tuple() | binary()). -type(group() :: emqx_topic:group()).
-type(route_entry() :: {topic(), node()} | {topic, sub_group()}). -type(route_entry() :: {topic(), node()} | {topic, group()}).
-type(plugin() :: #plugin{}). -type(plugin() :: #plugin{}).
-type(command() :: #command{}). -type(command() :: #command{}).
@ -215,4 +215,3 @@
max_heap_size => non_neg_integer(), max_heap_size => non_neg_integer(),
enable => boolean() enable => boolean()
}). }).

View File

@ -14,7 +14,7 @@
-type(permission() :: allow | deny). -type(permission() :: allow | deny).
-type(rule() :: {permission(), who(), action(), list(emqx_topic:topic())}). -type(rule() :: {permission(), who(), action(), list(emqx_types:topic())}).
-type(rules() :: [rule()]). -type(rules() :: [rule()]).
-type(sources() :: [map()]). -type(sources() :: [map()]).

View File

@ -326,7 +326,7 @@ init_source(#{enable := false} = Source) ->Source.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Check AuthZ %% @doc Check AuthZ
-spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_topic:topic(), allow | deny, sources()) -spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_types:topic(), allow | deny, sources())
-> {stop, allow} | {ok, deny}). -> {stop, allow} | {ok, deny}).
authorize(#{username := Username, authorize(#{username := Username,
peerhost := IpAddress peerhost := IpAddress

View File

@ -108,7 +108,7 @@
-type config() :: map(). -type config() :: map().
-type batch() :: [emqx_connector_mqtt_msg:exp_msg()]. -type batch() :: [emqx_connector_mqtt_msg:exp_msg()].
-type ack_ref() :: term(). -type ack_ref() :: term().
-type topic() :: emqx_topic:topic(). -type topic() :: emqx_types:topic().
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
@ -176,7 +176,7 @@ ping(Name) ->
get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)). get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)).
%% @doc Return all subscriptions (subscription over mqtt connection to remote broker). %% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. -spec get_subscriptions(id()) -> [{emqx_types:topic(), qos()}].
get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions). get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions).
callback_mode() -> [state_functions]. callback_mode() -> [state_functions].