From 567aeb274f372d11ea2f8ffbf88d62c6818fb07c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Aug 2018 23:08:55 +0800 Subject: [PATCH] Define types in emqx_types, emqx_mqtt_types modules --- include/emqx.hrl | 120 +++------------ include/emqx_mqtt.hrl | 294 ++++++++++++++++++------------------ src/emqx.erl | 32 ++-- src/emqx_access_control.erl | 9 +- src/emqx_access_rule.erl | 6 +- src/emqx_acl_cache.erl | 20 ++- src/emqx_acl_internal.erl | 3 +- src/emqx_alarm_mgr.erl | 4 +- src/emqx_banned.erl | 16 +- src/emqx_bridge.erl | 2 +- src/emqx_bridge1_sup.erl | 4 +- src/emqx_bridge_sup.erl | 3 +- src/emqx_bridge_sup_sup.erl | 8 +- src/emqx_broker.erl | 66 ++++---- src/emqx_client.erl | 12 +- src/emqx_cm.erl | 16 +- src/emqx_frame.erl | 11 +- src/emqx_message.erl | 17 ++- src/emqx_metrics.erl | 4 +- src/emqx_mqtt_caps.erl | 7 +- src/emqx_mqtt_types.erl | 43 ++++++ src/emqx_mqueue.erl | 2 +- src/emqx_packet.erl | 13 +- src/emqx_plugins.erl | 2 +- src/emqx_protocol.erl | 20 ++- src/emqx_router.erl | 22 +-- src/emqx_session.erl | 40 ++--- src/emqx_shared_sub.erl | 6 +- src/emqx_sm.erl | 24 +-- src/emqx_sm_locker.erl | 12 +- src/emqx_sm_registry.erl | 7 +- src/emqx_topic.erl | 6 +- src/emqx_trie.erl | 6 +- src/emqx_types.erl | 46 ++++-- src/emqx_zone.erl | 6 +- 35 files changed, 468 insertions(+), 441 deletions(-) create mode 100644 src/emqx_mqtt_types.erl diff --git a/include/emqx.hrl b/include/emqx.hrl index 5734da794..10190a3f6 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -12,6 +12,9 @@ %% See the License for the specific language governing permissions and %% limitations under the License. +-ifndef(EMQ_X_HRL). +-define(EMQ_X_HRL, true). + %%-------------------------------------------------------------------- %% Banner %%-------------------------------------------------------------------- @@ -24,14 +27,6 @@ -define(ERTS_MINIMUM_REQUIRED, "10.0"). -%%-------------------------------------------------------------------- -%% PubSub -%%-------------------------------------------------------------------- - --type(pubsub() :: publish | subscribe). - --define(PS(I), (I =:= publish orelse I =:= subscribe)). - %%-------------------------------------------------------------------- %% Topics' prefix: $SYS | $queue | $share %%-------------------------------------------------------------------- @@ -46,121 +41,48 @@ -define(SHARE, <<"$share/">>). %%-------------------------------------------------------------------- -%% Topic, subscription and subscriber +%% Message and Delivery %%-------------------------------------------------------------------- --type(topic() :: binary()). +-record(session, {sid, pid}). --type(subid() :: binary() | atom()). - --type(subopts() :: #{qos => integer(), - share => binary(), - atom() => term()}). - --record(subscription, { - topic :: topic(), - subid :: subid(), - subopts :: subopts() - }). - --type(subscription() :: #subscription{}). - --type(subscriber() :: {pid(), subid()}). - --type(topic_table() :: [{topic(), subopts()}]). - -%%-------------------------------------------------------------------- -%% Zone, Credentials, Client and Session -%%-------------------------------------------------------------------- - --type(zone() :: atom()). - --type(client_id() :: binary() | atom()). - --type(username() :: binary() | undefined). - --type(password() :: binary() | undefined). - --type(peername() :: {inet:ip_address(), inet:port_number()}). - --type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). - --type(credentials() :: #{client_id := binary(), - username := binary(), - peername := peername(), - zone => zone(), - atom() => term()}). - --record(client, { - id :: client_id(), - pid :: pid(), - zone :: zone(), - peername :: peername(), - username :: username(), - protocol :: protocol(), - attributes :: map() - }). - --type(client() :: #client{}). - --record(session, { - sid :: client_id(), - pid :: pid() - }). - --type(session() :: #session{}). - -%%-------------------------------------------------------------------- -%% Payload, Message and Delivery -%%-------------------------------------------------------------------- - --type(qos() :: integer()). - --type(payload() :: binary() | iodata()). - --type(message_flag() :: dup | sys | retain | atom()). +-record(subscription, {topic, subid, subopts}). %% See 'Application Message' in MQTT Version 5.0 -record(message, { %% Global unique message ID - id :: binary() | pos_integer(), + id :: binary(), %% Message QoS - qos = 0 :: qos(), + qos = 0, %% Message from - from :: atom() | client_id(), + from :: atom() | binary(), %% Message flags - flags :: #{message_flag() => boolean()}, + flags :: #{atom() => boolean()}, %% Message headers, or MQTT 5.0 Properties - headers = #{} :: map(), + headers = #{}, %% Topic that the message is published to - topic :: topic(), + topic :: binary(), %% Message Payload payload :: binary(), %% Timestamp timestamp :: erlang:timestamp() }). --type(message() :: #message{}). - -record(delivery, { - sender :: pid(), %% Sender of the delivery - message :: message(), %% The message delivered - flows :: list() %% The dispatch path of message + sender :: pid(), %% Sender of the delivery + message :: #message{}, %% The message delivered + results :: list() %% Dispatches of the message }). --type(delivery() :: #delivery{}). - %%-------------------------------------------------------------------- %% Route %%-------------------------------------------------------------------- -record(route, { - topic :: topic(), - dest :: node() | {binary(), node()} + topic :: binary(), + dest :: node() | {binary(), node()} }). --type(route() :: #route{}). - %%-------------------------------------------------------------------- %% Trie %%-------------------------------------------------------------------- @@ -170,7 +92,7 @@ -record(trie_node, { node_id :: trie_node_id(), edge_count = 0 :: non_neg_integer(), - topic :: topic() | undefined, + topic :: binary() | undefined, flags :: list(atom()) }). @@ -196,8 +118,6 @@ timestamp :: erlang:timestamp() }). --type(alarm() :: #alarm{}). - %%-------------------------------------------------------------------- %% Plugin %%-------------------------------------------------------------------- @@ -212,8 +132,6 @@ info :: map() }). --type(plugin() :: #plugin{}). - %%-------------------------------------------------------------------- %% Command %%-------------------------------------------------------------------- @@ -227,5 +145,5 @@ descr :: string() }). --type(command() :: #command{}). +-endif. diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 1d69ac365..74bfc1120 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -12,6 +12,9 @@ %% See the License for the specific language governing permissions and %% limitations under the License. +-ifndef(EMQ_X_MQTT_HRL). +-define(EMQ_X_MQTT_HRL, true). + %%-------------------------------------------------------------------- %% MQTT SockOpts %%-------------------------------------------------------------------- @@ -32,8 +35,6 @@ {?MQTT_PROTO_V4, <<"MQTT">>}, {?MQTT_PROTO_V5, <<"MQTT">>}]). --type(mqtt_version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). - %%-------------------------------------------------------------------- %% MQTT QoS Levels %%-------------------------------------------------------------------- @@ -48,12 +49,6 @@ -define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)). --type(mqtt_qos() :: ?QOS0 | ?QOS1 | ?QOS2). - --type(mqtt_qos_name() :: qos0 | at_most_once | - qos1 | at_least_once | - qos2 | exactly_once). - -define(QOS_I(Name), begin (case Name of @@ -80,25 +75,6 @@ -define(MAX_CLIENTID_LEN, 65535). -%%-------------------------------------------------------------------- -%% MQTT Client -%%-------------------------------------------------------------------- --record(mqtt_client, { - client_id :: binary() | undefined, - client_pid :: pid(), - username :: binary() | undefined, - peername :: {inet:ip_address(), inet:port_number()}, - clean_start :: boolean(), - proto_ver :: mqtt_version(), - keepalive = 0 :: non_neg_integer(), - will_topic :: undefined | binary(), - mountpoint :: undefined | binary(), - connected_at :: erlang:timestamp(), - attributes :: map() - }). - --type(mqtt_client() :: #mqtt_client{}). - %%-------------------------------------------------------------------- %% MQTT Control Packet Types %%-------------------------------------------------------------------- @@ -137,8 +113,6 @@ 'DISCONNECT', 'AUTH']). --type(mqtt_packet_type() :: ?RESERVED..?AUTH). - %%-------------------------------------------------------------------- %% MQTT V3.1.1 Connect Return Codes %%-------------------------------------------------------------------- @@ -150,8 +124,6 @@ -define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed -define(CONNACK_AUTH, 5). %% Client is not authorized to connect --type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH). - %%-------------------------------------------------------------------- %% MQTT V5.0 Reason Codes %%-------------------------------------------------------------------- @@ -220,108 +192,91 @@ %%-------------------------------------------------------------------- -record(mqtt_packet_header, { - type = ?RESERVED :: mqtt_packet_type(), - dup = false :: boolean(), - qos = ?QOS_0 :: mqtt_qos(), - retain = false :: boolean() + type = ?RESERVED, + dup = false, + qos = ?QOS_0, + retain = false }). %%-------------------------------------------------------------------- %% MQTT Packets %%-------------------------------------------------------------------- --type(mqtt_topic() :: binary()). - --type(mqtt_client_id() :: binary()). - --type(mqtt_username() :: binary() | undefined). - --type(mqtt_packet_id() :: 1..16#FFFF | undefined). - --type(mqtt_reason_code() :: 0..16#FF | undefined). - --type(mqtt_properties() :: #{atom() => term()} | undefined). - --type(mqtt_subopts() :: #{atom() => term()}). - --define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling - rap => 0, %% Retain as Publish - nl => 0, %% No Local - qos => ?QOS_0, - rc => 0, %% Reason Code - subid => 0 %% Subscription-Identifier +-define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling + rap => 0, %% Retain as Publish + nl => 0, %% No Local + qos => 0, %% QoS + rc => 0 %% Reason Code }). --type(mqtt_topic_filters() :: [{mqtt_topic(), mqtt_subopts()}]). - -record(mqtt_packet_connect, { - proto_name = <<"MQTT">> :: binary(), - proto_ver = ?MQTT_PROTO_V4 :: mqtt_version(), - is_bridge = false :: boolean(), - clean_start = true :: boolean(), - will_flag = false :: boolean(), - will_qos = ?QOS_0 :: mqtt_qos(), - will_retain = false :: boolean(), - keepalive = 0 :: non_neg_integer(), - properties = undefined :: mqtt_properties(), - client_id = <<>> :: mqtt_client_id(), - will_props = undefined :: undefined | map(), - will_topic = undefined :: undefined | binary(), - will_payload = undefined :: undefined | binary(), - username = undefined :: undefined | binary(), - password = undefined :: undefined | binary() + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V4, + is_bridge = false, + clean_start = true, + will_flag = false, + will_qos = ?QOS_0, + will_retain = false, + keepalive = 0, + properties = undefined, + client_id = <<>>, + will_props = undefined, + will_topic = undefined, + will_payload = undefined, + username = undefined, + password = undefined }). -record(mqtt_packet_connack, { - ack_flags :: 0 | 1, - reason_code :: mqtt_reason_code(), - properties :: mqtt_properties() + ack_flags, + reason_code, + properties }). -record(mqtt_packet_publish, { - topic_name :: mqtt_topic(), - packet_id :: mqtt_packet_id(), - properties :: mqtt_properties() + topic_name, + packet_id, + properties }). -record(mqtt_packet_puback, { - packet_id :: mqtt_packet_id(), - reason_code :: mqtt_reason_code(), - properties :: mqtt_properties() + packet_id, + reason_code, + properties }). -record(mqtt_packet_subscribe, { - packet_id :: mqtt_packet_id(), - properties :: mqtt_properties(), - topic_filters :: mqtt_topic_filters() + packet_id, + properties, + topic_filters }). -record(mqtt_packet_suback, { - packet_id :: mqtt_packet_id(), - properties :: mqtt_properties(), - reason_codes :: list(mqtt_reason_code()) + packet_id, + properties, + reason_codes }). -record(mqtt_packet_unsubscribe, { - packet_id :: mqtt_packet_id(), - properties :: mqtt_properties(), - topic_filters :: [mqtt_topic()] + packet_id, + properties, + topic_filters }). -record(mqtt_packet_unsuback, { - packet_id :: mqtt_packet_id(), - properties :: mqtt_properties(), - reason_codes :: list(mqtt_reason_code()) + packet_id, + properties, + reason_codes }). -record(mqtt_packet_disconnect, { - reason_code :: mqtt_reason_code(), - properties :: mqtt_properties() + reason_code, + properties }). -record(mqtt_packet_auth, { - reason_code :: mqtt_reason_code(), - properties :: mqtt_properties() + reason_code, + properties }). %%-------------------------------------------------------------------- @@ -340,63 +295,70 @@ | #mqtt_packet_unsuback{} | #mqtt_packet_disconnect{} | #mqtt_packet_auth{} - | mqtt_packet_id() + | pos_integer() | undefined, payload :: binary() | undefined }). --type(mqtt_packet() :: #mqtt_packet{}). - %%-------------------------------------------------------------------- %% MQTT Packet Match %%-------------------------------------------------------------------- -define(CONNECT_PACKET(Var), - #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}). + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, + variable = Var}). -define(CONNACK_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = 0, - reason_code = ReasonCode}}). + reason_code = ReasonCode} + }). -define(CONNACK_PACKET(ReasonCode, SessPresent), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = SessPresent, - reason_code = ReasonCode}}). + reason_code = ReasonCode} + }). -define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = SessPresent, reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). -define(AUTH_PACKET(), #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, - variable = #mqtt_packet_auth{reason_code = 0}}). + variable = #mqtt_packet_auth{reason_code = 0} + }). -define(AUTH_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, - variable = #mqtt_packet_auth{reason_code = ReasonCode}}). + variable = #mqtt_packet_auth{reason_code = ReasonCode} + }). -define(AUTH_PACKET(ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, variable = #mqtt_packet_auth{reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). -define(PUBLISH_PACKET(QoS), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = QoS}}). -define(PUBLISH_PACKET(QoS, PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = QoS}, - variable = #mqtt_packet_publish{packet_id = PacketId}}). + qos = QoS}, + variable = #mqtt_packet_publish{packet_id = PacketId} + }). -define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = QoS}, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId}, - payload = Payload}). + payload = Payload + }). -define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, @@ -404,130 +366,166 @@ variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId, properties = Properties}, - payload = Payload}). + payload = Payload + }). -define(PUBACK_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0}}). + reason_code = 0} + }). -define(PUBACK_PACKET(PacketId, ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode}}). + reason_code = ReasonCode} + }). -define(PUBACK_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). -define(PUBREC_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0}}). + reason_code = 0} + }). -define(PUBREC_PACKET(PacketId, ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode}}). + reason_code = ReasonCode} + }). -define(PUBREC_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, - variable = #mqtt_packet_puback{packet_id = PacketId, + variable = #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). -define(PUBREL_PACKET(PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, + qos = ?QOS_1}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0}}). + reason_code = 0} + }). + -define(PUBREL_PACKET(PacketId, ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, + qos = ?QOS_1}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode}}). + reason_code = ReasonCode} + }). -define(PUBREL_PACKET(PacketId, ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, - variable = #mqtt_packet_puback{packet_id = PacketId, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, + qos = ?QOS_1}, + variable = #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). -define(PUBCOMP_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0}}). + reason_code = 0} + }). + -define(PUBCOMP_PACKET(PacketId, ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode}}). + reason_code = ReasonCode} + }). -define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, variable = #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). -define(SUBSCRIBE_PACKET(PacketId, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1}, + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, + qos = ?QOS_1}, variable = #mqtt_packet_subscribe{packet_id = PacketId, - topic_filters = TopicFilters}}). + topic_filters = TopicFilters} + }). -define(SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1}, + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, + qos = ?QOS_1}, variable = #mqtt_packet_subscribe{packet_id = PacketId, properties = Properties, - topic_filters = TopicFilters}}). + topic_filters = TopicFilters} + }). -define(SUBACK_PACKET(PacketId, ReasonCodes), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, variable = #mqtt_packet_suback{packet_id = PacketId, - reason_codes = ReasonCodes}}). + reason_codes = ReasonCodes} + }). -define(SUBACK_PACKET(PacketId, Properties, ReasonCodes), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, variable = #mqtt_packet_suback{packet_id = PacketId, properties = Properties, - reason_codes = ReasonCodes}}). + reason_codes = ReasonCodes} + }). + -define(UNSUBSCRIBE_PACKET(PacketId, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1}, + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, + qos = ?QOS_1}, variable = #mqtt_packet_unsubscribe{packet_id = PacketId, - topic_filters = TopicFilters}}). + topic_filters = TopicFilters} + }). -define(UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1}, + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, + qos = ?QOS_1}, variable = #mqtt_packet_unsubscribe{packet_id = PacketId, properties = Properties, - topic_filters = TopicFilters}}). + topic_filters = TopicFilters} + }). -define(UNSUBACK_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, - variable = #mqtt_packet_unsuback{packet_id = PacketId}}). + variable = #mqtt_packet_unsuback{packet_id = PacketId} + }). -define(UNSUBACK_PACKET(PacketId, ReasonCodes), #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, variable = #mqtt_packet_unsuback{packet_id = PacketId, - reason_codes = ReasonCodes}}). + reason_codes = ReasonCodes} + }). -define(UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, variable = #mqtt_packet_unsuback{packet_id = PacketId, properties = Properties, - reason_codes = ReasonCodes}}). + reason_codes = ReasonCodes} + }). -define(DISCONNECT_PACKET(), #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, - variable = #mqtt_packet_disconnect{reason_code = 0}}). + variable = #mqtt_packet_disconnect{reason_code = 0} + }). -define(DISCONNECT_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, - variable = #mqtt_packet_disconnect{reason_code = ReasonCode}}). + variable = #mqtt_packet_disconnect{reason_code = ReasonCode} + }). -define(DISCONNECT_PACKET(ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, variable = #mqtt_packet_disconnect{reason_code = ReasonCode, - properties = Properties}}). + properties = Properties} + }). --define(PACKET(Type), - #mqtt_packet{header = #mqtt_packet_header{type = Type}}). +-define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). + +-endif. diff --git a/src/emqx.erl b/src/emqx.erl index 5ea884e9a..8e1f10168 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -66,32 +66,36 @@ is_running(Node) -> %% PubSub API %%-------------------------------------------------------------------- --spec(subscribe(topic() | string()) -> ok | {error, term()}). +-spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). subscribe(Topic) -> emqx_broker:subscribe(iolist_to_binary(Topic)). --spec(subscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) + -> ok | {error, term()}). subscribe(Topic, Sub) when is_list(Sub)-> emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub)); subscribe(Topic, Subscriber) when is_tuple(Subscriber) -> {SubPid, SubId} = Subscriber, emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId). --spec(subscribe(topic() | string(), subscriber() | string(), subopts()) -> ok | {error, term()}). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(), + emqx_topic:subopts()) -> ok | {error, term()}). subscribe(Topic, Sub, Options) when is_list(Sub)-> emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options); subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)-> {SubPid, SubId} = Subscriber, emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options). --spec(publish(message()) -> {ok, emqx_types:dispatches()}). -publish(Msg) -> emqx_broker:publish(Msg). +-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). +publish(Msg) -> + emqx_broker:publish(Msg). --spec(unsubscribe(topic() | string()) -> ok | {error, term()}). +-spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}). +-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) + -> ok | {error, term()}). unsubscribe(Topic, Sub) when is_list(Sub) -> emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub)); unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> @@ -102,26 +106,28 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> %% PubSub management API %%-------------------------------------------------------------------- --spec(get_subopts(topic() | string(), subscriber()) -> subopts()). +-spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) + -> emqx_types:subopts()). get_subopts(Topic, Subscriber) -> emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)). --spec(set_subopts(topic() | string(), subscriber(), subopts()) -> ok). +-spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(), + emqx_types:subopts()) -> ok). set_subopts(Topic, Subscriber, Options) when is_list(Options) -> emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options). --spec(topics() -> list(topic())). +-spec(topics() -> list(emqx_topic:topic())). topics() -> emqx_router:topics(). --spec(subscribers(topic() | string()) -> list(subscriber())). +-spec(subscribers(emqx_topic:topic() | string()) -> list(emqx_types:subscriber())). subscribers(Topic) -> emqx_broker:subscribers(iolist_to_binary(Topic)). --spec(subscriptions(subscriber()) -> [{topic(), subopts()}]). +-spec(subscriptions(emqx_types:subscriber()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). subscriptions(Subscriber) -> emqx_broker:subscriptions(Subscriber). --spec(subscribed(topic() | string(), subscriber()) -> boolean()). +-spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()). subscribed(Topic, Subscriber) -> emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 7c6bcfb60..46ed9ed53 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -55,7 +55,7 @@ register_default_acl() -> File -> register_mod(acl, emqx_acl_internal, [File]) end. --spec(authenticate(credentials(), password()) +-spec(authenticate(emqx_types:credentials(), emqx_types:password()) -> ok | {ok, map()} | {continue, map()} | {error, term()}). authenticate(Credentials, Password) -> authenticate(Credentials, Password, lookup_mods(auth)). @@ -85,10 +85,9 @@ authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) -> end. %% @doc Check ACL --spec(check_acl(credentials(), pubsub(), topic()) -> allow | deny). -check_acl(Credentials, PubSub, Topic) when ?PS(PubSub) -> - CacheEnabled = emqx_acl_cache:is_enabled(), - check_acl(Credentials, PubSub, Topic, lookup_mods(acl), CacheEnabled). +-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny). +check_acl(Credentials, PubSub, Topic) when PubSub =:= publish; PubSub =:= subscribe -> + check_acl(Credentials, PubSub, Topic, lookup_mods(acl), emqx_acl_cache:is_enabled()). check_acl(Credentials, PubSub, Topic, AclMods, false) -> do_check_acl(Credentials, PubSub, Topic, AclMods); diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 2f5d190a9..3fb6dd7ef 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -24,9 +24,9 @@ -type(access() :: subscribe | publish | pubsub). -type(rule() :: {allow, all} | - {allow, who(), access(), list(topic())} | + {allow, who(), access(), list(emqx_topic:topic())} | {deny, all} | - {deny, who(), access(), list(topic())}). + {deny, who(), access(), list(emqx_topic:topic())}). -export_type([rule/0]). @@ -81,7 +81,7 @@ bin(B) when is_binary(B) -> B. %% @doc Match access rule --spec(match(credentials(), topic(), rule()) +-spec(match(emqx_types:credentials(), emqx_types:topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). match(_Credentials, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) -> {matched, AllowDeny}; diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl index 65e1e3305..5be92814d 100644 --- a/src/emqx_acl_cache.erl +++ b/src/emqx_acl_cache.erl @@ -1,3 +1,17 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + -module(emqx_acl_cache). -include("emqx.hrl"). @@ -27,8 +41,7 @@ is_enabled() -> application:get_env(emqx, enable_acl_cache, true). %% We'll cleanup the cache before repalcing an expired acl. --spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic()) - -> (acl_result() | not_found)). +-spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)). get_acl_cache(PubSub, Topic) -> case erlang:get(cache_k(PubSub, Topic)) of undefined -> not_found; @@ -44,8 +57,7 @@ get_acl_cache(PubSub, Topic) -> %% If the cache get full, and also the latest one %% is expired, then delete all the cache entries --spec(put_acl_cache(PubSub :: publish | subscribe, - Topic :: topic(), AclResult :: acl_result()) -> ok). +-spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok). put_acl_cache(PubSub, Topic, AclResult) -> MaxSize = get_cache_max_size(), true = (MaxSize =/= 0), Size = get_cache_size(), diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 54f944416..0f25e6808 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -70,7 +70,8 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. %% @doc Check ACL --spec(check_acl({credentials(), pubsub(), topic()}, #{}) -> allow | deny | ignore). +-spec(check_acl({emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic()}, #{}) + -> allow | deny | ignore). check_acl({Credentials, PubSub, Topic}, _State) -> case match(Credentials, Topic, lookup(PubSub)) of {matched, allow} -> allow; diff --git a/src/emqx_alarm_mgr.erl b/src/emqx_alarm_mgr.erl index 9839ba44e..bb734c8e6 100644 --- a/src/emqx_alarm_mgr.erl +++ b/src/emqx_alarm_mgr.erl @@ -48,7 +48,7 @@ alarm_fun(Bool) -> (clear, _AlarmId) when Bool =:= false -> alarm_fun(false) end. --spec(set_alarm(alarm()) -> ok). +-spec(set_alarm(emqx_types:alarm()) -> ok). set_alarm(Alarm) when is_record(Alarm, alarm) -> gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}). @@ -56,7 +56,7 @@ set_alarm(Alarm) when is_record(Alarm, alarm) -> clear_alarm(AlarmId) when is_binary(AlarmId) -> gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}). --spec(get_alarms() -> list(alarm())). +-spec(get_alarms() -> list(emqx_types:alarm())). get_alarms() -> gen_event:call(?ALARM_MGR, ?MODULE, get_alarms). diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index e2f03643e..908c8b5d5 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -30,15 +30,15 @@ -export([add/1, del/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). --type(key() :: {client_id, client_id()} | - {ipaddr, inet:ip_address()} | - {username, username()}). +-type(key() :: {client_id, emqx_types:client_id()} | + {username, emqx_types:username() | + {ipaddr, inet:ip_address()}}). -record(state, {expiry_timer}). @@ -63,12 +63,12 @@ mnesia(copy) -> %%-------------------------------------------------------------------- %% @doc Start the banned server --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(check(client()) -> boolean()). -check(#client{id = ClientId, username = Username, peername = {IPAddr, _}}) -> +-spec(check(emqx_types:credentials()) -> boolean()). +check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -> ets:member(?TAB, {client_id, ClientId}) orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {ipaddr, IPAddr}). diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index eef5d249b..d4ebab041 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -36,7 +36,7 @@ ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). --type(option() :: {qos, mqtt_qos()} | +-type(option() :: {qos, emqx_mqtt_types:qos()} | {topic_suffix, binary()} | {topic_prefix, binary()} | {max_queue_len, pos_integer()} | diff --git a/src/emqx_bridge1_sup.erl b/src/emqx_bridge1_sup.erl index 444c7cfb5..f4e8c8f01 100644 --- a/src/emqx_bridge1_sup.erl +++ b/src/emqx_bridge1_sup.erl @@ -27,7 +27,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc List all bridges --spec(bridges() -> [{node(), topic(), pid()}]). +-spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). bridges() -> [{Name, emqx_bridge1:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. @@ -42,4 +42,4 @@ spec({Id, Options})-> restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_bridge1]}. \ No newline at end of file + modules => [emqx_bridge1]}. diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index e7ba21310..1735e3b99 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -18,7 +18,8 @@ -export([start_link/3]). --spec(start_link(node(), topic(), [emqx_bridge:option()]) -> {ok, pid()} | {error, term()}). +-spec(start_link(node(), emqx_topic:topic(), [emqx_bridge:option()]) + -> {ok, pid()} | {error, term()}). start_link(Node, Topic, Options) -> MFA = {emqx_bridge, start_link, [Node, Topic, Options]}, emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA). diff --git a/src/emqx_bridge_sup_sup.erl b/src/emqx_bridge_sup_sup.erl index 4b34bedd9..2ef05df8c 100644 --- a/src/emqx_bridge_sup_sup.erl +++ b/src/emqx_bridge_sup_sup.erl @@ -30,17 +30,17 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc List all bridges --spec(bridges() -> [{node(), topic(), pid()}]). +-spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). bridges() -> [{Node, Topic, Pid} || {?CHILD_ID(Node, Topic), Pid, supervisor, _} <- supervisor:which_children(?MODULE)]. %% @doc Start a bridge --spec(start_bridge(node(), topic()) -> {ok, pid()} | {error, term()}). +-spec(start_bridge(node(), emqx_topic:topic()) -> {ok, pid()} | {error, term()}). start_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) -> start_bridge(Node, Topic, []). --spec(start_bridge(node(), topic(), [emqx_bridge:option()]) +-spec(start_bridge(node(), emqx_topic:topic(), [emqx_bridge:option()]) -> {ok, pid()} | {error, term()}). start_bridge(Node, _Topic, _Options) when Node =:= node() -> {error, bridge_to_self}; @@ -49,7 +49,7 @@ start_bridge(Node, Topic, Options) when is_atom(Node), is_binary(Topic) -> supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). %% @doc Stop a bridge --spec(stop_bridge(node(), topic()) -> ok | {error, term()}). +-spec(stop_bridge(node(), emqx_topic:topic()) -> ok | {error, term()}). stop_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) -> ChildId = ?CHILD_ID(Node, Topic), case supervisor:terminate_child(?MODULE, ChildId) of diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index dd6a4c1ba..623290961 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -57,17 +57,18 @@ start_link(Pool, Id) -> %% Subscribe %%------------------------------------------------------------------------------ --spec(subscribe(topic()) -> ok). +-spec(subscribe(emqx_topic:topic()) -> ok). subscribe(Topic) when is_binary(Topic) -> subscribe(Topic, self()). --spec(subscribe(topic(), pid() | subid()) -> ok). +-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok). subscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> subscribe(Topic, SubPid, undefined); subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> subscribe(Topic, self(), SubId). --spec(subscribe(topic(), pid() | subid(), subid() | subopts()) -> ok). +-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(), + emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> subscribe(Topic, SubPid, SubId, #{}); subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> @@ -75,24 +76,24 @@ subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map( subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) -> subscribe(Topic, self(), SubId, SubOpts). --spec(subscribe(topic(), pid(), subid(), subopts()) -> ok). +-spec(subscribe(emqx_topic:topic(), pid(), emqx_types:subid(), emqx_types:subopts()) -> ok). subscribe(Topic, SubPid, SubId, SubOpts) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId), is_map(SubOpts) -> Broker = pick(SubPid), SubReq = #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}, wait_for_reply(async_call(Broker, SubReq), ?TIMEOUT). --spec(multi_subscribe(topic_table()) -> ok). +-spec(multi_subscribe(emqx_types:topic_table()) -> ok). multi_subscribe(TopicTable) when is_list(TopicTable) -> multi_subscribe(TopicTable, self()). --spec(multi_subscribe(topic_table(), pid() | subid()) -> ok). +-spec(multi_subscribe(emqx_types:topic_table(), pid() | emqx_types:subid()) -> ok). multi_subscribe(TopicTable, SubPid) when is_pid(SubPid) -> multi_subscribe(TopicTable, SubPid, undefined); multi_subscribe(TopicTable, SubId) when ?is_subid(SubId) -> multi_subscribe(TopicTable, self(), SubId). --spec(multi_subscribe(topic_table(), pid(), subid()) -> ok). +-spec(multi_subscribe(emqx_types:topic_table(), pid(), emqx_types:subid()) -> ok). multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -> Broker = pick(SubPid), SubReq = fun(Topic, SubOpts) -> @@ -105,33 +106,33 @@ multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) %% Unsubscribe %%------------------------------------------------------------------------------ --spec(unsubscribe(topic()) -> ok). +-spec(unsubscribe(emqx_topic:topic()) -> ok). unsubscribe(Topic) when is_binary(Topic) -> unsubscribe(Topic, self()). --spec(unsubscribe(topic(), pid() | subid()) -> ok). +-spec(unsubscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok). unsubscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> unsubscribe(Topic, SubPid, undefined); unsubscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> unsubscribe(Topic, self(), SubId). --spec(unsubscribe(topic(), pid(), subid()) -> ok). +-spec(unsubscribe(emqx_topic:topic(), pid(), emqx_types:subid()) -> ok). unsubscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> Broker = pick(SubPid), UnsubReq = #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}, wait_for_reply(async_call(Broker, UnsubReq), ?TIMEOUT). --spec(multi_unsubscribe([topic()]) -> ok). +-spec(multi_unsubscribe([emqx_topic:topic()]) -> ok). multi_unsubscribe(Topics) -> multi_unsubscribe(Topics, self()). --spec(multi_unsubscribe([topic()], pid() | subid()) -> ok). +-spec(multi_unsubscribe([emqx_topic:topic()], pid() | emqx_types:subid()) -> ok). multi_unsubscribe(Topics, SubPid) when is_pid(SubPid) -> multi_unsubscribe(Topics, SubPid, undefined); multi_unsubscribe(Topics, SubId) when ?is_subid(SubId) -> multi_unsubscribe(Topics, self(), SubId). --spec(multi_unsubscribe([topic()], pid(), subid()) -> ok). +-spec(multi_unsubscribe([emqx_topic:topic()], pid(), emqx_types:subid()) -> ok). multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -> Broker = pick(SubPid), UnsubReq = fun(Topic) -> @@ -143,18 +144,19 @@ multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) - %% Publish %%------------------------------------------------------------------------------ --spec(publish(message()) -> {ok, emqx_types:dispatches()}). +-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), {ok, case emqx_hooks:run('message.publish', [], Msg) of {ok, Msg1 = #message{topic = Topic}} -> Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), - Delivery#delivery.flows; + Delivery#delivery.results; {stop, _} -> - emqx_logger:warning("Stop publishing: ~p", [Msg]), [] + emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]), + [] end}. --spec(safe_publish(message()) -> ok). +-spec(safe_publish(emqx_types:message()) -> ok). %% Called internally safe_publish(Msg) when is_record(Msg, message) -> try @@ -167,7 +169,7 @@ safe_publish(Msg) when is_record(Msg, message) -> end. delivery(Msg) -> - #delivery{sender = self(), message = Msg, flows = []}. + #delivery{sender = self(), message = Msg, results = []}. %%------------------------------------------------------------------------------ %% Route @@ -180,8 +182,8 @@ route([], Delivery = #delivery{message = Msg}) -> route([{To, Node}], Delivery) when Node =:= node() -> dispatch(To, Delivery); -route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) -> - forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]}); +route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) -> + forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]}); route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) -> emqx_shared_sub:dispatch(Group, To, Delivery); @@ -213,20 +215,21 @@ forward(Node, To, Delivery) -> Delivery1 -> Delivery1 end. --spec(dispatch(topic(), delivery()) -> delivery()). -dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> +-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()). +dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> case subscribers(Topic) of [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Topic), Delivery; + inc_dropped_cnt(Topic), + Delivery; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg), - Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]}; + Delivery#delivery{results = [{dispatch, Topic, 1}|Results]}; Subscribers -> Count = lists:foldl(fun(Sub, Acc) -> dispatch(Sub, Topic, Msg), Acc + 1 end, 0, Subscribers), - Delivery#delivery{flows = [{dispatch, Topic, Count}|Flows]} + Delivery#delivery{results = [{dispatch, Topic, Count}|Results]} end. dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) -> @@ -239,11 +242,12 @@ inc_dropped_cnt(<<"$SYS/", _/binary>>) -> inc_dropped_cnt(_Topic) -> emqx_metrics:inc('messages/dropped'). --spec(subscribers(topic()) -> [subscriber()]). +-spec(subscribers(emqx_topic:topic()) -> [emqx_types:subscriber()]). subscribers(Topic) -> try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end. --spec(subscriptions(subscriber()) -> [{topic(), subopts()}]). +-spec(subscriptions(emqx_types:subscriber()) + -> [{emqx_topic:topic(), emqx_types:subopts()}]). subscriptions(Subscriber) -> lists:map(fun({_, {share, _Group, Topic}}) -> subscription(Topic, Subscriber); @@ -254,7 +258,7 @@ subscriptions(Subscriber) -> subscription(Topic, Subscriber) -> {Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}. --spec(subscribed(topic(), pid() | subid() | subscriber()) -> boolean()). +-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1; subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> @@ -262,13 +266,13 @@ subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). --spec(get_subopts(topic(), subscriber()) -> subopts()). +-spec(get_subopts(emqx_topic:topic(), emqx_types:subscriber()) -> emqx_types:subopts()). get_subopts(Topic, Subscriber) when is_binary(Topic) -> try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) catch error:badarg -> [] end. --spec(set_subopts(topic(), subscriber(), subopts()) -> boolean()). +-spec(set_subopts(emqx_topic:topic(), emqx_types:subscriber(), emqx_types:subopts()) -> boolean()). set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_map(Opts) -> case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of [{_, OldOpts}] -> @@ -298,7 +302,7 @@ wait_for_reply(Tag, Timeout) -> pick(SubPid) when is_pid(SubPid) -> gproc_pool:pick_worker(broker, SubPid). --spec(topics() -> [topic()]). +-spec(topics() -> [emqx_topic:topic()]). topics() -> emqx_router:topics(). %%------------------------------------------------------------------------------ diff --git a/src/emqx_client.erl b/src/emqx_client.erl index ab653b302..e6aac5d43 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -87,7 +87,7 @@ clean_start :: boolean(), username :: binary() | undefined, password :: binary() | undefined, - proto_ver :: mqtt_version(), + proto_ver :: emqx_mqtt_types:version(), proto_name :: iodata(), keepalive :: non_neg_integer(), keepalive_timer :: reference() | undefined, @@ -114,15 +114,15 @@ -type(client() :: pid() | atom()). --type(topic() :: mqtt_topic()). +-type(topic() :: emqx_topic:topic()). -type(payload() :: iodata()). --type(packet_id() :: mqtt_packet_id()). +-type(packet_id() :: emqx_mqtt_types:packet_id()). --type(properties() :: mqtt_properties()). +-type(properties() :: emqx_mqtt_types:properties()). --type(qos() :: mqtt_qos_name() | mqtt_qos()). +-type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()). -type(pubopt() :: {retain, boolean()} | {qos, qos()}). @@ -131,7 +131,7 @@ | {nl, boolean()} | {qos, qos()}). --type(reason_code() :: mqtt_reason_code()). +-type(reason_code() :: emqx_mqtt_types:reason_code()). -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 5e921a1c0..3e9958939 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -43,12 +43,12 @@ start_link() -> gen_server:start_link({local, ?CM}, ?MODULE, [], []). %% @doc Lookup a connection. --spec(lookup_connection(client_id()) -> list({client_id(), pid()})). +-spec(lookup_connection(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})). lookup_connection(ClientId) when is_binary(ClientId) -> ets:lookup(?CONN_TAB, ClientId). %% @doc Register a connection. --spec(register_connection(client_id() | {client_id(), pid()}) -> ok). +-spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). register_connection(ClientId) when is_binary(ClientId) -> register_connection({ClientId, self()}); @@ -56,7 +56,7 @@ register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid _ = ets:insert(?CONN_TAB, Conn), notify({registered, ClientId, ConnPid}). --spec(register_connection(client_id() | {client_id(), pid()}, list()) -> ok). +-spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok). register_connection(ClientId, Attrs) when is_binary(ClientId) -> register_connection({ClientId, self()}, Attrs); register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> @@ -64,7 +64,7 @@ register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), register_connection(Conn). %% @doc Get conn attrs --spec(get_conn_attrs({client_id(), pid()}) -> list()). +-spec(get_conn_attrs({emqx_types:client_id(), pid()}) -> list()). get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> try ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2) @@ -79,7 +79,7 @@ set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_p ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}). %% @doc Unregister a conn. --spec(unregister_connection(client_id() | {client_id(), pid()}) -> ok). +-spec(unregister_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). unregister_connection(ClientId) when is_binary(ClientId) -> unregister_connection({ClientId, self()}); @@ -90,7 +90,7 @@ unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_p notify({unregistered, ClientId, ConnPid}). %% @doc Lookup connection pid --spec(lookup_conn_pid(client_id()) -> pid() | undefined). +-spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined). lookup_conn_pid(ClientId) when is_binary(ClientId) -> case ets:lookup(?CONN_TAB, ClientId) of [] -> undefined; @@ -98,7 +98,7 @@ lookup_conn_pid(ClientId) when is_binary(ClientId) -> end. %% @doc Get conn stats --spec(get_conn_stats({client_id(), pid()}) -> list(emqx_stats:stats())). +-spec(get_conn_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> try ets:lookup_element(?CONN_STATS_TAB, Conn, 2) catch @@ -106,7 +106,7 @@ get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(Conn end. %% @doc Set conn stats. --spec(set_conn_stats(client_id(), list(emqx_stats:stats())) -> boolean()). +-spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> boolean()). set_conn_stats(ClientId, Stats) when is_binary(ClientId) -> set_conn_stats({ClientId, self()}, Stats); diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index c1cc34d59..3ec935020 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -22,11 +22,11 @@ -export([serialize/1, serialize/2]). -type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE, - version => mqtt_version()}). + version => emqx_mqtt_types:version()}). -type(parse_state() :: {none, options()} | cont_fun(binary())). --type(cont_fun(Bin) :: fun((Bin) -> {ok, mqtt_packet(), binary()} +-type(cont_fun(Bin) :: fun((Bin) -> {ok, emqx_mqtt_types:packet(), binary()} | {more, cont_fun(Bin)})). -export_type([options/0, parse_state/0]). @@ -53,7 +53,8 @@ merge_opts(Options) -> %% Parse MQTT Frame %%------------------------------------------------------------------------------ --spec(parse(binary(), parse_state()) -> {ok, mqtt_packet(), binary()} | {more, cont_fun(binary())}). +-spec(parse(binary(), parse_state()) -> {ok, emqx_mqtt_types:packet(), binary()} | + {more, cont_fun(binary())}). parse(<<>>, {none, Options}) -> {more, fun(Bin) -> parse(Bin, {none, Options}) end}; parse(<>, {none, Options}) -> @@ -359,11 +360,11 @@ parse_binary_data(<>) -> %% Serialize MQTT Packet %%------------------------------------------------------------------------------ --spec(serialize(mqtt_packet()) -> iodata()). +-spec(serialize(emqx_mqtt_types:packet()) -> iodata()). serialize(Packet) -> serialize(Packet, ?DEFAULT_OPTIONS). --spec(serialize(mqtt_packet(), options()) -> iodata()). +-spec(serialize(emqx_mqtt_types:packet(), options()) -> iodata()). serialize(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, Options) when is_map(Options) -> diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 86f6a825f..8b49d44d6 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -24,15 +24,19 @@ -export([get_header/2, get_header/3, set_header/3]). -export([format/1]). --spec(make(topic(), payload()) -> message()). +-type(flag() :: atom()). + +-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(Topic, Payload) -> make(undefined, Topic, Payload). --spec(make(atom() | client_id(), topic(), payload()) -> message()). +-spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload()) + -> emqx_types:message()). make(From, Topic, Payload) -> make(From, ?QOS0, Topic, Payload). --spec(make(atom() | client_id(), qos(), topic(), payload()) -> message()). +-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(), + emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(From, QoS, Topic, Payload) -> #message{id = msgid(QoS), qos = QoS, @@ -55,19 +59,20 @@ get_flag(Flag, Msg) -> get_flag(Flag, #message{flags = Flags}, Default) -> maps:get(Flag, Flags, Default). --spec(set_flag(message_flag(), message()) -> message()). +-spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()). set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) -> Msg#message{flags = #{Flag => true}}; set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, true, Flags)}. --spec(set_flag(message_flag(), boolean() | integer(), message()) -> message()). +-spec(set_flag(flag(), boolean() | integer(), emqx_types:message()) + -> emqx_types:message()). set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) -> Msg#message{flags = #{Flag => Val}}; set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, Val, Flags)}. --spec(unset_flag(message_flag(), message()) -> message()). +-spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()). unset_flag(Flag, Msg = #message{flags = Flags}) -> Msg#message{flags = maps:remove(Flag, Flags)}. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 506ff2c0d..15db3b420 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -167,7 +167,7 @@ update_counter(Key, UpOp) -> %%----------------------------------------------------------------------------- %% @doc Count packets received. --spec(received(mqtt_packet()) -> ok). +-spec(received(emqx_mqtt_types:packet()) -> ok). received(Packet) -> inc('packets/received'), received1(Packet). @@ -205,7 +205,7 @@ qos_received(?QOS_2) -> inc('messages/qos2/received'). %% @doc Count packets received. Will not count $SYS PUBLISH. --spec(sent(mqtt_packet()) -> ignore | non_neg_integer()). +-spec(sent(emqx_mqtt_types:packet()) -> ignore | non_neg_integer()). sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) -> ignore; sent(Packet) -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 27b8ad7bc..fdc29fae8 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -25,7 +25,7 @@ max_clientid_len => integer(), max_topic_alias => integer(), max_topic_levels => integer(), - max_qos_allowed => mqtt_qos(), + max_qos_allowed => emqx_mqtt_types:qos(), mqtt_retain_available => boolean(), mqtt_shared_subscription => boolean(), mqtt_wildcard_subscription => boolean()}). @@ -49,7 +49,7 @@ mqtt_shared_subscription, mqtt_wildcard_subscription]). --spec(check_pub(zone(), map()) -> ok | {error, mqtt_reason_code()}). +-spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_mqtt_types:reason_code()}). check_pub(Zone, Props) when is_map(Props) -> do_check_pub(Props, maps:to_list(get_caps(Zone, publish))). @@ -65,7 +65,8 @@ do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) -> do_check_pub(Props, Caps). --spec(check_sub(zone(), mqtt_topic_filters()) -> {ok | error, mqtt_topic_filters()}). +-spec(check_sub(emqx_types:zone(), emqx_mqtt_types:topic_filters()) + -> {ok | error, emqx_mqtt_types:topic_filters()}). check_sub(Zone, TopicFilters) -> Caps = maps:to_list(get_caps(Zone, subscribe)), lists:foldr(fun({Topic, Opts}, {Ok, Result}) -> diff --git a/src/emqx_mqtt_types.erl b/src/emqx_mqtt_types.erl new file mode 100644 index 000000000..0b231fc88 --- /dev/null +++ b/src/emqx_mqtt_types.erl @@ -0,0 +1,43 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_mqtt_types). + +-include("emqx_mqtt.hrl"). + +-export_type([version/0, qos/0, qos_name/0]). +-export_type([connack/0, reason_code/0]). +-export_type([properties/0, subopts/0]). +-export_type([topic_filters/0]). +-export_type([packet_id/0, packet_type/0, packet/0]). + +-type(qos() :: ?QOS0 | ?QOS1 | ?QOS2). +-type(version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). +-type(qos_name() :: qos0 | at_most_once | + qos1 | at_least_once | + qos2 | exactly_once). +-type(packet_type() :: ?RESERVED..?AUTH). +-type(connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH). +-type(reason_code() :: 0..16#FF). +-type(packet_id() :: 1..16#FFFF). +-type(properties() :: #{atom() => term()}). +-type(subopts() :: #{rh := 0 | 1, + rap := 0 | 1 | 2, + nl := 0 | 1, + qos := qos(), + rc => reason_code() + }). +-type(topic_filters() :: [{emqx_topic:topic(), subopts()}]). +-type(packet() :: #mqtt_packet{}). + diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 458c301fc..5127a5b7e 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -125,7 +125,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped end} | [{max_len, MaxLen}, {dropped, Dropped}]]. %% @doc Enqueue a message. --spec(in(message(), mqueue()) -> mqueue()). +-spec(in(emqx_types:message(), mqueue()) -> mqueue()). in(#message{flags = #{qos := ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> MQ; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index c6ab19c3c..384d62444 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -25,7 +25,7 @@ -export([will_msg/1]). %% @doc Protocol name of version --spec(protocol_name(mqtt_version()) -> binary()). +-spec(protocol_name(emqx_mqtt_types:version()) -> binary()). protocol_name(?MQTT_PROTO_V3) -> <<"MQIsdp">>; protocol_name(?MQTT_PROTO_V4) -> @@ -34,7 +34,7 @@ protocol_name(?MQTT_PROTO_V5) -> <<"MQTT">>. %% @doc Name of MQTT packet type --spec(type_name(mqtt_packet_type()) -> atom()). +-spec(type_name(emqx_mqtt_types:packet_type()) -> atom()). type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH -> lists:nth(Type, ?TYPE_NAMES). @@ -82,7 +82,7 @@ validate_qos(QoS) when ?QOS0 =< QoS, QoS =< ?QOS2 -> validate_qos(_) -> error(bad_qos). %% @doc From Message to Packet --spec(from_message(mqtt_packet_id(), message()) -> mqtt_packet()). +-spec(from_message(emqx_mqtt_types:packet_id(), emqx_types:message()) -> emqx_mqtt_types:packet()). from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) -> Dup = emqx_message:get_flag(dup, Msg, false), Retain = emqx_message:get_flag(retain, Msg, false), @@ -97,7 +97,8 @@ from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payloa variable = Publish, payload = Payload}. %% @doc Message from Packet --spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()). +-spec(to_message(emqx_types:credentials(), emqx_mqtt_types:packet()) + -> emqx_types:message()). to_message(#{client_id := ClientId, username := Username}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, retain = Retain, @@ -110,7 +111,7 @@ to_message(#{client_id := ClientId, username := Username}, Msg#message{flags = #{dup => Dup, retain => Retain}, headers = merge_props(#{username => Username}, Props)}. --spec(will_msg(#mqtt_packet_connect{}) -> message()). +-spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()). will_msg(#mqtt_packet_connect{will_flag = false}) -> undefined; will_msg(#mqtt_packet_connect{client_id = ClientId, @@ -130,7 +131,7 @@ merge_props(Headers, Props) -> maps:merge(Headers, Props). %% @doc Format packet --spec(format(mqtt_packet()) -> iolist()). +-spec(format(emqx_mqtt_types:packet()) -> iolist()). format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> format_header(Header, format_variable(Variable, Payload)). diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 0c03e827e..a6a04458f 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -153,7 +153,7 @@ stop_plugins(Names) -> [stop_app(App) || App <- Names]. %% @doc List all available plugins --spec(list() -> [plugin()]). +-spec(list() -> [emqx_types:plugin()]). list() -> case emqx_config:get_env(plugins_etc_dir) of undefined -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1ee80fbbf..cc64f5c7f 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -28,6 +28,22 @@ -export([send/2]). -export([shutdown/2]). + +%%-record(mqtt_client, { +%% client_id :: binary() | undefined, +%% client_pid :: pid(), +%% username :: binary() | undefined, +%% peername :: {inet:ip_address(), inet:port_number()}, +%% clean_start :: boolean(), +%% proto_ver :: emqx_mqtt_types:version(), +%% keepalive = 0 :: non_neg_integer(), +%% will_topic :: undefined | binary(), +%% mountpoint :: undefined | binary(), +%% connected_at :: erlang:timestamp(), +%% attributes :: map() +%% }). + + -record(pstate, { zone, sendfun, @@ -172,7 +188,7 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) -> %% Packet Received %%------------------------------------------------------------------------------ --spec(received(mqtt_packet(), state()) +-spec(received(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()} | {error, term(), state()}). received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; @@ -469,7 +485,7 @@ deliver({disconnect, _ReasonCode}, PState) -> %%------------------------------------------------------------------------------ %% Send Packet to Client --spec(send(mqtt_packet(), state()) -> {ok, state()} | {error, term()}). +-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of diff --git a/src/emqx_router.erl b/src/emqx_router.erl index df2d2e018..b1f2e783a 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -71,54 +71,54 @@ start_link(Pool, Id) -> %% Route APIs %%------------------------------------------------------------------------------ --spec(add_route(topic() | route()) -> ok). +-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok). add_route(Topic) when is_binary(Topic) -> add_route(#route{topic = Topic, dest = node()}); add_route(Route = #route{topic = Topic}) -> cast(pick(Topic), {add_route, Route}). --spec(add_route(topic(), destination()) -> ok). +-spec(add_route(emqx_topic:topic(), destination()) -> ok). add_route(Topic, Dest) when is_binary(Topic) -> add_route(#route{topic = Topic, dest = Dest}). --spec(add_route({pid(), reference()}, topic(), destination()) -> ok). +-spec(add_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok). add_route(From, Topic, Dest) when is_binary(Topic) -> cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}). --spec(get_routes(topic()) -> [route()]). +-spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]). get_routes(Topic) -> ets:lookup(?ROUTE, Topic). --spec(del_route(topic() | route()) -> ok). +-spec(del_route(emqx_topic:topic() | emqx_types:route()) -> ok). del_route(Topic) when is_binary(Topic) -> del_route(#route{topic = Topic, dest = node()}); del_route(Route = #route{topic = Topic}) -> cast(pick(Topic), {del_route, Route}). --spec(del_route(topic(), destination()) -> ok). +-spec(del_route(emqx_topic:topic(), destination()) -> ok). del_route(Topic, Dest) when is_binary(Topic) -> del_route(#route{topic = Topic, dest = Dest}). --spec(del_route({pid(), reference()}, topic(), destination()) -> ok). +-spec(del_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok). del_route(From, Topic, Dest) when is_binary(Topic) -> cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}). --spec(has_routes(topic()) -> boolean()). +-spec(has_routes(emqx_topic:topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> ets:member(?ROUTE, Topic). --spec(topics() -> list(topic())). +-spec(topics() -> list(emqx_topic:topic())). topics() -> mnesia:dirty_all_keys(?ROUTE). %% @doc Match routes %% Optimize: routing table will be replicated to all router nodes. --spec(match_routes(topic()) -> [route()]). +-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). match_routes(Topic) when is_binary(Topic) -> Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), lists:append([get_routes(To) || To <- [Topic | Matched]]). %% @doc Print routes to a topic --spec(print_routes(topic()) -> ok). +-spec(print_routes(emqx_topic:topic()) -> ok). print_routes(Topic) -> lists:foreach(fun(#route{topic = To, dest = Dest}) -> io:format("~s -> ~s~n", [To, Dest]) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 33661e2c3..c5fd2fc6f 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -80,7 +80,7 @@ old_conn_pid :: pid(), %% Next packet id of the session - next_pkt_id = 1 :: mqtt_packet_id(), + next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), %% Max subscriptions max_subscriptions :: non_neg_integer(), @@ -164,19 +164,20 @@ start_link(SessAttrs) -> %% PubSub API %%------------------------------------------------------------------------------ --spec(subscribe(pid(), list({topic(), map()}) | - {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). +-spec(subscribe(pid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) || {RawTopic, SubOpts} <- RawTopicFilters], subscribe(SPid, undefined, #{}, TopicFilters). -%% for mqtt 5.0 +-spec(subscribe(pid(), emqx_mqtt_types:packet_id(), + emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). subscribe(SPid, PacketId, Properties, TopicFilters) -> SubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {subscribe, self(), SubReq}). --spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, emqx_types:dispatches()}). +-spec(publish(pid(), emqx_mqtt_types:packet_id(), emqx_types:message()) + -> {ok, emqx_types:deliver_results()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message to broker directly emqx_broker:publish(Msg); @@ -189,43 +190,44 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 message to session gen_server:call(SPid, {publish, PacketId, Msg}, infinity). --spec(puback(pid(), mqtt_packet_id()) -> ok). +-spec(puback(pid(), emqx_mqtt_types:packet_id()) -> ok). puback(SPid, PacketId) -> gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}). puback(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {puback, PacketId, ReasonCode}). --spec(pubrec(pid(), mqtt_packet_id()) -> ok | {error, mqtt_reason_code()}). +-spec(pubrec(pid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrec(SPid, PacketId) -> pubrec(SPid, PacketId, ?RC_SUCCESS). --spec(pubrec(pid(), mqtt_packet_id(), mqtt_reason_code()) - -> ok | {error, mqtt_reason_code()}). +-spec(pubrec(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) + -> ok | {error, emqx_mqtt_types:reason_code()}). pubrec(SPid, PacketId, ReasonCode) -> gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). --spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) - -> ok | {error, mqtt_reason_code()}). +-spec(pubrel(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) + -> ok | {error, emqx_mqtt_types:reason_code()}). pubrel(SPid, PacketId, ReasonCode) -> gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). --spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok). +-spec(pubcomp(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). pubcomp(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). --spec(unsubscribe(pid(), topic_table()) -> ok). +-spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)). --spec(unsubscribe(pid(), mqtt_packet_id(), mqtt_properties(), topic_table()) -> ok). +-spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), + emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). -spec(resume(pid(), pid()) -> ok). -resume(SPid, ClientPid) -> - gen_server:cast(SPid, {resume, ClientPid}). +resume(SPid, ConnPid) -> + gen_server:cast(SPid, {resume, ConnPid}). %% @doc Get session info -spec(info(pid() | #state{}) -> list(tuple())). @@ -292,7 +294,7 @@ stats(#state{max_subscriptions = MaxSubscriptions, {enqueue_msg, EnqueueMsg}]). %% @doc Discard the session --spec(discard(pid(), client_id()) -> ok). +-spec(discard(pid(), emqx_types:client_id()) -> ok). discard(SPid, ClientId) -> gen_server:call(SPid, {discard, ClientId}, infinity). @@ -342,8 +344,8 @@ init_mqueue(Zone, ClientId) -> max_len => get_env(Zone, max_mqueue_len), store_qos0 => get_env(Zone, mqueue_store_qos0)}). -binding(ClientPid) -> - case node(ClientPid) =:= node() of true -> local; false -> remote end. +binding(ConnPid) -> + case node(ConnPid) =:= node() of true -> local; false -> remote end. handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> ?LOG(warning, "Discarded by ~p", [ConnPid], State), diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index ce21a1bf8..0cbfab60a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -58,7 +58,7 @@ mnesia(copy) -> %% API %%------------------------------------------------------------------------------ --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -81,11 +81,11 @@ record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. %% TODO: dispatch strategy, ensure the delivery... -dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> +dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) -> case pick(subscribers(Group, Topic)) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, - Delivery#delivery{flows = [{dispatch, {Group, Topic}, 1} | Flows]} + Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]} end. pick([]) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 577927b02..0b188f986 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -70,7 +70,7 @@ open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := emqx_sm_locker:trans(ClientId, ResumeStart). %% @doc Discard all the sessions identified by the ClientId. --spec(discard_session(client_id()) -> ok). +-spec(discard_session(emqx_types:client_id()) -> ok). discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, self()). @@ -84,7 +84,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> end, lookup_session(ClientId)). %% @doc Try to resume a session. --spec(resume_session(client_id()) -> {ok, pid()} | {error, term()}). +-spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}). resume_session(ClientId) -> resume_session(ClientId, self()). @@ -105,14 +105,14 @@ resume_session(ClientId, ConnPid) -> end. %% @doc Close a session. --spec(close_session({client_id(), pid()} | pid()) -> ok). +-spec(close_session({emqx_types:client_id(), pid()} | pid()) -> ok). close_session({_ClientId, SPid}) -> emqx_session:close(SPid); close_session(SPid) when is_pid(SPid) -> emqx_session:close(SPid). %% @doc Register a session with attributes. --spec(register_session(client_id() | {client_id(), pid()}, +-spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list(emqx_session:attribute())) -> ok). register_session(ClientId, Attrs) when is_binary(ClientId) -> register_session({ClientId, self()}, Attrs); @@ -129,7 +129,7 @@ register_session(Session = {ClientId, SPid}, Attrs) notify({registered, ClientId, SPid}). %% @doc Get session attrs --spec(get_session_attrs({client_id(), pid()}) -> list(emqx_session:attribute())). +-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())). get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> safe_lookup_element(?SESSION_ATTRS_TAB, Session, []). @@ -140,7 +140,7 @@ set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), i ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}). %% @doc Unregister a session --spec(unregister_session(client_id() | {client_id(), pid()}) -> ok). +-spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). unregister_session(ClientId) when is_binary(ClientId) -> unregister_session({ClientId, self()}); @@ -153,13 +153,13 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid( notify({unregistered, ClientId, SPid}). %% @doc Get session stats --spec(get_session_stats({client_id(), pid()}) -> list(emqx_stats:stats())). +-spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> safe_lookup_element(?SESSION_STATS_TAB, Session, []). %% @doc Set session stats --spec(set_session_stats(client_id() | {client_id(), pid()}, +-spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()}, emqx_stats:stats()) -> ok). set_session_stats(ClientId, Stats) when is_binary(ClientId) -> set_session_stats({ClientId, self()}, Stats); @@ -169,7 +169,7 @@ set_session_stats(Session = {ClientId, SPid}, Stats) ets:insert(?SESSION_STATS_TAB, {Session, Stats}). %% @doc Lookup a session from registry --spec(lookup_session(client_id()) -> list({client_id(), pid()})). +-spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})). lookup_session(ClientId) -> case emqx_sm_registry:is_enabled() of true -> emqx_sm_registry:lookup_session(ClientId); @@ -177,17 +177,17 @@ lookup_session(ClientId) -> end. %% @doc Dispatch a message to the session. --spec(dispatch(client_id(), topic(), message()) -> any()). +-spec(dispatch(emqx_types:client_id(), emqx_topic:topic(), emqx_types:message()) -> any()). dispatch(ClientId, Topic, Msg) -> case lookup_session_pid(ClientId) of Pid when is_pid(Pid) -> Pid ! {dispatch, Topic, Msg}; undefined -> - emqx_hooks:run('message.dropped', [ClientId, Msg]) + emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) end. %% @doc Lookup session pid. --spec(lookup_session_pid(client_id()) -> pid() | undefined). +-spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined). lookup_session_pid(ClientId) -> safe_lookup_element(?SESSION_TAB, ClientId, undefined). diff --git a/src/emqx_sm_locker.erl b/src/emqx_sm_locker.erl index 050e6276c..d50d16ccc 100644 --- a/src/emqx_sm_locker.erl +++ b/src/emqx_sm_locker.erl @@ -17,6 +17,7 @@ -include("emqx.hrl"). -export([start_link/0]). + -export([trans/2, trans/3]). -export([lock/1, lock/2, unlock/1]). @@ -24,11 +25,12 @@ start_link() -> ekka_locker:start_link(?MODULE). --spec(trans(client_id(), fun(([node()]) -> any())) -> any()). +-spec(trans(emqx_types:client_id(), fun(([node()]) -> any())) -> any()). trans(ClientId, Fun) -> trans(ClientId, Fun, undefined). --spec(trans(client_id() | undefined, fun(([node()]) -> any()), ekka_locker:piggyback()) -> any()). +-spec(trans(emqx_types:client_id() | undefined, + fun(([node()])-> any()), ekka_locker:piggyback()) -> any()). trans(undefined, Fun, _Piggyback) -> Fun([]); trans(ClientId, Fun, Piggyback) -> @@ -39,15 +41,15 @@ trans(ClientId, Fun, Piggyback) -> {error, client_id_unavailable} end. --spec(lock(client_id()) -> ekka_locker:lock_result()). +-spec(lock(emqx_types:client_id()) -> ekka_locker:lock_result()). lock(ClientId) -> ekka_locker:aquire(?MODULE, ClientId, strategy()). --spec(lock(client_id(), ekka_locker:piggyback()) -> ekka_locker:lock_result()). +-spec(lock(emqx_types:client_id(), ekka_locker:piggyback()) -> ekka_locker:lock_result()). lock(ClientId, Piggyback) -> ekka_locker:aquire(?MODULE, ClientId, strategy(), Piggyback). --spec(unlock(client_id()) -> {boolean(), [node()]}). +-spec(unlock(emqx_types:client_id()) -> {boolean(), [node()]}). unlock(ClientId) -> ekka_locker:release(?MODULE, ClientId, strategy()). diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index 701b9ae4e..74690b4b1 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -43,16 +43,17 @@ start_link() -> is_enabled() -> ets:info(?TAB, name) =/= undefined. --spec(lookup_session(client_id()) -> list({client_id(), session_pid()})). +-spec(lookup_session(emqx_types:client_id()) + -> list({emqx_types:client_id(), session_pid()})). lookup_session(ClientId) -> [{ClientId, SessionPid} || #global_session{pid = SessionPid} <- mnesia:dirty_read(?TAB, ClientId)]. --spec(register_session({client_id(), session_pid()}) -> ok). +-spec(register_session({emqx_types:client_id(), session_pid()}) -> ok). register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> mnesia:dirty_write(?TAB, record(ClientId, SessionPid)). --spec(unregister_session({client_id(), session_pid()}) -> ok). +-spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok). unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)). diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index b122c114b..c244a40b3 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -14,9 +14,6 @@ -module(emqx_topic). --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - -export([match/2]). -export([validate/1, validate/2]). -export([levels/1]). @@ -28,11 +25,12 @@ -export([systop/1]). -export([parse/1, parse/2]). +-type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). -type(triple() :: {root | binary(), word(), binary()}). --export_type([word/0, triple/0]). +-export_type([topic/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 4096). diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 6b75256d0..f5dfa93fe 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -58,7 +58,7 @@ mnesia(copy) -> %%------------------------------------------------------------------------------ %% @doc Insert a topic into the trie --spec(insert(Topic :: topic()) -> ok). +-spec(insert(emqx_topic:topic()) -> ok). insert(Topic) when is_binary(Topic) -> case mnesia:read(?TRIE_NODE, Topic) of [#trie_node{topic = Topic}] -> @@ -73,7 +73,7 @@ insert(Topic) when is_binary(Topic) -> end. %% @doc Find trie nodes that match the topic --spec(match(Topic :: topic()) -> list(MatchedTopic :: topic())). +-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())). match(Topic) when is_binary(Topic) -> TrieNodes = match_node(root, emqx_topic:words(Topic)), [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined]. @@ -84,7 +84,7 @@ lookup(NodeId) -> mnesia:read(?TRIE_NODE, NodeId). %% @doc Delete a topic from the trie --spec(delete(Topic :: topic()) -> ok). +-spec(delete(emqx_topic:topic()) -> ok). delete(Topic) when is_binary(Topic) -> case mnesia:read(?TRIE_NODE, Topic) of [#trie_node{edge_count = 0}] -> diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 6fb0647c8..d31f37303 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -14,17 +14,29 @@ -module(emqx_types). -%%-include("emqx.hrl"). +-include("emqx.hrl"). +-export_type([zone/0]). -export_type([startlink_ret/0]). --export_type([zone/0, client_id/0, username/0, password/0, peername/0, - protocol/0, credentials/0]). --export_type([topic/0, payload/0, dispatches/0]). -%%-export_type([payload/0, message/0, delivery/0]). - --type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). +-export_type([pubsub/0, topic/0, subid/0, subopts/0]). +-export_type([client_id/0, username/0, password/0, peername/0, protocol/0]). +-export_type([credentials/0, session/0]). +-export_type([subscription/0, subscriber/0, topic_table/0]). +-export_type([payload/0, message/0]). +-export_type([delivery/0, deliver_results/0]). +-export_type([route/0]). +-export_type([alarm/0, plugin/0, command/0]). -type(zone() :: atom()). +-type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). +-type(pubsub() :: publish | subscribe). +-type(topic() :: binary()). +-type(subid() :: binary() | atom()). +-type(subopts() :: #{qos := integer(), + share => binary(), + atom() => term() + }). +-type(session() :: #session{}). -type(client_id() :: binary() | atom()). -type(username() :: binary() | undefined). -type(password() :: binary() | undefined). @@ -34,12 +46,18 @@ username := username(), peername := peername(), zone => zone(), - atom() => term()}). - --type(topic() :: binary()). + atom() => term() + }). +-type(subscription() :: #subscription{}). +-type(subscriber() :: {pid(), subid()}). +-type(topic_table() :: [{topic(), subopts()}]). -type(payload() :: binary() | iodata()). -%-type(message() :: #message{}). -%-type(delivery() :: #delivery{}). - --type(dispatches() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]). +-type(message() :: #message{}). +-type(delivery() :: #delivery{}). +-type(deliver_results() :: [{route, node(), topic()} | + {dispatch, topic(), pos_integer()}]). +-type(route() :: #route{}). +-type(alarm() :: #alarm{}). +-type(plugin() :: #plugin{}). +-type(command() :: #command{}). diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index fdcfc37a5..209f0323c 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -33,13 +33,13 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec(get_env(zone() | undefined, atom()) -> undefined | term()). +-spec(get_env(emqx_types:zone() | undefined, atom()) -> undefined | term()). get_env(undefined, Key) -> emqx_config:get_env(Key); get_env(Zone, Key) -> get_env(Zone, Key, undefined). --spec(get_env(zone() | undefined, atom(), term()) -> undefined | term()). +-spec(get_env(emqx_types:zone() | undefined, atom(), term()) -> undefined | term()). get_env(undefined, Key, Def) -> emqx_config:get_env(Key, Def); get_env(Zone, Key, Def) -> @@ -48,7 +48,7 @@ get_env(Zone, Key, Def) -> emqx_config:get_env(Key, Def) end. --spec(set_env(zone(), atom(), term()) -> ok). +-spec(set_env(emqx_types:zone(), atom(), term()) -> ok). set_env(Zone, Key, Val) -> gen_server:cast(?MODULE, {set_env, Zone, Key, Val}).