diff --git a/include/emqx.hrl b/include/emqx.hrl index 433e733ff..386136a9c 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,6 +26,16 @@ %%-define(ERTS_MINIMUM, "9.0"). +%%-------------------------------------------------------------------- +%% Sys/Queue/Share Topics' Prefix +%%-------------------------------------------------------------------- + +-define(SYSTOP, <<"$SYS/">>). %% System Topic + +-define(QUEUE, <<"$queue/">>). %% Queue Topic + +-define(SHARE, <<"$share/">>). %% Shared Topic + %%-------------------------------------------------------------------- %% Message and Delivery %%-------------------------------------------------------------------- @@ -34,7 +44,8 @@ -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). --type(message_from() :: #{node := atom(), +-type(message_from() :: #{zone := atom(), + node := atom(), clientid := binary(), protocol := protocol(), connector => atom(), @@ -60,7 +71,7 @@ from :: message_from(), %% Message from sender :: pid(), %% The pid of the sender/publisher flags :: message_flags(), %% Message flags - headers :: message_headers() %% Message headers + headers :: message_headers(), %% Message headers topic :: binary(), %% Message topic properties :: map(), %% Message user properties payload :: binary(), %% Message payload @@ -70,24 +81,12 @@ -type(message() :: #message{}). -record(delivery, - { %sender :: pid(), %% The pid of the sender/publisher - message :: message(), %% Message + { message :: message(), flows :: list() }). -type(delivery() :: #delivery{}). - -%%-------------------------------------------------------------------- -%% Sys/Queue/Share Topics' Prefix -%%-------------------------------------------------------------------- - --define(SYSTOP, <<"$SYS/">>). %% System Topic - --define(QUEUE, <<"$queue/">>). %% Queue Topic - --define(SHARE, <<"$share/">>). %% Shared Topic - %%-------------------------------------------------------------------- %% PubSub %%-------------------------------------------------------------------- @@ -97,20 +96,16 @@ -define(PS(PS), (PS =:= publish orelse PS =:= subscribe)). %%-------------------------------------------------------------------- -%% MQTT Topic +%% Subscription %%-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% MQTT Subscription -%%-------------------------------------------------------------------- - --record(mqtt_subscription, - { subid :: binary() | atom(), - topic :: binary(), - qos :: 0 | 1 | 2 +-record(subscription, + { subid :: binary() | atom(), + topic :: binary(), + subopts :: list() }). --type(mqtt_subscription() :: #mqtt_subscription{}). +-type(subscription() :: #subscription{}). %%-------------------------------------------------------------------- %% MQTT Client @@ -149,57 +144,6 @@ -type(mqtt_session() :: #mqtt_session{}). -%%-------------------------------------------------------------------- -%% MQTT Message -%%-------------------------------------------------------------------- - --type(mqtt_msg_id() :: binary() | undefined). - --type(mqtt_pktid() :: 1..16#ffff | undefined). - --type(mqtt_msg_from() :: atom() | {binary(), undefined | binary()}). - --record(mqtt_message, - { %% Global unique message ID - id :: mqtt_msg_id(), - %% PacketId - pktid :: mqtt_pktid(), - %% ClientId and Username - from :: mqtt_msg_from(), - %% Topic that the message is published to - topic :: binary(), - %% Message QoS - qos = 0 :: 0 | 1 | 2, - %% Message Flags - flags = [] :: [retain | dup | sys], - %% Retain flag - retain = false :: boolean(), - %% Dup flag - dup = false :: boolean(), - %% $SYS flag - sys = false :: boolean(), - %% Headers - headers = [] :: list(), - %% Payload - payload :: binary(), - %% Timestamp - timestamp :: erlang:timestamp() - }). - --type(mqtt_message() :: #mqtt_message{}). - -%%-------------------------------------------------------------------- -%% MQTT Delivery -%%-------------------------------------------------------------------- - --record(mqtt_delivery, - { sender :: pid(), %% Pid of the sender/publisher - message :: mqtt_message(), %% Message - flows :: list() - }). - --type(mqtt_delivery() :: #mqtt_delivery{}). - %%-------------------------------------------------------------------- %% Route %%-------------------------------------------------------------------- @@ -254,10 +198,10 @@ -type(plugin() :: #plugin{}). %%-------------------------------------------------------------------- -%% MQTT CLI Command. For example: 'broker metrics' +%% Command %%-------------------------------------------------------------------- --record(mqtt_cli, { name, action, args = [], opts = [], usage, descr }). +-record(command, { name, action, args = [], opts = [], usage, descr }). --type(mqtt_cli() :: #mqtt_cli{}). +-type(command() :: #command{}). diff --git a/include/emqx_internal.hrl b/include/emqx_internal.hrl deleted file mode 100644 index 2241c2fc8..000000000 --- a/include/emqx_internal.hrl +++ /dev/null @@ -1,54 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% Internal Header File - --define(GPROC_POOL(JoinOrLeave, Pool, Id), - (begin - case JoinOrLeave of - join -> gproc_pool:connect_worker(Pool, {Pool, Id}); - leave -> gproc_pool:disconnect_worker(Pool, {Pool, Id}) - end - end)). - --define(PROC_NAME(M, I), (list_to_atom(lists:concat([M, "_", I])))). - --define(UNEXPECTED_REQ(Req, State), - (begin - lager:error("[~s] Unexpected Request: ~p", [?MODULE, Req]), - {reply, {error, unexpected_request}, State} - end)). - --define(UNEXPECTED_MSG(Msg, State), - (begin - lager:error("[~s] Unexpected Message: ~p", [?MODULE, Msg]), - {noreply, State} - end)). - --define(UNEXPECTED_INFO(Info, State), - (begin - lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), - {noreply, State} - end)). - --define(IF(Cond, TrueFun, FalseFun), - (case (Cond) of - true -> (TrueFun); - false-> (FalseFun) - end)). - --define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]). - diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index a4946cc16..62ee0e7d3 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -311,10 +311,10 @@ #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, variable = #mqtt_packet_puback{packet_id = PacketId}}). --define(SUBSCRIBE_PACKET(PacketId, TopicTable), +-define(SUBSCRIBE_PACKET(PacketId, TopicFilters), #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1}, - variable = #mqtt_packet_subscribe{packet_id = PacketId, - topic_table = TopicTable}}). + variable = #mqtt_packet_subscribe{packet_id = PacketId, + topic_filters = TopicFilters}}). -define(SUBACK_PACKET(PacketId, ReasonCodes), #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, @@ -337,3 +337,52 @@ -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). +%%-------------------------------------------------------------------- +%% MQTT Message +%%-------------------------------------------------------------------- + +-type(mqtt_msg_id() :: binary() | undefined). + +-type(mqtt_msg_from() :: atom() | {binary(), undefined | binary()}). + +-record(mqtt_message, + { %% Global unique message ID + id :: mqtt_msg_id(), + %% PacketId + packet_id :: mqtt_packet_id(), + %% ClientId and Username + from :: mqtt_msg_from(), + %% Topic that the message is published to + topic :: binary(), + %% Message QoS + qos = 0 :: mqtt_qos(), + %% Message Flags + flags = [] :: [retain | dup | sys], + %% Retain flag + retain = false :: boolean(), + %% Dup flag + dup = false :: boolean(), + %% $SYS flag + sys = false :: boolean(), + %% Headers + headers = [] :: list(), + %% Payload + payload :: binary(), + %% Timestamp + timestamp :: erlang:timestamp() + }). + +-type(mqtt_message() :: #mqtt_message{}). + +%%-------------------------------------------------------------------- +%% MQTT Delivery +%%-------------------------------------------------------------------- + +-record(mqtt_delivery, + { sender :: pid(), + message :: mqtt_message(), + flows :: list() + }). + +-type(mqtt_delivery() :: #mqtt_delivery{}). + diff --git a/src/emqx.erl b/src/emqx.erl index 6142cfa7d..73a750894 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -14,12 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc EMQ X Main Module. - -module(emqx). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 409b98065..05d3cbacb 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -18,8 +18,6 @@ -behaviour(gen_server). --author("Feng Lee "). - -include("emqx.hrl"). %% API Function Exports diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 39cae031e..9c1d3f4d1 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_access_rule). --author("Feng Lee "). - -include("emqx.hrl"). -type(who() :: all | binary() | diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 3353176c5..2cd1e5acb 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,12 +18,8 @@ -behaviour(emqx_acl_mod). --author("Feng Lee "). - -include("emqx.hrl"). --include("emqx_cli.hrl"). - -export([all_rules/0]). %% ACL callbacks @@ -116,7 +112,7 @@ reload_acl(#state{config = undefined}) -> reload_acl(State) -> case catch load_rules_from_file(State) of {'EXIT', Error} -> {error, Error}; - true -> ?PRINT("~s~n", ["reload acl_internal successfully"]), ok + true -> io:format("~s~n", ["reload acl_internal successfully"]), ok end. %% @doc ACL Module Description diff --git a/src/emqx_acl_mod.erl b/src/emqx_acl_mod.erl index fcdaf012d..9da3396b4 100644 --- a/src/emqx_acl_mod.erl +++ b/src/emqx_acl_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_acl_mod). --author("Feng Lee "). - -include("emqx.hrl"). %%-------------------------------------------------------------------- diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index f82ed5f2d..1688e7bb2 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_alarm). --author("Feng Lee "). - -behaviour(gen_event). -include("emqx.hrl"). @@ -88,17 +86,25 @@ handle_event({set_alarm, Alarm = #alarm{id = AlarmId, title = Title, summary = Summary}}, Alarms)-> TS = os:timestamp(), - Json = mochijson2:encode([{id, AlarmId}, - {severity, Severity}, - {title, iolist_to_binary(Title)}, - {summary, iolist_to_binary(Summary)}, - {ts, emqx_time:now_secs(TS)}]), - emqx:publish(alarm_msg(alert, AlarmId, Json)), + case catch emqx_json:encode([{id, AlarmId}, + {severity, Severity}, + {title, iolist_to_binary(Title)}, + {summary, iolist_to_binary(Summary)}, + {ts, emqx_time:now_secs(TS)}]) of + {'EXIT', Reason} -> + lager:error("Failed to encode set_alarm: ~p", [Reason]); + JSON -> + emqx_broker:publish(alarm_msg(alert, AlarmId, JSON)) + end, {ok, [Alarm#alarm{timestamp = TS} | Alarms]}; handle_event({clear_alarm, AlarmId}, Alarms) -> - Json = mochijson2:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]), - emqx:publish(alarm_msg(clear, AlarmId, Json)), + case catch emqx_json:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of + {'EXIT', Reason} -> + lager:error("Failed to encode clear_alarm: ~p", [Reason]); + JSON -> + emqx_broker:publish(alarm_msg(clear, AlarmId, JSON)) + end, {ok, lists:keydelete(AlarmId, 2, Alarms), hibernate}; handle_event(_, Alarms)-> diff --git a/src/emqx_app.erl b/src/emqx_app.erl index ab83e870b..48648d64e 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,10 +18,6 @@ -behaviour(application). --author("Feng Lee "). - --include("emqx_cli.hrl"). - -include("emqx_mqtt.hrl"). %% Application callbacks @@ -54,11 +50,11 @@ stop(_State) -> %%-------------------------------------------------------------------- print_banner() -> - ?PRINT("Starting ~s on node ~s~n", [?APP, node()]). + io:format("Starting ~s on node ~s~n", [?APP, node()]). print_vsn() -> {ok, Vsn} = application:get_key(vsn), - ?PRINT("~s ~s is running now!~n", [?APP, Vsn]). + io:format("~s ~s is running now!~n", [?APP, Vsn]). %%-------------------------------------------------------------------- %% Register default ACL File diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index b72494a72..658eea9de 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_auth_mod). --author("Feng Lee "). - -include("emqx.hrl"). -export([passwd_hash/2]). diff --git a/src/emqx_boot.erl b/src/emqx_boot.erl index 6a2d76b8c..8d103f3f0 100644 --- a/src/emqx_boot.erl +++ b/src/emqx_boot.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index e9858793f..2ef5a16a2 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -22,8 +22,6 @@ -include("emqx_mqtt.hrl"). --include("emqx_internal.hrl"). - %% API Function Exports -export([start_link/5]). @@ -104,10 +102,12 @@ qname(Node, Topic) -> iolist_to_binary(["Bridge:", Node, ":", Topic]). handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]), + {reply, ignore, State}. handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), + {noreply, State}. handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) -> {noreply, State#state{mqueue = emqx_mqueue:in(Msg, MQ)}}; @@ -148,7 +148,8 @@ handle_info({'EXIT', _Pid, normal}, State) -> {noreply, State}; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), + {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> gproc_pool:disconnect_worker(Pool, {Pool, Id}). diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index 668023319..5e0deed34 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_bridge_sup). --author("Feng Lee "). - -export([start_link/3]). %%-------------------------------------------------------------------- diff --git a/src/emqx_bridge_sup_sup.erl b/src/emqx_bridge_sup_sup.erl index 91deec7dd..762a5e8a4 100644 --- a/src/emqx_bridge_sup_sup.erl +++ b/src/emqx_bridge_sup_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ -behavior(supervisor). --author("Feng Lee "). - -export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]). -export([init/1]). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 3bb9a1835..36b174404 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ -behaviour(gen_server). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_internal.hrl"). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 4956b199d..2c186571b 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 229e474bb..bbaf0d855 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_config.erl b/src/emqx_config.erl index e7fca24a4..c9d71feef 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_conn.erl b/src/emqx_conn.erl index a731bacc9..bb80cb0fa 100644 --- a/src/emqx_conn.erl +++ b/src/emqx_conn.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -203,7 +203,7 @@ handle_info({suback, PacketId, GrantedQos}, State) -> %% Fastlane handle_info({dispatch, _Topic, Message}, State) -> - handle_info({deliver, Message#message{qos = ?QOS_0}}, State); + handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State); handle_info({deliver, Message}, State) -> with_proto( @@ -259,16 +259,16 @@ handle_info({keepalive, start, Interval}, State = #state{connection = Conn}) -> end, case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of {ok, KeepAlive} -> - {noreply, State#client_state{keepalive = KeepAlive}}; + {noreply, State#state{keepalive = KeepAlive}}; {error, Error} -> ?LOG(warning, "Keepalive error - ~p", [Error], State), shutdown(Error, State) end; -handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> +handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> case emqx_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - {noreply, State#client_state{keepalive = KeepAlive1}}; + {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> ?LOG(debug, "Keepalive timeout", [], State), shutdown(keepalive_timeout, State); diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 4bcfb4bc8..59d7ff8cc 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_gen_mod.erl b/src/emqx_gen_mod.erl index 18a8732a2..9b3e0ee3c 100644 --- a/src/emqx_gen_mod.erl +++ b/src/emqx_gen_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_gen_mod). --author("Feng Lee "). - -ifdef(use_specs). -callback(load(Opts :: any()) -> ok | {error, term()}). diff --git a/src/emqx_guid.erl b/src/emqx_guid.erl index bedd0f0b7..e03baab4d 100644 --- a/src/emqx_guid.erl +++ b/src/emqx_guid.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index f8bef6ecf..530bf0ad3 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ -behaviour(gen_server). --author("Feng Lee "). - %% Start -export([start_link/0]). diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index 4e26366e9..b3ea24841 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,12 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Inflight Window that wraps the gb_trees. - -module(emqx_inflight). --author("Feng Lee "). - -export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). diff --git a/src/emqx_json.erl b/src/emqx_json.erl new file mode 100644 index 000000000..6b204474f --- /dev/null +++ b/src/emqx_json.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_json). + +-export([encode/1, encode/2, decode/1, decode/2]). + +-spec(encode(jsx:json_term()) -> jsx:json_text()). +encode(Term) -> + jsx:encode(Term). + +-spec(encode(jsx:json_term(), jsx_to_json:config()) -> jsx:json_text()). +encode(Term, Opts) -> + jsx:encode(Term, Opts). + +-spec(decode(jsx:json_text()) -> jsx:json_term()). +decode(JSON) -> + jsx:decode(JSON). + +-spec(decode(jsx:json_text(), jsx_to_json:config()) -> jsx:json_term()). +decode(JSON, Opts) -> + jsx:decode(JSON, Opts). + diff --git a/src/emqx_lager_backend.erl b/src/emqx_lager_backend.erl index 60e7cd5bf..c675481c5 100644 --- a/src/emqx_lager_backend.erl +++ b/src/emqx_lager_backend.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_lager_backend). --author("Feng Lee "). - -behaviour(gen_event). -include_lib("lager/include/lager.hrl"). diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 912cf356f..2574418a9 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,12 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Message Functions - -module(emqx_message). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). @@ -57,7 +53,7 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, packet_id = PacketId}, payload = Payload}) -> #mqtt_message{id = msgid(), - pktid = PacketId, + packet_id = PacketId, qos = Qos, retain = Retain, dup = Dup, @@ -95,7 +91,7 @@ msgid() -> emqx_guid:gen(). %% @doc Message to Packet -spec(to_packet(mqtt_message()) -> mqtt_packet()). -to_packet(#mqtt_message{pktid = PkgId, +to_packet(#mqtt_message{packet_id = PkgId, qos = Qos, retain = Retain, dup = Dup, @@ -141,13 +137,13 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) -> unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message -format(#mqtt_message{id = MsgId, pktid = PktId, from = {ClientId, Username}, +format(#mqtt_message{id = MsgId, packet_id = PktId, from = {ClientId, Username}, qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)", [i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]); %% TODO:... -format(#mqtt_message{id = MsgId, pktid = PktId, from = From, +format(#mqtt_message{id = MsgId, packet_id = PktId, from = From, qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)", [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]). diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 347213e31..a0d8228d5 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index ca8d02551..81480f212 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ -include_lib("emqx.hrl"). +-include_lib("emqx_mqtt.hrl"). + -export([load/1, unload/1]). -export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]). diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index cae25842a..7230a6dd2 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mod_sup.erl b/src/emqx_mod_sup.erl index 1f71dcc74..016708351 100644 --- a/src/emqx_mod_sup.erl +++ b/src/emqx_mod_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqtt5_rscode.erl b/src/emqx_mqtt5_rscode.erl deleted file mode 100644 index 10c137678..000000000 --- a/src/emqx_mqtt5_rscode.erl +++ /dev/null @@ -1,195 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mqtt5_rscode). - --author("Feng Lee "). - --export([name/1, value/1]). - -%%-------------------------------------------------------------------- -%% Reason code to name -%%-------------------------------------------------------------------- - -0 -name(0x00 -Success -CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH -0 -name(0x00 -Normal disconnection -DISCONNECT -0 -name(0x00 -Granted QoS 0 -SUBACK -1 -name(0x01 -Granted QoS 1 -SUBACK -2 -name(0x02 -Granted QoS 2 -SUBACK -4 -name(0x04 -Disconnect with Will Message -DISCONNECT -16 -name(0x10 -No matching subscribers -PUBACK, PUBREC -17 -name(0x11 -No subscription existed -UNSUBACK -24 -name(0x18 -Continue authentication -AUTH -25 -name(0x19 -Re-authenticate -AUTH -128 -name(0x80 -Unspecified error -CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT -129 -name(0x81 -Malformed Packet -CONNACK, DISCONNECT -130 -name(0x82 -Protocol Error -CONNACK, DISCONNECT -131 -name(0x83 -Implementation specific error -CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT -132 -name(0x84 -Unsupported Protocol Version -CONNACK -133 -name(0x85 -Client Identifier not valid -CONNACK -134 -name(0x86 -Bad User Name or Password -CONNACK -135 -name(0x87 -Not authorized -CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT -136 -name(0x88 -Server unavailable -CONNACK -137 -name(0x89 -Server busy -CONNACK, DISCONNECT -138 -name(0x8A -Banned -CONNACK -139 -name(0x8B -Server shutting down -DISCONNECT -140 -name(0x8C -Bad authentication method -CONNACK, DISCONNECT -141 -name(0x8D -Keep Alive timeout -DISCONNECT -142 -name(0x8E -Session taken over -DISCONNECT -143 -name(0x8F -Topic Filter invalid -SUBACK, UNSUBACK, DISCONNECT -144 -name(0x90 -Topic Name invalid -CONNACK, PUBACK, PUBREC, DISCONNECT -145 -name(0x91 -Packet Identifier in use -PUBACK, PUBREC, SUBACK, UNSUBACK -146 -name(0x92 -Packet Identifier not found -PUBREL, PUBCOMP -147 -name(0x93 -Receive Maximum exceeded -DISCONNECT -148 -name(0x94 -Topic Alias invalid -DISCONNECT -149 -name(0x95 -Packet too large -CONNACK, DISCONNECT -150 -name(0x96 -Message rate too high -DISCONNECT -151 -name(0x97 -Quota exceeded -CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT -%% 152 -name(0x98 -Administrative action -DISCONNECT -%% 153 -name(0x99 -Payload format invalid -CONNACK, PUBACK, PUBREC, DISCONNECT -%% 154 -name(0x9A -Retain not supported -CONNACK, DISCONNECT -%% 155 -name(0x9B -QoS not supported -CONNACK, DISCONNECT -%% 156 -name(0x9C -Use another server -CONNACK, DISCONNECT -%% 157: CONNACK, DISCONNECT -name(0x9D) -> 'Server-Moved'; -%% 158: SUBACK, DISCONNECT -name(0x9E) -> 'Shared-Subscriptions-Not-Supported'; -%% 159: CONNACK, DISCONNECT -name(0x9F) -> 'Connection-Rate-Exceeded'; -%% 160: DISCONNECT -name(0xA0) -> 'Maximum-Connect-Time'; -%% 161: SUBACK, DISCONNECT -name(0xA1) -> 'Subscription-Identifiers-Not-Supported'; -%% 162: SUBACK, DISCONNECT -name(0xA2) -> 'Wildcard-Subscriptions-Not-Supported'; - diff --git a/src/emqx_mqtt_app.erl b/src/emqx_mqtt_app.erl index bde8f4633..e947855e1 100644 --- a/src/emqx_mqtt_app.erl +++ b/src/emqx_mqtt_app.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqtt5_props.erl b/src/emqx_mqtt_props.erl similarity index 97% rename from src/emqx_mqtt5_props.erl rename to src/emqx_mqtt_props.erl index 70857d18c..6001b5211 100644 --- a/src/emqx_mqtt5_props.erl +++ b/src/emqx_mqtt_props.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,9 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mqtt5_props). - --author("Feng Lee "). +-module(emqx_mqtt_props). -export([name/1, id/1]). diff --git a/src/emqx_mqtt_rscode.erl b/src/emqx_mqtt_rscode.erl new file mode 100644 index 000000000..b5658c17f --- /dev/null +++ b/src/emqx_mqtt_rscode.erl @@ -0,0 +1,115 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt_rscode). + +-export([value/1]). + +%%-------------------------------------------------------------------- +%% Reason code to name +%%-------------------------------------------------------------------- + +%% 00: Success; CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH +value('Success') -> 16#00; +%% 00: Normal disconnection; DISCONNECT +value('Normal-Disconnection') -> 16#00; +%% 00: Granted QoS 0; SUBACK +value('Granted-QoS0') -> 16#00; +%% 01: Granted QoS 1; SUBACK +value('Granted-QoS1') -> 16#01; +%% 02: Granted QoS 2; SUBACK +value('Granted-QoS2') -> 16#02; +%% 04: Disconnect with Will Message; DISCONNECT +value('Disconnect-With-Will-Message') -> 16#04; +%% 16: No matching subscribers; PUBACK, PUBREC +value('No-Matching-Subscribers') -> 16#10; +%% 17: No subscription existed; UNSUBACK +value('No-Subscription-Existed') -> 16#11; +%% 24: Continue authentication; AUTH +value('Continue-Authentication') -> 16#18; +%% 25: Re-Authenticate; AUTH +value('Re-Authenticate') -> 16#19; +%% 128: Unspecified error; CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT +value('Unspecified-Error') -> 16#80; +%% 129: Malformed Packet; CONNACK, DISCONNECT +value('Malformed-Packet') -> 16#81; +%% 130: Protocol Error; CONNACK, DISCONNECT +value('Protocol-Error') -> 16#82; +%% 131: Implementation specific error; CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT +value('Implementation-Specific-Error') -> 16#83; +%% 132: Unsupported Protocol Version; CONNACK +value('Unsupported-Protocol-Version') -> 16#84; +%% 133: Client Identifier not valid; CONNACK +value('Client-Identifier-not-Valid') -> 16#85; +%% 134: Bad User Name or Password; CONNACK +value('Bad-Username-or-Password') -> 16#86; +%% 135: Not authorized; CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT +value('Not-Authorized') -> 16#87; +%% 136: Server unavailable; CONNACK +value('Server-Unavailable') -> 16#88; +%% 137: Server busy; CONNACK, DISCONNECT +value('Server-Busy') -> 16#89; +%% 138: Banned; CONNACK +value('Banned') -> 16#8A; +%% 139: Server shutting down; DISCONNECT +value('Server-Shutting-Down') -> 16#8B; +%% 140: Bad authentication method; CONNACK, DISCONNECT +value('Bad-Authentication-Method') -> 16#8C; +%% 141: Keep Alive timeout; DISCONNECT +value('Keep-Alive-Timeout') -> 16#8D; +%% 142: Session taken over; DISCONNECT +value('Session-Taken-Over') -> 16#8E; +%% 143: Topic Filter invalid; SUBACK, UNSUBACK, DISCONNECT +value('Topic-Filter-Invalid') -> 16#8F; +%% 144: Topic Name invalid; CONNACK, PUBACK, PUBREC, DISCONNECT +value('Topic-Name-Invalid') -> 16#90; +%% 145: Packet Identifier in use; PUBACK, PUBREC, SUBACK, UNSUBACK +value('Packet-Identifier-Inuse') -> 16#91; +%% 146: Packet Identifier not found; PUBREL, PUBCOMP +value('Packet-Identifier-Not-Found') -> 16#92; +%% 147: Receive Maximum exceeded; DISCONNECT +value('Receive-Maximum-Exceeded') -> 16#93; +%% 148: Topic Alias invalid; DISCONNECT +value('Topic-Alias-Invalid') -> 16#94; +%% 149: Packet too large; CONNACK, DISCONNECT +value('Packet-Too-Large') -> 16#95; +%% 150: Message rate too high; DISCONNECT +value('Message-Rate-Too-High') -> 16#96; +%% 151: Quota exceeded; CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT +value('Quota-Exceeded') -> 16#97; +%% 152: Administrative action; DISCONNECT +value('Administrative-Action') -> 16#98; +%% 153: Payload format invalid; CONNACK, PUBACK, PUBREC, DISCONNECT +value('Payload-Format-Invalid') -> 16#99; +%% 154: Retain not supported; CONNACK, DISCONNECT +value('Retain-Not-Supported') -> 16#9A; +%% 155: QoS not supported; CONNACK, DISCONNECT +value('QoS-Not-Supported') -> 16#9B; +%% 156: Use another server; CONNACK, DISCONNECT +value('Use-Another-Server') -> 16#9C; +%% 157: Server moved; CONNACK, DISCONNECT +value('Server-Moved') -> 16#9D; +%% 158: Shared Subscriptions not supported; SUBACK, DISCONNECT +value('Shared-Subscriptions-Not-Supported') -> 16#9E; +%% 159: Connection rate exceeded; CONNACK, DISCONNECT +value('Connection-Rate-Exceeded') -> 16#9F; +%% 160: Maximum connect time; DISCONNECT +value('Maximum-Connect-Time') -> 16#A0; +%% 161: Subscription Identifiers not supported; SUBACK, DISCONNECT +value('Subscription-Identifiers-Not-Supported') -> 16#A1; +%% 162: Wildcard-Subscriptions-Not-Supported; SUBACK, DISCONNECT +value('Wildcard-Subscriptions-Not-Supported') -> 16#A2. + diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 190e525fc..c139a6bdd 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_net.erl b/src/emqx_net.erl index 0e666dfb1..ee186ee5a 100644 --- a/src/emqx_net.erl +++ b/src/emqx_net.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index ad94f5d0c..39681f27f 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_packet). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). @@ -92,8 +90,8 @@ format_variable(#mqtt_packet_connect{ io_lib:format(Format1, Args1); format_variable(#mqtt_packet_connack{ack_flags = AckFlags, - return_code = ReturnCode}) -> - io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); + reason_code = ReasonCode}) -> + io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReasonCode]); format_variable(#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}) -> @@ -102,17 +100,17 @@ format_variable(#mqtt_packet_publish{topic_name = TopicName, format_variable(#mqtt_packet_puback{packet_id = PacketId}) -> io_lib:format("PacketId=~p", [PacketId]); -format_variable(#mqtt_packet_subscribe{packet_id = PacketId, - topic_table = TopicTable}) -> - io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, TopicTable]); +format_variable(#mqtt_packet_subscribe{packet_id = PacketId, + topic_filters = TopicFilters}) -> + io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]); format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, topics = Topics}) -> io_lib:format("PacketId=~p, Topics=~p", [PacketId, Topics]); format_variable(#mqtt_packet_suback{packet_id = PacketId, - qos_table = QosTable}) -> - io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]); + reason_codes = ReasonCodes}) -> + io_lib:format("PacketId=~p, ReasonCodes=~p", [PacketId, ReasonCodes]); format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) -> io_lib:format("PacketId=~p", [PacketId]); diff --git a/src/emqx_parser.erl b/src/emqx_parser.erl index c7539112e..a65b70894 100644 --- a/src/emqx_parser.erl +++ b/src/emqx_parser.erl @@ -16,8 +16,6 @@ -module(emqx_parser). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). @@ -27,53 +25,55 @@ -type(max_packet_size() :: 1..?MAX_PACKET_SIZE). --spec(initial_state() -> {none, max_packet_size()}). +-type(state() :: #{maxlen := max_packet_size(), vsn := mqtt_vsn()}). + +-spec(initial_state() -> {none, state()}). initial_state() -> initial_state(?MAX_PACKET_SIZE). %% @doc Initialize a parser --spec(initial_state(max_packet_size()) -> {none, max_packet_size()}). +-spec(initial_state(max_packet_size()) -> {none, state()}). initial_state(MaxSize) -> - {none, MaxSize}. + {none, #{maxlen => MaxSize, vsn => ?MQTT_PROTO_V4}}. %% @doc Parse MQTT Packet --spec(parse(binary(), {none, pos_integer()} | fun()) +-spec(parse(binary(), {none, state()} | fun()) -> {ok, mqtt_packet()} | {error, term()} | {more, fun()}). -parse(<<>>, {none, MaxLen}) -> - {more, fun(Bin) -> parse(Bin, {none, MaxLen}) end}; -parse(<>, {none, Limit}) -> +parse(<<>>, {none, State}) -> + {more, fun(Bin) -> parse(Bin, {none, State}) end}; +parse(<>, {none, State}) -> parse_remaining_len(Rest, #mqtt_packet_header{type = Type, dup = bool(Dup), qos = fixqos(Type, QoS), - retain = bool(Retain)}, Limit); + retain = bool(Retain)}, State); parse(Bin, Cont) -> Cont(Bin). -parse_remaining_len(<<>>, Header, Limit) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Limit) end}; -parse_remaining_len(Rest, Header, Limit) -> - parse_remaining_len(Rest, Header, 1, 0, Limit). +parse_remaining_len(<<>>, Header, State) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, State) end}; +parse_remaining_len(Rest, Header, State) -> + parse_remaining_len(Rest, Header, 1, 0, State). -parse_remaining_len(_Bin, _Header, _Multiplier, Length, MaxLen) +parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{maxlen := MaxLen}) when Length > MaxLen -> {error, invalid_mqtt_frame_len}; -parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Limit) end}; -%% optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... -parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, _Limit) -> - parse_frame(Rest, Header, 2); +parse_remaining_len(<<>>, Header, Multiplier, Length, State) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, State) end}; +%% Optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... +parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, State) -> + parse_frame(Rest, Header, 2, State); %% optimize: match PINGREQ... -parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, _Limit) -> - parse_frame(Rest, Header, 0); -parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) -> - parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit); -parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, MaxLen) -> +parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, State) -> + parse_frame(Rest, Header, 0, State); +parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, State) -> + parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, State); +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, State = #{maxlen := MaxLen}) -> FrameLen = Value + Len * Multiplier, if FrameLen > MaxLen -> {error, invalid_mqtt_frame_len}; - true -> parse_frame(Rest, Header, FrameLen) + true -> parse_frame(Rest, Header, FrameLen, State) end. -parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -> +parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length, State = #{vsn := Vsn}) -> case {Type, Bin} of {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), @@ -95,7 +95,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) - {WillMsg, Rest8} = parse_msg(Rest7, WillFlag), {UserName, Rest9} = parse_utf(Rest8, UsernameFlag), {PasssWord, <<>>} = parse_utf(Rest9, PasswordFlag), - case protocol_name_approved(ProtoVersion, ProtoName) of + case protocol_name_approved(ProtoVer, ProtoName) of true -> wrap(Header, #mqtt_packet_connect{ @@ -128,7 +128,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) - _ -> <> = Rest1, {Id, R} end, - {Properties, Payload} = parse_properties(ProtoVer, Rest), + {Properties, Payload} = parse_properties(Vsn, Rest2), wrap(fixdup(Header), #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, properties = Properties}, @@ -136,10 +136,10 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) - {PubAck, <>} when PubAck == ?PUBACK; PubAck == ?PUBREC; PubAck == ?PUBREL; PubAck == ?PUBCOMP -> <> = FrameBin, - case ProtoVer == ?MQTT_PROTO_V5 of + case Vsn == ?MQTT_PROTO_V5 of true -> <> = Rest1, - {Properties, Rest3} = parse_properties(ProtoVer, Rest2), + {Properties, Rest3} = parse_properties(Vsn, Rest2), wrap(Header, #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, properties = Properties}, Rest3); @@ -149,11 +149,11 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) - {?SUBSCRIBE, <>} -> %% 1 = Qos, <> = FrameBin, - {Properties, Rest2} = parse_properties(ProtoVer, Rest1), - TopicTable = parse_topics(?SUBSCRIBE, Rest1, []), - wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId, - properties = Properties, - topic_table = TopicTable}, Rest); + {Properties, Rest2} = parse_properties(Vsn, Rest1), + TopicFilters = parse_topics(?SUBSCRIBE, Rest2, []), + wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters}, Rest); %{?SUBACK, <>} -> % <> = FrameBin, % {Properties, Rest2/binary>> = parse_properties(ProtoVer, Rest1), @@ -162,7 +162,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) - {?UNSUBSCRIBE, <>} -> %% 1 = Qos, <> = FrameBin, - {Properties, Rest2} = parse_properties(ProtoVer, Rest1), + {Properties, Rest2} = parse_properties(Vsn, Rest1), Topics = parse_topics(?UNSUBSCRIBE, Rest2, []), wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId, properties = Properties, @@ -180,20 +180,19 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) - % Length = 0, % wrap(Header, Rest); {?DISCONNECT, <>} -> - case ProtoVer == ?MQTT_PROTO_V5 of + if + Vsn == ?MQTT_PROTO_V5 -> + <> = FrameBin, + {Properties, Rest2} = parse_properties(Vsn, Rest1), + wrap(Header, #mqtt_packet_disconnect{reason_code = ReasonCode, + properties = Properties}, Rest2); true -> - <> = Rest, - {Properties, Rest2} = parse_properties(ProtoVer, Rest1), - wrap(Header, #mqtt_packet_disconnect{reason_code = Reason, - properties = Properties}, Rest2); - false -> - Lenght = 0, wrap(Header, Rest) + Length = 0, wrap(Header, Rest) end; {_, TooShortBin} -> {more, fun(BinMore) -> - parse_frame(<>, - Header, Length) - end} + parse_frame(<>, Header, Length, State) + end} end. wrap(Header, Variable, Payload, Rest) -> @@ -205,12 +204,12 @@ wrap(Header, Rest) -> parse_will_props(Bin, ProtoVer = ?MQTT_PROTO_V5, 1) -> parse_properties(ProtoVer, Bin); -parse_will_props(Bin, _ProtoVer, _WillFlag), +parse_will_props(Bin, _ProtoVer, _WillFlag) -> {#{}, Bin}. parse_properties(?MQTT_PROTO_V5, Bin) -> {Len, Rest} = parse_variable_byte_integer(Bin), - <> = Rest, {parse_property(PropsBin, #{}), Rest1}; parse_properties(_MQTT_PROTO_V3, Bin) -> {#{}, Bin}. %% No properties. @@ -228,11 +227,11 @@ parse_property(<<16#03, Bin/binary>>, Props) -> {Val, Rest} = parse_utf(Bin), parse_property(Rest, Props#{'Content-Type' => Val}); %% 08: 'Response-Topic', UTF-8 Encoded String; -parse_property(<<16#08, Bin/binary>>) -> +parse_property(<<16#08, Bin/binary>>, Props) -> {Val, Rest} = parse_utf(Bin), parse_property(Rest, Props#{'Response-Topic' => Val}); %% 09: 'Correlation-Data', Binary Data; -parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>) -> +parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> parse_property(Bin, Props#{'Correlation-Data' => Val}); %% 11: 'Subscription-Identifier', Variable Byte Integer; parse_property(<<16#0B, Bin/binary>>, Props) -> @@ -242,18 +241,18 @@ parse_property(<<16#0B, Bin/binary>>, Props) -> parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); %% 18: 'Assigned-Client-Identifier', UTF-8 Encoded String; -parse_property(<<16#12, Bin/binary>>) -> +parse_property(<<16#12, Bin/binary>>, Props) -> {Val, Rest} = parse_utf(Bin), parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); %% 19: 'Server-Keep-Alive', Two Byte Integer; -parse_property(<<16#13, Val:16, Bin/binary>>) -> +parse_property(<<16#13, Val:16, Bin/binary>>, Props) -> parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); %% 21: 'Authentication-Method', UTF-8 Encoded String; parse_property(<<16#15, Bin/binary>>, Props) -> {Val, Rest} = parse_utf(Bin), - parse_property(Rest, Props#{'Authentication-Method' => Val}) + parse_property(Rest, Props#{'Authentication-Method' => Val}); %% 22: 'Authentication-Data', Binary Data; -parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>) -> +parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> parse_property(Bin, Props#{'Authentication-Data' => Val}); %% 23: 'Request-Problem-Information', Byte; parse_property(<<16#17, Val, Bin/binary>>, Props) -> @@ -273,7 +272,7 @@ parse_property(<<16#1C, Bin/binary>>, Props) -> {Val, Rest} = parse_utf(Bin), parse_property(Rest, Props#{'Server-Reference' => Val}); %% 31: 'Reason-String', UTF-8 Encoded String; -parse_property(<<16#1F, Bin/binary, Props) -> +parse_property(<<16#1F, Bin/binary>>, Props) -> {Val, Rest} = parse_utf(Bin), parse_property(Rest, Props#{'Reason-String' => Val}); %% 33: 'Receive-Maximum', Two Byte Integer; @@ -300,7 +299,7 @@ parse_property(<<16#26, Bin/binary>>, Props) -> end); %% 39: 'Maximum-Packet-Size', Four Byte Integer; parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> - parse_property(Rest, Props#{'Maximum-Packet-Size' => Val}); + parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}); %% 40: 'Wildcard-Subscription-Available', Byte; parse_property(<<16#28, Val, Bin/binary>>, Props) -> parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); @@ -321,8 +320,8 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_topics(_Packet, <<>>, Topics) -> lists:reverse(Topics); parse_topics(?SUBSCRIBE = Sub, Bin, Topics) -> - {Name, <<<<_Reserved:2, RetainHandling:2, KeepRetain:1, NoLocal:1, QoS:2>>, Rest/binary>>} = parse_utf(Bin), - SubOpts = [{qos, Qos}, {retain_handling, RetainHandling}, {keep_retain, KeepRetain}, {no_local, NoLocal}], + {Name, <<_Reserved:2, RetainHandling:2, KeepRetain:1, NoLocal:1, QoS:2, Rest/binary>>} = parse_utf(Bin), + SubOpts = [{qos, QoS}, {retain_handling, RetainHandling}, {keep_retain, KeepRetain}, {no_local, NoLocal}], parse_topics(Sub, Rest, [{Name, SubOpts}| Topics]); parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> {Name, <>} = parse_utf(Bin), diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 4eddf7331..0ec84245f 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_plugins). --author("Feng Lee "). - -include("emqx.hrl"). -export([init/0]). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c754774a3..c2ab59be3 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,12 +16,12 @@ -module(emqx_protocol). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_misc.hrl"). + -import(proplists, [get_value/2, get_value/3]). %% API @@ -241,8 +241,8 @@ process(?CONNECT_PACKET(Var), State0) -> end, %% Run hooks emqx_hooks:run('client.connected', [ReturnCode1], client(State3)), - %% Send connack - send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), + %%TODO: Send Connack + %% send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), %% stop if authentication failure stop_if_auth_failure(ReturnCode1, State3); @@ -567,10 +567,10 @@ sp(false) -> 0. %% The retained flag should be propagated for bridge. %%-------------------------------------------------------------------- -clean_retain(false, Msg = #message{retain = true, headers = Headers}) -> +clean_retain(false, Msg = #mqtt_message{retain = true, headers = Headers}) -> case lists:member(retained, Headers) of true -> Msg; - false -> Msg#message{retain = false} + false -> Msg#mqtt_message{retain = false} end; clean_retain(_IsBridge, Msg) -> Msg. diff --git a/src/emqx_pubsub.erl b/src/emqx_pubsub.erl index 9554f30ad..9099afc83 100644 --- a/src/emqx_pubsub.erl +++ b/src/emqx_pubsub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ -include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + -export([start_link/3]). %% PubSub API. @@ -173,7 +175,8 @@ handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) -> reply(ok, setstats(State)); handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]), + {reply, ignore, State}. handle_cast({subscribe, Topic, Subscriber, Options}, State) -> add_subscriber(Topic, Subscriber, Options), @@ -184,10 +187,12 @@ handle_cast({unsubscribe, Topic, Subscriber, Options}, State) -> noreply(setstats(State)); handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), + {noreply, State}. handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), + {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> gproc_pool:disconnect_worker(Pool, {Pool, Id}). diff --git a/src/emqx_pubsub_sup.erl b/src/emqx_pubsub_sup.erl index 6611e1bbd..a9978d011 100644 --- a/src/emqx_pubsub_sup.erl +++ b/src/emqx_pubsub_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc PubSub Supervisor. -module(emqx_pubsub_sup). --author("Feng Lee "). - -behaviour(supervisor). %% API diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 060786590..e46add4bf 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_router). --author("Feng Lee "). - -behaviour(gen_server). -include("emqx.hrl"). diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index a137b543c..f4ef07809 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -13,18 +13,12 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -%% -%% @doc EMQ X Distributed RPC. -%% -%%-------------------------------------------------------------------- -module(emqx_rpc). --author("Feng Lee "). - -export([cast/4]). -%% @doc Wraps gen_rpc first. cast(Node, Mod, Fun, Args) -> - emqx_metrics:inc('messages/forward'), rpc:cast(Node, Mod, Fun, Args). + emqx_metrics:inc('messages/forward'), + rpc:cast(Node, Mod, Fun, Args). diff --git a/src/emqx_serializer.erl b/src/emqx_serializer.erl index f99497d84..63906e426 100644 --- a/src/emqx_serializer.erl +++ b/src/emqx_serializer.erl @@ -16,8 +16,6 @@ -module(emqx_serializer). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). @@ -83,11 +81,11 @@ serialize_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags, reason_code = ReasonCode, properties = Properties}, undefined) -> PropsBin = serialize_properties(Properties), - {<>, <<>>}; + {<>, <<>>}; serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, - topic_table = Topics }, undefined) -> - {<>, serialize_topics(Topics)}; + topic_filters = TopicFilters}, undefined) -> + {<>, serialize_topics(TopicFilters)}; serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId, properties = Properties, @@ -124,7 +122,7 @@ serialize_variable(?PINGRESP, undefined, undefined) -> serialize_variable(?DISCONNECT, #mqtt_packet_disconnect{reason_code = ReasonCode, properties = Properties}, undefined) -> - {<>, <<>>}. + {<>, <<>>}; serialize_variable(?AUTH, #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}, undefined) -> @@ -138,7 +136,7 @@ serialize_payload(Bin) when is_binary(Bin) -> serialize_properties(undefined) -> <<>>; serialize_properties(Props) -> - << serialize_property(Prop, Val) || {Prop, Val} <= (maps:to_list(Props)) >>. + << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>. %% 01: Byte; serialize_property('Payload-Format-Indicator', Val) -> diff --git a/src/emqx_server.erl b/src/emqx_server.erl index 6aa9acce8..1f7ee65ba 100644 --- a/src/emqx_server.erl +++ b/src/emqx_server.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 4ce6f7cfd..4f89e73ae 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -47,12 +47,12 @@ -behaviour(gen_server). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_misc.hrl"). + -import(emqx_misc, [start_timer/2]). -import(proplists, [get_value/2, get_value/3]). @@ -192,15 +192,15 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... %% @doc Publish Message -spec(publish(pid(), message()) -> ok | {error, term()}). -publish(_Session, Msg = #message{qos = ?QOS_0}) -> +publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> %% Publish QoS0 Directly emqx_server:publish(Msg), ok; -publish(_Session, Msg = #message{qos = ?QOS_1}) -> +publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> %% Publish QoS1 message directly for client will PubAck automatically emqx_server:publish(Msg), ok; -publish(Session, Msg = #message{qos = ?QOS_2}) -> +publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> %% Publish QoS2 to Session gen_server:call(Session, {publish, Msg}, ?TIMEOUT). @@ -320,7 +320,7 @@ binding(ClientPid) -> handle_pre_hibernate(State) -> {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From, +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, packet_id = PacketId}}, _From, State = #state{awaiting_rel = AwaitingRel, await_rel_timer = Timer, await_rel_timeout = Timeout}) -> @@ -347,7 +347,8 @@ handle_call(state, _From, State) -> reply(?record_to_proplist(state, State, ?STATE_KEYS), State); handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]), + {reply, ignore, State}. handle_cast({subscribe, From, TopicTable, AckFun}, State = #state{client_id = ClientId, @@ -512,10 +513,11 @@ handle_cast({destroy, ClientId}, shutdown(conflict, State); handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), + {noreply, State}. %% Ignore Messages delivered by self -handle_info({dispatch, _Topic, #message{from = {ClientId, _}}}, +handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}}, State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> {noreply, State}; @@ -560,8 +562,9 @@ handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> [ClientPid, Pid, Reason], State), {noreply, State, hibernate}; -handle_info(Info, Session) -> - ?UNEXPECTED_INFO(Info, Session). +handle_info(Info, State) -> + lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), + {noreply, State}. terminate(Reason, #state{client_id = ClientId, username = Username}) -> %% Move to emqx_sm to avoid race condition @@ -608,7 +611,7 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now, if Force orelse (Diff >= Interval) -> case {Type, Msg} of - {publish, Msg = #mqtt_message{pktid = PacketId}} -> + {publish, Msg = #mqtt_message{packet_id = PacketId}} -> redeliver(Msg, State), Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); @@ -635,7 +638,7 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], +expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, TS) div 1000) of @@ -691,7 +694,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, true -> enqueue_msg(Msg, State); false -> - Msg1 = Msg#mqtt_message{pktid = MsgId}, + Msg1 = Msg#mqtt_message{packet_id = MsgId}, deliver(Msg1, State), await(Msg1, next_msg_id(State)) end. @@ -719,12 +722,15 @@ deliver(Msg, #state{client_pid = Pid, binding = remote}) -> %% Awaiting ACK for QoS1/QoS2 Messages %%-------------------------------------------------------------------- -await(Msg = #mqtt_message{pktid = PacketId}, +await(Msg = #mqtt_message{packet_id = PacketId}, State = #state{inflight = Inflight, retry_timer = RetryTimer, retry_interval = Interval}) -> %% Start retry timer if the Inflight is still empty - State1 = ?IF(RetryTimer == undefined, State#state{retry_timer = start_timer(Interval, retry_delivery)}, State), + State1 = case RetryTimer == undefined of + true -> State#state{retry_timer = start_timer(Interval, retry_delivery)}; + false -> State + end, State1#state{inflight = Inflight:insert(PacketId, {publish, Msg, os:timestamp()})}. acked(puback, PacketId, State = #state{client_id = ClientId, diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index 6453bcba9..8a5fa8b2e 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 5440bbded..30983d0b5 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_sm_helper.erl b/src/emqx_sm_helper.erl index 9f8970c08..6751fd54d 100644 --- a/src/emqx_sm_helper.erl +++ b/src/emqx_sm_helper.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_sm_helper). --author("Feng Lee "). - -behaviour(gen_server). -include("emqx.hrl"). diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index 1c9098715..feef64e2b 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 7146beb96..1fbc7b245 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index cdcf03255..e115674f9 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ -behaviour(supervisor). --author("Feng Lee "). - -export([start_link/0, start_child/1, start_child/2, stop_child/1]). %% Supervisor callbacks diff --git a/src/emqx_sysmon_sup.erl b/src/emqx_sysmon_sup.erl index cfe6e7657..6bcbf76c8 100644 --- a/src/emqx_sysmon_sup.erl +++ b/src/emqx_sysmon_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_sysmon_sup). --author("Feng Lee "). - -behaviour(supervisor). %% API diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 7f177e8d0..267786943 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_trace.erl b/src/emqx_trace.erl index 2cc4cd29e..99da331f7 100644 --- a/src/emqx_trace.erl +++ b/src/emqx_trace.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ -behaviour(gen_server). --author("Feng Lee "). - %% API Function Exports -export([start_link/0]). diff --git a/src/emqx_trace_sup.erl b/src/emqx_trace_sup.erl index 89557ba45..861b3ef79 100644 --- a/src/emqx_trace_sup.erl +++ b/src/emqx_trace_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_trace_sup). --author("Feng Lee "). - -behaviour(supervisor). %% API diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index a421ab54a..32959cc0c 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index 3ff2551bb..e854bd1cd 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_ws). --author("Feng Lee "). - -include("emqx_mqtt.hrl"). -import(proplists, [get_value/3]). diff --git a/test/emqx_lib_SUITE.erl b/test/emqx_lib_SUITE.erl index 398d6995f..0b24fb059 100644 --- a/test/emqx_lib_SUITE.erl +++ b/test/emqx_lib_SUITE.erl @@ -16,8 +16,6 @@ -module(emqx_lib_SUITE). --author("Feng Lee "). - -include_lib("eunit/include/eunit.hrl"). -compile(export_all). diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index a3b085354..3123ab94f 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_mqueue_SUITE). --author("Feng Lee "). - -compile(export_all). -include("emqx.hrl"). diff --git a/test/emqx_topic_SUITE.erl b/test/emqx_topic_SUITE.erl index fb812b520..a60921ada 100644 --- a/test/emqx_topic_SUITE.erl +++ b/test/emqx_topic_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ -module(emqx_topic_SUITE). --author("Feng Lee "). - -include_lib("eunit/include/eunit.hrl"). %% CT