Merge the emqx-mqtt5 library

This commit is contained in:
Feng Lee 2018-02-28 21:22:39 +08:00
parent f7f0f27e4d
commit d7eae533ac
29 changed files with 233 additions and 287 deletions

View File

@ -48,13 +48,6 @@
%% MQTT Topic
%%--------------------------------------------------------------------
-record(mqtt_topic,
{ topic :: binary(),
flags = [] :: [retained | static]
}).
-type(mqtt_topic() :: #mqtt_topic{}).
%%--------------------------------------------------------------------
%% MQTT Subscription
%%--------------------------------------------------------------------
@ -163,6 +156,29 @@
-type(route() :: #route{}).
%%--------------------------------------------------------------------
%% Trie
%%--------------------------------------------------------------------
-type(trie_node_id() :: binary() | atom()).
-record(trie_node,
{ node_id :: trie_node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: binary() | undefined,
flags :: list(atom())
}).
-record(trie_edge,
{ node_id :: trie_node_id(),
word :: binary() | atom()
}).
-record(trie,
{ edge :: #trie_edge{},
node_id :: trie_node_id()
}).
%%--------------------------------------------------------------------
%% Alarm
%%--------------------------------------------------------------------

View File

@ -1,24 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)).
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])).
-define(USAGE(CmdList), [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]).

View File

@ -26,13 +26,6 @@
-define(PROC_NAME(M, I), (list_to_atom(lists:concat([M, "_", I])))).
-define(record_to_proplist(Def, Rec),
lists:zip(record_info(fields, Def), tl(tuple_to_list(Rec)))).
-define(record_to_proplist(Def, Rec, Fields),
[{K, V} || {K, V} <- ?record_to_proplist(Def, Rec),
lists:member(K, Fields)]).
-define(UNEXPECTED_REQ(Req, State),
(begin
lager:error("[~s] Unexpected Request: ~p", [?MODULE, Req]),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -115,7 +115,7 @@
'DISCONNECT',
'AUTH']).
-type(mqtt_packet_type() :: ?RESERVED..?DISCONNECT).
-type(mqtt_packet_type() :: ?RESERVED..?AUTH).
%%--------------------------------------------------------------------
%% MQTT Connect Return Codes
@ -158,10 +158,23 @@
%% MQTT Packets
%%--------------------------------------------------------------------
-type(mqtt_topic() :: binary()).
-type(mqtt_client_id() :: binary()).
-type(mqtt_username() :: binary() | undefined).
-type(mqtt_packet_id() :: 1..16#ffff | undefined).
-type(mqtt_reason_code() :: 1..16#ff | undefined).
-type(mqtt_properties() :: undefined | map()).
-type(mqtt_subopt() :: list({qos, mqtt_qos()}
| {retain_handling, boolean()}
| {keep_retain, boolean()}
| {no_local, boolean()})).
-record(mqtt_packet_connect,
{ client_id = <<>> :: mqtt_client_id(),
proto_ver = ?MQTT_PROTO_V4 :: mqtt_vsn(),
@ -170,44 +183,68 @@
will_qos = ?QOS_1 :: mqtt_qos(),
will_flag = false :: boolean(),
clean_sess = false :: boolean(),
clean_start = true :: boolean(),
keep_alive = 60 :: non_neg_integer(),
will_props = undefined :: undefined | map(),
will_topic = undefined :: undefined | binary(),
will_msg = undefined :: undefined | binary(),
username = undefined :: undefined | binary(),
password = undefined :: undefined | binary(),
is_bridge = false :: boolean()
is_bridge = false :: boolean(),
properties = undefined :: mqtt_properties() %% MQTT Version 5.0
}).
-record(mqtt_packet_connack,
{ ack_flags = ?RESERVED :: 0 | 1,
return_code :: mqtt_connack()
reason_code :: mqtt_connack(),
properties :: map()
}).
-record(mqtt_packet_publish,
{ topic_name :: binary(),
packet_id :: mqtt_packet_id()
packet_id :: mqtt_packet_id(),
properties :: mqtt_properties()
}).
-record(mqtt_packet_puback,
{ packet_id :: mqtt_packet_id() }).
{ packet_id :: mqtt_packet_id(),
reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
}).
-record(mqtt_packet_subscribe,
{ packet_id :: mqtt_packet_id(),
topic_table :: list({binary(), mqtt_qos()})
{ packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
topic_filters :: list({binary(), mqtt_subopt()})
}).
-record(mqtt_packet_unsubscribe,
{ packet_id :: mqtt_packet_id(),
topics :: list(binary())
{ packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
topics :: list(binary())
}).
-record(mqtt_packet_suback,
{ packet_id :: mqtt_packet_id(),
qos_table :: list(mqtt_qos() | 128)
{ packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
reason_codes :: list(mqtt_reason_code())
}).
-record(mqtt_packet_unsuback,
{ packet_id :: mqtt_packet_id() }).
{ packet_id :: mqtt_packet_id(),
properties :: mqtt_properties(),
reason_codes :: list(mqtt_reason_code())
}).
-record(mqtt_packet_disconnect,
{ reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
}).
-record(mqtt_packet_auth,
{ reason_code :: mqtt_reason_code(),
properties :: mqtt_properties()
}).
%%--------------------------------------------------------------------
%% MQTT Control Packet
@ -215,11 +252,18 @@
-record(mqtt_packet,
{ header :: #mqtt_packet_header{},
variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
| #mqtt_packet_publish{} | #mqtt_packet_puback{}
| #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
| #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
| mqtt_packet_id() | undefined,
variable :: #mqtt_packet_connect{}
| #mqtt_packet_connack{}
| #mqtt_packet_publish{}
| #mqtt_packet_puback{}
| #mqtt_packet_subscribe{}
| #mqtt_packet_suback{}
| #mqtt_packet_unsubscribe{}
| #mqtt_packet_unsuback{}
| #mqtt_packet_disconnect{}
| #mqtt_packet_auth{}
| mqtt_packet_id()
| undefined,
payload :: binary() | undefined
}).
@ -232,14 +276,20 @@
-define(CONNECT_PACKET(Var),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}).
-define(CONNACK_PACKET(ReturnCode),
-define(CONNACK_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{return_code = ReturnCode}}).
variable = #mqtt_packet_connack{reason_code = ReturnCode}}).
-define(CONNACK_PACKET(ReturnCode, SessPresent),
-define(CONNACK_PACKET(ReasonCode, SessPresent),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent,
return_code = ReturnCode}}).
reason_code = ReturnCode}}).
-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent,
reason_code = ReasonCode,
properties = Properties}}).
-define(PUBLISH_PACKET(Qos, PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
@ -265,10 +315,17 @@
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1},
variable = #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = TopicTable}}).
-define(SUBACK_PACKET(PacketId, QosTable),
-define(SUBACK_PACKET(PacketId, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
variable = #mqtt_packet_suback{packet_id = PacketId,
qos_table = QosTable}}).
variable = #mqtt_packet_suback{packet_id = PacketId,
reason_codes = ReasonCodes}}).
-define(SUBACK_PACKET(PacketId, Properties, ReasonCodes),
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
variable = #mqtt_packet_suback{packet_id = PacketId,
properties = Properties,
reason_codes = ReasonCodes}}).
-define(UNSUBSCRIBE_PACKET(PacketId, Topics),
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1},
variable = #mqtt_packet_unsubscribe{packet_id = PacketId,

View File

@ -1,17 +0,0 @@
-define(SUCCESS, 0). %% Success
-define(ERROR1, 101). %% badrpc
-define(ERROR2, 102). %% Unknown error
-define(ERROR3, 103). %% Username or password error
-define(ERROR4, 104). %% Empty username or password
-define(ERROR5, 105). %% User does not exist
-define(ERROR6, 106). %% Admin can not be deleted
-define(ERROR7, 107). %% Missing request parameter
-define(ERROR8, 108). %% Request parameter type error
-define(ERROR9, 109). %% Request parameter is not a json
-define(ERROR10, 110). %% Plugin has been loaded
-define(ERROR11, 111). %% Plugin has been loaded
-define(ERROR12, 112). %% Client not online
-define(ERROR13, 113). %% User already exist
-define(ERROR14, 114). %% OldPassword error

View File

@ -1,35 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-type(trie_node_id() :: binary() | atom()).
-record(trie_node,
{ node_id :: trie_node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: binary() | undefined,
flags :: list(atom())
}).
-record(trie_edge,
{ node_id :: trie_node_id(),
word :: binary() | atom()
}).
-record(trie,
{ edge :: #trie_edge{},
node_id :: trie_node_id()
}).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -66,7 +66,7 @@ start_link(Pool, Id, Node, Topic, Options) ->
%%--------------------------------------------------------------------
init([Pool, Id, Node, Topic, Options]) ->
?GPROC_POOL(join, Pool, Id),
gproc_pool:connect_worker(Pool, {Pool, Id}),
process_flag(trap_exit, true),
case net_kernel:connect_node(Node) of
true ->
@ -151,8 +151,7 @@ handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id),
ok.
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -14,18 +14,12 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Client Manager
-module(emqx_cm).
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_internal.hrl").
%% API Exports
-export([start_link/3]).
@ -78,7 +72,7 @@ pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId).
%%--------------------------------------------------------------------
init([Pool, Id, StatsFun]) ->
?GPROC_POOL(join, Pool, Id),
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}.
handle_call({reg, Client = #mqtt_client{client_id = ClientId,
@ -92,7 +86,8 @@ handle_call({reg, Client = #mqtt_client{client_id = ClientId,
end;
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[MQTT-CM] Unexpected Call: ~p", [Req]),
{reply, ignore, State}.
handle_cast({unreg, ClientId, Pid}, State) ->
case lookup_proc(ClientId) of
@ -104,7 +99,8 @@ handle_cast({unreg, ClientId, Pid}, State) ->
end;
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[MQTT-CM] Unexpected Cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of
@ -123,10 +119,11 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
end;
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[CM] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id), ok.
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -14,24 +14,16 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Client Manager Supervisor.
-module(emqx_cm_sup).
-behaviour(supervisor).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CM, emqx_cm).
-define(TAB, mqtt_client).
start_link() ->
@ -42,8 +34,8 @@ init([]) ->
create_client_tab(),
%% CM Pool Sup
MFA = {?CM, start_link, [emqx_stats:statsfun('clients/count', 'clients/max')]},
PoolSup = emqx_pool_sup:spec([?CM, hash, erlang:system_info(schedulers), MFA]),
MFA = {emqx_cm, start_link, [emqx_stats:statsfun('clients/count', 'clients/max')]},
PoolSup = emqx_pool_sup:spec([emqx_cm, hash, erlang:system_info(schedulers), MFA]),
{ok, {{one_for_all, 10, 3600}, [PoolSup]}}.

View File

@ -14,9 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT/TCP Connection.
-module(emqx_client).
-module(emqx_conn).
-behaviour(gen_server).
@ -24,7 +22,7 @@
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
-include("emqx_misc.hrl").
-import(proplists, [get_value/2, get_value/3]).
@ -49,11 +47,10 @@
%% TODO: How to emit stats?
-export([handle_pre_hibernate/1]).
%% Client State
%% Unused fields: connname, peerhost, peerport
-record(client_state, {connection, peername, conn_state, await_recv,
rate_limit, packet_size, parser, proto_state,
keepalive, enable_stats, idle_timeout, force_gc_count}).
-record(state, {connection, peername, conn_state, await_recv,
rate_limit, packet_size, parser, proto_state,
keepalive, enable_stats, idle_timeout, force_gc_count}).
-define(INFO_KEYS, [peername, conn_state, await_recv]).
@ -61,7 +58,7 @@
-define(LOG(Level, Format, Args, State),
lager:Level("Client(~s): " ++ Format,
[esockd_net:format(State#client_state.peername) | Args])).
[esockd_net:format(State#state.peername) | Args])).
start_link(Conn, Env) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
@ -117,17 +114,17 @@ do_init(Conn, Env, Peername) ->
EnableStats = get_value(client_enable_stats, Env, false),
IdleTimout = get_value(client_idle_timeout, Env, 30000),
ForceGcCount = emqx_gc:conn_max_gc_count(),
State = run_socket(#client_state{connection = Conn,
peername = Peername,
await_recv = false,
conn_state = running,
rate_limit = RateLimit,
packet_size = PacketSize,
parser = Parser,
proto_state = ProtoState,
enable_stats = EnableStats,
idle_timeout = IdleTimout,
force_gc_count = ForceGcCount}),
State = run_socket(#state{connection = Conn,
peername = Peername,
await_recv = false,
conn_state = running,
rate_limit = RateLimit,
packet_size = PacketSize,
parser = Parser,
proto_state = ProtoState,
enable_stats = EnableStats,
idle_timeout = IdleTimout,
force_gc_count = ForceGcCount}),
gen_server:enter_loop(?MODULE, [{hibernate_after, 10000}],
State, self(), IdleTimout).
@ -135,7 +132,7 @@ send_fun(Conn, Peername) ->
Self = self(),
fun(Packet) ->
Data = emqx_serializer:serialize(Packet),
?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}),
emqx_metrics:inc('bytes/sent', iolist_size(Data)),
try Conn:async_send(Data) of
ok -> ok;
@ -147,15 +144,15 @@ send_fun(Conn, Peername) ->
end.
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#client_state.force_gc_count, emit_stats(State))}.
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
handle_call(info, From, State = #client_state{proto_state = ProtoState}) ->
handle_call(info, From, State = #state{proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS),
ClientInfo = ?record_to_proplist(state, State, ?INFO_KEYS),
{reply, Stats, _, _} = handle_call(stats, From, State),
reply(lists:append([ClientInfo, ProtoInfo, Stats]), State);
handle_call(stats, _From, State = #client_state{proto_state = ProtoState}) ->
handle_call(stats, _From, State = #state{proto_state = ProtoState}) ->
reply(lists:append([emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState),
sock_stats(State)]), State);
@ -164,12 +161,12 @@ handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State};
handle_call({set_rate_limit, Rl}, _From, State) ->
reply(ok, State#client_state{rate_limit = Rl});
reply(ok, State#state{rate_limit = Rl});
handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) ->
handle_call(get_rate_limit, _From, State = #state{rate_limit = Rl}) ->
reply(Rl, State);
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
reply(emqx_protocol:session(ProtoState), State);
handle_call({clean_acl_cache, Topic}, _From, State) ->
@ -177,10 +174,12 @@ handle_call({clean_acl_cache, Topic}, _From, State) ->
reply(ok, State);
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
?LOG(error, "Unexpected Call: ~p", [Req], State),
{reply, ignore, State}.
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
?LOG(error, "Unexpected Cast: ~p", [Msg], State),
{noreply, State}.
handle_info({subscribe, TopicTable}, State) ->
with_proto(
@ -204,7 +203,7 @@ handle_info({suback, PacketId, GrantedQos}, State) ->
%% Fastlane
handle_info({dispatch, _Topic, Message}, State) ->
handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State);
handle_info({deliver, Message#message{qos = ?QOS_0}}, State);
handle_info({deliver, Message}, State) ->
with_proto(
@ -233,13 +232,13 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
shutdown(conflict, State);
handle_info(activate_sock, State) ->
{noreply, run_socket(State#client_state{conn_state = running})};
{noreply, run_socket(State#state{conn_state = running})};
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
Size = iolist_size(Data),
?LOG(debug, "RECV ~p", [Data], State),
emqx_metrics:inc('bytes/received', Size),
received(Data, rate_limit(Size, State#client_state{await_recv = false}));
received(Data, rate_limit(Size, State#state{await_recv = false}));
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
shutdown(Reason, State);
@ -250,7 +249,7 @@ handle_info({inet_reply, _Sock, ok}, State) ->
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State);
handle_info({keepalive, start, Interval}, State = #client_state{connection = Conn}) ->
handle_info({keepalive, start, Interval}, State = #state{connection = Conn}) ->
?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
StatFun = fun() ->
case Conn:getstat([recv_oct]) of
@ -279,11 +278,12 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
end;
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
?LOG(error, "Unexpected Info: ~p", [Info], State),
{noreply, State}.
terminate(Reason, State = #client_state{connection = Conn,
keepalive = KeepAlive,
proto_state = ProtoState}) ->
terminate(Reason, State = #state{connection = Conn,
keepalive = KeepAlive,
proto_state = ProtoState}) ->
?LOG(debug, "Terminated for ~p", [Reason], State),
Conn:fast_close(),
@ -308,26 +308,26 @@ code_change(_OldVsn, State, _Extra) ->
received(<<>>, State) ->
{noreply, gc(State)};
received(Bytes, State = #client_state{parser = Parser,
packet_size = PacketSize,
proto_state = ProtoState,
idle_timeout = IdleTimeout}) ->
received(Bytes, State = #state{parser = Parser,
packet_size = PacketSize,
proto_state = ProtoState,
idle_timeout = IdleTimeout}) ->
case catch emqx_parser:parse(Bytes, Parser) of
{more, NewParser} ->
{noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout};
{noreply, run_socket(State#state{parser = NewParser}), IdleTimeout};
{ok, Packet, Rest} ->
emqx_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
received(Rest, State#client_state{parser = emqx_parser:initial_state(PacketSize),
proto_state = ProtoState1});
received(Rest, State#state{parser = emqx_parser:initial_state(PacketSize),
proto_state = ProtoState1});
{error, Error} ->
?LOG(error, "Protocol error - ~p", [Error], State),
shutdown(Error, State);
{error, Error, ProtoState1} ->
shutdown(Error, State#client_state{proto_state = ProtoState1});
shutdown(Error, State#state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} ->
stop(Reason, State#client_state{proto_state = ProtoState1})
stop(Reason, State#state{proto_state = ProtoState1})
end;
{error, Error} ->
?LOG(error, "Framing error - ~p", [Error], State),
@ -338,34 +338,34 @@ received(Bytes, State = #client_state{parser = Parser,
shutdown(parser_error, State)
end.
rate_limit(_Size, State = #client_state{rate_limit = undefined}) ->
rate_limit(_Size, State = #state{rate_limit = undefined}) ->
run_socket(State);
rate_limit(Size, State = #client_state{rate_limit = Rl}) ->
rate_limit(Size, State = #state{rate_limit = Rl}) ->
case Rl:check(Size) of
{0, Rl1} ->
run_socket(State#client_state{conn_state = running, rate_limit = Rl1});
run_socket(State#state{conn_state = running, rate_limit = Rl1});
{Pause, Rl1} ->
?LOG(warning, "Rate limiter pause for ~p", [Pause], State),
erlang:send_after(Pause, self(), activate_sock),
State#client_state{conn_state = blocked, rate_limit = Rl1}
State#state{conn_state = blocked, rate_limit = Rl1}
end.
run_socket(State = #client_state{conn_state = blocked}) ->
run_socket(State = #state{conn_state = blocked}) ->
State;
run_socket(State = #client_state{await_recv = true}) ->
run_socket(State = #state{await_recv = true}) ->
State;
run_socket(State = #client_state{connection = Conn}) ->
run_socket(State = #state{connection = Conn}) ->
Conn:async_recv(0, infinity),
State#client_state{await_recv = true}.
State#state{await_recv = true}.
with_proto(Fun, State = #client_state{proto_state = ProtoState}) ->
with_proto(Fun, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
{noreply, State#client_state{proto_state = ProtoState1}}.
{noreply, State#state{proto_state = ProtoState1}}.
emit_stats(State = #client_state{proto_state = ProtoState}) ->
emit_stats(State = #state{proto_state = ProtoState}) ->
emit_stats(emqx_protocol:clientid(ProtoState), State).
emit_stats(_ClientId, State = #client_state{enable_stats = false}) ->
emit_stats(_ClientId, State = #state{enable_stats = false}) ->
State;
emit_stats(undefined, State) ->
State;
@ -374,7 +374,7 @@ emit_stats(ClientId, State) ->
emqx_stats:set_client_stats(ClientId, Stats),
State.
sock_stats(#client_state{connection = Conn}) ->
sock_stats(#state{connection = Conn}) ->
case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end.
reply(Reply, State) ->
@ -386,7 +386,7 @@ shutdown(Reason, State) ->
stop(Reason, State) ->
{stop, Reason, State}.
gc(State = #client_state{connection = Conn}) ->
gc(State = #state{connection = Conn}) ->
Cb = fun() -> Conn:gc(), emit_stats(State) end,
emqx_gc:maybe_force_gc(#client_state.force_gc_count, State, Cb).
emqx_gc:maybe_force_gc(#state.force_gc_count, State, Cb).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -16,8 +16,6 @@
-module(emqx_pooler).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
%% Start the pool supervisor

View File

@ -22,8 +22,6 @@
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
-import(proplists, [get_value/2, get_value/3]).
%% API
@ -64,7 +62,7 @@
%% @doc Init protocol
init(Peername, SendFun, Opts) ->
Backoff = get_value(keepalive_backoff, Opts, 1.25),
Backoff = get_value(keepalive_backoff, Opts, 0.75),
EnableStats = get_value(client_enable_stats, Opts, false),
MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
WsInitialHeaders = get_value(ws_initial_headers, Opts),
@ -569,10 +567,10 @@ sp(false) -> 0.
%% The retained flag should be propagated for bridge.
%%--------------------------------------------------------------------
clean_retain(false, Msg = #mqtt_message{retain = true, headers = Headers}) ->
clean_retain(false, Msg = #message{retain = true, headers = Headers}) ->
case lists:member(retained, Headers) of
true -> Msg;
false -> Msg#mqtt_message{retain = false}
false -> Msg#message{retain = false}
end;
clean_retain(_IsBridge, Msg) ->
Msg.

View File

@ -20,8 +20,6 @@
-include("emqx.hrl").
-include("emqx_internal.hrl").
-export([start_link/3]).
%% PubSub API.
@ -46,8 +44,7 @@
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Env) ->
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)},
?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
@ -164,7 +161,7 @@ pick(Topic) ->
%%--------------------------------------------------------------------
init([Pool, Id, Env]) ->
?GPROC_POOL(join, Pool, Id),
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, env = Env}, hibernate}.
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
@ -193,7 +190,7 @@ handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id).
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -49,8 +49,7 @@
%% @doc Start the server
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Env) ->
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)},
?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
@ -186,7 +185,7 @@ dump() ->
%%--------------------------------------------------------------------
init([Pool, Id, Env]) ->
?GPROC_POOL(join, Pool, Id),
gproc_pool:connect_worker(Pool, {Pool, Id}),
State = #state{pool = Pool, id = Id, env = Env,
subids = #{}, submon = emqx_pmon:new()},
{ok, State, hibernate}.
@ -245,7 +244,7 @@ handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id).
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -53,8 +53,6 @@
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
-import(emqx_misc, [start_timer/2]).
-import(proplists, [get_value/2, get_value/3]).
@ -193,16 +191,16 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
gen_server:cast(Session, {subscribe, From, TopicTable, AckFun}).
%% @doc Publish Message
-spec(publish(pid(), mqtt_message()) -> ok | {error, term()}).
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
-spec(publish(pid(), message()) -> ok | {error, term()}).
publish(_Session, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 Directly
emqx_server:publish(Msg), ok;
publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
publish(_Session, Msg = #message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly for client will PubAck automatically
emqx_server:publish(Msg), ok;
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
publish(Session, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session
gen_server:call(Session, {publish, Msg}, ?TIMEOUT).
@ -517,7 +515,7 @@ handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
%% Ignore Messages delivered by self
handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}},
handle_info({dispatch, _Topic, #message{from = {ClientId, _}}},
State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
{noreply, State};
@ -637,7 +635,7 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
expire_awaiting_rel([], _Now, State) ->
State#state{await_rel_timer = undefined};
expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs],
Now, State = #state{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, TS) div 1000) of

View File

@ -14,11 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Session Supervisor.
-module(emqx_session_sup).
-author("Feng Lee <feng@emqtt.io>").
-behavior(supervisor).
-export([start_link/0, start_session/3]).

View File

@ -20,8 +20,6 @@
-include("emqx.hrl").
-include("emqx_internal.hrl").
%% Mnesia Callbacks
-export([mnesia/1]).
@ -73,7 +71,7 @@ mnesia(copy) ->
%% @doc Start a session manager
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id) ->
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
gen_server:start_link(?MODULE, [Pool, Id], []).
%% @doc Start a session
-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}).
@ -129,7 +127,7 @@ local_sessions() ->
%%--------------------------------------------------------------------
init([Pool, Id]) ->
?GPROC_POOL(join, Pool, Id),
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, monitors = dict:new()}}.
%% Persistent Session
@ -163,10 +161,12 @@ handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State
end;
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[MQTT-SM] Unexpected Request: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[MQTT-SM] Unexpected Message: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of
@ -186,10 +186,11 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
end;
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[MQTT-SM] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id).
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -202,7 +203,8 @@ code_change(_OldVsn, State, _Extra) ->
create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
case create_session(CleanSess, {ClientId, Username}, ClientPid) of
{ok, SessPid} ->
{reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)};
{reply, {ok, SessPid, false},
monitor_session(ClientId, SessPid, State)};
{error, Error} ->
{reply, {error, Error}, State}
end.

View File

@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Session Helper.
-module(emqx_sm_helper).
-author("Feng Lee <feng@emqtt.io>").
@ -23,8 +22,6 @@
-include("emqx.hrl").
-include("emqx_internal.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% API Function Exports
@ -49,10 +46,12 @@ init([StatsFun]) ->
{ok, #state{stats_fun = StatsFun, ticker = TRef}}.
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[SM-HELPER] Unexpected Call: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[SM-HELPER] Unexpected Cast: ~p", [Msg]),
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
Fun = fun() ->
@ -71,7 +70,8 @@ handle_info(tick, State) ->
{noreply, setstats(State), hibernate};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[SM-HELPER] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State = #state{ticker = TRef}) ->
timer:cancel(TRef),

View File

@ -14,15 +14,10 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Session Manager Supervisor.
-module(emqx_sm_sup).
-behaviour(supervisor).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-define(SM, emqx_sm).
@ -39,7 +34,7 @@ start_link() ->
init([]) ->
%% Create session tables
ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]),
_ = ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]),
%% Helper
StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,15 +14,9 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Topic Trie:
%% [Trie](http://en.wikipedia.org/wiki/Trie)
%% @end
-module(emqx_trie).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx_trie.hrl").
-include("emqx.hrl").
%% Mnesia Callbacks
-export([mnesia/1]).

View File

@ -52,7 +52,7 @@ handle_request('GET', "/mqtt", Req) ->
Parser = emqx_parser:initial_state(PacketSize),
%% Upgrade WebSocket.
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
{ok, ClientPid} = emqx_ws_client_sup:start_client(self(), Req, ReplyChannel),
{ok, ClientPid} = emqx_ws_conn_sup:start_connection(self(), Req, ReplyChannel),
ReentryWs(#wsocket_state{peername = Peername,
parser = Parser,
max_packet_size = PacketSize,

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,20 +14,14 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT WebSocket Connection.
-module(emqx_ws_client).
-module(emqx_ws_conn).
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
-import(proplists, [get_value/2, get_value/3]).
%% API Exports

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,24 +14,21 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ws_client_sup).
-author("Feng Lee <feng@emqtt.io>").
-module(emqx_ws_conn_sup).
-behavior(supervisor).
-export([start_link/0, start_client/3]).
-export([start_link/0, start_connection/3]).
-export([init/1]).
%% @doc Start websocket client supervisor
-spec(start_link() -> {ok, pid()}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Start a WebSocket Connection.
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
start_client(WsPid, Req, ReplyChannel) ->
%% @doc Start a MQTT/WebSocket Connection.
-spec(start_connection(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
start_connection(WsPid, Req, ReplyChannel) ->
supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).
%%--------------------------------------------------------------------
@ -39,8 +36,9 @@ start_client(WsPid, Req, ReplyChannel) ->
%%--------------------------------------------------------------------
init([]) ->
%%TODO: Cannot upgrade the environments, Use zone?
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])),
{ok, {{simple_one_for_one, 0, 1},
[{ws_client, {emqx_ws_client, start_link, [Env]},
temporary, 5000, worker, [emqx_ws_client]}]}}.
[{ws_conn, {emqx_ws_conn, start_link, [Env]},
temporary, 5000, worker, [emqx_ws_conn]}]}}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,11 +16,9 @@
-module(emqx_trie_SUITE).
-author("Feng Lee <feng@emqtt.io>").
-compile(export_all).
-include("emqx_trie.hrl").
-include("emqx.hrl").
-define(TRIE, emqx_trie).