Support MQTT Version 5.0

This commit is contained in:
Feng Lee 2018-03-02 20:18:27 +08:00
parent aef5a20697
commit 6a957e1b33
64 changed files with 424 additions and 587 deletions

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -26,6 +26,16 @@
%%-define(ERTS_MINIMUM, "9.0").
%%--------------------------------------------------------------------
%% Sys/Queue/Share Topics' Prefix
%%--------------------------------------------------------------------
-define(SYSTOP, <<"$SYS/">>). %% System Topic
-define(QUEUE, <<"$queue/">>). %% Queue Topic
-define(SHARE, <<"$share/">>). %% Shared Topic
%%--------------------------------------------------------------------
%% Message and Delivery
%%--------------------------------------------------------------------
@ -34,7 +44,8 @@
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()).
-type(message_from() :: #{node := atom(),
-type(message_from() :: #{zone := atom(),
node := atom(),
clientid := binary(),
protocol := protocol(),
connector => atom(),
@ -60,7 +71,7 @@
from :: message_from(), %% Message from
sender :: pid(), %% The pid of the sender/publisher
flags :: message_flags(), %% Message flags
headers :: message_headers() %% Message headers
headers :: message_headers(), %% Message headers
topic :: binary(), %% Message topic
properties :: map(), %% Message user properties
payload :: binary(), %% Message payload
@ -70,24 +81,12 @@
-type(message() :: #message{}).
-record(delivery,
{ %sender :: pid(), %% The pid of the sender/publisher
message :: message(), %% Message
{ message :: message(),
flows :: list()
}).
-type(delivery() :: #delivery{}).
%%--------------------------------------------------------------------
%% Sys/Queue/Share Topics' Prefix
%%--------------------------------------------------------------------
-define(SYSTOP, <<"$SYS/">>). %% System Topic
-define(QUEUE, <<"$queue/">>). %% Queue Topic
-define(SHARE, <<"$share/">>). %% Shared Topic
%%--------------------------------------------------------------------
%% PubSub
%%--------------------------------------------------------------------
@ -97,20 +96,16 @@
-define(PS(PS), (PS =:= publish orelse PS =:= subscribe)).
%%--------------------------------------------------------------------
%% MQTT Topic
%% Subscription
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% MQTT Subscription
%%--------------------------------------------------------------------
-record(mqtt_subscription,
{ subid :: binary() | atom(),
topic :: binary(),
qos :: 0 | 1 | 2
-record(subscription,
{ subid :: binary() | atom(),
topic :: binary(),
subopts :: list()
}).
-type(mqtt_subscription() :: #mqtt_subscription{}).
-type(subscription() :: #subscription{}).
%%--------------------------------------------------------------------
%% MQTT Client
@ -149,57 +144,6 @@
-type(mqtt_session() :: #mqtt_session{}).
%%--------------------------------------------------------------------
%% MQTT Message
%%--------------------------------------------------------------------
-type(mqtt_msg_id() :: binary() | undefined).
-type(mqtt_pktid() :: 1..16#ffff | undefined).
-type(mqtt_msg_from() :: atom() | {binary(), undefined | binary()}).
-record(mqtt_message,
{ %% Global unique message ID
id :: mqtt_msg_id(),
%% PacketId
pktid :: mqtt_pktid(),
%% ClientId and Username
from :: mqtt_msg_from(),
%% Topic that the message is published to
topic :: binary(),
%% Message QoS
qos = 0 :: 0 | 1 | 2,
%% Message Flags
flags = [] :: [retain | dup | sys],
%% Retain flag
retain = false :: boolean(),
%% Dup flag
dup = false :: boolean(),
%% $SYS flag
sys = false :: boolean(),
%% Headers
headers = [] :: list(),
%% Payload
payload :: binary(),
%% Timestamp
timestamp :: erlang:timestamp()
}).
-type(mqtt_message() :: #mqtt_message{}).
%%--------------------------------------------------------------------
%% MQTT Delivery
%%--------------------------------------------------------------------
-record(mqtt_delivery,
{ sender :: pid(), %% Pid of the sender/publisher
message :: mqtt_message(), %% Message
flows :: list()
}).
-type(mqtt_delivery() :: #mqtt_delivery{}).
%%--------------------------------------------------------------------
%% Route
%%--------------------------------------------------------------------
@ -254,10 +198,10 @@
-type(plugin() :: #plugin{}).
%%--------------------------------------------------------------------
%% MQTT CLI Command. For example: 'broker metrics'
%% Command
%%--------------------------------------------------------------------
-record(mqtt_cli, { name, action, args = [], opts = [], usage, descr }).
-record(command, { name, action, args = [], opts = [], usage, descr }).
-type(mqtt_cli() :: #mqtt_cli{}).
-type(command() :: #command{}).

View File

@ -1,54 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Internal Header File
-define(GPROC_POOL(JoinOrLeave, Pool, Id),
(begin
case JoinOrLeave of
join -> gproc_pool:connect_worker(Pool, {Pool, Id});
leave -> gproc_pool:disconnect_worker(Pool, {Pool, Id})
end
end)).
-define(PROC_NAME(M, I), (list_to_atom(lists:concat([M, "_", I])))).
-define(UNEXPECTED_REQ(Req, State),
(begin
lager:error("[~s] Unexpected Request: ~p", [?MODULE, Req]),
{reply, {error, unexpected_request}, State}
end)).
-define(UNEXPECTED_MSG(Msg, State),
(begin
lager:error("[~s] Unexpected Message: ~p", [?MODULE, Msg]),
{noreply, State}
end)).
-define(UNEXPECTED_INFO(Info, State),
(begin
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]),
{noreply, State}
end)).
-define(IF(Cond, TrueFun, FalseFun),
(case (Cond) of
true -> (TrueFun);
false-> (FalseFun)
end)).
-define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]).

View File

@ -311,10 +311,10 @@
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
variable = #mqtt_packet_puback{packet_id = PacketId}}).
-define(SUBSCRIBE_PACKET(PacketId, TopicTable),
-define(SUBSCRIBE_PACKET(PacketId, TopicFilters),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1},
variable = #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = TopicTable}}).
variable = #mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}}).
-define(SUBACK_PACKET(PacketId, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
@ -337,3 +337,52 @@
-define(PACKET(Type),
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
%%--------------------------------------------------------------------
%% MQTT Message
%%--------------------------------------------------------------------
-type(mqtt_msg_id() :: binary() | undefined).
-type(mqtt_msg_from() :: atom() | {binary(), undefined | binary()}).
-record(mqtt_message,
{ %% Global unique message ID
id :: mqtt_msg_id(),
%% PacketId
packet_id :: mqtt_packet_id(),
%% ClientId and Username
from :: mqtt_msg_from(),
%% Topic that the message is published to
topic :: binary(),
%% Message QoS
qos = 0 :: mqtt_qos(),
%% Message Flags
flags = [] :: [retain | dup | sys],
%% Retain flag
retain = false :: boolean(),
%% Dup flag
dup = false :: boolean(),
%% $SYS flag
sys = false :: boolean(),
%% Headers
headers = [] :: list(),
%% Payload
payload :: binary(),
%% Timestamp
timestamp :: erlang:timestamp()
}).
-type(mqtt_message() :: #mqtt_message{}).
%%--------------------------------------------------------------------
%% MQTT Delivery
%%--------------------------------------------------------------------
-record(mqtt_delivery,
{ sender :: pid(),
message :: mqtt_message(),
flows :: list()
}).
-type(mqtt_delivery() :: #mqtt_delivery{}).

View File

@ -14,12 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc EMQ X Main Module.
-module(emqx).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").

View File

@ -18,8 +18,6 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
%% API Function Exports

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_access_rule).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-type(who() :: all | binary() |

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,12 +18,8 @@
-behaviour(emqx_acl_mod).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_cli.hrl").
-export([all_rules/0]).
%% ACL callbacks
@ -116,7 +112,7 @@ reload_acl(#state{config = undefined}) ->
reload_acl(State) ->
case catch load_rules_from_file(State) of
{'EXIT', Error} -> {error, Error};
true -> ?PRINT("~s~n", ["reload acl_internal successfully"]), ok
true -> io:format("~s~n", ["reload acl_internal successfully"]), ok
end.
%% @doc ACL Module Description

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_acl_mod).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
%%--------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_alarm).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_event).
-include("emqx.hrl").
@ -88,17 +86,25 @@ handle_event({set_alarm, Alarm = #alarm{id = AlarmId,
title = Title,
summary = Summary}}, Alarms)->
TS = os:timestamp(),
Json = mochijson2:encode([{id, AlarmId},
{severity, Severity},
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(TS)}]),
emqx:publish(alarm_msg(alert, AlarmId, Json)),
case catch emqx_json:encode([{id, AlarmId},
{severity, Severity},
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(TS)}]) of
{'EXIT', Reason} ->
lager:error("Failed to encode set_alarm: ~p", [Reason]);
JSON ->
emqx_broker:publish(alarm_msg(alert, AlarmId, JSON))
end,
{ok, [Alarm#alarm{timestamp = TS} | Alarms]};
handle_event({clear_alarm, AlarmId}, Alarms) ->
Json = mochijson2:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]),
emqx:publish(alarm_msg(clear, AlarmId, Json)),
case catch emqx_json:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of
{'EXIT', Reason} ->
lager:error("Failed to encode clear_alarm: ~p", [Reason]);
JSON ->
emqx_broker:publish(alarm_msg(clear, AlarmId, JSON))
end,
{ok, lists:keydelete(AlarmId, 2, Alarms), hibernate};
handle_event(_, Alarms)->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,10 +18,6 @@
-behaviour(application).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx_cli.hrl").
-include("emqx_mqtt.hrl").
%% Application callbacks
@ -54,11 +50,11 @@ stop(_State) ->
%%--------------------------------------------------------------------
print_banner() ->
?PRINT("Starting ~s on node ~s~n", [?APP, node()]).
io:format("Starting ~s on node ~s~n", [?APP, node()]).
print_vsn() ->
{ok, Vsn} = application:get_key(vsn),
?PRINT("~s ~s is running now!~n", [?APP, Vsn]).
io:format("~s ~s is running now!~n", [?APP, Vsn]).
%%--------------------------------------------------------------------
%% Register default ACL File

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_auth_mod).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-export([passwd_hash/2]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -22,8 +22,6 @@
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
%% API Function Exports
-export([start_link/5]).
@ -104,10 +102,12 @@ qname(Node, Topic) ->
iolist_to_binary(["Bridge:", Node, ":", Topic]).
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]),
{noreply, State}.
handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) ->
{noreply, State#state{mqueue = emqx_mqueue:in(Msg, MQ)}};
@ -148,7 +148,8 @@ handle_info({'EXIT', _Pid, normal}, State) ->
{noreply, State};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_bridge_sup).
-author("Feng Lee <feng@emqtt.io>").
-export([start_link/3]).
%%--------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,8 +18,6 @@
-behavior(supervisor).
-author("Feng Lee <feng@emqtt.io>").
-export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]).
-export([init/1]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,8 +18,6 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_internal.hrl").

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -203,7 +203,7 @@ handle_info({suback, PacketId, GrantedQos}, State) ->
%% Fastlane
handle_info({dispatch, _Topic, Message}, State) ->
handle_info({deliver, Message#message{qos = ?QOS_0}}, State);
handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State);
handle_info({deliver, Message}, State) ->
with_proto(
@ -259,16 +259,16 @@ handle_info({keepalive, start, Interval}, State = #state{connection = Conn}) ->
end,
case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
{ok, KeepAlive} ->
{noreply, State#client_state{keepalive = KeepAlive}};
{noreply, State#state{keepalive = KeepAlive}};
{error, Error} ->
?LOG(warning, "Keepalive error - ~p", [Error], State),
shutdown(Error, State)
end;
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
case emqx_keepalive:check(KeepAlive) of
{ok, KeepAlive1} ->
{noreply, State#client_state{keepalive = KeepAlive1}};
{noreply, State#state{keepalive = KeepAlive1}};
{error, timeout} ->
?LOG(debug, "Keepalive timeout", [], State),
shutdown(keepalive_timeout, State);

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_gen_mod).
-author("Feng Lee <feng@emqtt.io>").
-ifdef(use_specs).
-callback(load(Opts :: any()) -> ok | {error, term()}).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,8 +18,6 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
%% Start
-export([start_link/0]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,12 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Inflight Window that wraps the gb_trees.
-module(emqx_inflight).
-author("Feng Lee <feng@emqtt.io>").
-export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1,
to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]).

36
src/emqx_json.erl Normal file
View File

@ -0,0 +1,36 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_json).
-export([encode/1, encode/2, decode/1, decode/2]).
-spec(encode(jsx:json_term()) -> jsx:json_text()).
encode(Term) ->
jsx:encode(Term).
-spec(encode(jsx:json_term(), jsx_to_json:config()) -> jsx:json_text()).
encode(Term, Opts) ->
jsx:encode(Term, Opts).
-spec(decode(jsx:json_text()) -> jsx:json_term()).
decode(JSON) ->
jsx:decode(JSON).
-spec(decode(jsx:json_text(), jsx_to_json:config()) -> jsx:json_term()).
decode(JSON, Opts) ->
jsx:decode(JSON, Opts).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_lager_backend).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_event).
-include_lib("lager/include/lager.hrl").

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,12 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Message Functions
-module(emqx_message).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
@ -57,7 +53,7 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
packet_id = PacketId},
payload = Payload}) ->
#mqtt_message{id = msgid(),
pktid = PacketId,
packet_id = PacketId,
qos = Qos,
retain = Retain,
dup = Dup,
@ -95,7 +91,7 @@ msgid() -> emqx_guid:gen().
%% @doc Message to Packet
-spec(to_packet(mqtt_message()) -> mqtt_packet()).
to_packet(#mqtt_message{pktid = PkgId,
to_packet(#mqtt_message{packet_id = PkgId,
qos = Qos,
retain = Retain,
dup = Dup,
@ -141,13 +137,13 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%% @doc Format MQTT Message
format(#mqtt_message{id = MsgId, pktid = PktId, from = {ClientId, Username},
format(#mqtt_message{id = MsgId, packet_id = PktId, from = {ClientId, Username},
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]);
%% TODO:...
format(#mqtt_message{id = MsgId, pktid = PktId, from = From,
format(#mqtt_message{id = MsgId, packet_id = PktId, from = From,
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,6 +18,8 @@
-include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl").
-export([load/1, unload/1]).
-export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,195 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt5_rscode).
-author("Feng Lee <feng@emqtt.io>").
-export([name/1, value/1]).
%%--------------------------------------------------------------------
%% Reason code to name
%%--------------------------------------------------------------------
0
name(0x00
Success
CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH
0
name(0x00
Normal disconnection
DISCONNECT
0
name(0x00
Granted QoS 0
SUBACK
1
name(0x01
Granted QoS 1
SUBACK
2
name(0x02
Granted QoS 2
SUBACK
4
name(0x04
Disconnect with Will Message
DISCONNECT
16
name(0x10
No matching subscribers
PUBACK, PUBREC
17
name(0x11
No subscription existed
UNSUBACK
24
name(0x18
Continue authentication
AUTH
25
name(0x19
Re-authenticate
AUTH
128
name(0x80
Unspecified error
CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
129
name(0x81
Malformed Packet
CONNACK, DISCONNECT
130
name(0x82
Protocol Error
CONNACK, DISCONNECT
131
name(0x83
Implementation specific error
CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
132
name(0x84
Unsupported Protocol Version
CONNACK
133
name(0x85
Client Identifier not valid
CONNACK
134
name(0x86
Bad User Name or Password
CONNACK
135
name(0x87
Not authorized
CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
136
name(0x88
Server unavailable
CONNACK
137
name(0x89
Server busy
CONNACK, DISCONNECT
138
name(0x8A
Banned
CONNACK
139
name(0x8B
Server shutting down
DISCONNECT
140
name(0x8C
Bad authentication method
CONNACK, DISCONNECT
141
name(0x8D
Keep Alive timeout
DISCONNECT
142
name(0x8E
Session taken over
DISCONNECT
143
name(0x8F
Topic Filter invalid
SUBACK, UNSUBACK, DISCONNECT
144
name(0x90
Topic Name invalid
CONNACK, PUBACK, PUBREC, DISCONNECT
145
name(0x91
Packet Identifier in use
PUBACK, PUBREC, SUBACK, UNSUBACK
146
name(0x92
Packet Identifier not found
PUBREL, PUBCOMP
147
name(0x93
Receive Maximum exceeded
DISCONNECT
148
name(0x94
Topic Alias invalid
DISCONNECT
149
name(0x95
Packet too large
CONNACK, DISCONNECT
150
name(0x96
Message rate too high
DISCONNECT
151
name(0x97
Quota exceeded
CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT
%% 152
name(0x98
Administrative action
DISCONNECT
%% 153
name(0x99
Payload format invalid
CONNACK, PUBACK, PUBREC, DISCONNECT
%% 154
name(0x9A
Retain not supported
CONNACK, DISCONNECT
%% 155
name(0x9B
QoS not supported
CONNACK, DISCONNECT
%% 156
name(0x9C
Use another server
CONNACK, DISCONNECT
%% 157: CONNACK, DISCONNECT
name(0x9D) -> 'Server-Moved';
%% 158: SUBACK, DISCONNECT
name(0x9E) -> 'Shared-Subscriptions-Not-Supported';
%% 159: CONNACK, DISCONNECT
name(0x9F) -> 'Connection-Rate-Exceeded';
%% 160: DISCONNECT
name(0xA0) -> 'Maximum-Connect-Time';
%% 161: SUBACK, DISCONNECT
name(0xA1) -> 'Subscription-Identifiers-Not-Supported';
%% 162: SUBACK, DISCONNECT
name(0xA2) -> 'Wildcard-Subscriptions-Not-Supported';

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,9 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt5_props).
-author("Feng Lee <feng@emqtt.io>").
-module(emqx_mqtt_props).
-export([name/1, id/1]).

115
src/emqx_mqtt_rscode.erl Normal file
View File

@ -0,0 +1,115 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt_rscode).
-export([value/1]).
%%--------------------------------------------------------------------
%% Reason code to name
%%--------------------------------------------------------------------
%% 00: Success; CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH
value('Success') -> 16#00;
%% 00: Normal disconnection; DISCONNECT
value('Normal-Disconnection') -> 16#00;
%% 00: Granted QoS 0; SUBACK
value('Granted-QoS0') -> 16#00;
%% 01: Granted QoS 1; SUBACK
value('Granted-QoS1') -> 16#01;
%% 02: Granted QoS 2; SUBACK
value('Granted-QoS2') -> 16#02;
%% 04: Disconnect with Will Message; DISCONNECT
value('Disconnect-With-Will-Message') -> 16#04;
%% 16: No matching subscribers; PUBACK, PUBREC
value('No-Matching-Subscribers') -> 16#10;
%% 17: No subscription existed; UNSUBACK
value('No-Subscription-Existed') -> 16#11;
%% 24: Continue authentication; AUTH
value('Continue-Authentication') -> 16#18;
%% 25: Re-Authenticate; AUTH
value('Re-Authenticate') -> 16#19;
%% 128: Unspecified error; CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
value('Unspecified-Error') -> 16#80;
%% 129: Malformed Packet; CONNACK, DISCONNECT
value('Malformed-Packet') -> 16#81;
%% 130: Protocol Error; CONNACK, DISCONNECT
value('Protocol-Error') -> 16#82;
%% 131: Implementation specific error; CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
value('Implementation-Specific-Error') -> 16#83;
%% 132: Unsupported Protocol Version; CONNACK
value('Unsupported-Protocol-Version') -> 16#84;
%% 133: Client Identifier not valid; CONNACK
value('Client-Identifier-not-Valid') -> 16#85;
%% 134: Bad User Name or Password; CONNACK
value('Bad-Username-or-Password') -> 16#86;
%% 135: Not authorized; CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
value('Not-Authorized') -> 16#87;
%% 136: Server unavailable; CONNACK
value('Server-Unavailable') -> 16#88;
%% 137: Server busy; CONNACK, DISCONNECT
value('Server-Busy') -> 16#89;
%% 138: Banned; CONNACK
value('Banned') -> 16#8A;
%% 139: Server shutting down; DISCONNECT
value('Server-Shutting-Down') -> 16#8B;
%% 140: Bad authentication method; CONNACK, DISCONNECT
value('Bad-Authentication-Method') -> 16#8C;
%% 141: Keep Alive timeout; DISCONNECT
value('Keep-Alive-Timeout') -> 16#8D;
%% 142: Session taken over; DISCONNECT
value('Session-Taken-Over') -> 16#8E;
%% 143: Topic Filter invalid; SUBACK, UNSUBACK, DISCONNECT
value('Topic-Filter-Invalid') -> 16#8F;
%% 144: Topic Name invalid; CONNACK, PUBACK, PUBREC, DISCONNECT
value('Topic-Name-Invalid') -> 16#90;
%% 145: Packet Identifier in use; PUBACK, PUBREC, SUBACK, UNSUBACK
value('Packet-Identifier-Inuse') -> 16#91;
%% 146: Packet Identifier not found; PUBREL, PUBCOMP
value('Packet-Identifier-Not-Found') -> 16#92;
%% 147: Receive Maximum exceeded; DISCONNECT
value('Receive-Maximum-Exceeded') -> 16#93;
%% 148: Topic Alias invalid; DISCONNECT
value('Topic-Alias-Invalid') -> 16#94;
%% 149: Packet too large; CONNACK, DISCONNECT
value('Packet-Too-Large') -> 16#95;
%% 150: Message rate too high; DISCONNECT
value('Message-Rate-Too-High') -> 16#96;
%% 151: Quota exceeded; CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT
value('Quota-Exceeded') -> 16#97;
%% 152: Administrative action; DISCONNECT
value('Administrative-Action') -> 16#98;
%% 153: Payload format invalid; CONNACK, PUBACK, PUBREC, DISCONNECT
value('Payload-Format-Invalid') -> 16#99;
%% 154: Retain not supported; CONNACK, DISCONNECT
value('Retain-Not-Supported') -> 16#9A;
%% 155: QoS not supported; CONNACK, DISCONNECT
value('QoS-Not-Supported') -> 16#9B;
%% 156: Use another server; CONNACK, DISCONNECT
value('Use-Another-Server') -> 16#9C;
%% 157: Server moved; CONNACK, DISCONNECT
value('Server-Moved') -> 16#9D;
%% 158: Shared Subscriptions not supported; SUBACK, DISCONNECT
value('Shared-Subscriptions-Not-Supported') -> 16#9E;
%% 159: Connection rate exceeded; CONNACK, DISCONNECT
value('Connection-Rate-Exceeded') -> 16#9F;
%% 160: Maximum connect time; DISCONNECT
value('Maximum-Connect-Time') -> 16#A0;
%% 161: Subscription Identifiers not supported; SUBACK, DISCONNECT
value('Subscription-Identifiers-Not-Supported') -> 16#A1;
%% 162: Wildcard-Subscriptions-Not-Supported; SUBACK, DISCONNECT
value('Wildcard-Subscriptions-Not-Supported') -> 16#A2.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_packet).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
@ -92,8 +90,8 @@ format_variable(#mqtt_packet_connect{
io_lib:format(Format1, Args1);
format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode}) ->
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]);
reason_code = ReasonCode}) ->
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReasonCode]);
format_variable(#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId}) ->
@ -102,17 +100,17 @@ format_variable(#mqtt_packet_publish{topic_name = TopicName,
format_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
io_lib:format("PacketId=~p", [PacketId]);
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_table = TopicTable}) ->
io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, TopicTable]);
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}) ->
io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]);
format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
topics = Topics}) ->
io_lib:format("PacketId=~p, Topics=~p", [PacketId, Topics]);
format_variable(#mqtt_packet_suback{packet_id = PacketId,
qos_table = QosTable}) ->
io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]);
reason_codes = ReasonCodes}) ->
io_lib:format("PacketId=~p, ReasonCodes=~p", [PacketId, ReasonCodes]);
format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) ->
io_lib:format("PacketId=~p", [PacketId]);

View File

@ -16,8 +16,6 @@
-module(emqx_parser).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
@ -27,53 +25,55 @@
-type(max_packet_size() :: 1..?MAX_PACKET_SIZE).
-spec(initial_state() -> {none, max_packet_size()}).
-type(state() :: #{maxlen := max_packet_size(), vsn := mqtt_vsn()}).
-spec(initial_state() -> {none, state()}).
initial_state() ->
initial_state(?MAX_PACKET_SIZE).
%% @doc Initialize a parser
-spec(initial_state(max_packet_size()) -> {none, max_packet_size()}).
-spec(initial_state(max_packet_size()) -> {none, state()}).
initial_state(MaxSize) ->
{none, MaxSize}.
{none, #{maxlen => MaxSize, vsn => ?MQTT_PROTO_V4}}.
%% @doc Parse MQTT Packet
-spec(parse(binary(), {none, pos_integer()} | fun())
-spec(parse(binary(), {none, state()} | fun())
-> {ok, mqtt_packet()} | {error, term()} | {more, fun()}).
parse(<<>>, {none, MaxLen}) ->
{more, fun(Bin) -> parse(Bin, {none, MaxLen}) end};
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
parse(<<>>, {none, State}) ->
{more, fun(Bin) -> parse(Bin, {none, State}) end};
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, State}) ->
parse_remaining_len(Rest, #mqtt_packet_header{type = Type,
dup = bool(Dup),
qos = fixqos(Type, QoS),
retain = bool(Retain)}, Limit);
retain = bool(Retain)}, State);
parse(Bin, Cont) -> Cont(Bin).
parse_remaining_len(<<>>, Header, Limit) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Limit) end};
parse_remaining_len(Rest, Header, Limit) ->
parse_remaining_len(Rest, Header, 1, 0, Limit).
parse_remaining_len(<<>>, Header, State) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, State) end};
parse_remaining_len(Rest, Header, State) ->
parse_remaining_len(Rest, Header, 1, 0, State).
parse_remaining_len(_Bin, _Header, _Multiplier, Length, MaxLen)
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{maxlen := MaxLen})
when Length > MaxLen ->
{error, invalid_mqtt_frame_len};
parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Limit) end};
%% optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, _Limit) ->
parse_frame(Rest, Header, 2);
parse_remaining_len(<<>>, Header, Multiplier, Length, State) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, State) end};
%% Optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, State) ->
parse_frame(Rest, Header, 2, State);
%% optimize: match PINGREQ...
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, _Limit) ->
parse_frame(Rest, Header, 0);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, MaxLen) ->
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, State) ->
parse_frame(Rest, Header, 0, State);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, State) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, State);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, State = #{maxlen := MaxLen}) ->
FrameLen = Value + Len * Multiplier,
if
FrameLen > MaxLen -> {error, invalid_mqtt_frame_len};
true -> parse_frame(Rest, Header, FrameLen)
true -> parse_frame(Rest, Header, FrameLen, State)
end.
parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) ->
parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length, State = #{vsn := Vsn}) ->
case {Type, Bin} of
{?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
{ProtoName, Rest1} = parse_utf(FrameBin),
@ -95,7 +95,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -
{WillMsg, Rest8} = parse_msg(Rest7, WillFlag),
{UserName, Rest9} = parse_utf(Rest8, UsernameFlag),
{PasssWord, <<>>} = parse_utf(Rest9, PasswordFlag),
case protocol_name_approved(ProtoVersion, ProtoName) of
case protocol_name_approved(ProtoVer, ProtoName) of
true ->
wrap(Header,
#mqtt_packet_connect{
@ -128,7 +128,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -
_ -> <<Id:16/big, R/binary>> = Rest1,
{Id, R}
end,
{Properties, Payload} = parse_properties(ProtoVer, Rest),
{Properties, Payload} = parse_properties(Vsn, Rest2),
wrap(fixdup(Header), #mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId,
properties = Properties},
@ -136,10 +136,10 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -
{PubAck, <<FrameBin:Length/binary, Rest/binary>>}
when PubAck == ?PUBACK; PubAck == ?PUBREC; PubAck == ?PUBREL; PubAck == ?PUBCOMP ->
<<PacketId:16/big, Rest1/binary>> = FrameBin,
case ProtoVer == ?MQTT_PROTO_V5 of
case Vsn == ?MQTT_PROTO_V5 of
true ->
<<ReasonCode, Rest2/binary>> = Rest1,
{Properties, Rest3} = parse_properties(ProtoVer, Rest2),
{Properties, Rest3} = parse_properties(Vsn, Rest2),
wrap(Header, #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode,
properties = Properties}, Rest3);
@ -149,11 +149,11 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -
{?SUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
%% 1 = Qos,
<<PacketId:16/big, Rest1/binary>> = FrameBin,
{Properties, Rest2} = parse_properties(ProtoVer, Rest1),
TopicTable = parse_topics(?SUBSCRIBE, Rest1, []),
wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
properties = Properties,
topic_table = TopicTable}, Rest);
{Properties, Rest2} = parse_properties(Vsn, Rest1),
TopicFilters = parse_topics(?SUBSCRIBE, Rest2, []),
wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
properties = Properties,
topic_filters = TopicFilters}, Rest);
%{?SUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
% <<PacketId:16/big, Rest1/binary>> = FrameBin,
% {Properties, Rest2/binary>> = parse_properties(ProtoVer, Rest1),
@ -162,7 +162,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -
{?UNSUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
%% 1 = Qos,
<<PacketId:16/big, Rest1/binary>> = FrameBin,
{Properties, Rest2} = parse_properties(ProtoVer, Rest1),
{Properties, Rest2} = parse_properties(Vsn, Rest1),
Topics = parse_topics(?UNSUBSCRIBE, Rest2, []),
wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
properties = Properties,
@ -180,20 +180,19 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -
% Length = 0,
% wrap(Header, Rest);
{?DISCONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
case ProtoVer == ?MQTT_PROTO_V5 of
if
Vsn == ?MQTT_PROTO_V5 ->
<<ReasonCode, Rest1/binary>> = FrameBin,
{Properties, Rest2} = parse_properties(Vsn, Rest1),
wrap(Header, #mqtt_packet_disconnect{reason_code = ReasonCode,
properties = Properties}, Rest2);
true ->
<<ReasonCode, Rest1/binary>> = Rest,
{Properties, Rest2} = parse_properties(ProtoVer, Rest1),
wrap(Header, #mqtt_packet_disconnect{reason_code = Reason,
properties = Properties}, Rest2);
false ->
Lenght = 0, wrap(Header, Rest)
Length = 0, wrap(Header, Rest)
end;
{_, TooShortBin} ->
{more, fun(BinMore) ->
parse_frame(<<TooShortBin/binary, BinMore/binary>>,
Header, Length)
end}
parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, State)
end}
end.
wrap(Header, Variable, Payload, Rest) ->
@ -205,12 +204,12 @@ wrap(Header, Rest) ->
parse_will_props(Bin, ProtoVer = ?MQTT_PROTO_V5, 1) ->
parse_properties(ProtoVer, Bin);
parse_will_props(Bin, _ProtoVer, _WillFlag),
parse_will_props(Bin, _ProtoVer, _WillFlag) ->
{#{}, Bin}.
parse_properties(?MQTT_PROTO_V5, Bin) ->
{Len, Rest} = parse_variable_byte_integer(Bin),
<<PropsBin:Len/binary, Rest1} = Rest,
<<PropsBin:Len/binary, Rest1/binary>> = Rest,
{parse_property(PropsBin, #{}), Rest1};
parse_properties(_MQTT_PROTO_V3, Bin) ->
{#{}, Bin}. %% No properties.
@ -228,11 +227,11 @@ parse_property(<<16#03, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Content-Type' => Val});
%% 08: 'Response-Topic', UTF-8 Encoded String;
parse_property(<<16#08, Bin/binary>>) ->
parse_property(<<16#08, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Response-Topic' => Val});
%% 09: 'Correlation-Data', Binary Data;
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>) ->
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Correlation-Data' => Val});
%% 11: 'Subscription-Identifier', Variable Byte Integer;
parse_property(<<16#0B, Bin/binary>>, Props) ->
@ -242,18 +241,18 @@ parse_property(<<16#0B, Bin/binary>>, Props) ->
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val});
%% 18: 'Assigned-Client-Identifier', UTF-8 Encoded String;
parse_property(<<16#12, Bin/binary>>) ->
parse_property(<<16#12, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val});
%% 19: 'Server-Keep-Alive', Two Byte Integer;
parse_property(<<16#13, Val:16, Bin/binary>>) ->
parse_property(<<16#13, Val:16, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Server-Keep-Alive' => Val});
%% 21: 'Authentication-Method', UTF-8 Encoded String;
parse_property(<<16#15, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Authentication-Method' => Val})
parse_property(Rest, Props#{'Authentication-Method' => Val});
%% 22: 'Authentication-Data', Binary Data;
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>) ->
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Authentication-Data' => Val});
%% 23: 'Request-Problem-Information', Byte;
parse_property(<<16#17, Val, Bin/binary>>, Props) ->
@ -273,7 +272,7 @@ parse_property(<<16#1C, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Server-Reference' => Val});
%% 31: 'Reason-String', UTF-8 Encoded String;
parse_property(<<16#1F, Bin/binary, Props) ->
parse_property(<<16#1F, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Reason-String' => Val});
%% 33: 'Receive-Maximum', Two Byte Integer;
@ -300,7 +299,7 @@ parse_property(<<16#26, Bin/binary>>, Props) ->
end);
%% 39: 'Maximum-Packet-Size', Four Byte Integer;
parse_property(<<16#27, Val:32, Bin/binary>>, Props) ->
parse_property(Rest, Props#{'Maximum-Packet-Size' => Val});
parse_property(Bin, Props#{'Maximum-Packet-Size' => Val});
%% 40: 'Wildcard-Subscription-Available', Byte;
parse_property(<<16#28, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val});
@ -321,8 +320,8 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
parse_topics(_Packet, <<>>, Topics) ->
lists:reverse(Topics);
parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
{Name, <<<<_Reserved:2, RetainHandling:2, KeepRetain:1, NoLocal:1, QoS:2>>, Rest/binary>>} = parse_utf(Bin),
SubOpts = [{qos, Qos}, {retain_handling, RetainHandling}, {keep_retain, KeepRetain}, {no_local, NoLocal}],
{Name, <<_Reserved:2, RetainHandling:2, KeepRetain:1, NoLocal:1, QoS:2, Rest/binary>>} = parse_utf(Bin),
SubOpts = [{qos, QoS}, {retain_handling, RetainHandling}, {keep_retain, KeepRetain}, {no_local, NoLocal}],
parse_topics(Sub, Rest, [{Name, SubOpts}| Topics]);
parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
{Name, <<Rest/binary>>} = parse_utf(Bin),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_plugins).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-export([init/0]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,12 +16,12 @@
-module(emqx_protocol).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_misc.hrl").
-import(proplists, [get_value/2, get_value/3]).
%% API
@ -241,8 +241,8 @@ process(?CONNECT_PACKET(Var), State0) ->
end,
%% Run hooks
emqx_hooks:run('client.connected', [ReturnCode1], client(State3)),
%% Send connack
send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
%%TODO: Send Connack
%% send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
%% stop if authentication failure
stop_if_auth_failure(ReturnCode1, State3);
@ -567,10 +567,10 @@ sp(false) -> 0.
%% The retained flag should be propagated for bridge.
%%--------------------------------------------------------------------
clean_retain(false, Msg = #message{retain = true, headers = Headers}) ->
clean_retain(false, Msg = #mqtt_message{retain = true, headers = Headers}) ->
case lists:member(retained, Headers) of
true -> Msg;
false -> Msg#message{retain = false}
false -> Msg#mqtt_message{retain = false}
end;
clean_retain(_IsBridge, Msg) ->
Msg.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -20,6 +20,8 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([start_link/3]).
%% PubSub API.
@ -173,7 +175,8 @@ handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) ->
reply(ok, setstats(State));
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]),
{reply, ignore, State}.
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
add_subscriber(Topic, Subscriber, Options),
@ -184,10 +187,12 @@ handle_cast({unsubscribe, Topic, Subscriber, Options}, State) ->
noreply(setstats(State));
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]),
{noreply, State}.
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,11 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc PubSub Supervisor.
-module(emqx_pubsub_sup).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(supervisor).
%% API

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_router).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
-include("emqx.hrl").

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -13,18 +13,12 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%%
%% @doc EMQ X Distributed RPC.
%%
%%--------------------------------------------------------------------
-module(emqx_rpc).
-author("Feng Lee <feng@emqtt.io>").
-export([cast/4]).
%% @doc Wraps gen_rpc first.
cast(Node, Mod, Fun, Args) ->
emqx_metrics:inc('messages/forward'), rpc:cast(Node, Mod, Fun, Args).
emqx_metrics:inc('messages/forward'),
rpc:cast(Node, Mod, Fun, Args).

View File

@ -16,8 +16,6 @@
-module(emqx_serializer).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
@ -83,11 +81,11 @@ serialize_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags,
reason_code = ReasonCode,
properties = Properties}, undefined) ->
PropsBin = serialize_properties(Properties),
{<<AckFlags:8, ReturnCode:8, PropsBin/binary>>, <<>>};
{<<AckFlags:8, ReasonCode:8, PropsBin/binary>>, <<>>};
serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = Topics }, undefined) ->
{<<PacketId:16/big>>, serialize_topics(Topics)};
topic_filters = TopicFilters}, undefined) ->
{<<PacketId:16/big>>, serialize_topics(TopicFilters)};
serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId,
properties = Properties,
@ -124,7 +122,7 @@ serialize_variable(?PINGRESP, undefined, undefined) ->
serialize_variable(?DISCONNECT, #mqtt_packet_disconnect{reason_code = ReasonCode,
properties = Properties}, undefined) ->
{<<ReasonCode, (serialize_properties(Properties))/binary>>, <<>>}.
{<<ReasonCode, (serialize_properties(Properties))/binary>>, <<>>};
serialize_variable(?AUTH, #mqtt_packet_auth{reason_code = ReasonCode,
properties = Properties}, undefined) ->
@ -138,7 +136,7 @@ serialize_payload(Bin) when is_binary(Bin) ->
serialize_properties(undefined) ->
<<>>;
serialize_properties(Props) ->
<< serialize_property(Prop, Val) || {Prop, Val} <= (maps:to_list(Props)) >>.
<< <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>.
%% 01: Byte;
serialize_property('Payload-Format-Indicator', Val) ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -47,12 +47,12 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_misc.hrl").
-import(emqx_misc, [start_timer/2]).
-import(proplists, [get_value/2, get_value/3]).
@ -192,15 +192,15 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
%% @doc Publish Message
-spec(publish(pid(), message()) -> ok | {error, term()}).
publish(_Session, Msg = #message{qos = ?QOS_0}) ->
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
%% Publish QoS0 Directly
emqx_server:publish(Msg), ok;
publish(_Session, Msg = #message{qos = ?QOS_1}) ->
publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly for client will PubAck automatically
emqx_server:publish(Msg), ok;
publish(Session, Msg = #message{qos = ?QOS_2}) ->
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session
gen_server:call(Session, {publish, Msg}, ?TIMEOUT).
@ -320,7 +320,7 @@ binding(ClientPid) ->
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From,
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, packet_id = PacketId}}, _From,
State = #state{awaiting_rel = AwaitingRel,
await_rel_timer = Timer,
await_rel_timeout = Timeout}) ->
@ -347,7 +347,8 @@ handle_call(state, _From, State) ->
reply(?record_to_proplist(state, State, ?STATE_KEYS), State);
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]),
{reply, ignore, State}.
handle_cast({subscribe, From, TopicTable, AckFun},
State = #state{client_id = ClientId,
@ -512,10 +513,11 @@ handle_cast({destroy, ClientId},
shutdown(conflict, State);
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]),
{noreply, State}.
%% Ignore Messages delivered by self
handle_info({dispatch, _Topic, #message{from = {ClientId, _}}},
handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}},
State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
{noreply, State};
@ -560,8 +562,9 @@ handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
[ClientPid, Pid, Reason], State),
{noreply, State, hibernate};
handle_info(Info, Session) ->
?UNEXPECTED_INFO(Info, Session).
handle_info(Info, State) ->
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]),
{noreply, State}.
terminate(Reason, #state{client_id = ClientId, username = Username}) ->
%% Move to emqx_sm to avoid race condition
@ -608,7 +611,7 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
if
Force orelse (Diff >= Interval) ->
case {Type, Msg} of
{publish, Msg = #mqtt_message{pktid = PacketId}} ->
{publish, Msg = #mqtt_message{packet_id = PacketId}} ->
redeliver(Msg, State),
Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
@ -635,7 +638,7 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
expire_awaiting_rel([], _Now, State) ->
State#state{await_rel_timer = undefined};
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs],
expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
Now, State = #state{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, TS) div 1000) of
@ -691,7 +694,7 @@ dispatch(Msg = #mqtt_message{qos = QoS},
true ->
enqueue_msg(Msg, State);
false ->
Msg1 = Msg#mqtt_message{pktid = MsgId},
Msg1 = Msg#mqtt_message{packet_id = MsgId},
deliver(Msg1, State),
await(Msg1, next_msg_id(State))
end.
@ -719,12 +722,15 @@ deliver(Msg, #state{client_pid = Pid, binding = remote}) ->
%% Awaiting ACK for QoS1/QoS2 Messages
%%--------------------------------------------------------------------
await(Msg = #mqtt_message{pktid = PacketId},
await(Msg = #mqtt_message{packet_id = PacketId},
State = #state{inflight = Inflight,
retry_timer = RetryTimer,
retry_interval = Interval}) ->
%% Start retry timer if the Inflight is still empty
State1 = ?IF(RetryTimer == undefined, State#state{retry_timer = start_timer(Interval, retry_delivery)}, State),
State1 = case RetryTimer == undefined of
true -> State#state{retry_timer = start_timer(Interval, retry_delivery)};
false -> State
end,
State1#state{inflight = Inflight:insert(PacketId, {publish, Msg, os:timestamp()})}.
acked(puback, PacketId, State = #state{client_id = ClientId,

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_sm_helper).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
-include("emqx.hrl").

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,8 +18,6 @@
-behaviour(supervisor).
-author("Feng Lee <feng@emqtt.io>").
-export([start_link/0, start_child/1, start_child/2, stop_child/1]).
%% Supervisor callbacks

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_sysmon_sup).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(supervisor).
%% API

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,8 +18,6 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
%% API Function Exports
-export([start_link/0]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_trace_sup).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(supervisor).
%% API

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_ws).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx_mqtt.hrl").
-import(proplists, [get_value/3]).

View File

@ -16,8 +16,6 @@
-module(emqx_lib_SUITE).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_mqueue_SUITE).
-author("Feng Lee <feng@emqtt.io>").
-compile(export_all).
-include("emqx.hrl").

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
-module(emqx_topic_SUITE).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("eunit/include/eunit.hrl").
%% CT