From ea81a924f1702ab8da58a51aeb31396dea8c6238 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 1 Jun 2023 14:34:25 +0800 Subject: [PATCH] refactor: move topic() types def in `emqx_types.erl` --- apps/emqx/include/emqx_mqtt.hrl | 7 +++++++ apps/emqx/src/emqx_broker.erl | 4 ++-- apps/emqx/src/emqx_frame.erl | 2 ++ apps/emqx/src/emqx_mqueue.erl | 3 +-- apps/emqx/src/emqx_session_router.erl | 12 +++++------ apps/emqx/src/emqx_shared_sub.erl | 4 ++-- apps/emqx/src/emqx_topic.erl | 21 +++++++------------ apps/emqx/src/emqx_trie.erl | 6 +++--- apps/emqx/src/emqx_types.erl | 15 ++++++++++--- apps/emqx_authz/src/emqx_authz_mnesia.erl | 2 +- .../src/emqx_bridge_mqtt_egress.erl | 2 +- .../src/emqx_bridge_mqtt_ingress.erl | 2 +- .../emqx_retainer/src/emqx_retainer_index.erl | 14 ++++++------- 13 files changed, 52 insertions(+), 42 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index e4f9111da..8a40f614f 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -48,6 +48,13 @@ {?MQTT_PROTO_V5, <<"MQTT">>} ]). +%%-------------------------------------------------------------------- +%% MQTT Topic and TopitFilter byte length +%%-------------------------------------------------------------------- + +%% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3] +-define(MAX_TOPIC_LEN, 65535). + %%-------------------------------------------------------------------- %% MQTT QoS Levels %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 390b732b9..1bbe4dc82 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -103,10 +103,10 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubOption: {Topic, SubPid} -> SubOption + %% SubOption: {TopicFilter, SubPid} -> SubOption ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]), - %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... + %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ... %% duplicate_bag: o(1) insert ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 5999e93f3..8620f834f 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -300,6 +300,7 @@ parse_connect2( ConnPacket = #mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, + %% For bridge mode, non-standard implementation is_bridge = (BridgeTag =:= 8), clean_start = bool(CleanStart), will_flag = bool(WillFlag), @@ -762,6 +763,7 @@ serialize_variable( #mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, + %% For bridge mode, non-standard implementation is_bridge = IsBridge, clean_start = CleanStart, will_flag = WillFlag, diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index fb43941ee..0ef7d56e5 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -75,11 +75,10 @@ -export_type([mqueue/0, options/0]). --type topic() :: emqx_types:topic(). -type priority() :: infinity | integer(). -type pq() :: emqx_pqueue:q(). -type count() :: non_neg_integer(). --type p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}. +-type p_table() :: ?NO_PRIORITY_TABLE | #{emqx_types:topic() := priority()}. -type options() :: #{ max_len := count(), priorities => p_table(), diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 0435ddca3..47e4bdf74 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -57,9 +57,7 @@ code_change/3 ]). --type group() :: binary(). - --type dest() :: node() | {group(), node()}. +-type dest() :: node() | {emqx_types:group(), node()}. -define(ROUTE_RAM_TAB, emqx_session_route_ram). -define(ROUTE_DISC_TAB, emqx_session_route_disc). @@ -114,7 +112,7 @@ start_link(Pool, Id) -> %% Route APIs %%-------------------------------------------------------------------- --spec do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}. +-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, SessionID) when is_binary(Topic) -> Route = #route{topic = Topic, dest = SessionID}, case lists:member(Route, lookup_routes(Topic)) of @@ -135,7 +133,7 @@ do_add_route(Topic, SessionID) when is_binary(Topic) -> end. %% @doc Match routes --spec match_routes(emqx_topic:topic()) -> [emqx_types:route()]. +-spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> case match_trie(Topic) of [] -> lookup_routes(Topic); @@ -153,7 +151,7 @@ match_trie(Topic) -> delete_routes(SessionID, Subscriptions) -> cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}). --spec do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}. +-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, SessionID) -> Route = #route{topic = Topic, dest = SessionID}, case emqx_topic:wildcard(Topic) of @@ -165,7 +163,7 @@ do_delete_route(Topic, SessionID) -> end. %% @doc Print routes to a topic --spec print_routes(emqx_topic:topic()) -> ok. +-spec print_routes(emqx_types:topic()) -> ok. print_routes(Topic) -> lists:foreach( fun(#route{topic = To, dest = SessionID}) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index bc13a06c6..3a370ddba 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -97,7 +97,7 @@ -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). -define(SUBSCRIBER_DOWN, noproc). --type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). +-type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()). -record(state, {pmon}). @@ -156,7 +156,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> end end. --spec strategy(emqx_topic:group()) -> strategy(). +-spec strategy(emqx_types:group()) -> strategy(). strategy(Group) -> try emqx:get_config([ diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 6434154ed..f667455b4 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -16,6 +16,8 @@ -module(emqx_topic). +-include("emqx_mqtt.hrl"). + %% APIs -export([ match/2, @@ -33,18 +35,9 @@ parse/2 ]). --export_type([ - group/0, - topic/0, - word/0 -]). - --type group() :: binary(). --type topic() :: binary(). --type word() :: '' | '+' | '#' | binary(). --type words() :: list(word()). - --define(MAX_TOPIC_LEN, 65535). +-type topic() :: emqx_types:topic(). +-type word() :: emqx_types:word(). +-type words() :: emqx_types:words(). %%-------------------------------------------------------------------- %% APIs @@ -142,6 +135,7 @@ prepend(Parent0, W) -> _ -> <> end. +-spec bin(word()) -> binary(). bin('') -> <<>>; bin('+') -> <<"+">>; bin('#') -> <<"#">>; @@ -163,6 +157,7 @@ tokens(Topic) -> words(Topic) when is_binary(Topic) -> [word(W) || W <- tokens(Topic)]. +-spec word(binary()) -> word(). word(<<>>) -> ''; word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; @@ -185,7 +180,7 @@ feed_var(Var, Val, [Var | Words], Acc) -> feed_var(Var, Val, [W | Words], Acc) -> feed_var(Var, Val, Words, [W | Acc]). --spec join(list(binary())) -> binary(). +-spec join(list(word())) -> binary(). join([]) -> <<>>; join([W]) -> diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index b4ff49bb3..229a0e3f4 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -114,7 +114,7 @@ create_session_trie(Type) -> insert(Topic) when is_binary(Topic) -> insert(Topic, ?TRIE). --spec insert_session(emqx_topic:topic()) -> ok. +-spec insert_session(emqx_types:topic()) -> ok. insert_session(Topic) when is_binary(Topic) -> insert(Topic, session_trie()). @@ -132,7 +132,7 @@ delete(Topic) when is_binary(Topic) -> delete(Topic, ?TRIE). %% @doc Delete a topic filter from the trie. --spec delete_session(emqx_topic:topic()) -> ok. +-spec delete_session(emqx_types:topic()) -> ok. delete_session(Topic) when is_binary(Topic) -> delete(Topic, session_trie()). @@ -148,7 +148,7 @@ delete(Topic, Trie) when is_binary(Topic) -> match(Topic) when is_binary(Topic) -> match(Topic, ?TRIE). --spec match_session(emqx_topic:topic()) -> list(emqx_topic:topic()). +-spec match_session(emqx_types:topic()) -> list(emqx_types:topic()). match_session(Topic) when is_binary(Topic) -> match(Topic, session_trie()). diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index a7ec0cec4..c6a567318 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -29,10 +29,16 @@ -export_type([ zone/0, pubsub/0, - topic/0, subid/0 ]). +-export_type([ + group/0, + topic/0, + word/0, + words/0 +]). + -export_type([ socktype/0, sockstate/0, @@ -122,9 +128,13 @@ -type zone() :: atom(). -type pubsub() :: publish | subscribe. --type topic() :: emqx_topic:topic(). -type subid() :: binary() | atom(). +-type group() :: binary() | undefined. +-type topic() :: binary(). +-type word() :: '' | '+' | '#' | binary(). +-type words() :: list(word()). + -type socktype() :: tcp | udp | ssl | proxy | atom(). -type sockstate() :: idle | running | blocked | closed. -type conninfo() :: #{ @@ -230,7 +240,6 @@ | {share, topic(), deliver_result()} ]. -type route() :: #route{}. --type group() :: emqx_topic:group(). -type route_entry() :: {topic(), node()} | {topic, group()}. -type command() :: #command{}. diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl index 6b9c367d1..58df08653 100644 --- a/apps/emqx_authz/src/emqx_authz_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -33,7 +33,7 @@ -type clientid() :: {clientid, binary()}. -type who() :: username() | clientid() | all. --type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_topic:topic()}. +-type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_types:topic()}. -type rules() :: [rule()]. -record(emqx_acl, { diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index 673a30726..2d50d92ce 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -49,7 +49,7 @@ -type egress() :: #{ local => #{ - topic => emqx_topic:topic() + topic => emqx_types:topic() }, remote := emqx_bridge_mqtt_msg:msgvars() }. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 91ec27e74..c0b2da908 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -43,7 +43,7 @@ -type ingress() :: #{ server := string(), remote := #{ - topic := emqx_topic:topic(), + topic := emqx_types:topic(), qos => emqx_types:qos() }, local := emqx_bridge_mqtt_msg:msgvars(), diff --git a/apps/emqx_retainer/src/emqx_retainer_index.erl b/apps/emqx_retainer/src/emqx_retainer_index.erl index 55fe7ce3f..d1e5425ed 100644 --- a/apps/emqx_retainer/src/emqx_retainer_index.erl +++ b/apps/emqx_retainer/src/emqx_retainer_index.erl @@ -31,7 +31,7 @@ -type index() :: list(pos_integer()). %% @doc Index key is a term that can be effectively searched in the index table. --type index_key() :: {index(), {emqx_topic:words(), emqx_topic:words()}}. +-type index_key() :: {index(), {emqx_types:words(), emqx_types:words()}}. -type match_pattern_part() :: term(). @@ -42,7 +42,7 @@ %% @doc Given words of a concrete topic (`Tokens') and a list of `Indices', %% constructs index keys for the topic and each of the indices. %% `Fun' is called with each of these keys. --spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_topic:words()) -> ok. +-spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_types:words()) -> ok. foreach_index_key(_Fun, [], _Tokens) -> ok; foreach_index_key(Fun, [Index | Indices], Tokens) -> @@ -59,7 +59,7 @@ foreach_index_key(Fun, [Index | Indices], Tokens) -> %% returns `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' term. %% %% @see foreach_index_key/3 --spec to_index_key(index(), emqx_topic:words()) -> index_key(). +-spec to_index_key(index(), emqx_types:words()) -> index_key(). to_index_key(Index, Tokens) -> {Index, split_index_tokens(Index, Tokens, 1, [], [])}. @@ -73,7 +73,7 @@ to_index_key(Index, Tokens) -> %% %% @see foreach_index_key/3 %% @see to_index_key/2 --spec index_score(index(), emqx_topic:words()) -> non_neg_integer(). +-spec index_score(index(), emqx_types:words()) -> non_neg_integer(). index_score(Index, Tokens) -> index_score(Index, Tokens, 1, 0). @@ -92,7 +92,7 @@ select_index(Tokens, Indices) -> %% %% E.g. for `[2, 3]' index and ['+', <<"b">>, '+', <<"d">>] wildcard topic %% returns {[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}} pattern. --spec condition(index(), emqx_topic:words()) -> match_pattern_part(). +-spec condition(index(), emqx_types:words()) -> match_pattern_part(). condition(Index, Tokens) -> {Index, condition(Index, Tokens, 1, [], [])}. @@ -100,7 +100,7 @@ condition(Index, Tokens) -> %% %% E.g. for ['+', <<"b">>, '+', <<"d">>, '#'] wildcard topic %% returns ['_', <<"b">>, '_', <<"d">> | '_'] pattern. --spec condition(emqx_topic:words()) -> match_pattern_part(). +-spec condition(emqx_types:words()) -> match_pattern_part(). condition(Tokens) -> Tokens1 = [ case W =:= '+' of @@ -118,7 +118,7 @@ condition(Tokens) -> %% %% E.g given `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' index key %% returns `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic. --spec restore_topic(index_key()) -> emqx_topic:words(). +-spec restore_topic(index_key()) -> emqx_types:words(). restore_topic({Index, {IndexTokens, OtherTokens}}) -> restore_topic(Index, IndexTokens, OtherTokens, 1, []).