Define types in emqx_types, emqx_mqtt_types modules

This commit is contained in:
Feng Lee 2018-08-29 23:08:55 +08:00
parent 2dc8f9c4c5
commit 567aeb274f
35 changed files with 468 additions and 441 deletions

View File

@ -12,6 +12,9 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
-ifndef(EMQ_X_HRL).
-define(EMQ_X_HRL, true).
%%--------------------------------------------------------------------
%% Banner
%%--------------------------------------------------------------------
@ -24,14 +27,6 @@
-define(ERTS_MINIMUM_REQUIRED, "10.0").
%%--------------------------------------------------------------------
%% PubSub
%%--------------------------------------------------------------------
-type(pubsub() :: publish | subscribe).
-define(PS(I), (I =:= publish orelse I =:= subscribe)).
%%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $queue | $share
%%--------------------------------------------------------------------
@ -46,121 +41,48 @@
-define(SHARE, <<"$share/">>).
%%--------------------------------------------------------------------
%% Topic, subscription and subscriber
%% Message and Delivery
%%--------------------------------------------------------------------
-type(topic() :: binary()).
-record(session, {sid, pid}).
-type(subid() :: binary() | atom()).
-type(subopts() :: #{qos => integer(),
share => binary(),
atom() => term()}).
-record(subscription, {
topic :: topic(),
subid :: subid(),
subopts :: subopts()
}).
-type(subscription() :: #subscription{}).
-type(subscriber() :: {pid(), subid()}).
-type(topic_table() :: [{topic(), subopts()}]).
%%--------------------------------------------------------------------
%% Zone, Credentials, Client and Session
%%--------------------------------------------------------------------
-type(zone() :: atom()).
-type(client_id() :: binary() | atom()).
-type(username() :: binary() | undefined).
-type(password() :: binary() | undefined).
-type(peername() :: {inet:ip_address(), inet:port_number()}).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(credentials() :: #{client_id := binary(),
username := binary(),
peername := peername(),
zone => zone(),
atom() => term()}).
-record(client, {
id :: client_id(),
pid :: pid(),
zone :: zone(),
peername :: peername(),
username :: username(),
protocol :: protocol(),
attributes :: map()
}).
-type(client() :: #client{}).
-record(session, {
sid :: client_id(),
pid :: pid()
}).
-type(session() :: #session{}).
%%--------------------------------------------------------------------
%% Payload, Message and Delivery
%%--------------------------------------------------------------------
-type(qos() :: integer()).
-type(payload() :: binary() | iodata()).
-type(message_flag() :: dup | sys | retain | atom()).
-record(subscription, {topic, subid, subopts}).
%% See 'Application Message' in MQTT Version 5.0
-record(message, {
%% Global unique message ID
id :: binary() | pos_integer(),
id :: binary(),
%% Message QoS
qos = 0 :: qos(),
qos = 0,
%% Message from
from :: atom() | client_id(),
from :: atom() | binary(),
%% Message flags
flags :: #{message_flag() => boolean()},
flags :: #{atom() => boolean()},
%% Message headers, or MQTT 5.0 Properties
headers = #{} :: map(),
headers = #{},
%% Topic that the message is published to
topic :: topic(),
topic :: binary(),
%% Message Payload
payload :: binary(),
%% Timestamp
timestamp :: erlang:timestamp()
}).
-type(message() :: #message{}).
-record(delivery, {
sender :: pid(), %% Sender of the delivery
message :: message(), %% The message delivered
flows :: list() %% The dispatch path of message
sender :: pid(), %% Sender of the delivery
message :: #message{}, %% The message delivered
results :: list() %% Dispatches of the message
}).
-type(delivery() :: #delivery{}).
%%--------------------------------------------------------------------
%% Route
%%--------------------------------------------------------------------
-record(route, {
topic :: topic(),
dest :: node() | {binary(), node()}
topic :: binary(),
dest :: node() | {binary(), node()}
}).
-type(route() :: #route{}).
%%--------------------------------------------------------------------
%% Trie
%%--------------------------------------------------------------------
@ -170,7 +92,7 @@
-record(trie_node, {
node_id :: trie_node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: topic() | undefined,
topic :: binary() | undefined,
flags :: list(atom())
}).
@ -196,8 +118,6 @@
timestamp :: erlang:timestamp()
}).
-type(alarm() :: #alarm{}).
%%--------------------------------------------------------------------
%% Plugin
%%--------------------------------------------------------------------
@ -212,8 +132,6 @@
info :: map()
}).
-type(plugin() :: #plugin{}).
%%--------------------------------------------------------------------
%% Command
%%--------------------------------------------------------------------
@ -227,5 +145,5 @@
descr :: string()
}).
-type(command() :: #command{}).
-endif.

View File

@ -12,6 +12,9 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
-ifndef(EMQ_X_MQTT_HRL).
-define(EMQ_X_MQTT_HRL, true).
%%--------------------------------------------------------------------
%% MQTT SockOpts
%%--------------------------------------------------------------------
@ -32,8 +35,6 @@
{?MQTT_PROTO_V4, <<"MQTT">>},
{?MQTT_PROTO_V5, <<"MQTT">>}]).
-type(mqtt_version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5).
%%--------------------------------------------------------------------
%% MQTT QoS Levels
%%--------------------------------------------------------------------
@ -48,12 +49,6 @@
-define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)).
-type(mqtt_qos() :: ?QOS0 | ?QOS1 | ?QOS2).
-type(mqtt_qos_name() :: qos0 | at_most_once |
qos1 | at_least_once |
qos2 | exactly_once).
-define(QOS_I(Name),
begin
(case Name of
@ -80,25 +75,6 @@
-define(MAX_CLIENTID_LEN, 65535).
%%--------------------------------------------------------------------
%% MQTT Client
%%--------------------------------------------------------------------
-record(mqtt_client, {
client_id :: binary() | undefined,
client_pid :: pid(),
username :: binary() | undefined,
peername :: {inet:ip_address(), inet:port_number()},
clean_start :: boolean(),
proto_ver :: mqtt_version(),
keepalive = 0 :: non_neg_integer(),
will_topic :: undefined | binary(),
mountpoint :: undefined | binary(),
connected_at :: erlang:timestamp(),
attributes :: map()
}).
-type(mqtt_client() :: #mqtt_client{}).
%%--------------------------------------------------------------------
%% MQTT Control Packet Types
%%--------------------------------------------------------------------
@ -137,8 +113,6 @@
'DISCONNECT',
'AUTH']).
-type(mqtt_packet_type() :: ?RESERVED..?AUTH).
%%--------------------------------------------------------------------
%% MQTT V3.1.1 Connect Return Codes
%%--------------------------------------------------------------------
@ -150,8 +124,6 @@
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
%%--------------------------------------------------------------------
%% MQTT V5.0 Reason Codes
%%--------------------------------------------------------------------
@ -220,108 +192,91 @@
%%--------------------------------------------------------------------
-record(mqtt_packet_header, {
type = ?RESERVED :: mqtt_packet_type(),
dup = false :: boolean(),
qos = ?QOS_0 :: mqtt_qos(),
retain = false :: boolean()
type = ?RESERVED,
dup = false,
qos = ?QOS_0,
retain = false
}).
%%--------------------------------------------------------------------
%% MQTT Packets
%%--------------------------------------------------------------------
-type(mqtt_topic() :: binary()).
-type(mqtt_client_id() :: binary()).
-type(mqtt_username() :: binary() | undefined).
-type(mqtt_packet_id() :: 1..16#FFFF | undefined).
-type(mqtt_reason_code() :: 0..16#FF | undefined).
-type(mqtt_properties() :: #{atom() => term()} | undefined).
-type(mqtt_subopts() :: #{atom() => term()}).
-define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling
rap => 0, %% Retain as Publish
nl => 0, %% No Local
qos => ?QOS_0,
rc => 0, %% Reason Code
subid => 0 %% Subscription-Identifier
-define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling
rap => 0, %% Retain as Publish
nl => 0, %% No Local
qos => 0, %% QoS
rc => 0 %% Reason Code
}).
-type(mqtt_topic_filters() :: [{mqtt_topic(), mqtt_subopts()}]).
-record(mqtt_packet_connect, {
proto_name = <<"MQTT">> :: binary(),
proto_ver = ?MQTT_PROTO_V4 :: mqtt_version(),
is_bridge = false :: boolean(),
clean_start = true :: boolean(),
will_flag = false :: boolean(),
will_qos = ?QOS_0 :: mqtt_qos(),
will_retain = false :: boolean(),
keepalive = 0 :: non_neg_integer(),
properties = undefined :: mqtt_properties(),
client_id = <<>> :: mqtt_client_id(),
will_props = undefined :: undefined | map(),
will_topic = undefined :: undefined | binary(),
will_payload = undefined :: undefined | binary(),
username = undefined :: undefined | binary(),
password = undefined :: undefined | binary()
proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V4,
is_bridge = false,
clean_start = true,
will_flag = false,
will_qos = ?QOS_0,
will_retain = false,
keepalive = 0,
properties = undefined,
client_id = <<>>,
will_props = undefined,
will_topic = undefined,
will_payload = undefined,
username = undefined,
password = undefined
}).
-record(mqtt_packet_connack, {
ack_flags :: 0 | 1,
reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
ack_flags,
reason_code,
properties
}).
-record(mqtt_packet_publish, {
topic_name :: mqtt_topic(),
packet_id :: mqtt_packet_id(),
properties :: mqtt_properties()
topic_name,
packet_id,
properties
}).
-record(mqtt_packet_puback, {
packet_id :: mqtt_packet_id(),
reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
packet_id,
reason_code,
properties
}).
-record(mqtt_packet_subscribe, {
packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
topic_filters :: mqtt_topic_filters()
packet_id,
properties,
topic_filters
}).
-record(mqtt_packet_suback, {
packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
reason_codes :: list(mqtt_reason_code())
packet_id,
properties,
reason_codes
}).
-record(mqtt_packet_unsubscribe, {
packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
topic_filters :: [mqtt_topic()]
packet_id,
properties,
topic_filters
}).
-record(mqtt_packet_unsuback, {
packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
reason_codes :: list(mqtt_reason_code())
packet_id,
properties,
reason_codes
}).
-record(mqtt_packet_disconnect, {
reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
reason_code,
properties
}).
-record(mqtt_packet_auth, {
reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
reason_code,
properties
}).
%%--------------------------------------------------------------------
@ -340,63 +295,70 @@
| #mqtt_packet_unsuback{}
| #mqtt_packet_disconnect{}
| #mqtt_packet_auth{}
| mqtt_packet_id()
| pos_integer()
| undefined,
payload :: binary() | undefined
}).
-type(mqtt_packet() :: #mqtt_packet{}).
%%--------------------------------------------------------------------
%% MQTT Packet Match
%%--------------------------------------------------------------------
-define(CONNECT_PACKET(Var),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}).
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
variable = Var}).
-define(CONNACK_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = 0,
reason_code = ReasonCode}}).
reason_code = ReasonCode}
}).
-define(CONNACK_PACKET(ReasonCode, SessPresent),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent,
reason_code = ReasonCode}}).
reason_code = ReasonCode}
}).
-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent,
reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(AUTH_PACKET(),
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
variable = #mqtt_packet_auth{reason_code = 0}}).
variable = #mqtt_packet_auth{reason_code = 0}
}).
-define(AUTH_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
variable = #mqtt_packet_auth{reason_code = ReasonCode}}).
variable = #mqtt_packet_auth{reason_code = ReasonCode}
}).
-define(AUTH_PACKET(ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
variable = #mqtt_packet_auth{reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(PUBLISH_PACKET(QoS),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = QoS}}).
-define(PUBLISH_PACKET(QoS, PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = QoS},
variable = #mqtt_packet_publish{packet_id = PacketId}}).
qos = QoS},
variable = #mqtt_packet_publish{packet_id = PacketId}
}).
-define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = QoS},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}).
payload = Payload
}).
-define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
@ -404,130 +366,166 @@
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
properties = Properties},
payload = Payload}).
payload = Payload
}).
-define(PUBACK_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
reason_code = 0}
}).
-define(PUBACK_PACKET(PacketId, ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode}}).
reason_code = ReasonCode}
}).
-define(PUBACK_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(PUBREC_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
reason_code = 0}
}).
-define(PUBREC_PACKET(PacketId, ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode}}).
reason_code = ReasonCode}
}).
-define(PUBREC_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
variable = #mqtt_packet_puback{packet_id = PacketId,
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(PUBREL_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
qos = ?QOS_1},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
reason_code = 0}
}).
-define(PUBREL_PACKET(PacketId, ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
qos = ?QOS_1},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode}}).
reason_code = ReasonCode}
}).
-define(PUBREL_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
variable = #mqtt_packet_puback{packet_id = PacketId,
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
qos = ?QOS_1},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(PUBCOMP_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
reason_code = 0}
}).
-define(PUBCOMP_PACKET(PacketId, ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode}}).
reason_code = ReasonCode}
}).
-define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(SUBSCRIBE_PACKET(PacketId, TopicFilters),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1},
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE,
qos = ?QOS_1},
variable = #mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}}).
topic_filters = TopicFilters}
}).
-define(SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1},
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE,
qos = ?QOS_1},
variable = #mqtt_packet_subscribe{packet_id = PacketId,
properties = Properties,
topic_filters = TopicFilters}}).
topic_filters = TopicFilters}
}).
-define(SUBACK_PACKET(PacketId, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
variable = #mqtt_packet_suback{packet_id = PacketId,
reason_codes = ReasonCodes}}).
reason_codes = ReasonCodes}
}).
-define(SUBACK_PACKET(PacketId, Properties, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
variable = #mqtt_packet_suback{packet_id = PacketId,
properties = Properties,
reason_codes = ReasonCodes}}).
reason_codes = ReasonCodes}
}).
-define(UNSUBSCRIBE_PACKET(PacketId, TopicFilters),
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1},
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE,
qos = ?QOS_1},
variable = #mqtt_packet_unsubscribe{packet_id = PacketId,
topic_filters = TopicFilters}}).
topic_filters = TopicFilters}
}).
-define(UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1},
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE,
qos = ?QOS_1},
variable = #mqtt_packet_unsubscribe{packet_id = PacketId,
properties = Properties,
topic_filters = TopicFilters}}).
topic_filters = TopicFilters}
}).
-define(UNSUBACK_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
variable = #mqtt_packet_unsuback{packet_id = PacketId}}).
variable = #mqtt_packet_unsuback{packet_id = PacketId}
}).
-define(UNSUBACK_PACKET(PacketId, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
variable = #mqtt_packet_unsuback{packet_id = PacketId,
reason_codes = ReasonCodes}}).
reason_codes = ReasonCodes}
}).
-define(UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
variable = #mqtt_packet_unsuback{packet_id = PacketId,
properties = Properties,
reason_codes = ReasonCodes}}).
reason_codes = ReasonCodes}
}).
-define(DISCONNECT_PACKET(),
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
variable = #mqtt_packet_disconnect{reason_code = 0}}).
variable = #mqtt_packet_disconnect{reason_code = 0}
}).
-define(DISCONNECT_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
variable = #mqtt_packet_disconnect{reason_code = ReasonCode}}).
variable = #mqtt_packet_disconnect{reason_code = ReasonCode}
}).
-define(DISCONNECT_PACKET(ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
variable = #mqtt_packet_disconnect{reason_code = ReasonCode,
properties = Properties}}).
properties = Properties}
}).
-define(PACKET(Type),
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
-define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}).
-endif.

View File

@ -66,32 +66,36 @@ is_running(Node) ->
%% PubSub API
%%--------------------------------------------------------------------
-spec(subscribe(topic() | string()) -> ok | {error, term()}).
-spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}).
subscribe(Topic) ->
emqx_broker:subscribe(iolist_to_binary(Topic)).
-spec(subscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string())
-> ok | {error, term()}).
subscribe(Topic, Sub) when is_list(Sub)->
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub));
subscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
{SubPid, SubId} = Subscriber,
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId).
-spec(subscribe(topic() | string(), subscriber() | string(), subopts()) -> ok | {error, term()}).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(),
emqx_topic:subopts()) -> ok | {error, term()}).
subscribe(Topic, Sub, Options) when is_list(Sub)->
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options);
subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)->
{SubPid, SubId} = Subscriber,
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options).
-spec(publish(message()) -> {ok, emqx_types:dispatches()}).
publish(Msg) -> emqx_broker:publish(Msg).
-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
publish(Msg) ->
emqx_broker:publish(Msg).
-spec(unsubscribe(topic() | string()) -> ok | {error, term()}).
-spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}).
unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string())
-> ok | {error, term()}).
unsubscribe(Topic, Sub) when is_list(Sub) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub));
unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
@ -102,26 +106,28 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
%% PubSub management API
%%--------------------------------------------------------------------
-spec(get_subopts(topic() | string(), subscriber()) -> subopts()).
-spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber())
-> emqx_types:subopts()).
get_subopts(Topic, Subscriber) ->
emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)).
-spec(set_subopts(topic() | string(), subscriber(), subopts()) -> ok).
-spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(),
emqx_types:subopts()) -> ok).
set_subopts(Topic, Subscriber, Options) when is_list(Options) ->
emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options).
-spec(topics() -> list(topic())).
-spec(topics() -> list(emqx_topic:topic())).
topics() -> emqx_router:topics().
-spec(subscribers(topic() | string()) -> list(subscriber())).
-spec(subscribers(emqx_topic:topic() | string()) -> list(emqx_types:subscriber())).
subscribers(Topic) ->
emqx_broker:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(subscriber()) -> [{topic(), subopts()}]).
-spec(subscriptions(emqx_types:subscriber()) -> [{emqx_topic:topic(), emqx_types:subopts()}]).
subscriptions(Subscriber) ->
emqx_broker:subscriptions(Subscriber).
-spec(subscribed(topic() | string(), subscriber()) -> boolean()).
-spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()).
subscribed(Topic, Subscriber) ->
emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)).

View File

@ -55,7 +55,7 @@ register_default_acl() ->
File -> register_mod(acl, emqx_acl_internal, [File])
end.
-spec(authenticate(credentials(), password())
-spec(authenticate(emqx_types:credentials(), emqx_types:password())
-> ok | {ok, map()} | {continue, map()} | {error, term()}).
authenticate(Credentials, Password) ->
authenticate(Credentials, Password, lookup_mods(auth)).
@ -85,10 +85,9 @@ authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) ->
end.
%% @doc Check ACL
-spec(check_acl(credentials(), pubsub(), topic()) -> allow | deny).
check_acl(Credentials, PubSub, Topic) when ?PS(PubSub) ->
CacheEnabled = emqx_acl_cache:is_enabled(),
check_acl(Credentials, PubSub, Topic, lookup_mods(acl), CacheEnabled).
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny).
check_acl(Credentials, PubSub, Topic) when PubSub =:= publish; PubSub =:= subscribe ->
check_acl(Credentials, PubSub, Topic, lookup_mods(acl), emqx_acl_cache:is_enabled()).
check_acl(Credentials, PubSub, Topic, AclMods, false) ->
do_check_acl(Credentials, PubSub, Topic, AclMods);

View File

@ -24,9 +24,9 @@
-type(access() :: subscribe | publish | pubsub).
-type(rule() :: {allow, all} |
{allow, who(), access(), list(topic())} |
{allow, who(), access(), list(emqx_topic:topic())} |
{deny, all} |
{deny, who(), access(), list(topic())}).
{deny, who(), access(), list(emqx_topic:topic())}).
-export_type([rule/0]).
@ -81,7 +81,7 @@ bin(B) when is_binary(B) ->
B.
%% @doc Match access rule
-spec(match(credentials(), topic(), rule())
-spec(match(emqx_types:credentials(), emqx_types:topic(), rule())
-> {matched, allow} | {matched, deny} | nomatch).
match(_Credentials, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) ->
{matched, AllowDeny};

View File

@ -1,3 +1,17 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_acl_cache).
-include("emqx.hrl").
@ -27,8 +41,7 @@ is_enabled() ->
application:get_env(emqx, enable_acl_cache, true).
%% We'll cleanup the cache before repalcing an expired acl.
-spec(get_acl_cache(PubSub :: publish | subscribe, Topic :: topic())
-> (acl_result() | not_found)).
-spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)).
get_acl_cache(PubSub, Topic) ->
case erlang:get(cache_k(PubSub, Topic)) of
undefined -> not_found;
@ -44,8 +57,7 @@ get_acl_cache(PubSub, Topic) ->
%% If the cache get full, and also the latest one
%% is expired, then delete all the cache entries
-spec(put_acl_cache(PubSub :: publish | subscribe,
Topic :: topic(), AclResult :: acl_result()) -> ok).
-spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok).
put_acl_cache(PubSub, Topic, AclResult) ->
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
Size = get_cache_size(),

View File

@ -70,7 +70,8 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL
-spec(check_acl({credentials(), pubsub(), topic()}, #{}) -> allow | deny | ignore).
-spec(check_acl({emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic()}, #{})
-> allow | deny | ignore).
check_acl({Credentials, PubSub, Topic}, _State) ->
case match(Credentials, Topic, lookup(PubSub)) of
{matched, allow} -> allow;

View File

@ -48,7 +48,7 @@ alarm_fun(Bool) ->
(clear, _AlarmId) when Bool =:= false -> alarm_fun(false)
end.
-spec(set_alarm(alarm()) -> ok).
-spec(set_alarm(emqx_types:alarm()) -> ok).
set_alarm(Alarm) when is_record(Alarm, alarm) ->
gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}).
@ -56,7 +56,7 @@ set_alarm(Alarm) when is_record(Alarm, alarm) ->
clear_alarm(AlarmId) when is_binary(AlarmId) ->
gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}).
-spec(get_alarms() -> list(alarm())).
-spec(get_alarms() -> list(emqx_types:alarm())).
get_alarms() ->
gen_event:call(?ALARM_MGR, ?MODULE, get_alarms).

View File

@ -30,15 +30,15 @@
-export([add/1, del/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
-type(key() :: {client_id, client_id()} |
{ipaddr, inet:ip_address()} |
{username, username()}).
-type(key() :: {client_id, emqx_types:client_id()} |
{username, emqx_types:username() |
{ipaddr, inet:ip_address()}}).
-record(state, {expiry_timer}).
@ -63,12 +63,12 @@ mnesia(copy) ->
%%--------------------------------------------------------------------
%% @doc Start the banned server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(check(client()) -> boolean()).
check(#client{id = ClientId, username = Username, peername = {IPAddr, _}}) ->
-spec(check(emqx_types:credentials()) -> boolean()).
check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) ->
ets:member(?TAB, {client_id, ClientId})
orelse ets:member(?TAB, {username, Username})
orelse ets:member(?TAB, {ipaddr, IPAddr}).

View File

@ -36,7 +36,7 @@
ping_down_interval = ?PING_DOWN_INTERVAL,
status = up}).
-type(option() :: {qos, mqtt_qos()} |
-type(option() :: {qos, emqx_mqtt_types:qos()} |
{topic_suffix, binary()} |
{topic_prefix, binary()} |
{max_queue_len, pos_integer()} |

View File

@ -27,7 +27,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc List all bridges
-spec(bridges() -> [{node(), topic(), pid()}]).
-spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]).
bridges() ->
[{Name, emqx_bridge1:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)].

View File

@ -18,7 +18,8 @@
-export([start_link/3]).
-spec(start_link(node(), topic(), [emqx_bridge:option()]) -> {ok, pid()} | {error, term()}).
-spec(start_link(node(), emqx_topic:topic(), [emqx_bridge:option()])
-> {ok, pid()} | {error, term()}).
start_link(Node, Topic, Options) ->
MFA = {emqx_bridge, start_link, [Node, Topic, Options]},
emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA).

View File

@ -30,17 +30,17 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc List all bridges
-spec(bridges() -> [{node(), topic(), pid()}]).
-spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]).
bridges() ->
[{Node, Topic, Pid} || {?CHILD_ID(Node, Topic), Pid, supervisor, _}
<- supervisor:which_children(?MODULE)].
%% @doc Start a bridge
-spec(start_bridge(node(), topic()) -> {ok, pid()} | {error, term()}).
-spec(start_bridge(node(), emqx_topic:topic()) -> {ok, pid()} | {error, term()}).
start_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) ->
start_bridge(Node, Topic, []).
-spec(start_bridge(node(), topic(), [emqx_bridge:option()])
-spec(start_bridge(node(), emqx_topic:topic(), [emqx_bridge:option()])
-> {ok, pid()} | {error, term()}).
start_bridge(Node, _Topic, _Options) when Node =:= node() ->
{error, bridge_to_self};
@ -49,7 +49,7 @@ start_bridge(Node, Topic, Options) when is_atom(Node), is_binary(Topic) ->
supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
%% @doc Stop a bridge
-spec(stop_bridge(node(), topic()) -> ok | {error, term()}).
-spec(stop_bridge(node(), emqx_topic:topic()) -> ok | {error, term()}).
stop_bridge(Node, Topic) when is_atom(Node), is_binary(Topic) ->
ChildId = ?CHILD_ID(Node, Topic),
case supervisor:terminate_child(?MODULE, ChildId) of

View File

@ -57,17 +57,18 @@ start_link(Pool, Id) ->
%% Subscribe
%%------------------------------------------------------------------------------
-spec(subscribe(topic()) -> ok).
-spec(subscribe(emqx_topic:topic()) -> ok).
subscribe(Topic) when is_binary(Topic) ->
subscribe(Topic, self()).
-spec(subscribe(topic(), pid() | subid()) -> ok).
-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok).
subscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
subscribe(Topic, SubPid, undefined);
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
subscribe(Topic, self(), SubId).
-spec(subscribe(topic(), pid() | subid(), subid() | subopts()) -> ok).
-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(),
emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
subscribe(Topic, SubPid, SubId, #{});
subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) ->
@ -75,24 +76,24 @@ subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(
subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->
subscribe(Topic, self(), SubId, SubOpts).
-spec(subscribe(topic(), pid(), subid(), subopts()) -> ok).
-spec(subscribe(emqx_topic:topic(), pid(), emqx_types:subid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubPid, SubId, SubOpts) when is_binary(Topic), is_pid(SubPid),
?is_subid(SubId), is_map(SubOpts) ->
Broker = pick(SubPid),
SubReq = #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts},
wait_for_reply(async_call(Broker, SubReq), ?TIMEOUT).
-spec(multi_subscribe(topic_table()) -> ok).
-spec(multi_subscribe(emqx_types:topic_table()) -> ok).
multi_subscribe(TopicTable) when is_list(TopicTable) ->
multi_subscribe(TopicTable, self()).
-spec(multi_subscribe(topic_table(), pid() | subid()) -> ok).
-spec(multi_subscribe(emqx_types:topic_table(), pid() | emqx_types:subid()) -> ok).
multi_subscribe(TopicTable, SubPid) when is_pid(SubPid) ->
multi_subscribe(TopicTable, SubPid, undefined);
multi_subscribe(TopicTable, SubId) when ?is_subid(SubId) ->
multi_subscribe(TopicTable, self(), SubId).
-spec(multi_subscribe(topic_table(), pid(), subid()) -> ok).
-spec(multi_subscribe(emqx_types:topic_table(), pid(), emqx_types:subid()) -> ok).
multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) ->
Broker = pick(SubPid),
SubReq = fun(Topic, SubOpts) ->
@ -105,33 +106,33 @@ multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId)
%% Unsubscribe
%%------------------------------------------------------------------------------
-spec(unsubscribe(topic()) -> ok).
-spec(unsubscribe(emqx_topic:topic()) -> ok).
unsubscribe(Topic) when is_binary(Topic) ->
unsubscribe(Topic, self()).
-spec(unsubscribe(topic(), pid() | subid()) -> ok).
-spec(unsubscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok).
unsubscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
unsubscribe(Topic, SubPid, undefined);
unsubscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
unsubscribe(Topic, self(), SubId).
-spec(unsubscribe(topic(), pid(), subid()) -> ok).
-spec(unsubscribe(emqx_topic:topic(), pid(), emqx_types:subid()) -> ok).
unsubscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
Broker = pick(SubPid),
UnsubReq = #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId},
wait_for_reply(async_call(Broker, UnsubReq), ?TIMEOUT).
-spec(multi_unsubscribe([topic()]) -> ok).
-spec(multi_unsubscribe([emqx_topic:topic()]) -> ok).
multi_unsubscribe(Topics) ->
multi_unsubscribe(Topics, self()).
-spec(multi_unsubscribe([topic()], pid() | subid()) -> ok).
-spec(multi_unsubscribe([emqx_topic:topic()], pid() | emqx_types:subid()) -> ok).
multi_unsubscribe(Topics, SubPid) when is_pid(SubPid) ->
multi_unsubscribe(Topics, SubPid, undefined);
multi_unsubscribe(Topics, SubId) when ?is_subid(SubId) ->
multi_unsubscribe(Topics, self(), SubId).
-spec(multi_unsubscribe([topic()], pid(), subid()) -> ok).
-spec(multi_unsubscribe([emqx_topic:topic()], pid(), emqx_types:subid()) -> ok).
multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) ->
Broker = pick(SubPid),
UnsubReq = fun(Topic) ->
@ -143,18 +144,19 @@ multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -
%% Publish
%%------------------------------------------------------------------------------
-spec(publish(message()) -> {ok, emqx_types:dispatches()}).
-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg),
{ok, case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.flows;
Delivery#delivery.results;
{stop, _} ->
emqx_logger:warning("Stop publishing: ~p", [Msg]), []
emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
[]
end}.
-spec(safe_publish(message()) -> ok).
-spec(safe_publish(emqx_types:message()) -> ok).
%% Called internally
safe_publish(Msg) when is_record(Msg, message) ->
try
@ -167,7 +169,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
end.
delivery(Msg) ->
#delivery{sender = self(), message = Msg, flows = []}.
#delivery{sender = self(), message = Msg, results = []}.
%%------------------------------------------------------------------------------
%% Route
@ -180,8 +182,8 @@ route([], Delivery = #delivery{message = Msg}) ->
route([{To, Node}], Delivery) when Node =:= node() ->
dispatch(To, Delivery);
route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) ->
forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]});
route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) ->
emqx_shared_sub:dispatch(Group, To, Delivery);
@ -213,20 +215,21 @@ forward(Node, To, Delivery) ->
Delivery1 -> Delivery1
end.
-spec(dispatch(topic(), delivery()) -> delivery()).
dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()).
dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
case subscribers(Topic) of
[] ->
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
inc_dropped_cnt(Topic), Delivery;
inc_dropped_cnt(Topic),
Delivery;
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]};
Delivery#delivery{results = [{dispatch, Topic, 1}|Results]};
Subscribers ->
Count = lists:foldl(fun(Sub, Acc) ->
dispatch(Sub, Topic, Msg), Acc + 1
end, 0, Subscribers),
Delivery#delivery{flows = [{dispatch, Topic, Count}|Flows]}
Delivery#delivery{results = [{dispatch, Topic, Count}|Results]}
end.
dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) ->
@ -239,11 +242,12 @@ inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
inc_dropped_cnt(_Topic) ->
emqx_metrics:inc('messages/dropped').
-spec(subscribers(topic()) -> [subscriber()]).
-spec(subscribers(emqx_topic:topic()) -> [emqx_types:subscriber()]).
subscribers(Topic) ->
try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end.
-spec(subscriptions(subscriber()) -> [{topic(), subopts()}]).
-spec(subscriptions(emqx_types:subscriber())
-> [{emqx_topic:topic(), emqx_types:subopts()}]).
subscriptions(Subscriber) ->
lists:map(fun({_, {share, _Group, Topic}}) ->
subscription(Topic, Subscriber);
@ -254,7 +258,7 @@ subscriptions(Subscriber) ->
subscription(Topic, Subscriber) ->
{Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}.
-spec(subscribed(topic(), pid() | subid() | subscriber()) -> boolean()).
-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1;
subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
@ -262,13 +266,13 @@ subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
-spec(get_subopts(topic(), subscriber()) -> subopts()).
-spec(get_subopts(emqx_topic:topic(), emqx_types:subscriber()) -> emqx_types:subopts()).
get_subopts(Topic, Subscriber) when is_binary(Topic) ->
try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)
catch error:badarg -> []
end.
-spec(set_subopts(topic(), subscriber(), subopts()) -> boolean()).
-spec(set_subopts(emqx_topic:topic(), emqx_types:subscriber(), emqx_types:subopts()) -> boolean()).
set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_map(Opts) ->
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
[{_, OldOpts}] ->
@ -298,7 +302,7 @@ wait_for_reply(Tag, Timeout) ->
pick(SubPid) when is_pid(SubPid) ->
gproc_pool:pick_worker(broker, SubPid).
-spec(topics() -> [topic()]).
-spec(topics() -> [emqx_topic:topic()]).
topics() -> emqx_router:topics().
%%------------------------------------------------------------------------------

View File

@ -87,7 +87,7 @@
clean_start :: boolean(),
username :: binary() | undefined,
password :: binary() | undefined,
proto_ver :: mqtt_version(),
proto_ver :: emqx_mqtt_types:version(),
proto_name :: iodata(),
keepalive :: non_neg_integer(),
keepalive_timer :: reference() | undefined,
@ -114,15 +114,15 @@
-type(client() :: pid() | atom()).
-type(topic() :: mqtt_topic()).
-type(topic() :: emqx_topic:topic()).
-type(payload() :: iodata()).
-type(packet_id() :: mqtt_packet_id()).
-type(packet_id() :: emqx_mqtt_types:packet_id()).
-type(properties() :: mqtt_properties()).
-type(properties() :: emqx_mqtt_types:properties()).
-type(qos() :: mqtt_qos_name() | mqtt_qos()).
-type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()).
-type(pubopt() :: {retain, boolean()} | {qos, qos()}).
@ -131,7 +131,7 @@
| {nl, boolean()}
| {qos, qos()}).
-type(reason_code() :: mqtt_reason_code()).
-type(reason_code() :: emqx_mqtt_types:reason_code()).
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).

View File

@ -43,12 +43,12 @@ start_link() ->
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
%% @doc Lookup a connection.
-spec(lookup_connection(client_id()) -> list({client_id(), pid()})).
-spec(lookup_connection(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
lookup_connection(ClientId) when is_binary(ClientId) ->
ets:lookup(?CONN_TAB, ClientId).
%% @doc Register a connection.
-spec(register_connection(client_id() | {client_id(), pid()}) -> ok).
-spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
register_connection(ClientId) when is_binary(ClientId) ->
register_connection({ClientId, self()});
@ -56,7 +56,7 @@ register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid
_ = ets:insert(?CONN_TAB, Conn),
notify({registered, ClientId, ConnPid}).
-spec(register_connection(client_id() | {client_id(), pid()}, list()) -> ok).
-spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok).
register_connection(ClientId, Attrs) when is_binary(ClientId) ->
register_connection({ClientId, self()}, Attrs);
register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
@ -64,7 +64,7 @@ register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId),
register_connection(Conn).
%% @doc Get conn attrs
-spec(get_conn_attrs({client_id(), pid()}) -> list()).
-spec(get_conn_attrs({emqx_types:client_id(), pid()}) -> list()).
get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
try
ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2)
@ -79,7 +79,7 @@ set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_p
ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
%% @doc Unregister a conn.
-spec(unregister_connection(client_id() | {client_id(), pid()}) -> ok).
-spec(unregister_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
unregister_connection(ClientId) when is_binary(ClientId) ->
unregister_connection({ClientId, self()});
@ -90,7 +90,7 @@ unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_p
notify({unregistered, ClientId, ConnPid}).
%% @doc Lookup connection pid
-spec(lookup_conn_pid(client_id()) -> pid() | undefined).
-spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
lookup_conn_pid(ClientId) when is_binary(ClientId) ->
case ets:lookup(?CONN_TAB, ClientId) of
[] -> undefined;
@ -98,7 +98,7 @@ lookup_conn_pid(ClientId) when is_binary(ClientId) ->
end.
%% @doc Get conn stats
-spec(get_conn_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
-spec(get_conn_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
try ets:lookup_element(?CONN_STATS_TAB, Conn, 2)
catch
@ -106,7 +106,7 @@ get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(Conn
end.
%% @doc Set conn stats.
-spec(set_conn_stats(client_id(), list(emqx_stats:stats())) -> boolean()).
-spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> boolean()).
set_conn_stats(ClientId, Stats) when is_binary(ClientId) ->
set_conn_stats({ClientId, self()}, Stats);

View File

@ -22,11 +22,11 @@
-export([serialize/1, serialize/2]).
-type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE,
version => mqtt_version()}).
version => emqx_mqtt_types:version()}).
-type(parse_state() :: {none, options()} | cont_fun(binary())).
-type(cont_fun(Bin) :: fun((Bin) -> {ok, mqtt_packet(), binary()}
-type(cont_fun(Bin) :: fun((Bin) -> {ok, emqx_mqtt_types:packet(), binary()}
| {more, cont_fun(Bin)})).
-export_type([options/0, parse_state/0]).
@ -53,7 +53,8 @@ merge_opts(Options) ->
%% Parse MQTT Frame
%%------------------------------------------------------------------------------
-spec(parse(binary(), parse_state()) -> {ok, mqtt_packet(), binary()} | {more, cont_fun(binary())}).
-spec(parse(binary(), parse_state()) -> {ok, emqx_mqtt_types:packet(), binary()} |
{more, cont_fun(binary())}).
parse(<<>>, {none, Options}) ->
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Options}) ->
@ -359,11 +360,11 @@ parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
%% Serialize MQTT Packet
%%------------------------------------------------------------------------------
-spec(serialize(mqtt_packet()) -> iodata()).
-spec(serialize(emqx_mqtt_types:packet()) -> iodata()).
serialize(Packet) ->
serialize(Packet, ?DEFAULT_OPTIONS).
-spec(serialize(mqtt_packet(), options()) -> iodata()).
-spec(serialize(emqx_mqtt_types:packet(), options()) -> iodata()).
serialize(#mqtt_packet{header = Header,
variable = Variable,
payload = Payload}, Options) when is_map(Options) ->

View File

@ -24,15 +24,19 @@
-export([get_header/2, get_header/3, set_header/3]).
-export([format/1]).
-spec(make(topic(), payload()) -> message()).
-type(flag() :: atom()).
-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(Topic, Payload) ->
make(undefined, Topic, Payload).
-spec(make(atom() | client_id(), topic(), payload()) -> message()).
-spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload())
-> emqx_types:message()).
make(From, Topic, Payload) ->
make(From, ?QOS0, Topic, Payload).
-spec(make(atom() | client_id(), qos(), topic(), payload()) -> message()).
-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(),
emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) ->
#message{id = msgid(QoS),
qos = QoS,
@ -55,19 +59,20 @@ get_flag(Flag, Msg) ->
get_flag(Flag, #message{flags = Flags}, Default) ->
maps:get(Flag, Flags, Default).
-spec(set_flag(message_flag(), message()) -> message()).
-spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()).
set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) ->
Msg#message{flags = #{Flag => true}};
set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) ->
Msg#message{flags = maps:put(Flag, true, Flags)}.
-spec(set_flag(message_flag(), boolean() | integer(), message()) -> message()).
-spec(set_flag(flag(), boolean() | integer(), emqx_types:message())
-> emqx_types:message()).
set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) ->
Msg#message{flags = #{Flag => Val}};
set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
Msg#message{flags = maps:put(Flag, Val, Flags)}.
-spec(unset_flag(message_flag(), message()) -> message()).
-spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()).
unset_flag(Flag, Msg = #message{flags = Flags}) ->
Msg#message{flags = maps:remove(Flag, Flags)}.

View File

@ -167,7 +167,7 @@ update_counter(Key, UpOp) ->
%%-----------------------------------------------------------------------------
%% @doc Count packets received.
-spec(received(mqtt_packet()) -> ok).
-spec(received(emqx_mqtt_types:packet()) -> ok).
received(Packet) ->
inc('packets/received'),
received1(Packet).
@ -205,7 +205,7 @@ qos_received(?QOS_2) ->
inc('messages/qos2/received').
%% @doc Count packets received. Will not count $SYS PUBLISH.
-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()).
-spec(sent(emqx_mqtt_types:packet()) -> ignore | non_neg_integer()).
sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
ignore;
sent(Packet) ->

View File

@ -25,7 +25,7 @@
max_clientid_len => integer(),
max_topic_alias => integer(),
max_topic_levels => integer(),
max_qos_allowed => mqtt_qos(),
max_qos_allowed => emqx_mqtt_types:qos(),
mqtt_retain_available => boolean(),
mqtt_shared_subscription => boolean(),
mqtt_wildcard_subscription => boolean()}).
@ -49,7 +49,7 @@
mqtt_shared_subscription,
mqtt_wildcard_subscription]).
-spec(check_pub(zone(), map()) -> ok | {error, mqtt_reason_code()}).
-spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_mqtt_types:reason_code()}).
check_pub(Zone, Props) when is_map(Props) ->
do_check_pub(Props, maps:to_list(get_caps(Zone, publish))).
@ -65,7 +65,8 @@ do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) ->
do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) ->
do_check_pub(Props, Caps).
-spec(check_sub(zone(), mqtt_topic_filters()) -> {ok | error, mqtt_topic_filters()}).
-spec(check_sub(emqx_types:zone(), emqx_mqtt_types:topic_filters())
-> {ok | error, emqx_mqtt_types:topic_filters()}).
check_sub(Zone, TopicFilters) ->
Caps = maps:to_list(get_caps(Zone, subscribe)),
lists:foldr(fun({Topic, Opts}, {Ok, Result}) ->

43
src/emqx_mqtt_types.erl Normal file
View File

@ -0,0 +1,43 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_mqtt_types).
-include("emqx_mqtt.hrl").
-export_type([version/0, qos/0, qos_name/0]).
-export_type([connack/0, reason_code/0]).
-export_type([properties/0, subopts/0]).
-export_type([topic_filters/0]).
-export_type([packet_id/0, packet_type/0, packet/0]).
-type(qos() :: ?QOS0 | ?QOS1 | ?QOS2).
-type(version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5).
-type(qos_name() :: qos0 | at_most_once |
qos1 | at_least_once |
qos2 | exactly_once).
-type(packet_type() :: ?RESERVED..?AUTH).
-type(connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
-type(reason_code() :: 0..16#FF).
-type(packet_id() :: 1..16#FFFF).
-type(properties() :: #{atom() => term()}).
-type(subopts() :: #{rh := 0 | 1,
rap := 0 | 1 | 2,
nl := 0 | 1,
qos := qos(),
rc => reason_code()
}).
-type(topic_filters() :: [{emqx_topic:topic(), subopts()}]).
-type(packet() :: #mqtt_packet{}).

View File

@ -125,7 +125,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped
end} | [{max_len, MaxLen}, {dropped, Dropped}]].
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> mqueue()).
-spec(in(emqx_types:message(), mqueue()) -> mqueue()).
in(#message{flags = #{qos := ?QOS_0}}, MQ = #mqueue{qos0 = false}) ->
MQ;
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->

View File

@ -25,7 +25,7 @@
-export([will_msg/1]).
%% @doc Protocol name of version
-spec(protocol_name(mqtt_version()) -> binary()).
-spec(protocol_name(emqx_mqtt_types:version()) -> binary()).
protocol_name(?MQTT_PROTO_V3) ->
<<"MQIsdp">>;
protocol_name(?MQTT_PROTO_V4) ->
@ -34,7 +34,7 @@ protocol_name(?MQTT_PROTO_V5) ->
<<"MQTT">>.
%% @doc Name of MQTT packet type
-spec(type_name(mqtt_packet_type()) -> atom()).
-spec(type_name(emqx_mqtt_types:packet_type()) -> atom()).
type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH ->
lists:nth(Type, ?TYPE_NAMES).
@ -82,7 +82,7 @@ validate_qos(QoS) when ?QOS0 =< QoS, QoS =< ?QOS2 ->
validate_qos(_) -> error(bad_qos).
%% @doc From Message to Packet
-spec(from_message(mqtt_packet_id(), message()) -> mqtt_packet()).
-spec(from_message(emqx_mqtt_types:packet_id(), emqx_types:message()) -> emqx_mqtt_types:packet()).
from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) ->
Dup = emqx_message:get_flag(dup, Msg, false),
Retain = emqx_message:get_flag(retain, Msg, false),
@ -97,7 +97,8 @@ from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payloa
variable = Publish, payload = Payload}.
%% @doc Message from Packet
-spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()).
-spec(to_message(emqx_types:credentials(), emqx_mqtt_types:packet())
-> emqx_types:message()).
to_message(#{client_id := ClientId, username := Username},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
@ -110,7 +111,7 @@ to_message(#{client_id := ClientId, username := Username},
Msg#message{flags = #{dup => Dup, retain => Retain},
headers = merge_props(#{username => Username}, Props)}.
-spec(will_msg(#mqtt_packet_connect{}) -> message()).
-spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()).
will_msg(#mqtt_packet_connect{will_flag = false}) ->
undefined;
will_msg(#mqtt_packet_connect{client_id = ClientId,
@ -130,7 +131,7 @@ merge_props(Headers, Props) ->
maps:merge(Headers, Props).
%% @doc Format packet
-spec(format(mqtt_packet()) -> iolist()).
-spec(format(emqx_mqtt_types:packet()) -> iolist()).
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
format_header(Header, format_variable(Variable, Payload)).

View File

@ -153,7 +153,7 @@ stop_plugins(Names) ->
[stop_app(App) || App <- Names].
%% @doc List all available plugins
-spec(list() -> [plugin()]).
-spec(list() -> [emqx_types:plugin()]).
list() ->
case emqx_config:get_env(plugins_etc_dir) of
undefined ->

View File

@ -28,6 +28,22 @@
-export([send/2]).
-export([shutdown/2]).
%%-record(mqtt_client, {
%% client_id :: binary() | undefined,
%% client_pid :: pid(),
%% username :: binary() | undefined,
%% peername :: {inet:ip_address(), inet:port_number()},
%% clean_start :: boolean(),
%% proto_ver :: emqx_mqtt_types:version(),
%% keepalive = 0 :: non_neg_integer(),
%% will_topic :: undefined | binary(),
%% mountpoint :: undefined | binary(),
%% connected_at :: erlang:timestamp(),
%% attributes :: map()
%% }).
-record(pstate, {
zone,
sendfun,
@ -172,7 +188,7 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
%% Packet Received
%%------------------------------------------------------------------------------
-spec(received(mqtt_packet(), state())
-spec(received(emqx_mqtt_types:packet(), state())
-> {ok, state()} | {error, term()} | {error, term(), state()}).
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
{error, proto_not_connected, PState};
@ -469,7 +485,7 @@ deliver({disconnect, _ReasonCode}, PState) ->
%%------------------------------------------------------------------------------
%% Send Packet to Client
-spec(send(mqtt_packet(), state()) -> {ok, state()} | {error, term()}).
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
trace(send, Packet, PState),
case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of

View File

@ -71,54 +71,54 @@ start_link(Pool, Id) ->
%% Route APIs
%%------------------------------------------------------------------------------
-spec(add_route(topic() | route()) -> ok).
-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok).
add_route(Topic) when is_binary(Topic) ->
add_route(#route{topic = Topic, dest = node()});
add_route(Route = #route{topic = Topic}) ->
cast(pick(Topic), {add_route, Route}).
-spec(add_route(topic(), destination()) -> ok).
-spec(add_route(emqx_topic:topic(), destination()) -> ok).
add_route(Topic, Dest) when is_binary(Topic) ->
add_route(#route{topic = Topic, dest = Dest}).
-spec(add_route({pid(), reference()}, topic(), destination()) -> ok).
-spec(add_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok).
add_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}).
-spec(get_routes(topic()) -> [route()]).
-spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]).
get_routes(Topic) ->
ets:lookup(?ROUTE, Topic).
-spec(del_route(topic() | route()) -> ok).
-spec(del_route(emqx_topic:topic() | emqx_types:route()) -> ok).
del_route(Topic) when is_binary(Topic) ->
del_route(#route{topic = Topic, dest = node()});
del_route(Route = #route{topic = Topic}) ->
cast(pick(Topic), {del_route, Route}).
-spec(del_route(topic(), destination()) -> ok).
-spec(del_route(emqx_topic:topic(), destination()) -> ok).
del_route(Topic, Dest) when is_binary(Topic) ->
del_route(#route{topic = Topic, dest = Dest}).
-spec(del_route({pid(), reference()}, topic(), destination()) -> ok).
-spec(del_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok).
del_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}).
-spec(has_routes(topic()) -> boolean()).
-spec(has_routes(emqx_topic:topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) ->
ets:member(?ROUTE, Topic).
-spec(topics() -> list(topic())).
-spec(topics() -> list(emqx_topic:topic())).
topics() -> mnesia:dirty_all_keys(?ROUTE).
%% @doc Match routes
%% Optimize: routing table will be replicated to all router nodes.
-spec(match_routes(topic()) -> [route()]).
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
match_routes(Topic) when is_binary(Topic) ->
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
lists:append([get_routes(To) || To <- [Topic | Matched]]).
%% @doc Print routes to a topic
-spec(print_routes(topic()) -> ok).
-spec(print_routes(emqx_topic:topic()) -> ok).
print_routes(Topic) ->
lists:foreach(fun(#route{topic = To, dest = Dest}) ->
io:format("~s -> ~s~n", [To, Dest])

View File

@ -80,7 +80,7 @@
old_conn_pid :: pid(),
%% Next packet id of the session
next_pkt_id = 1 :: mqtt_packet_id(),
next_pkt_id = 1 :: emqx_mqtt_types:packet_id(),
%% Max subscriptions
max_subscriptions :: non_neg_integer(),
@ -164,19 +164,20 @@ start_link(SessAttrs) ->
%% PubSub API
%%------------------------------------------------------------------------------
-spec(subscribe(pid(), list({topic(), map()}) |
{mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
-spec(subscribe(pid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok).
subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts))
|| {RawTopic, SubOpts} <- RawTopicFilters],
subscribe(SPid, undefined, #{}, TopicFilters).
%% for mqtt 5.0
-spec(subscribe(pid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
subscribe(SPid, PacketId, Properties, TopicFilters) ->
SubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {subscribe, self(), SubReq}).
-spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, emqx_types:dispatches()}).
-spec(publish(pid(), emqx_mqtt_types:packet_id(), emqx_types:message())
-> {ok, emqx_types:deliver_results()}).
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 message to broker directly
emqx_broker:publish(Msg);
@ -189,43 +190,44 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 message to session
gen_server:call(SPid, {publish, PacketId, Msg}, infinity).
-spec(puback(pid(), mqtt_packet_id()) -> ok).
-spec(puback(pid(), emqx_mqtt_types:packet_id()) -> ok).
puback(SPid, PacketId) ->
gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}).
puback(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
-spec(pubrec(pid(), mqtt_packet_id()) -> ok | {error, mqtt_reason_code()}).
-spec(pubrec(pid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrec(SPid, PacketId) ->
pubrec(SPid, PacketId, ?RC_SUCCESS).
-spec(pubrec(pid(), mqtt_packet_id(), mqtt_reason_code())
-> ok | {error, mqtt_reason_code()}).
-spec(pubrec(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code())
-> ok | {error, emqx_mqtt_types:reason_code()}).
pubrec(SPid, PacketId, ReasonCode) ->
gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity).
-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code())
-> ok | {error, mqtt_reason_code()}).
-spec(pubrel(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code())
-> ok | {error, emqx_mqtt_types:reason_code()}).
pubrel(SPid, PacketId, ReasonCode) ->
gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity).
-spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
-spec(pubcomp(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok).
pubcomp(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}).
-spec(unsubscribe(pid(), topic_table()) -> ok).
-spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)).
-spec(unsubscribe(pid(), mqtt_packet_id(), mqtt_properties(), topic_table()) -> ok).
-spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
UnsubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
-spec(resume(pid(), pid()) -> ok).
resume(SPid, ClientPid) ->
gen_server:cast(SPid, {resume, ClientPid}).
resume(SPid, ConnPid) ->
gen_server:cast(SPid, {resume, ConnPid}).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list(tuple())).
@ -292,7 +294,7 @@ stats(#state{max_subscriptions = MaxSubscriptions,
{enqueue_msg, EnqueueMsg}]).
%% @doc Discard the session
-spec(discard(pid(), client_id()) -> ok).
-spec(discard(pid(), emqx_types:client_id()) -> ok).
discard(SPid, ClientId) ->
gen_server:call(SPid, {discard, ClientId}, infinity).
@ -342,8 +344,8 @@ init_mqueue(Zone, ClientId) ->
max_len => get_env(Zone, max_mqueue_len),
store_qos0 => get_env(Zone, mqueue_store_qos0)}).
binding(ClientPid) ->
case node(ClientPid) =:= node() of true -> local; false -> remote end.
binding(ConnPid) ->
case node(ConnPid) =:= node() of true -> local; false -> remote end.
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ConnPid], State),

View File

@ -58,7 +58,7 @@ mnesia(copy) ->
%% API
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@ -81,11 +81,11 @@ record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
%% TODO: dispatch strategy, ensure the delivery...
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) ->
case pick(subscribers(Group, Topic)) of
false -> Delivery;
SubPid -> SubPid ! {dispatch, Topic, Msg},
Delivery#delivery{flows = [{dispatch, {Group, Topic}, 1} | Flows]}
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}
end.
pick([]) ->

View File

@ -70,7 +70,7 @@ open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid :=
emqx_sm_locker:trans(ClientId, ResumeStart).
%% @doc Discard all the sessions identified by the ClientId.
-spec(discard_session(client_id()) -> ok).
-spec(discard_session(emqx_types:client_id()) -> ok).
discard_session(ClientId) when is_binary(ClientId) ->
discard_session(ClientId, self()).
@ -84,7 +84,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
end, lookup_session(ClientId)).
%% @doc Try to resume a session.
-spec(resume_session(client_id()) -> {ok, pid()} | {error, term()}).
-spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}).
resume_session(ClientId) ->
resume_session(ClientId, self()).
@ -105,14 +105,14 @@ resume_session(ClientId, ConnPid) ->
end.
%% @doc Close a session.
-spec(close_session({client_id(), pid()} | pid()) -> ok).
-spec(close_session({emqx_types:client_id(), pid()} | pid()) -> ok).
close_session({_ClientId, SPid}) ->
emqx_session:close(SPid);
close_session(SPid) when is_pid(SPid) ->
emqx_session:close(SPid).
%% @doc Register a session with attributes.
-spec(register_session(client_id() | {client_id(), pid()},
-spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()},
list(emqx_session:attribute())) -> ok).
register_session(ClientId, Attrs) when is_binary(ClientId) ->
register_session({ClientId, self()}, Attrs);
@ -129,7 +129,7 @@ register_session(Session = {ClientId, SPid}, Attrs)
notify({registered, ClientId, SPid}).
%% @doc Get session attrs
-spec(get_session_attrs({client_id(), pid()}) -> list(emqx_session:attribute())).
-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())).
get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
@ -140,7 +140,7 @@ set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), i
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}).
%% @doc Unregister a session
-spec(unregister_session(client_id() | {client_id(), pid()}) -> ok).
-spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
unregister_session(ClientId) when is_binary(ClientId) ->
unregister_session({ClientId, self()});
@ -153,13 +153,13 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(
notify({unregistered, ClientId, SPid}).
%% @doc Get session stats
-spec(get_session_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
-spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
get_session_stats(Session = {ClientId, SPid})
when is_binary(ClientId), is_pid(SPid) ->
safe_lookup_element(?SESSION_STATS_TAB, Session, []).
%% @doc Set session stats
-spec(set_session_stats(client_id() | {client_id(), pid()},
-spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
emqx_stats:stats()) -> ok).
set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
set_session_stats({ClientId, self()}, Stats);
@ -169,7 +169,7 @@ set_session_stats(Session = {ClientId, SPid}, Stats)
ets:insert(?SESSION_STATS_TAB, {Session, Stats}).
%% @doc Lookup a session from registry
-spec(lookup_session(client_id()) -> list({client_id(), pid()})).
-spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
lookup_session(ClientId) ->
case emqx_sm_registry:is_enabled() of
true -> emqx_sm_registry:lookup_session(ClientId);
@ -177,17 +177,17 @@ lookup_session(ClientId) ->
end.
%% @doc Dispatch a message to the session.
-spec(dispatch(client_id(), topic(), message()) -> any()).
-spec(dispatch(emqx_types:client_id(), emqx_topic:topic(), emqx_types:message()) -> any()).
dispatch(ClientId, Topic, Msg) ->
case lookup_session_pid(ClientId) of
Pid when is_pid(Pid) ->
Pid ! {dispatch, Topic, Msg};
undefined ->
emqx_hooks:run('message.dropped', [ClientId, Msg])
emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg])
end.
%% @doc Lookup session pid.
-spec(lookup_session_pid(client_id()) -> pid() | undefined).
-spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined).
lookup_session_pid(ClientId) ->
safe_lookup_element(?SESSION_TAB, ClientId, undefined).

View File

@ -17,6 +17,7 @@
-include("emqx.hrl").
-export([start_link/0]).
-export([trans/2, trans/3]).
-export([lock/1, lock/2, unlock/1]).
@ -24,11 +25,12 @@
start_link() ->
ekka_locker:start_link(?MODULE).
-spec(trans(client_id(), fun(([node()]) -> any())) -> any()).
-spec(trans(emqx_types:client_id(), fun(([node()]) -> any())) -> any()).
trans(ClientId, Fun) ->
trans(ClientId, Fun, undefined).
-spec(trans(client_id() | undefined, fun(([node()]) -> any()), ekka_locker:piggyback()) -> any()).
-spec(trans(emqx_types:client_id() | undefined,
fun(([node()])-> any()), ekka_locker:piggyback()) -> any()).
trans(undefined, Fun, _Piggyback) ->
Fun([]);
trans(ClientId, Fun, Piggyback) ->
@ -39,15 +41,15 @@ trans(ClientId, Fun, Piggyback) ->
{error, client_id_unavailable}
end.
-spec(lock(client_id()) -> ekka_locker:lock_result()).
-spec(lock(emqx_types:client_id()) -> ekka_locker:lock_result()).
lock(ClientId) ->
ekka_locker:aquire(?MODULE, ClientId, strategy()).
-spec(lock(client_id(), ekka_locker:piggyback()) -> ekka_locker:lock_result()).
-spec(lock(emqx_types:client_id(), ekka_locker:piggyback()) -> ekka_locker:lock_result()).
lock(ClientId, Piggyback) ->
ekka_locker:aquire(?MODULE, ClientId, strategy(), Piggyback).
-spec(unlock(client_id()) -> {boolean(), [node()]}).
-spec(unlock(emqx_types:client_id()) -> {boolean(), [node()]}).
unlock(ClientId) ->
ekka_locker:release(?MODULE, ClientId, strategy()).

View File

@ -43,16 +43,17 @@ start_link() ->
is_enabled() ->
ets:info(?TAB, name) =/= undefined.
-spec(lookup_session(client_id()) -> list({client_id(), session_pid()})).
-spec(lookup_session(emqx_types:client_id())
-> list({emqx_types:client_id(), session_pid()})).
lookup_session(ClientId) ->
[{ClientId, SessionPid} || #global_session{pid = SessionPid}
<- mnesia:dirty_read(?TAB, ClientId)].
-spec(register_session({client_id(), session_pid()}) -> ok).
-spec(register_session({emqx_types:client_id(), session_pid()}) -> ok).
register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) ->
mnesia:dirty_write(?TAB, record(ClientId, SessionPid)).
-spec(unregister_session({client_id(), session_pid()}) -> ok).
-spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok).
unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) ->
mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)).

View File

@ -14,9 +14,6 @@
-module(emqx_topic).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([match/2]).
-export([validate/1, validate/2]).
-export([levels/1]).
@ -28,11 +25,12 @@
-export([systop/1]).
-export([parse/1, parse/2]).
-type(topic() :: binary()).
-type(word() :: '' | '+' | '#' | binary()).
-type(words() :: list(word())).
-type(triple() :: {root | binary(), word(), binary()}).
-export_type([word/0, triple/0]).
-export_type([topic/0, word/0, triple/0]).
-define(MAX_TOPIC_LEN, 4096).

View File

@ -58,7 +58,7 @@ mnesia(copy) ->
%%------------------------------------------------------------------------------
%% @doc Insert a topic into the trie
-spec(insert(Topic :: topic()) -> ok).
-spec(insert(emqx_topic:topic()) -> ok).
insert(Topic) when is_binary(Topic) ->
case mnesia:read(?TRIE_NODE, Topic) of
[#trie_node{topic = Topic}] ->
@ -73,7 +73,7 @@ insert(Topic) when is_binary(Topic) ->
end.
%% @doc Find trie nodes that match the topic
-spec(match(Topic :: topic()) -> list(MatchedTopic :: topic())).
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
match(Topic) when is_binary(Topic) ->
TrieNodes = match_node(root, emqx_topic:words(Topic)),
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
@ -84,7 +84,7 @@ lookup(NodeId) ->
mnesia:read(?TRIE_NODE, NodeId).
%% @doc Delete a topic from the trie
-spec(delete(Topic :: topic()) -> ok).
-spec(delete(emqx_topic:topic()) -> ok).
delete(Topic) when is_binary(Topic) ->
case mnesia:read(?TRIE_NODE, Topic) of
[#trie_node{edge_count = 0}] ->

View File

@ -14,17 +14,29 @@
-module(emqx_types).
%%-include("emqx.hrl").
-include("emqx.hrl").
-export_type([zone/0]).
-export_type([startlink_ret/0]).
-export_type([zone/0, client_id/0, username/0, password/0, peername/0,
protocol/0, credentials/0]).
-export_type([topic/0, payload/0, dispatches/0]).
%%-export_type([payload/0, message/0, delivery/0]).
-type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}).
-export_type([pubsub/0, topic/0, subid/0, subopts/0]).
-export_type([client_id/0, username/0, password/0, peername/0, protocol/0]).
-export_type([credentials/0, session/0]).
-export_type([subscription/0, subscriber/0, topic_table/0]).
-export_type([payload/0, message/0]).
-export_type([delivery/0, deliver_results/0]).
-export_type([route/0]).
-export_type([alarm/0, plugin/0, command/0]).
-type(zone() :: atom()).
-type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}).
-type(pubsub() :: publish | subscribe).
-type(topic() :: binary()).
-type(subid() :: binary() | atom()).
-type(subopts() :: #{qos := integer(),
share => binary(),
atom() => term()
}).
-type(session() :: #session{}).
-type(client_id() :: binary() | atom()).
-type(username() :: binary() | undefined).
-type(password() :: binary() | undefined).
@ -34,12 +46,18 @@
username := username(),
peername := peername(),
zone => zone(),
atom() => term()}).
-type(topic() :: binary()).
atom() => term()
}).
-type(subscription() :: #subscription{}).
-type(subscriber() :: {pid(), subid()}).
-type(topic_table() :: [{topic(), subopts()}]).
-type(payload() :: binary() | iodata()).
%-type(message() :: #message{}).
%-type(delivery() :: #delivery{}).
-type(dispatches() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]).
-type(message() :: #message{}).
-type(delivery() :: #delivery{}).
-type(deliver_results() :: [{route, node(), topic()} |
{dispatch, topic(), pos_integer()}]).
-type(route() :: #route{}).
-type(alarm() :: #alarm{}).
-type(plugin() :: #plugin{}).
-type(command() :: #command{}).

View File

@ -33,13 +33,13 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(get_env(zone() | undefined, atom()) -> undefined | term()).
-spec(get_env(emqx_types:zone() | undefined, atom()) -> undefined | term()).
get_env(undefined, Key) ->
emqx_config:get_env(Key);
get_env(Zone, Key) ->
get_env(Zone, Key, undefined).
-spec(get_env(zone() | undefined, atom(), term()) -> undefined | term()).
-spec(get_env(emqx_types:zone() | undefined, atom(), term()) -> undefined | term()).
get_env(undefined, Key, Def) ->
emqx_config:get_env(Key, Def);
get_env(Zone, Key, Def) ->
@ -48,7 +48,7 @@ get_env(Zone, Key, Def) ->
emqx_config:get_env(Key, Def)
end.
-spec(set_env(zone(), atom(), term()) -> ok).
-spec(set_env(emqx_types:zone(), atom(), term()) -> ok).
set_env(Zone, Key, Val) ->
gen_server:cast(?MODULE, {set_env, Zone, Key, Val}).