merge emqtt to emqttd app
This commit is contained in:
parent
d255a98c09
commit
470ac34a6d
|
@ -1,70 +0,0 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
|
||||||
%%%
|
|
||||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%%% in the Software without restriction, including without limitation the rights
|
|
||||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%%% furnished to do so, subject to the following conditions:
|
|
||||||
%%%
|
|
||||||
%%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%%% copies or substantial portions of the Software.
|
|
||||||
%%%
|
|
||||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%%% SOFTWARE.
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%%% @doc
|
|
||||||
%%% MQTT Common Header.
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Protocol Version and Levels
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(MQTT_PROTO_V31, 3).
|
|
||||||
-define(MQTT_PROTO_V311, 4).
|
|
||||||
|
|
||||||
-define(PROTOCOL_NAMES, [
|
|
||||||
{?MQTT_PROTO_V31, <<"MQIsdp">>},
|
|
||||||
{?MQTT_PROTO_V311, <<"MQTT">>}]).
|
|
||||||
|
|
||||||
-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% QoS Levels
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(QOS_0, 0).
|
|
||||||
-define(QOS_1, 1).
|
|
||||||
-define(QOS_2, 2).
|
|
||||||
|
|
||||||
-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)).
|
|
||||||
|
|
||||||
-type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Message
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-type mqtt_msgid() :: undefined | 1..16#ffff.
|
|
||||||
|
|
||||||
-record(mqtt_message, {
|
|
||||||
topic :: binary(), %% topic published to
|
|
||||||
from :: binary() | atom(), %% from clientid
|
|
||||||
qos = ?QOS_0 :: mqtt_qos(),
|
|
||||||
retain = false :: boolean(),
|
|
||||||
dup = false :: boolean(),
|
|
||||||
sys = false :: boolean(), %% $SYS flag
|
|
||||||
msgid :: mqtt_msgid(),
|
|
||||||
payload :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type mqtt_message() :: #mqtt_message{}.
|
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
{application, emqtt,
|
|
||||||
[
|
|
||||||
{description, "Erlang MQTT Common Library"},
|
|
||||||
{vsn, git},
|
|
||||||
{modules, []},
|
|
||||||
{registered, []},
|
|
||||||
{applications, [
|
|
||||||
kernel,
|
|
||||||
stdlib
|
|
||||||
]},
|
|
||||||
{env, []}
|
|
||||||
]}.
|
|
|
@ -36,6 +36,12 @@
|
||||||
|
|
||||||
-define(ERTS_MINIMUM, "6.0").
|
-define(ERTS_MINIMUM, "6.0").
|
||||||
|
|
||||||
|
%% System Topics.
|
||||||
|
-define(SYSTOP, <<"$SYS">>).
|
||||||
|
|
||||||
|
%% Queue Topics.
|
||||||
|
-define(QTop, <<"$Q">>).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% PubSub
|
%% PubSub
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -56,8 +62,8 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_subscriber, {
|
-record(mqtt_subscriber, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
qos = 0 :: 0 | 1 | 2,
|
subpid :: pid(),
|
||||||
pid :: pid()
|
qos = 0 :: 0 | 1 | 2
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type mqtt_subscriber() :: #mqtt_subscriber{}.
|
-type mqtt_subscriber() :: #mqtt_subscriber{}.
|
||||||
|
@ -94,13 +100,29 @@
|
||||||
-record(mqtt_session, {
|
-record(mqtt_session, {
|
||||||
clientid,
|
clientid,
|
||||||
session_pid,
|
session_pid,
|
||||||
subscriptions = [],
|
subscriptions = []
|
||||||
awaiting_ack,
|
|
||||||
awaiting_rel
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type mqtt_session() :: #mqtt_session{}.
|
-type mqtt_session() :: #mqtt_session{}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Message
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-type mqtt_msgid() :: undefined | 1..16#ffff.
|
||||||
|
|
||||||
|
-record(mqtt_message, {
|
||||||
|
topic :: binary(), %% The topic published to
|
||||||
|
from :: binary() | atom(), %% ClientId of publisher
|
||||||
|
qos = 0 :: 0 | 1 | 2, %% Message QoS
|
||||||
|
retain = false :: boolean(), %% Retain flag
|
||||||
|
dup = false :: boolean(), %% Dup flag
|
||||||
|
sys = false :: boolean(), %% $SYS flag
|
||||||
|
msgid :: mqtt_msgid(), %% Message ID
|
||||||
|
payload :: binary() %% Payload
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type mqtt_message() :: #mqtt_message{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Plugin
|
%% MQTT Plugin
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -20,11 +20,35 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% MQTT Packet Header.
|
%%% MQTT Protocol Header.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Protocol Version and Levels
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-define(MQTT_PROTO_V31, 3).
|
||||||
|
-define(MQTT_PROTO_V311, 4).
|
||||||
|
|
||||||
|
-define(PROTOCOL_NAMES, [
|
||||||
|
{?MQTT_PROTO_V31, <<"MQIsdp">>},
|
||||||
|
{?MQTT_PROTO_V311, <<"MQTT">>}]).
|
||||||
|
|
||||||
|
-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT QoS
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(QOS_0, 0). %% At most once
|
||||||
|
-define(QOS_1, 1). %% At least once
|
||||||
|
-define(QOS_2, 2). %% Exactly once
|
||||||
|
|
||||||
|
-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)).
|
||||||
|
|
||||||
|
-type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Max ClientId Length. Why 1024? NiDongDe!
|
%% Max ClientId Length. Why 1024? NiDongDe!
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -199,4 +223,3 @@
|
||||||
-define(PACKET(Type),
|
-define(PACKET(Type),
|
||||||
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
|
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
|
||||||
|
|
||||||
|
|
|
@ -1,106 +0,0 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
|
||||||
%%%
|
|
||||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%%% in the Software without restriction, including without limitation the rights
|
|
||||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%%% furnished to do so, subject to the following conditions:
|
|
||||||
%%%
|
|
||||||
%%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%%% copies or substantial portions of the Software.
|
|
||||||
%%%
|
|
||||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%%% SOFTWARE.
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%%% @doc
|
|
||||||
%%% eMQTT System Topics.
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(SYSTOP, <<"$SYS">>).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% $SYS Topics of Broker
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_BROKERS, [
|
|
||||||
version, % Broker version
|
|
||||||
uptime, % Broker uptime
|
|
||||||
datetime, % Broker local datetime
|
|
||||||
sysdescr % Broker description
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% $SYS Topics for Clients
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_CLIENTS, [
|
|
||||||
'clients/count', % clients connected current
|
|
||||||
'clients/max' % max clients connected
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% $SYS Topics for Sessions
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_SESSIONS, [
|
|
||||||
'sessions/count',
|
|
||||||
'sessions/max'
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% $SYS Topics for Subscribers
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_PUBSUB, [
|
|
||||||
'topics/count', % ...
|
|
||||||
'topics/max', % ...
|
|
||||||
'subscribers/count', % ...
|
|
||||||
'subscribers/max', % ...
|
|
||||||
'queues/count', % ...
|
|
||||||
'queues/max' % ...
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Bytes sent and received of Broker
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_BYTES, [
|
|
||||||
{counter, 'bytes/received'}, % Total bytes received
|
|
||||||
{counter, 'bytes/sent'} % Total bytes sent
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Packets sent and received of Broker
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_PACKETS, [
|
|
||||||
{counter, 'packets/received'}, % All Packets received
|
|
||||||
{counter, 'packets/sent'}, % All Packets sent
|
|
||||||
{counter, 'packets/connect'}, % CONNECT Packets received
|
|
||||||
{counter, 'packets/connack'}, % CONNACK Packets sent
|
|
||||||
{counter, 'packets/publish/received'}, % PUBLISH packets received
|
|
||||||
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
|
|
||||||
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
|
|
||||||
{counter, 'packets/suback'}, % SUBACK packets sent
|
|
||||||
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
|
|
||||||
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
|
|
||||||
{counter, 'packets/pingreq'}, % PINGREQ packets received
|
|
||||||
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
|
|
||||||
{counter, 'packets/disconnect'} % DISCONNECT Packets received
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Messages sent and received of broker
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(SYSTOP_MESSAGES, [
|
|
||||||
{counter, 'messages/received'}, % Messages received
|
|
||||||
{counter, 'messages/sent'}, % Messages sent
|
|
||||||
{gauge, 'messages/retained/count'},% Messagea retained
|
|
||||||
{gauge, 'messages/stored/count'}, % Messages stored
|
|
||||||
{counter, 'messages/dropped'} % Messages dropped
|
|
||||||
]).
|
|
||||||
|
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
|
||||||
%%
|
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%% in the Software without restriction, including without limitation the rights
|
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%% furnished to do so, subject to the following conditions:
|
|
||||||
%%
|
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%% copies or substantial portions of the Software.
|
|
||||||
%%
|
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%% SOFTWARE.
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%%% @doc
|
|
||||||
%%% emqtt topic header.
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Topic
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(topic, {
|
|
||||||
name :: binary(),
|
|
||||||
node :: node()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type topic() :: #topic{}.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Topic Subscriber
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(topic_subscriber, {
|
|
||||||
topic :: binary(),
|
|
||||||
qos = 0 :: 0 | 1 | 2,
|
|
||||||
subpid :: pid()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type topic_subscriber() :: #topic_subscriber{}.
|
|
||||||
|
|
|
@ -73,9 +73,9 @@ compile(who, {user, Username}) ->
|
||||||
{user, bin(Username)};
|
{user, bin(Username)};
|
||||||
|
|
||||||
compile(topic, {eq, Topic}) ->
|
compile(topic, {eq, Topic}) ->
|
||||||
{eq, emqtt_topic:words(bin(Topic))};
|
{eq, emqttd_topic:words(bin(Topic))};
|
||||||
compile(topic, Topic) ->
|
compile(topic, Topic) ->
|
||||||
Words = emqtt_topic:words(bin(Topic)),
|
Words = emqttd_topic:words(bin(Topic)),
|
||||||
case 'pattern?'(Words) of
|
case 'pattern?'(Words) of
|
||||||
true -> {pattern, Words};
|
true -> {pattern, Words};
|
||||||
false -> Words
|
false -> Words
|
||||||
|
@ -126,12 +126,12 @@ match_topics(_Client, _Topic, []) ->
|
||||||
false;
|
false;
|
||||||
match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) ->
|
match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) ->
|
||||||
TopicFilter = feed_var(Client, PatternFilter),
|
TopicFilter = feed_var(Client, PatternFilter),
|
||||||
case match_topic(emqtt_topic:words(Topic), TopicFilter) of
|
case match_topic(emqttd_topic:words(Topic), TopicFilter) of
|
||||||
true -> true;
|
true -> true;
|
||||||
false -> match_topics(Client, Topic, Filters)
|
false -> match_topics(Client, Topic, Filters)
|
||||||
end;
|
end;
|
||||||
match_topics(Client, Topic, [TopicFilter|Filters]) ->
|
match_topics(Client, Topic, [TopicFilter|Filters]) ->
|
||||||
case match_topic(emqtt_topic:words(Topic), TopicFilter) of
|
case match_topic(emqttd_topic:words(Topic), TopicFilter) of
|
||||||
true -> true;
|
true -> true;
|
||||||
false -> match_topics(Client, Topic, Filters)
|
false -> match_topics(Client, Topic, Filters)
|
||||||
end.
|
end.
|
||||||
|
@ -139,7 +139,7 @@ match_topics(Client, Topic, [TopicFilter|Filters]) ->
|
||||||
match_topic(Topic, {eq, TopicFilter}) ->
|
match_topic(Topic, {eq, TopicFilter}) ->
|
||||||
Topic =:= TopicFilter;
|
Topic =:= TopicFilter;
|
||||||
match_topic(Topic, TopicFilter) ->
|
match_topic(Topic, TopicFilter) ->
|
||||||
emqtt_topic:match(Topic, TopicFilter).
|
emqttd_topic:match(Topic, TopicFilter).
|
||||||
|
|
||||||
feed_var(Client, Pattern) ->
|
feed_var(Client, Pattern) ->
|
||||||
feed_var(Client, Pattern, []).
|
feed_var(Client, Pattern, []).
|
||||||
|
|
|
@ -30,13 +30,13 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
|
@ -28,13 +28,7 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqttd_systop.hrl").
|
-include_lib("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
@ -54,6 +48,10 @@
|
||||||
%% Tick API
|
%% Tick API
|
||||||
-export([start_tick/1, stop_tick/1]).
|
-export([start_tick/1, stop_tick/1]).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
@ -62,6 +60,14 @@
|
||||||
|
|
||||||
-record(state, {started_at, sys_interval, tick_tref}).
|
-record(state, {started_at, sys_interval, tick_tref}).
|
||||||
|
|
||||||
|
%% $SYS Topics of Broker
|
||||||
|
-define(SYSTOP_BROKERS, [
|
||||||
|
version, % Broker version
|
||||||
|
uptime, % Broker uptime
|
||||||
|
datetime, % Broker local datetime
|
||||||
|
sysdescr % Broker description
|
||||||
|
]).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
@ -276,7 +282,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
create_topic(Topic) ->
|
create_topic(Topic) ->
|
||||||
emqttd_pubsub:create(emqtt_topic:systop(Topic)).
|
emqttd_pubsub:create(emqttd_topic:systop(Topic)).
|
||||||
|
|
||||||
retain(brokers) ->
|
retain(brokers) ->
|
||||||
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
|
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
|
||||||
|
@ -288,12 +294,12 @@ retain(brokers) ->
|
||||||
retain(Topic, Payload) when is_binary(Payload) ->
|
retain(Topic, Payload) when is_binary(Payload) ->
|
||||||
publish(#mqtt_message{from = broker,
|
publish(#mqtt_message{from = broker,
|
||||||
retain = true,
|
retain = true,
|
||||||
topic = emqtt_topic:systop(Topic),
|
topic = emqttd_topic:systop(Topic),
|
||||||
payload = Payload}).
|
payload = Payload}).
|
||||||
|
|
||||||
publish(Topic, Payload) when is_binary(Payload) ->
|
publish(Topic, Payload) when is_binary(Payload) ->
|
||||||
publish( #mqtt_message{from = broker,
|
publish( #mqtt_message{from = broker,
|
||||||
topic = emqtt_topic:systop(Topic),
|
topic = emqttd_topic:systop(Topic),
|
||||||
payload = Payload}).
|
payload = Payload}).
|
||||||
|
|
||||||
publish(Msg) ->
|
publish(Msg) ->
|
||||||
|
|
|
@ -28,9 +28,9 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/2, info/1]).
|
-export([start_link/2, info/1]).
|
||||||
|
@ -68,7 +68,7 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
|
||||||
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
||||||
lager:info("Connect from ~s", [ConnStr]),
|
lager:info("Connect from ~s", [ConnStr]),
|
||||||
SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
|
SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
|
||||||
ParserState = emqtt_parser:init(PacketOpts),
|
ParserState = emqttd_parser:init(PacketOpts),
|
||||||
ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts),
|
ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts),
|
||||||
State = control_throttle(#state{transport = Transport,
|
State = control_throttle(#state{transport = Transport,
|
||||||
socket = NewSock,
|
socket = NewSock,
|
||||||
|
@ -177,7 +177,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
|
||||||
parse_state = ParseState,
|
parse_state = ParseState,
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
conn_name = ConnStr}) ->
|
conn_name = ConnStr}) ->
|
||||||
case emqtt_parser:parse(Bytes, ParseState) of
|
case emqttd_parser:parse(Bytes, ParseState) of
|
||||||
{more, ParseState1} ->
|
{more, ParseState1} ->
|
||||||
{noreply,
|
{noreply,
|
||||||
control_throttle(State #state{parse_state = ParseState1}),
|
control_throttle(State #state{parse_state = ParseState1}),
|
||||||
|
@ -186,7 +186,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
|
||||||
received_stats(Packet),
|
received_stats(Packet),
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(PacketOpts),
|
process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts),
|
||||||
proto_state = ProtoState1});
|
proto_state = ProtoState1});
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||||
|
|
|
@ -29,8 +29,7 @@
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
|
||||||
|
|
||||||
-import(proplists, [get_value/2, get_value/3]).
|
-import(proplists, [get_value/2, get_value/3]).
|
||||||
|
|
||||||
|
@ -122,7 +121,7 @@ validate(qos, Qos) ->
|
||||||
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
|
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
|
||||||
|
|
||||||
validate(topic, Topic) ->
|
validate(topic, Topic) ->
|
||||||
emqtt_topic:validate({name, Topic}).
|
emqttd_topic:validate({name, Topic}).
|
||||||
|
|
||||||
int(S) -> list_to_integer(S).
|
int(S) -> list_to_integer(S).
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-export([new/2, is_full/1, len/1, in/2, ack/2]).
|
-export([new/2, is_full/1, len/1, in/2, ack/2]).
|
||||||
|
|
||||||
|
|
|
@ -24,13 +24,13 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_message).
|
-module(emqttd_message).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-export([from_packet/1, from_packet/2, to_packet/1]).
|
-export([from_packet/1, from_packet/2, to_packet/1]).
|
||||||
|
|
|
@ -28,9 +28,7 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqttd_systop.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
@ -48,9 +46,41 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-record(state, {tick_tref}).
|
||||||
|
|
||||||
-define(METRIC_TAB, mqtt_metric).
|
-define(METRIC_TAB, mqtt_metric).
|
||||||
|
|
||||||
-record(state, {tick_tref}).
|
%% Bytes sent and received of Broker
|
||||||
|
-define(SYSTOP_BYTES, [
|
||||||
|
{counter, 'bytes/received'}, % Total bytes received
|
||||||
|
{counter, 'bytes/sent'} % Total bytes sent
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Packets sent and received of Broker
|
||||||
|
-define(SYSTOP_PACKETS, [
|
||||||
|
{counter, 'packets/received'}, % All Packets received
|
||||||
|
{counter, 'packets/sent'}, % All Packets sent
|
||||||
|
{counter, 'packets/connect'}, % CONNECT Packets received
|
||||||
|
{counter, 'packets/connack'}, % CONNACK Packets sent
|
||||||
|
{counter, 'packets/publish/received'}, % PUBLISH packets received
|
||||||
|
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
|
||||||
|
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
|
||||||
|
{counter, 'packets/suback'}, % SUBACK packets sent
|
||||||
|
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
|
||||||
|
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
|
||||||
|
{counter, 'packets/pingreq'}, % PINGREQ packets received
|
||||||
|
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
|
||||||
|
{counter, 'packets/disconnect'} % DISCONNECT Packets received
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Messages sent and received of broker
|
||||||
|
-define(SYSTOP_MESSAGES, [
|
||||||
|
{counter, 'messages/received'}, % Messages received
|
||||||
|
{counter, 'messages/sent'}, % Messages sent
|
||||||
|
{gauge, 'messages/retained/count'},% Messagea retained
|
||||||
|
{gauge, 'messages/stored/count'}, % Messages stored
|
||||||
|
{counter, 'messages/dropped'} % Messages dropped
|
||||||
|
]).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -204,6 +234,6 @@ create_metric({counter, Name}) ->
|
||||||
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
||||||
|
|
||||||
metric_topic(Metric) ->
|
metric_topic(Metric) ->
|
||||||
emqtt_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))).
|
emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -29,12 +29,10 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-behaviour(emqttd_gen_mod).
|
-behaviour(emqttd_gen_mod).
|
||||||
|
|
||||||
-export([load/1, client_connected/3, unload/1]).
|
-export([load/1, client_connected/3, unload/1]).
|
||||||
|
@ -48,7 +46,7 @@ load(Opts) ->
|
||||||
{ok, #state{topics = Topics}}.
|
{ok, #state{topics = Topics}}.
|
||||||
|
|
||||||
client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) ->
|
client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) ->
|
||||||
F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end,
|
F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
|
||||||
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};
|
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};
|
||||||
|
|
||||||
client_connected(_ConnAck, _Client, _Topics) ->
|
client_connected(_ConnAck, _Client, _Topics) ->
|
||||||
|
|
|
@ -28,8 +28,6 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-export([load/1, unload/1]).
|
-export([load/1, unload/1]).
|
||||||
|
@ -75,9 +73,9 @@ unload(_Opts) ->
|
||||||
|
|
||||||
|
|
||||||
topic(connected, ClientId) ->
|
topic(connected, ClientId) ->
|
||||||
emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
|
emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
|
||||||
topic(disconnected, ClientId) ->
|
topic(disconnected, ClientId) ->
|
||||||
emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])).
|
emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])).
|
||||||
|
|
||||||
reason(Reason) when is_atom(Reason) -> Reason;
|
reason(Reason) when is_atom(Reason) -> Reason;
|
||||||
reason({Error, _}) when is_atom(Error) -> Error;
|
reason({Error, _}) when is_atom(Error) -> Error;
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-behaviour(emqttd_gen_mod).
|
-behaviour(emqttd_gen_mod).
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ compile(Sections) ->
|
||||||
match_topic(Topic, []) ->
|
match_topic(Topic, []) ->
|
||||||
Topic;
|
Topic;
|
||||||
match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
|
match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
|
||||||
case emqtt_topic:match(Topic, Filter) of
|
case emqttd_topic:match(Topic, Filter) of
|
||||||
true ->
|
true ->
|
||||||
match_rule(Topic, Rules);
|
match_rule(Topic, Rules);
|
||||||
false ->
|
false ->
|
||||||
|
|
|
@ -52,7 +52,8 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-export([new/2, name/1,
|
-export([new/2, name/1,
|
||||||
is_empty/1, is_full/1,
|
is_empty/1, is_full/1,
|
||||||
|
@ -62,8 +63,6 @@
|
||||||
|
|
||||||
-define(HIGH_WM, 0.6).
|
-define(HIGH_WM, 0.6).
|
||||||
|
|
||||||
-define(MAX_LEN, 1000).
|
|
||||||
|
|
||||||
-record(mqueue, {name,
|
-record(mqueue, {name,
|
||||||
q = queue:new(), %% pending queue
|
q = queue:new(), %% pending queue
|
||||||
len = 0, %% current queue len
|
len = 0, %% current queue len
|
||||||
|
@ -89,7 +88,7 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec new(binary(), list(mqueue_option())) -> mqueue().
|
-spec new(binary(), list(mqueue_option())) -> mqueue().
|
||||||
new(Name, Opts) ->
|
new(Name, Opts) ->
|
||||||
MaxLen = emqttd_opts:g(max_length, Opts, ?MAX_LEN),
|
MaxLen = emqttd_opts:g(max_length, Opts, 1000),
|
||||||
#mqueue{name = Name,
|
#mqueue{name = Name,
|
||||||
max_len = MaxLen,
|
max_len = MaxLen,
|
||||||
low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)),
|
low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)),
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
%% Mnesia callbacks
|
%% Mnesia callbacks
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
@ -74,7 +74,7 @@ retain(Msg = #mqtt_message{topic = Topic,
|
||||||
TabSize = mnesia:table_info(message, size),
|
TabSize = mnesia:table_info(message, size),
|
||||||
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
lager:debug("Retained ~s", [emqtt_message:format(Msg)]),
|
lager:debug("Retained ~s", [emqttd_message:format(Msg)]),
|
||||||
mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]),
|
mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]),
|
||||||
emqttd_metrics:set('messages/retained/count',
|
emqttd_metrics:set('messages/retained/count',
|
||||||
mnesia:table_info(message, size));
|
mnesia:table_info(message, size));
|
||||||
|
@ -106,12 +106,12 @@ env() ->
|
||||||
Topic :: binary(),
|
Topic :: binary(),
|
||||||
CPid :: pid().
|
CPid :: pid().
|
||||||
redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
|
redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
|
||||||
case emqtt_topic:wildcard(Topic) of
|
case emqttd_topic:wildcard(Topic) of
|
||||||
false ->
|
false ->
|
||||||
dispatch(CPid, mnesia:dirty_read(message, Topic));
|
dispatch(CPid, mnesia:dirty_read(message, Topic));
|
||||||
true ->
|
true ->
|
||||||
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
|
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
|
||||||
case emqtt_topic:match(Name, Topic) of
|
case emqttd_topic:match(Name, Topic) of
|
||||||
true -> [Msg|Acc];
|
true -> [Msg|Acc];
|
||||||
false -> Acc
|
false -> Acc
|
||||||
end
|
end
|
||||||
|
|
|
@ -24,13 +24,13 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_packet).
|
-module(emqttd_packet).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([protocol_name/1, type_name/1, connack_name/1]).
|
-export([protocol_name/1, type_name/1, connack_name/1]).
|
|
@ -24,13 +24,13 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_parser).
|
-module(emqttd_parser).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([init/1, parse/2]).
|
-export([init/1, parse/2]).
|
|
@ -28,12 +28,10 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([init/3, info/1, clientid/1, client/1]).
|
-export([init/3, info/1, clientid/1, client/1]).
|
||||||
|
|
||||||
|
@ -259,18 +257,18 @@ handle(?PACKET(?DISCONNECT), State) ->
|
||||||
{stop, normal, State#proto_state{will_msg = undefined}}.
|
{stop, normal, State#proto_state{will_msg = undefined}}.
|
||||||
|
|
||||||
do_publish(Session, ClientId, Packet) ->
|
do_publish(Session, ClientId, Packet) ->
|
||||||
Msg = emqtt_message:from_packet(ClientId, Packet),
|
Msg = emqttd_message:from_packet(ClientId, Packet),
|
||||||
Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
|
Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
|
||||||
emqttd_session:publish(Session, Msg1).
|
emqttd_session:publish(Session, Msg1).
|
||||||
|
|
||||||
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
||||||
send(Msg, State) when is_record(Msg, mqtt_message) ->
|
send(Msg, State) when is_record(Msg, mqtt_message) ->
|
||||||
send(emqtt_message:to_packet(Msg), State);
|
send(emqttd_message:to_packet(Msg), State);
|
||||||
|
|
||||||
send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->
|
send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->
|
||||||
trace(send, Packet, State),
|
trace(send, Packet, State),
|
||||||
sent_stats(Packet),
|
sent_stats(Packet),
|
||||||
Data = emqtt_serialiser:serialise(Packet),
|
Data = emqttd_serialiser:serialise(Packet),
|
||||||
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
||||||
emqttd_metrics:inc('bytes/sent', size(Data)),
|
emqttd_metrics:inc('bytes/sent', size(Data)),
|
||||||
SendFun(Data),
|
SendFun(Data),
|
||||||
|
@ -278,11 +276,11 @@ send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when
|
||||||
|
|
||||||
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
||||||
lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
|
lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
|
||||||
[ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]);
|
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]);
|
||||||
|
|
||||||
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
||||||
lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
|
lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
|
||||||
[ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]).
|
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]).
|
||||||
|
|
||||||
%% @doc redeliver PUBREL PacketId
|
%% @doc redeliver PUBREL PacketId
|
||||||
redeliver({?PUBREL, PacketId}, State) ->
|
redeliver({?PUBREL, PacketId}, State) ->
|
||||||
|
@ -310,7 +308,7 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg
|
||||||
emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]).
|
emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]).
|
||||||
|
|
||||||
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
|
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
|
||||||
emqtt_message:from_packet(Packet).
|
emqttd_message:from_packet(Packet).
|
||||||
|
|
||||||
%% generate a clientId
|
%% generate a clientId
|
||||||
clientid(undefined, State) ->
|
clientid(undefined, State) ->
|
||||||
|
@ -366,7 +364,7 @@ validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess,
|
||||||
|
|
||||||
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
||||||
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
|
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
|
||||||
case emqtt_topic:validate({name, Topic}) of
|
case emqttd_topic:validate({name, Topic}) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
||||||
end;
|
end;
|
||||||
|
@ -390,7 +388,7 @@ validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
|
||||||
|
|
||||||
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
||||||
ErrTopics = [Topic || {Topic, Qos} <- Topics,
|
ErrTopics = [Topic || {Topic, Qos} <- Topics,
|
||||||
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
||||||
case ErrTopics of
|
case ErrTopics of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
_ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
|
_ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
|
||||||
|
|
|
@ -31,9 +31,7 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
|
||||||
|
|
||||||
%% Mnesia Callbacks
|
%% Mnesia Callbacks
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
@ -165,7 +163,7 @@ publish(From, #mqtt_message{topic=Topic} = Msg) ->
|
||||||
case emqttd_msg_store:retain(Msg) of
|
case emqttd_msg_store:retain(Msg) of
|
||||||
ok ->
|
ok ->
|
||||||
%TODO: why unset 'retain' flag?
|
%TODO: why unset 'retain' flag?
|
||||||
publish(From, Topic, emqtt_message:unset_flag(Msg));
|
publish(From, Topic, emqttd_message:unset_flag(Msg));
|
||||||
ignore ->
|
ignore ->
|
||||||
publish(From, Topic, Msg)
|
publish(From, Topic, Msg)
|
||||||
end.
|
end.
|
||||||
|
@ -197,7 +195,7 @@ dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) ->
|
||||||
Subscribers = mnesia:dirty_read(subscriber, Topic),
|
Subscribers = mnesia:dirty_read(subscriber, Topic),
|
||||||
setstats(dropped, Subscribers =:= []), %%TODO:...
|
setstats(dropped, Subscribers =:= []), %%TODO:...
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(#mqtt_subscriber{qos = SubQos, pid=SubPid}) ->
|
fun(#mqtt_subscriber{subpid=SubPid, qos = SubQos}) ->
|
||||||
Msg1 = if
|
Msg1 = if
|
||||||
Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
|
Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
|
||||||
true -> Msg
|
true -> Msg
|
||||||
|
@ -226,7 +224,7 @@ handle_call({subscribe, SubPid, Topics}, _From, State) ->
|
||||||
#mqtt_queue{name = Queue, subpid = SubPid, qos = Qos};
|
#mqtt_queue{name = Queue, subpid = SubPid, qos = Qos};
|
||||||
({Topic, Qos}) ->
|
({Topic, Qos}) ->
|
||||||
{#mqtt_topic{topic = Topic, node = node()},
|
{#mqtt_topic{topic = Topic, node = node()},
|
||||||
#mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}
|
#mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}}
|
||||||
end, Topics),
|
end, Topics),
|
||||||
F = fun() ->
|
F = fun() ->
|
||||||
lists:map(fun(QueueR) when is_record(QueueR, mqtt_queue) ->
|
lists:map(fun(QueueR) when is_record(QueueR, mqtt_queue) ->
|
||||||
|
@ -261,7 +259,7 @@ handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State)
|
||||||
|
|
||||||
handle_call({subscribe, SubPid, Topic, Qos}, _From, State) ->
|
handle_call({subscribe, SubPid, Topic, Qos}, _From, State) ->
|
||||||
TopicR = #mqtt_topic{topic = Topic, node = node()},
|
TopicR = #mqtt_topic{topic = Topic, node = node()},
|
||||||
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid},
|
Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos},
|
||||||
case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of
|
case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of
|
||||||
{atomic, ok} ->
|
{atomic, ok} ->
|
||||||
setstats(all),
|
setstats(all),
|
||||||
|
@ -280,7 +278,7 @@ handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
|
||||||
#mqtt_queue{name = Queue, subpid = SubPid};
|
#mqtt_queue{name = Queue, subpid = SubPid};
|
||||||
(Topic) ->
|
(Topic) ->
|
||||||
{#mqtt_topic{topic = Topic, node = node()},
|
{#mqtt_topic{topic = Topic, node = node()},
|
||||||
#mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}}
|
#mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}}
|
||||||
end, Topics),
|
end, Topics),
|
||||||
F = fun() ->
|
F = fun() ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -309,7 +307,7 @@ handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) ->
|
||||||
|
|
||||||
handle_cast({unsubscribe, SubPid, Topic}, State) ->
|
handle_cast({unsubscribe, SubPid, Topic}, State) ->
|
||||||
TopicR = #mqtt_topic{topic = Topic, node = node()},
|
TopicR = #mqtt_topic{topic = Topic, node = node()},
|
||||||
Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid},
|
Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'},
|
||||||
case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of
|
case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of
|
||||||
{atomic, _} -> ok;
|
{atomic, _} -> ok;
|
||||||
{aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error])
|
{aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error])
|
||||||
|
@ -333,7 +331,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
|
||||||
end, Queues),
|
end, Queues),
|
||||||
|
|
||||||
%% remove subscribers...
|
%% remove subscribers...
|
||||||
Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid),
|
Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid),
|
||||||
lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) ->
|
lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) ->
|
||||||
mnesia:delete_object(subscriber, Sub, write),
|
mnesia:delete_object(subscriber, Sub, write),
|
||||||
try_remove_topic(#mqtt_topic{topic = Topic, node = Node})
|
try_remove_topic(#mqtt_topic{topic = Topic, node = Node})
|
||||||
|
@ -387,12 +385,12 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Fix issue #53 - Remove Overlapping Subscriptions
|
%% Fix issue #53 - Remove Overlapping Subscriptions
|
||||||
add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}})
|
add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}})
|
||||||
when is_record(TopicR, mqtt_topic) ->
|
when is_record(TopicR, mqtt_topic) ->
|
||||||
case add_topic(TopicR) of
|
case add_topic(TopicR) of
|
||||||
ok ->
|
ok ->
|
||||||
OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos}
|
OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos}
|
||||||
<- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid),
|
<- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.subpid),
|
||||||
SubTopic =:= Topic, SubQos =/= Qos],
|
SubTopic =:= Topic, SubQos =/= Qos],
|
||||||
|
|
||||||
%% remove overlapping subscribers
|
%% remove overlapping subscribers
|
||||||
|
|
|
@ -24,13 +24,13 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_serialiser).
|
-module(emqttd_serialiser).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([serialise/1]).
|
-export([serialise/1]).
|
|
@ -49,9 +49,7 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
|
||||||
|
|
||||||
%% Session API
|
%% Session API
|
||||||
-export([start_link/3, resume/3, destroy/2]).
|
-export([start_link/3, resume/3, destroy/2]).
|
||||||
|
|
|
@ -28,16 +28,14 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqttd_systop.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-export([start_link/0]).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
%% statistics API.
|
%% statistics API.
|
||||||
-export([statsfun/1, statsfun/2,
|
-export([statsfun/1, statsfun/2,
|
||||||
getstats/0, getstat/1,
|
getstats/0, getstat/1,
|
||||||
|
@ -47,9 +45,31 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-record(state, {tick_tref}).
|
||||||
|
|
||||||
-define(STATS_TAB, mqtt_stats).
|
-define(STATS_TAB, mqtt_stats).
|
||||||
|
|
||||||
-record(state, {tick_tref}).
|
%% $SYS Topics for Clients
|
||||||
|
-define(SYSTOP_CLIENTS, [
|
||||||
|
'clients/count', % clients connected current
|
||||||
|
'clients/max' % max clients connected
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% $SYS Topics for Sessions
|
||||||
|
-define(SYSTOP_SESSIONS, [
|
||||||
|
'sessions/count',
|
||||||
|
'sessions/max'
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% $SYS Topics for Subscribers
|
||||||
|
-define(SYSTOP_PUBSUB, [
|
||||||
|
'topics/count', % ...
|
||||||
|
'topics/max', % ...
|
||||||
|
'subscribers/count', % ...
|
||||||
|
'subscribers/max', % ...
|
||||||
|
'queues/count', % ...
|
||||||
|
'queues/max' % ...
|
||||||
|
]).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -159,5 +179,5 @@ publish(Stat, Val) ->
|
||||||
payload = emqttd_util:integer_to_binary(Val)}).
|
payload = emqttd_util:integer_to_binary(Val)}).
|
||||||
|
|
||||||
stats_topic(Stat) ->
|
stats_topic(Stat) ->
|
||||||
emqtt_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
|
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_topic).
|
-module(emqttd_topic).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
|
@ -102,7 +102,7 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
mnesia:write(TrieNode#trie_node{topic=Topic});
|
mnesia:write(TrieNode#trie_node{topic=Topic});
|
||||||
[] ->
|
[] ->
|
||||||
%add trie path
|
%add trie path
|
||||||
[add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],
|
[add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
|
||||||
%add last node
|
%add last node
|
||||||
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
|
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
|
||||||
end.
|
end.
|
||||||
|
@ -113,7 +113,7 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()).
|
-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()).
|
||||||
find(Topic) when is_binary(Topic) ->
|
find(Topic) when is_binary(Topic) ->
|
||||||
TrieNodes = match_node(root, emqtt_topic:words(Topic), []),
|
TrieNodes = match_node(root, emqttd_topic:words(Topic), []),
|
||||||
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
|
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -125,7 +125,7 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
case mnesia:read(trie_node, Topic) of
|
case mnesia:read(trie_node, Topic) of
|
||||||
[#trie_node{edge_count=0}] ->
|
[#trie_node{edge_count=0}] ->
|
||||||
mnesia:delete({trie_node, Topic}),
|
mnesia:delete({trie_node, Topic}),
|
||||||
delete_path(lists:reverse(emqtt_topic:triples(Topic)));
|
delete_path(lists:reverse(emqttd_topic:triples(Topic)));
|
||||||
[TrieNode] ->
|
[TrieNode] ->
|
||||||
mnesia:write(TrieNode#trie_node{topic=Topic});
|
mnesia:write(TrieNode#trie_node{topic=Topic});
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -29,30 +29,24 @@
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/1, ws_loop/3]).
|
-export([start_link/1, ws_loop/3]).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
%% WebSocket loop state
|
%% WebSocket Loop State
|
||||||
-record(wsocket_state, {request,
|
-record(wsocket_state, {request, client_pid, packet_opts, parser_state}).
|
||||||
client_pid,
|
|
||||||
packet_opts,
|
|
||||||
parser_state}).
|
|
||||||
|
|
||||||
%% Client state
|
%% Client State
|
||||||
-record(client_state, {ws_pid,
|
-record(client_state, {ws_pid, request, proto_state, keepalive}).
|
||||||
request,
|
|
||||||
proto_state,
|
|
||||||
keepalive}).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Start WebSocket client.
|
%% @doc Start WebSocket client.
|
||||||
|
@ -65,7 +59,7 @@ start_link(Req) ->
|
||||||
ReentryWs(#wsocket_state{request = Req,
|
ReentryWs(#wsocket_state{request = Req,
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid,
|
||||||
packet_opts = PktOpts,
|
packet_opts = PktOpts,
|
||||||
parser_state = emqtt_parser:init(PktOpts)}).
|
parser_state = emqttd_parser:init(PktOpts)}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -88,7 +82,7 @@ ws_loop(Data, State = #wsocket_state{request = Req,
|
||||||
parser_state = ParserState}, ReplyChannel) ->
|
parser_state = ParserState}, ReplyChannel) ->
|
||||||
Peer = Req:get(peer),
|
Peer = Req:get(peer),
|
||||||
lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]),
|
lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]),
|
||||||
case emqtt_parser:parse(iolist_to_binary(Data), ParserState) of
|
case emqttd_parser:parse(iolist_to_binary(Data), ParserState) of
|
||||||
{more, ParserState1} ->
|
{more, ParserState1} ->
|
||||||
State#wsocket_state{parser_state = ParserState1};
|
State#wsocket_state{parser_state = ParserState1};
|
||||||
{ok, Packet, Rest} ->
|
{ok, Packet, Rest} ->
|
||||||
|
@ -100,7 +94,7 @@ ws_loop(Data, State = #wsocket_state{request = Req,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
|
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
|
||||||
State#wsocket_state{parser_state = emqtt_parser:init(PktOpts)}.
|
State#wsocket_state{parser_state = emqttd_parser:init(PktOpts)}.
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_fsm callbacks
|
%%% gen_fsm callbacks
|
||||||
|
|
|
@ -20,21 +20,20 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqtt_parser tests.
|
%%% emqttd_parser tests.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_parser_tests).
|
-module(emqttd_parser_tests).
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
-include("emqtt_packet.hrl").
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
parse_connect_test() ->
|
parse_connect_test() ->
|
||||||
State = emqtt_parser:init([]),
|
State = emqttd_parser:init([]),
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||||
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -46,7 +45,7 @@ parse_connect_test() ->
|
||||||
proto_name = <<"MQIsdp">>,
|
proto_name = <<"MQIsdp">>,
|
||||||
clientid = <<"mosqpub/10451-iMac.loca">>,
|
clientid = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60}}, <<>>}, emqtt_parser:parse(V31ConnBin, State)),
|
keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)),
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||||
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -58,7 +57,7 @@ parse_connect_test() ->
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
clientid = <<"mosqpub/10451-iMac.loca">>,
|
clientid = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnBin, State)),
|
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)),
|
||||||
|
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
||||||
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||||
|
@ -71,7 +70,7 @@ parse_connect_test() ->
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
clientid = <<>>,
|
clientid = <<>>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnWithoutClientId, State)),
|
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)),
|
||||||
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
||||||
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -91,11 +90,11 @@ parse_connect_test() ->
|
||||||
will_msg = <<"willmsg">> ,
|
will_msg = <<"willmsg">> ,
|
||||||
username = <<"test">>,
|
username = <<"test">>,
|
||||||
password = <<"public">>}},
|
password = <<"public">>}},
|
||||||
<<>> }, emqtt_parser:parse(ConnBinWithWill, State)),
|
<<>> }, emqttd_parser:parse(ConnBinWithWill, State)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
parse_publish_test() ->
|
parse_publish_test() ->
|
||||||
State = emqtt_parser:init([]),
|
State = emqttd_parser:init([]),
|
||||||
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
||||||
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -105,7 +104,7 @@ parse_publish_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
||||||
packet_id = 1},
|
packet_id = 1},
|
||||||
payload = <<"hahah">> }, <<>>}, emqtt_parser:parse(PubBin, State)),
|
payload = <<"hahah">> }, <<>>}, emqttd_parser:parse(PubBin, State)),
|
||||||
|
|
||||||
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
||||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||||
|
@ -117,13 +116,13 @@ parse_publish_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
||||||
packet_id = undefined},
|
packet_id = undefined},
|
||||||
payload = <<"hello">> }, <<224,0>>}, emqtt_parser:parse(PubBin1, State)),
|
payload = <<"hello">> }, <<224,0>>}, emqttd_parser:parse(PubBin1, State)),
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
header = #mqtt_packet_header{type = ?DISCONNECT,
|
header = #mqtt_packet_header{type = ?DISCONNECT,
|
||||||
dup = false,
|
dup = false,
|
||||||
qos = 0,
|
qos = 0,
|
||||||
retain = false}
|
retain = false}
|
||||||
}, <<>>}, emqtt_parser:parse(<<224, 0>>, State)).
|
}, <<>>}, emqttd_parser:parse(<<224, 0>>, State)).
|
||||||
|
|
||||||
parse_puback_test() ->
|
parse_puback_test() ->
|
||||||
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
||||||
|
@ -133,7 +132,7 @@ parse_puback_test() ->
|
||||||
dup = false,
|
dup = false,
|
||||||
qos = 0,
|
qos = 0,
|
||||||
retain = false }
|
retain = false }
|
||||||
}, <<>>}, emqtt_parser:parse(PubAckBin, emqtt_parser:init([]))),
|
}, <<>>}, emqttd_parser:parse(PubAckBin, emqttd_parser:init([]))),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
parse_subscribe_test() ->
|
parse_subscribe_test() ->
|
||||||
|
@ -150,7 +149,7 @@ parse_disconnect_test() ->
|
||||||
dup = false,
|
dup = false,
|
||||||
qos = 0,
|
qos = 0,
|
||||||
retain = false}
|
retain = false}
|
||||||
}, <<>>}, emqtt_parser:parse(Bin, emqtt_parser:init([]))).
|
}, <<>>}, emqttd_parser:parse(Bin, emqttd_parser:init([]))).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -20,58 +20,59 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqtt_serialiser tests.
|
%%% emqttd_serialiser tests.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_serialiser_tests).
|
-module(emqttd_serialiser_tests).
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
|
||||||
-include("emqtt_packet.hrl").
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-import(emqttd_serialiser, [serialise/1]).
|
||||||
|
|
||||||
serialise_connect_test() ->
|
serialise_connect_test() ->
|
||||||
emqtt_serialiser:serialise(?CONNECT_PACKET(#mqtt_packet_connect{})).
|
serialise(?CONNECT_PACKET(#mqtt_packet_connect{})).
|
||||||
|
|
||||||
serialise_connack_test() ->
|
serialise_connack_test() ->
|
||||||
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||||
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
|
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
|
||||||
?assertEqual(<<32,2,0,0>>, emqtt_serialiser:serialise(ConnAck)).
|
?assertEqual(<<32,2,0,0>>, serialise(ConnAck)).
|
||||||
|
|
||||||
serialise_publish_test() ->
|
serialise_publish_test() ->
|
||||||
emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
|
serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
|
||||||
emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)).
|
serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)).
|
||||||
|
|
||||||
serialise_puback_test() ->
|
serialise_puback_test() ->
|
||||||
emqtt_serialiser:serialise(?PUBACK_PACKET(?PUBACK, 10384)).
|
serialise(?PUBACK_PACKET(?PUBACK, 10384)).
|
||||||
|
|
||||||
serialise_pubrel_test() ->
|
serialise_pubrel_test() ->
|
||||||
emqtt_serialiser:serialise(?PUBREL_PACKET(10384)).
|
serialise(?PUBREL_PACKET(10384)).
|
||||||
|
|
||||||
serialise_subscribe_test() ->
|
serialise_subscribe_test() ->
|
||||||
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
|
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
|
||||||
emqtt_serialiser:serialise(?SUBSCRIBE_PACKET(10, TopicTable)).
|
serialise(?SUBSCRIBE_PACKET(10, TopicTable)).
|
||||||
|
|
||||||
serialise_suback_test() ->
|
serialise_suback_test() ->
|
||||||
emqtt_serialiser:serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
|
serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
|
||||||
|
|
||||||
serialise_unsubscribe_test() ->
|
serialise_unsubscribe_test() ->
|
||||||
emqtt_serialiser:serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
|
serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
|
||||||
|
|
||||||
serialise_unsuback_test() ->
|
serialise_unsuback_test() ->
|
||||||
emqtt_serialiser:serialise(?UNSUBACK_PACKET(10)).
|
serialise(?UNSUBACK_PACKET(10)).
|
||||||
|
|
||||||
serialise_pingreq_test() ->
|
serialise_pingreq_test() ->
|
||||||
emqtt_serialiser:serialise(?PACKET(?PINGREQ)).
|
serialise(?PACKET(?PINGREQ)).
|
||||||
|
|
||||||
serialise_pingresp_test() ->
|
serialise_pingresp_test() ->
|
||||||
emqtt_serialiser:serialise(?PACKET(?PINGRESP)).
|
serialise(?PACKET(?PINGRESP)).
|
||||||
|
|
||||||
serialise_disconnect_test() ->
|
serialise_disconnect_test() ->
|
||||||
emqtt_serialiser:serialise(?PACKET(?DISCONNECT)).
|
serialise(?PACKET(?DISCONNECT)).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -19,14 +19,14 @@
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-module(emqtt_topic_tests).
|
-module(emqttd_topic_tests).
|
||||||
|
|
||||||
-import(emqtt_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-import(emqttd_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]).
|
||||||
|
|
||||||
-define(N, 100000).
|
-define(N, 100000).
|
||||||
|
|
||||||
validate_test() ->
|
validate_test() ->
|
||||||
|
@ -116,11 +116,11 @@ words_test() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
feed_var_test() ->
|
feed_var_test() ->
|
||||||
?assertEqual(<<"$Q/client/clientId">>, emqtt_topic:feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)).
|
?assertEqual(<<"$Q/client/clientId">>, emqttd_topic:feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)).
|
||||||
|
|
||||||
join_test() ->
|
join_test() ->
|
||||||
?assertEqual(<<"/ab/cd/ef/">>, emqtt_topic:join(words(<<"/ab/cd/ef/">>))),
|
?assertEqual(<<"/ab/cd/ef/">>, emqttd_topic:join(words(<<"/ab/cd/ef/">>))),
|
||||||
?assertEqual(<<"ab/+/#">>, emqtt_topic:join(words(<<"ab/+/#">>))).
|
?assertEqual(<<"ab/+/#">>, emqttd_topic:join(words(<<"ab/+/#">>))).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -25,7 +25,6 @@
|
||||||
gproc,
|
gproc,
|
||||||
esockd,
|
esockd,
|
||||||
mochiweb,
|
mochiweb,
|
||||||
{emqtt, load},
|
|
||||||
emqttd
|
emqttd
|
||||||
]},
|
]},
|
||||||
{rel, "start_clean", "",
|
{rel, "start_clean", "",
|
||||||
|
@ -62,7 +61,6 @@
|
||||||
{app, gproc, [{incl_cond, include}]},
|
{app, gproc, [{incl_cond, include}]},
|
||||||
{app, esockd, [{mod_cond, app}, {incl_cond, include}]},
|
{app, esockd, [{mod_cond, app}, {incl_cond, include}]},
|
||||||
{app, mochiweb, [{mod_cond, app}, {incl_cond, include}]},
|
{app, mochiweb, [{mod_cond, app}, {incl_cond, include}]},
|
||||||
{app, emqtt, [{mod_cond, app}, {incl_cond, include}]},
|
|
||||||
{app, emqttd, [{mod_cond, app}, {incl_cond, include}]}
|
{app, emqttd, [{mod_cond, app}, {incl_cond, include}]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue