diff --git a/apps/emqtt/include/emqtt.hrl b/apps/emqtt/include/emqtt.hrl deleted file mode 100644 index e0a2aab63..000000000 --- a/apps/emqtt/include/emqtt.hrl +++ /dev/null @@ -1,70 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Common Header. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - -%%------------------------------------------------------------------------------ -%% MQTT Protocol Version and Levels -%%------------------------------------------------------------------------------ --define(MQTT_PROTO_V31, 3). --define(MQTT_PROTO_V311, 4). - --define(PROTOCOL_NAMES, [ - {?MQTT_PROTO_V31, <<"MQIsdp">>}, - {?MQTT_PROTO_V311, <<"MQTT">>}]). - --type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311. - -%%------------------------------------------------------------------------------ -%% QoS Levels -%%------------------------------------------------------------------------------ - --define(QOS_0, 0). --define(QOS_1, 1). --define(QOS_2, 2). - --define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). - --type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2. - -%%------------------------------------------------------------------------------ -%% MQTT Message -%%------------------------------------------------------------------------------ - --type mqtt_msgid() :: undefined | 1..16#ffff. - --record(mqtt_message, { - topic :: binary(), %% topic published to - from :: binary() | atom(), %% from clientid - qos = ?QOS_0 :: mqtt_qos(), - retain = false :: boolean(), - dup = false :: boolean(), - sys = false :: boolean(), %% $SYS flag - msgid :: mqtt_msgid(), - payload :: binary() -}). - --type mqtt_message() :: #mqtt_message{}. - diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src deleted file mode 100644 index c84d855f6..000000000 --- a/apps/emqtt/src/emqtt.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqtt, - [ - {description, "Erlang MQTT Common Library"}, - {vsn, git}, - {modules, []}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {env, []} - ]}. diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 63ca9a047..34aa3b4bd 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -36,6 +36,12 @@ -define(ERTS_MINIMUM, "6.0"). +%% System Topics. +-define(SYSTOP, <<"$SYS">>). + +%% Queue Topics. +-define(QTop, <<"$Q">>). + %%------------------------------------------------------------------------------ %% PubSub %%------------------------------------------------------------------------------ @@ -56,8 +62,8 @@ %%------------------------------------------------------------------------------ -record(mqtt_subscriber, { topic :: binary(), - qos = 0 :: 0 | 1 | 2, - pid :: pid() + subpid :: pid(), + qos = 0 :: 0 | 1 | 2 }). -type mqtt_subscriber() :: #mqtt_subscriber{}. @@ -94,13 +100,29 @@ -record(mqtt_session, { clientid, session_pid, - subscriptions = [], - awaiting_ack, - awaiting_rel + subscriptions = [] }). -type mqtt_session() :: #mqtt_session{}. +%%------------------------------------------------------------------------------ +%% MQTT Message +%%------------------------------------------------------------------------------ +-type mqtt_msgid() :: undefined | 1..16#ffff. + +-record(mqtt_message, { + topic :: binary(), %% The topic published to + from :: binary() | atom(), %% ClientId of publisher + qos = 0 :: 0 | 1 | 2, %% Message QoS + retain = false :: boolean(), %% Retain flag + dup = false :: boolean(), %% Dup flag + sys = false :: boolean(), %% $SYS flag + msgid :: mqtt_msgid(), %% Message ID + payload :: binary() %% Payload +}). + +-type mqtt_message() :: #mqtt_message{}. + %%------------------------------------------------------------------------------ %% MQTT Plugin %%------------------------------------------------------------------------------ diff --git a/apps/emqtt/include/emqtt_packet.hrl b/apps/emqttd/include/emqttd_protocol.hrl similarity index 91% rename from apps/emqtt/include/emqtt_packet.hrl rename to apps/emqttd/include/emqttd_protocol.hrl index 5a965e98c..5266fae54 100644 --- a/apps/emqtt/include/emqtt_packet.hrl +++ b/apps/emqttd/include/emqttd_protocol.hrl @@ -20,11 +20,35 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% MQTT Packet Header. +%%% MQTT Protocol Header. %%% %%% @end %%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% MQTT Protocol Version and Levels +%%------------------------------------------------------------------------------ +-define(MQTT_PROTO_V31, 3). +-define(MQTT_PROTO_V311, 4). + +-define(PROTOCOL_NAMES, [ + {?MQTT_PROTO_V31, <<"MQIsdp">>}, + {?MQTT_PROTO_V311, <<"MQTT">>}]). + +-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311. + +%%------------------------------------------------------------------------------ +%% MQTT QoS +%%------------------------------------------------------------------------------ + +-define(QOS_0, 0). %% At most once +-define(QOS_1, 1). %% At least once +-define(QOS_2, 2). %% Exactly once + +-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). + +-type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2. + %%------------------------------------------------------------------------------ %% Max ClientId Length. Why 1024? NiDongDe! %%------------------------------------------------------------------------------ @@ -199,4 +223,3 @@ -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). - diff --git a/apps/emqttd/include/emqttd_systop.hrl b/apps/emqttd/include/emqttd_systop.hrl deleted file mode 100644 index 9308ba682..000000000 --- a/apps/emqttd/include/emqttd_systop.hrl +++ /dev/null @@ -1,106 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% eMQTT System Topics. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - --define(SYSTOP, <<"$SYS">>). - -%%------------------------------------------------------------------------------ -%% $SYS Topics of Broker -%%------------------------------------------------------------------------------ --define(SYSTOP_BROKERS, [ - version, % Broker version - uptime, % Broker uptime - datetime, % Broker local datetime - sysdescr % Broker description -]). - -%%------------------------------------------------------------------------------ -%% $SYS Topics for Clients -%%------------------------------------------------------------------------------ --define(SYSTOP_CLIENTS, [ - 'clients/count', % clients connected current - 'clients/max' % max clients connected -]). - -%%------------------------------------------------------------------------------ -%% $SYS Topics for Sessions -%%------------------------------------------------------------------------------ --define(SYSTOP_SESSIONS, [ - 'sessions/count', - 'sessions/max' -]). - -%%------------------------------------------------------------------------------ -%% $SYS Topics for Subscribers -%%------------------------------------------------------------------------------ --define(SYSTOP_PUBSUB, [ - 'topics/count', % ... - 'topics/max', % ... - 'subscribers/count', % ... - 'subscribers/max', % ... - 'queues/count', % ... - 'queues/max' % ... -]). - -%%------------------------------------------------------------------------------ -%% Bytes sent and received of Broker -%%------------------------------------------------------------------------------ --define(SYSTOP_BYTES, [ - {counter, 'bytes/received'}, % Total bytes received - {counter, 'bytes/sent'} % Total bytes sent -]). - -%%------------------------------------------------------------------------------ -%% Packets sent and received of Broker -%%------------------------------------------------------------------------------ --define(SYSTOP_PACKETS, [ - {counter, 'packets/received'}, % All Packets received - {counter, 'packets/sent'}, % All Packets sent - {counter, 'packets/connect'}, % CONNECT Packets received - {counter, 'packets/connack'}, % CONNACK Packets sent - {counter, 'packets/publish/received'}, % PUBLISH packets received - {counter, 'packets/publish/sent'}, % PUBLISH packets sent - {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received - {counter, 'packets/suback'}, % SUBACK packets sent - {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received - {counter, 'packets/unsuback'}, % UNSUBACK Packets sent - {counter, 'packets/pingreq'}, % PINGREQ packets received - {counter, 'packets/pingresp'}, % PINGRESP Packets sent - {counter, 'packets/disconnect'} % DISCONNECT Packets received -]). - -%%------------------------------------------------------------------------------ -%% Messages sent and received of broker -%%------------------------------------------------------------------------------ --define(SYSTOP_MESSAGES, [ - {counter, 'messages/received'}, % Messages received - {counter, 'messages/sent'}, % Messages sent - {gauge, 'messages/retained/count'},% Messagea retained - {gauge, 'messages/stored/count'}, % Messages stored - {counter, 'messages/dropped'} % Messages dropped -]). - - diff --git a/apps/emqttd/include/emqttd_topic.hrl b/apps/emqttd/include/emqttd_topic.hrl deleted file mode 100644 index a98dc791a..000000000 --- a/apps/emqttd/include/emqttd_topic.hrl +++ /dev/null @@ -1,48 +0,0 @@ -%%------------------------------------------------------------------------------ -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ -%%% @doc -%%% emqtt topic header. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - -%%------------------------------------------------------------------------------ -%% MQTT Topic -%%------------------------------------------------------------------------------ --record(topic, { - name :: binary(), - node :: node() -}). - --type topic() :: #topic{}. - -%%------------------------------------------------------------------------------ -%% MQTT Topic Subscriber -%%------------------------------------------------------------------------------ --record(topic_subscriber, { - topic :: binary(), - qos = 0 :: 0 | 1 | 2, - subpid :: pid() -}). - --type topic_subscriber() :: #topic_subscriber{}. - diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index a492136cf..0b0fdc6ba 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -73,9 +73,9 @@ compile(who, {user, Username}) -> {user, bin(Username)}; compile(topic, {eq, Topic}) -> - {eq, emqtt_topic:words(bin(Topic))}; + {eq, emqttd_topic:words(bin(Topic))}; compile(topic, Topic) -> - Words = emqtt_topic:words(bin(Topic)), + Words = emqttd_topic:words(bin(Topic)), case 'pattern?'(Words) of true -> {pattern, Words}; false -> Words @@ -126,12 +126,12 @@ match_topics(_Client, _Topic, []) -> false; match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) -> TopicFilter = feed_var(Client, PatternFilter), - case match_topic(emqtt_topic:words(Topic), TopicFilter) of + case match_topic(emqttd_topic:words(Topic), TopicFilter) of true -> true; false -> match_topics(Client, Topic, Filters) end; match_topics(Client, Topic, [TopicFilter|Filters]) -> - case match_topic(emqtt_topic:words(Topic), TopicFilter) of + case match_topic(emqttd_topic:words(Topic), TopicFilter) of true -> true; false -> match_topics(Client, Topic, Filters) end. @@ -139,7 +139,7 @@ match_topics(Client, Topic, [TopicFilter|Filters]) -> match_topic(Topic, {eq, TopicFilter}) -> Topic =:= TopicFilter; match_topic(Topic, TopicFilter) -> - emqtt_topic:match(Topic, TopicFilter). + emqttd_topic:match(Topic, TopicFilter). feed_var(Client, Pattern) -> feed_var(Client, Pattern, []). diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 7f2d8345d..2129a6bdb 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -30,13 +30,13 @@ -include("emqttd.hrl"). --include_lib("emqtt/include/emqtt.hrl"). - --behaviour(gen_server). +-include("emqttd_protocol.hrl"). %% API Function Exports -export([start_link/3]). +-behaviour(gen_server). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 20bd27b45..e4deb6414 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -28,13 +28,7 @@ -author("Feng Lee "). --include("emqttd_systop.hrl"). - --include_lib("emqtt/include/emqtt.hrl"). - --behaviour(gen_server). - --define(SERVER, ?MODULE). +-include_lib("emqttd.hrl"). %% API Function Exports -export([start_link/0]). @@ -54,6 +48,10 @@ %% Tick API -export([start_tick/1, stop_tick/1]). +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -62,6 +60,14 @@ -record(state, {started_at, sys_interval, tick_tref}). +%% $SYS Topics of Broker +-define(SYSTOP_BROKERS, [ + version, % Broker version + uptime, % Broker uptime + datetime, % Broker local datetime + sysdescr % Broker description +]). + %%%============================================================================= %%% API %%%============================================================================= @@ -276,7 +282,7 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= create_topic(Topic) -> - emqttd_pubsub:create(emqtt_topic:systop(Topic)). + emqttd_pubsub:create(emqttd_topic:systop(Topic)). retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), @@ -288,12 +294,12 @@ retain(brokers) -> retain(Topic, Payload) when is_binary(Payload) -> publish(#mqtt_message{from = broker, retain = true, - topic = emqtt_topic:systop(Topic), + topic = emqttd_topic:systop(Topic), payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> publish( #mqtt_message{from = broker, - topic = emqtt_topic:systop(Topic), + topic = emqttd_topic:systop(Topic), payload = Payload}). publish(Msg) -> diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index cbcfffb4b..bd9f16414 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -28,9 +28,9 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). --include_lib("emqtt/include/emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). %% API Function Exports -export([start_link/2, info/1]). @@ -68,7 +68,7 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), SendFun = fun(Data) -> Transport:send(NewSock, Data) end, - ParserState = emqtt_parser:init(PacketOpts), + ParserState = emqttd_parser:init(PacketOpts), ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts), State = control_throttle(#state{transport = Transport, socket = NewSock, @@ -177,7 +177,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, parse_state = ParseState, proto_state = ProtoState, conn_name = ConnStr}) -> - case emqtt_parser:parse(Bytes, ParseState) of + case emqttd_parser:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, control_throttle(State #state{parse_state = ParseState1}), @@ -186,7 +186,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, received_stats(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(PacketOpts), + process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts), proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 2c5c1a24f..8b7afe54e 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -29,8 +29,7 @@ -author("Feng Lee "). -include("emqttd.hrl"). - --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd_protocol.hrl"). -import(proplists, [get_value/2, get_value/3]). @@ -122,7 +121,7 @@ validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); validate(topic, Topic) -> - emqtt_topic:validate({name, Topic}). + emqttd_topic:validate({name, Topic}). int(S) -> list_to_integer(S). diff --git a/apps/emqttd/src/emqttd_inflight.erl b/apps/emqttd/src/emqttd_inflight.erl index 1eb69de8f..8f3df986c 100644 --- a/apps/emqttd/src/emqttd_inflight.erl +++ b/apps/emqttd/src/emqttd_inflight.erl @@ -29,7 +29,7 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). -export([new/2, is_full/1, len/1, in/2, ack/2]). diff --git a/apps/emqtt/src/emqtt_message.erl b/apps/emqttd/src/emqttd_message.erl similarity index 98% rename from apps/emqtt/src/emqtt_message.erl rename to apps/emqttd/src/emqttd_message.erl index 61da1011e..d96e5d922 100644 --- a/apps/emqtt/src/emqtt_message.erl +++ b/apps/emqttd/src/emqttd_message.erl @@ -24,13 +24,13 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_message). +-module(emqttd_message). -author("Feng Lee "). --include("emqtt.hrl"). +-include("emqttd.hrl"). --include("emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). -export([from_packet/1, from_packet/2, to_packet/1]). diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 84350f939..f0d45ec02 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -28,9 +28,7 @@ -author("Feng Lee "). --include("emqttd_systop.hrl"). - --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). -behaviour(gen_server). @@ -48,9 +46,41 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {tick_tref}). + -define(METRIC_TAB, mqtt_metric). --record(state, {tick_tref}). +%% Bytes sent and received of Broker +-define(SYSTOP_BYTES, [ + {counter, 'bytes/received'}, % Total bytes received + {counter, 'bytes/sent'} % Total bytes sent +]). + +%% Packets sent and received of Broker +-define(SYSTOP_PACKETS, [ + {counter, 'packets/received'}, % All Packets received + {counter, 'packets/sent'}, % All Packets sent + {counter, 'packets/connect'}, % CONNECT Packets received + {counter, 'packets/connack'}, % CONNACK Packets sent + {counter, 'packets/publish/received'}, % PUBLISH packets received + {counter, 'packets/publish/sent'}, % PUBLISH packets sent + {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received + {counter, 'packets/suback'}, % SUBACK packets sent + {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received + {counter, 'packets/unsuback'}, % UNSUBACK Packets sent + {counter, 'packets/pingreq'}, % PINGREQ packets received + {counter, 'packets/pingresp'}, % PINGRESP Packets sent + {counter, 'packets/disconnect'} % DISCONNECT Packets received +]). + +%% Messages sent and received of broker +-define(SYSTOP_MESSAGES, [ + {counter, 'messages/received'}, % Messages received + {counter, 'messages/sent'}, % Messages sent + {gauge, 'messages/retained/count'},% Messagea retained + {gauge, 'messages/stored/count'}, % Messages stored + {counter, 'messages/dropped'} % Messages dropped +]). %%%============================================================================= %%% API @@ -204,6 +234,6 @@ create_metric({counter, Name}) -> [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. metric_topic(Metric) -> - emqtt_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). + emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl index 8f7af60ac..c5a1e136e 100644 --- a/apps/emqttd/src/emqttd_mod_autosub.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -29,12 +29,10 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). - --include_lib("emqtt/include/emqtt_packet.hrl"). - -include("emqttd.hrl"). +-include("emqttd_protocol.hrl"). + -behaviour(emqttd_gen_mod). -export([load/1, client_connected/3, unload/1]). @@ -48,7 +46,7 @@ load(Opts) -> {ok, #state{topics = Topics}}. client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> - F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, + F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; client_connected(_ConnAck, _Client, _Topics) -> diff --git a/apps/emqttd/src/emqttd_mod_presence.erl b/apps/emqttd/src/emqttd_mod_presence.erl index 74c94a1c9..fe5135c1e 100644 --- a/apps/emqttd/src/emqttd_mod_presence.erl +++ b/apps/emqttd/src/emqttd_mod_presence.erl @@ -28,8 +28,6 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). - -include("emqttd.hrl"). -export([load/1, unload/1]). @@ -75,9 +73,9 @@ unload(_Opts) -> topic(connected, ClientId) -> - emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); + emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); topic(disconnected, ClientId) -> - emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). + emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index fc038d95f..eaaf287ae 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -29,7 +29,7 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). -behaviour(emqttd_gen_mod). @@ -104,7 +104,7 @@ compile(Sections) -> match_topic(Topic, []) -> Topic; match_topic(Topic, [{topic, Filter, Rules} | Sections]) -> - case emqtt_topic:match(Topic, Filter) of + case emqttd_topic:match(Topic, Filter) of true -> match_rule(Topic, Rules); false -> diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index 6a37a707e..c9c7527be 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -52,7 +52,8 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). +-include("emqttd_protocol.hrl"). -export([new/2, name/1, is_empty/1, is_full/1, @@ -62,8 +63,6 @@ -define(HIGH_WM, 0.6). --define(MAX_LEN, 1000). - -record(mqueue, {name, q = queue:new(), %% pending queue len = 0, %% current queue len @@ -89,7 +88,7 @@ %%------------------------------------------------------------------------------ -spec new(binary(), list(mqueue_option())) -> mqueue(). new(Name, Opts) -> - MaxLen = emqttd_opts:g(max_length, Opts, ?MAX_LEN), + MaxLen = emqttd_opts:g(max_length, Opts, 1000), #mqueue{name = Name, max_len = MaxLen, low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)), diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index c0b37cbc8..f304d0aad 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -28,7 +28,7 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). %% Mnesia callbacks -export([mnesia/1]). @@ -74,7 +74,7 @@ retain(Msg = #mqtt_message{topic = Topic, TabSize = mnesia:table_info(message, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> - lager:debug("Retained ~s", [emqtt_message:format(Msg)]), + lager:debug("Retained ~s", [emqttd_message:format(Msg)]), mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]), emqttd_metrics:set('messages/retained/count', mnesia:table_info(message, size)); @@ -106,12 +106,12 @@ env() -> Topic :: binary(), CPid :: pid(). redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> - case emqtt_topic:wildcard(Topic) of + case emqttd_topic:wildcard(Topic) of false -> dispatch(CPid, mnesia:dirty_read(message, Topic)); true -> Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> - case emqtt_topic:match(Name, Topic) of + case emqttd_topic:match(Name, Topic) of true -> [Msg|Acc]; false -> Acc end diff --git a/apps/emqtt/src/emqtt_packet.erl b/apps/emqttd/src/emqttd_packet.erl similarity index 98% rename from apps/emqtt/src/emqtt_packet.erl rename to apps/emqttd/src/emqttd_packet.erl index f2b001236..6cb6b415d 100644 --- a/apps/emqtt/src/emqtt_packet.erl +++ b/apps/emqttd/src/emqttd_packet.erl @@ -24,13 +24,13 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_packet). +-module(emqttd_packet). -author("Feng Lee "). --include("emqtt.hrl"). +-include("emqttd.hrl"). --include("emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). %% API -export([protocol_name/1, type_name/1, connack_name/1]). diff --git a/apps/emqtt/src/emqtt_parser.erl b/apps/emqttd/src/emqttd_parser.erl similarity index 99% rename from apps/emqtt/src/emqtt_parser.erl rename to apps/emqttd/src/emqttd_parser.erl index 387f041d4..bcc9fa1b4 100644 --- a/apps/emqtt/src/emqtt_parser.erl +++ b/apps/emqttd/src/emqttd_parser.erl @@ -24,13 +24,13 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_parser). +-module(emqttd_parser). -author("Feng Lee "). --include("emqtt.hrl"). +-include("emqttd.hrl"). --include("emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). %% API -export([init/1, parse/2]). diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 086bd6be3..2c4f9568c 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -28,12 +28,10 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). - --include_lib("emqtt/include/emqtt_packet.hrl"). - -include("emqttd.hrl"). +-include("emqttd_protocol.hrl"). + %% API -export([init/3, info/1, clientid/1, client/1]). @@ -259,18 +257,18 @@ handle(?PACKET(?DISCONNECT), State) -> {stop, normal, State#proto_state{will_msg = undefined}}. do_publish(Session, ClientId, Packet) -> - Msg = emqtt_message:from_packet(ClientId, Packet), + Msg = emqttd_message:from_packet(ClientId, Packet), Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg), emqttd_session:publish(Session, Msg1). -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. send(Msg, State) when is_record(Msg, mqtt_message) -> - send(emqtt_message:to_packet(Msg), State); + send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), sent_stats(Packet), - Data = emqtt_serialiser:serialise(Packet), + Data = emqttd_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/sent', size(Data)), SendFun(Data), @@ -278,11 +276,11 @@ send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> lager:info([{client, ClientId}], "RECV from ~s@~s: ~s", - [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); + [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]); trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> lager:info([{client, ClientId}], "SEND to ~s@~s: ~s", - [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). + [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]). %% @doc redeliver PUBREL PacketId redeliver({?PUBREL, PacketId}, State) -> @@ -310,7 +308,7 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]). willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> - emqtt_message:from_packet(Packet). + emqttd_message:from_packet(Packet). %% generate a clientId clientid(undefined, State) -> @@ -366,7 +364,7 @@ validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, variable = #mqtt_packet_publish{topic_name = Topic}}) -> - case emqtt_topic:validate({name, Topic}) of + case emqttd_topic:validate({name, Topic}) of true -> ok; false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} end; @@ -390,7 +388,7 @@ validate_topics(Type, []) when Type =:= name orelse Type =:= filter -> validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter -> ErrTopics = [Topic || {Topic, Qos} <- Topics, - not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))], + not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))], case ErrTopics of [] -> ok; _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic} diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 51fcc3c1e..1e1304d1a 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -31,9 +31,7 @@ -include("emqttd.hrl"). --include_lib("emqtt/include/emqtt.hrl"). - --include_lib("emqtt/include/emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). %% Mnesia Callbacks -export([mnesia/1]). @@ -165,7 +163,7 @@ publish(From, #mqtt_message{topic=Topic} = Msg) -> case emqttd_msg_store:retain(Msg) of ok -> %TODO: why unset 'retain' flag? - publish(From, Topic, emqtt_message:unset_flag(Msg)); + publish(From, Topic, emqttd_message:unset_flag(Msg)); ignore -> publish(From, Topic, Msg) end. @@ -197,7 +195,7 @@ dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) -> Subscribers = mnesia:dirty_read(subscriber, Topic), setstats(dropped, Subscribers =:= []), %%TODO:... lists:foreach( - fun(#mqtt_subscriber{qos = SubQos, pid=SubPid}) -> + fun(#mqtt_subscriber{subpid=SubPid, qos = SubQos}) -> Msg1 = if Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; true -> Msg @@ -226,7 +224,7 @@ handle_call({subscribe, SubPid, Topics}, _From, State) -> #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos}; ({Topic, Qos}) -> {#mqtt_topic{topic = Topic, node = node()}, - #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}} + #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}} end, Topics), F = fun() -> lists:map(fun(QueueR) when is_record(QueueR, mqtt_queue) -> @@ -261,7 +259,7 @@ handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State) handle_call({subscribe, SubPid, Topic, Qos}, _From, State) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, - Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}, + Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}, case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of {atomic, ok} -> setstats(all), @@ -280,7 +278,7 @@ handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> #mqtt_queue{name = Queue, subpid = SubPid}; (Topic) -> {#mqtt_topic{topic = Topic, node = node()}, - #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}} + #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}} end, Topics), F = fun() -> lists:foreach( @@ -309,7 +307,7 @@ handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) -> handle_cast({unsubscribe, SubPid, Topic}, State) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, - Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}, + Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}, case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of {atomic, _} -> ok; {aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error]) @@ -333,7 +331,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa end, Queues), %% remove subscribers... - Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid), + Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid), lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> mnesia:delete_object(subscriber, Sub, write), try_remove_topic(#mqtt_topic{topic = Topic, node = Node}) @@ -387,12 +385,12 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> end. %% Fix issue #53 - Remove Overlapping Subscriptions -add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}) +add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}}) when is_record(TopicR, mqtt_topic) -> case add_topic(TopicR) of ok -> OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos} - <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid), + <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.subpid), SubTopic =:= Topic, SubQos =/= Qos], %% remove overlapping subscribers diff --git a/apps/emqtt/src/emqtt_serialiser.erl b/apps/emqttd/src/emqttd_serialiser.erl similarity index 98% rename from apps/emqtt/src/emqtt_serialiser.erl rename to apps/emqttd/src/emqttd_serialiser.erl index 60d1f6dfa..e109c19a3 100644 --- a/apps/emqtt/src/emqtt_serialiser.erl +++ b/apps/emqttd/src/emqttd_serialiser.erl @@ -24,13 +24,13 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_serialiser). +-module(emqttd_serialiser). -author("Feng Lee "). --include("emqtt.hrl"). +-include("emqttd.hrl"). --include("emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). %% API -export([serialise/1]). diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 4cd480165..d4e8f081d 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -49,9 +49,7 @@ -include("emqttd.hrl"). --include_lib("emqtt/include/emqtt.hrl"). - --include_lib("emqtt/include/emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). %% Session API -export([start_link/3, resume/3, destroy/2]). diff --git a/apps/emqttd/src/emqttd_stats.erl b/apps/emqttd/src/emqttd_stats.erl index 45cec4ffe..1f005369e 100644 --- a/apps/emqttd/src/emqttd_stats.erl +++ b/apps/emqttd/src/emqttd_stats.erl @@ -28,16 +28,14 @@ -author("Feng Lee "). --include("emqttd_systop.hrl"). +-include("emqttd.hrl"). --include_lib("emqtt/include/emqtt.hrl"). +-export([start_link/0]). -behaviour(gen_server). -define(SERVER, ?MODULE). --export([start_link/0]). - %% statistics API. -export([statsfun/1, statsfun/2, getstats/0, getstat/1, @@ -47,9 +45,31 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {tick_tref}). + -define(STATS_TAB, mqtt_stats). --record(state, {tick_tref}). +%% $SYS Topics for Clients +-define(SYSTOP_CLIENTS, [ + 'clients/count', % clients connected current + 'clients/max' % max clients connected +]). + +%% $SYS Topics for Sessions +-define(SYSTOP_SESSIONS, [ + 'sessions/count', + 'sessions/max' +]). + +%% $SYS Topics for Subscribers +-define(SYSTOP_PUBSUB, [ + 'topics/count', % ... + 'topics/max', % ... + 'subscribers/count', % ... + 'subscribers/max', % ... + 'queues/count', % ... + 'queues/max' % ... +]). %%%============================================================================= %%% API @@ -159,5 +179,5 @@ publish(Stat, Val) -> payload = emqttd_util:integer_to_binary(Val)}). stats_topic(Stat) -> - emqtt_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). + emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqttd/src/emqttd_topic.erl similarity index 99% rename from apps/emqtt/src/emqtt_topic.erl rename to apps/emqttd/src/emqttd_topic.erl index 0fa183614..13e9d55a3 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqttd/src/emqttd_topic.erl @@ -24,7 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_topic). +-module(emqttd_topic). -author("Feng Lee "). diff --git a/apps/emqttd/src/emqttd_trie.erl b/apps/emqttd/src/emqttd_trie.erl index 9beaa339a..30e6d69d2 100644 --- a/apps/emqttd/src/emqttd_trie.erl +++ b/apps/emqttd/src/emqttd_trie.erl @@ -102,7 +102,7 @@ insert(Topic) when is_binary(Topic) -> mnesia:write(TrieNode#trie_node{topic=Topic}); [] -> %add trie path - [add_path(Triple) || Triple <- emqtt_topic:triples(Topic)], + [add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], %add last node mnesia:write(#trie_node{node_id=Topic, topic=Topic}) end. @@ -113,7 +113,7 @@ insert(Topic) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec find(Topic :: binary()) -> list(MatchedTopic :: binary()). find(Topic) when is_binary(Topic) -> - TrieNodes = match_node(root, emqtt_topic:words(Topic), []), + TrieNodes = match_node(root, emqttd_topic:words(Topic), []), [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined]. %%------------------------------------------------------------------------------ @@ -125,7 +125,7 @@ delete(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of [#trie_node{edge_count=0}] -> mnesia:delete({trie_node, Topic}), - delete_path(lists:reverse(emqtt_topic:triples(Topic))); + delete_path(lists:reverse(emqttd_topic:triples(Topic))); [TrieNode] -> mnesia:write(TrieNode#trie_node{topic=Topic}); [] -> diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl index 25dc334c7..0270d7d38 100644 --- a/apps/emqttd/src/emqttd_ws_client.erl +++ b/apps/emqttd/src/emqttd_ws_client.erl @@ -29,30 +29,24 @@ -author("Feng Lee "). --include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). --include_lib("emqtt/include/emqtt_packet.hrl"). - --behaviour(gen_server). +-include("emqttd_protocol.hrl"). %% API Exports -export([start_link/1, ws_loop/3]). +-behaviour(gen_server). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% WebSocket loop state --record(wsocket_state, {request, - client_pid, - packet_opts, - parser_state}). +%% WebSocket Loop State +-record(wsocket_state, {request, client_pid, packet_opts, parser_state}). -%% Client state --record(client_state, {ws_pid, - request, - proto_state, - keepalive}). +%% Client State +-record(client_state, {ws_pid, request, proto_state, keepalive}). %%------------------------------------------------------------------------------ %% @doc Start WebSocket client. @@ -65,7 +59,7 @@ start_link(Req) -> ReentryWs(#wsocket_state{request = Req, client_pid = ClientPid, packet_opts = PktOpts, - parser_state = emqtt_parser:init(PktOpts)}). + parser_state = emqttd_parser:init(PktOpts)}). %%------------------------------------------------------------------------------ %% @private @@ -88,7 +82,7 @@ ws_loop(Data, State = #wsocket_state{request = Req, parser_state = ParserState}, ReplyChannel) -> Peer = Req:get(peer), lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), - case emqtt_parser:parse(iolist_to_binary(Data), ParserState) of + case emqttd_parser:parse(iolist_to_binary(Data), ParserState) of {more, ParserState1} -> State#wsocket_state{parser_state = ParserState1}; {ok, Packet, Rest} -> @@ -100,7 +94,7 @@ ws_loop(Data, State = #wsocket_state{request = Req, end. reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser_state = emqtt_parser:init(PktOpts)}. + State#wsocket_state{parser_state = emqttd_parser:init(PktOpts)}. %%%============================================================================= %%% gen_fsm callbacks diff --git a/apps/emqtt/test/emqtt_parser_tests.erl b/apps/emqttd/test/emqttd_parser_tests.erl similarity index 92% rename from apps/emqtt/test/emqtt_parser_tests.erl rename to apps/emqttd/test/emqttd_parser_tests.erl index 6cb3c9286..fb15019eb 100644 --- a/apps/emqtt/test/emqtt_parser_tests.erl +++ b/apps/emqttd/test/emqttd_parser_tests.erl @@ -20,21 +20,20 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt_parser tests. +%%% emqttd_parser tests. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_parser_tests). +-module(emqttd_parser_tests). --include("emqtt.hrl"). --include("emqtt_packet.hrl"). +-include("emqttd_protocol.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). parse_connect_test() -> - State = emqtt_parser:init([]), + State = emqttd_parser:init([]), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, ?assertMatch({ok, #mqtt_packet{ @@ -46,7 +45,7 @@ parse_connect_test() -> proto_name = <<"MQIsdp">>, clientid = <<"mosqpub/10451-iMac.loca">>, clean_sess = true, - keep_alive = 60}}, <<>>}, emqtt_parser:parse(V31ConnBin, State)), + keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, ?assertMatch({ok, #mqtt_packet{ @@ -58,7 +57,7 @@ parse_connect_test() -> proto_name = <<"MQTT">>, clientid = <<"mosqpub/10451-iMac.loca">>, clean_sess = true, - keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnBin, State)), + keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60) V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, @@ -71,7 +70,7 @@ parse_connect_test() -> proto_name = <<"MQTT">>, clientid = <<>>, clean_sess = true, - keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnWithoutClientId, State)), + keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)), %%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg)) ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>, ?assertMatch({ok, #mqtt_packet{ @@ -91,11 +90,11 @@ parse_connect_test() -> will_msg = <<"willmsg">> , username = <<"test">>, password = <<"public">>}}, - <<>> }, emqtt_parser:parse(ConnBinWithWill, State)), + <<>> }, emqttd_parser:parse(ConnBinWithWill, State)), ok. parse_publish_test() -> - State = emqtt_parser:init([]), + State = emqttd_parser:init([]), %%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>) PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>, ?assertMatch({ok, #mqtt_packet{ @@ -105,7 +104,7 @@ parse_publish_test() -> retain = false}, variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>, packet_id = 1}, - payload = <<"hahah">> }, <<>>}, emqtt_parser:parse(PubBin, State)), + payload = <<"hahah">> }, <<>>}, emqttd_parser:parse(PubBin, State)), %PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>) %DISCONNECT(Qos=0, Retain=false, Dup=false) @@ -117,13 +116,13 @@ parse_publish_test() -> retain = false}, variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>, packet_id = undefined}, - payload = <<"hello">> }, <<224,0>>}, emqtt_parser:parse(PubBin1, State)), + payload = <<"hello">> }, <<224,0>>}, emqttd_parser:parse(PubBin1, State)), ?assertMatch({ok, #mqtt_packet{ header = #mqtt_packet_header{type = ?DISCONNECT, dup = false, qos = 0, retain = false} - }, <<>>}, emqtt_parser:parse(<<224, 0>>, State)). + }, <<>>}, emqttd_parser:parse(<<224, 0>>, State)). parse_puback_test() -> %%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1) @@ -133,7 +132,7 @@ parse_puback_test() -> dup = false, qos = 0, retain = false } - }, <<>>}, emqtt_parser:parse(PubAckBin, emqtt_parser:init([]))), + }, <<>>}, emqttd_parser:parse(PubAckBin, emqttd_parser:init([]))), ok. parse_subscribe_test() -> @@ -150,7 +149,7 @@ parse_disconnect_test() -> dup = false, qos = 0, retain = false} - }, <<>>}, emqtt_parser:parse(Bin, emqtt_parser:init([]))). + }, <<>>}, emqttd_parser:parse(Bin, emqttd_parser:init([]))). -endif. diff --git a/apps/emqtt/test/emqtt_serialiser_tests.erl b/apps/emqttd/test/emqttd_serialiser_tests.erl similarity index 67% rename from apps/emqtt/test/emqtt_serialiser_tests.erl rename to apps/emqttd/test/emqttd_serialiser_tests.erl index 5cec36a18..1f3a2b4c0 100644 --- a/apps/emqtt/test/emqtt_serialiser_tests.erl +++ b/apps/emqttd/test/emqttd_serialiser_tests.erl @@ -20,58 +20,59 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt_serialiser tests. +%%% emqttd_serialiser tests. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqtt_serialiser_tests). - --include("emqtt.hrl"). --include("emqtt_packet.hrl"). +-module(emqttd_serialiser_tests). -ifdef(TEST). +-include("emqttd_protocol.hrl"). + -include_lib("eunit/include/eunit.hrl"). +-import(emqttd_serialiser, [serialise/1]). + serialise_connect_test() -> - emqtt_serialiser:serialise(?CONNECT_PACKET(#mqtt_packet_connect{})). + serialise(?CONNECT_PACKET(#mqtt_packet_connect{})). serialise_connack_test() -> ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}}, - ?assertEqual(<<32,2,0,0>>, emqtt_serialiser:serialise(ConnAck)). + ?assertEqual(<<32,2,0,0>>, serialise(ConnAck)). serialise_publish_test() -> - emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)), - emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)). + serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)), + serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)). serialise_puback_test() -> - emqtt_serialiser:serialise(?PUBACK_PACKET(?PUBACK, 10384)). + serialise(?PUBACK_PACKET(?PUBACK, 10384)). serialise_pubrel_test() -> - emqtt_serialiser:serialise(?PUBREL_PACKET(10384)). + serialise(?PUBREL_PACKET(10384)). serialise_subscribe_test() -> TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}], - emqtt_serialiser:serialise(?SUBSCRIBE_PACKET(10, TopicTable)). + serialise(?SUBSCRIBE_PACKET(10, TopicTable)). serialise_suback_test() -> - emqtt_serialiser:serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])). + serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])). serialise_unsubscribe_test() -> - emqtt_serialiser:serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])). + serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])). serialise_unsuback_test() -> - emqtt_serialiser:serialise(?UNSUBACK_PACKET(10)). + serialise(?UNSUBACK_PACKET(10)). serialise_pingreq_test() -> - emqtt_serialiser:serialise(?PACKET(?PINGREQ)). + serialise(?PACKET(?PINGREQ)). serialise_pingresp_test() -> - emqtt_serialiser:serialise(?PACKET(?PINGRESP)). + serialise(?PACKET(?PINGRESP)). serialise_disconnect_test() -> - emqtt_serialiser:serialise(?PACKET(?DISCONNECT)). + serialise(?PACKET(?DISCONNECT)). -endif. diff --git a/apps/emqtt/test/emqtt_topic_tests.erl b/apps/emqttd/test/emqttd_topic_tests.erl similarity index 92% rename from apps/emqtt/test/emqtt_topic_tests.erl rename to apps/emqttd/test/emqttd_topic_tests.erl index c4fb75377..a21b6c520 100644 --- a/apps/emqtt/test/emqtt_topic_tests.erl +++ b/apps/emqttd/test/emqttd_topic_tests.erl @@ -19,14 +19,14 @@ %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %% SOFTWARE. %%------------------------------------------------------------------------------ --module(emqtt_topic_tests). - --import(emqtt_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]). +-module(emqttd_topic_tests). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-import(emqttd_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]). + -define(N, 100000). validate_test() -> @@ -116,11 +116,11 @@ words_test() -> ok. feed_var_test() -> - ?assertEqual(<<"$Q/client/clientId">>, emqtt_topic:feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)). + ?assertEqual(<<"$Q/client/clientId">>, emqttd_topic:feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)). join_test() -> - ?assertEqual(<<"/ab/cd/ef/">>, emqtt_topic:join(words(<<"/ab/cd/ef/">>))), - ?assertEqual(<<"ab/+/#">>, emqtt_topic:join(words(<<"ab/+/#">>))). + ?assertEqual(<<"/ab/cd/ef/">>, emqttd_topic:join(words(<<"/ab/cd/ef/">>))), + ?assertEqual(<<"ab/+/#">>, emqttd_topic:join(words(<<"ab/+/#">>))). -endif. diff --git a/rel/reltool.config b/rel/reltool.config index 2dbb76ea5..497676d87 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -25,7 +25,6 @@ gproc, esockd, mochiweb, - {emqtt, load}, emqttd ]}, {rel, "start_clean", "", @@ -62,7 +61,6 @@ {app, gproc, [{incl_cond, include}]}, {app, esockd, [{mod_cond, app}, {incl_cond, include}]}, {app, mochiweb, [{mod_cond, app}, {incl_cond, include}]}, - {app, emqtt, [{mod_cond, app}, {incl_cond, include}]}, {app, emqttd, [{mod_cond, app}, {incl_cond, include}]} ]}.