Merge pull request #917 from emqtt/emq20
V2.1.0-beta.1: Tune the CPU/Memory usage of Client/Session Processes
This commit is contained in:
commit
331d016c8e
12
Makefile
12
Makefile
|
@ -1,6 +1,8 @@
|
|||
PROJECT = emqttd
|
||||
PROJECT_DESCRIPTION = Erlang MQTT Broker
|
||||
PROJECT_VERSION = 2.1
|
||||
PROJECT_VERSION = 2.1.0
|
||||
|
||||
NO_AUTOPATCH = cuttlefish
|
||||
|
||||
DEPS = gproc lager esockd mochiweb lager_syslog
|
||||
|
||||
|
@ -13,11 +15,11 @@ dep_lager_syslog = git https://github.com/basho/lager_syslog
|
|||
|
||||
ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
||||
TEST_DEPS = cuttlefish emqttc
|
||||
BUILD_DEPS = cuttlefish
|
||||
dep_cuttlefish = git https://github.com/emqtt/cuttlefish
|
||||
dep_emqttc = git https://github.com/emqtt/emqttc
|
||||
|
||||
NO_AUTOPATCH = cuttlefish
|
||||
TEST_DEPS = emqttc
|
||||
dep_emqttc = git https://github.com/emqtt/emqttc
|
||||
|
||||
TEST_ERLC_OPTS += +debug_info
|
||||
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
@ -38,5 +40,5 @@ include erlang.mk
|
|||
app:: rebar.config
|
||||
|
||||
app.config::
|
||||
cuttlefish -l info -e etc/ -c etc/emq.conf -i priv/emq.schema -d data/
|
||||
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq.conf -i priv/emq.schema -d data/
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ Documentation on [emqtt.io/docs/v2/](http://emqtt.io/docs/v2/install.html), [doc
|
|||
|
||||
## Build From Source
|
||||
|
||||
The *EMQ* broker requires Erlang/OTP R18+ to build.
|
||||
The *EMQ* broker requires Erlang/OTP R19+ to build since 2.1 release.
|
||||
|
||||
```
|
||||
git clone https://github.com/emqtt/emq-relx.git
|
||||
|
|
11
etc/emq.conf
11
etc/emq.conf
|
@ -106,6 +106,13 @@ mqtt.max_clientid_len = 1024
|
|||
## Max Packet Size Allowed, 64K by default.
|
||||
mqtt.max_packet_size = 64KB
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## MQTT Connection
|
||||
##--------------------------------------------------------------------
|
||||
|
||||
## Force GC: integer. Value 0 disabled the Force GC.
|
||||
mqtt.conn.force_gc_count = 100
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## MQTT Client
|
||||
##--------------------------------------------------------------------
|
||||
|
@ -113,7 +120,7 @@ mqtt.max_packet_size = 64KB
|
|||
## Client Idle Timeout (Second)
|
||||
mqtt.client.idle_timeout = 30s
|
||||
|
||||
## Enable client Stats: seconds or off
|
||||
## Enable client Stats: on | off
|
||||
mqtt.client.enable_stats = off
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
|
@ -136,7 +143,7 @@ mqtt.session.max_awaiting_rel = 100
|
|||
## Awaiting PUBREL Timeout
|
||||
mqtt.session.await_rel_timeout = 20s
|
||||
|
||||
## Enable Statistics at the Interval(seconds)
|
||||
## Enable Statistics: on | off
|
||||
mqtt.session.enable_stats = off
|
||||
|
||||
## Expired after 1 day:
|
||||
|
|
|
@ -58,6 +58,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% MQTT Subscription
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-record(mqtt_subscription,
|
||||
{ subid :: binary() | atom(),
|
||||
topic :: binary(),
|
||||
|
@ -104,17 +105,19 @@
|
|||
%% MQTT Message
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-type(mqtt_msgid() :: binary() | undefined).
|
||||
-type(mqtt_msg_id() :: binary() | undefined).
|
||||
|
||||
-type(mqtt_pktid() :: 1..16#ffff | undefined).
|
||||
|
||||
-type(mqtt_msg_from() :: atom() | {binary(), undefined | binary()}).
|
||||
|
||||
-record(mqtt_message,
|
||||
{ %% Global unique message ID
|
||||
id :: mqtt_msgid(),
|
||||
id :: mqtt_msg_id(),
|
||||
%% PacketId
|
||||
pktid :: mqtt_pktid(),
|
||||
%% ClientId and Username
|
||||
from :: {binary(), undefined | binary()},
|
||||
from :: mqtt_msg_from(),
|
||||
%% Topic that the message is published to
|
||||
topic :: binary(),
|
||||
%% Message QoS
|
||||
|
@ -127,12 +130,13 @@
|
|||
dup = false :: boolean(),
|
||||
%% $SYS flag
|
||||
sys = false :: boolean(),
|
||||
%% Headers
|
||||
headers = [] :: list(),
|
||||
%% Payload
|
||||
payload :: binary(),
|
||||
%% Timestamp
|
||||
timestamp :: erlang:timestamp()
|
||||
}).
|
||||
}).
|
||||
|
||||
-type(mqtt_message() :: #mqtt_message{}).
|
||||
|
||||
|
|
|
@ -58,3 +58,5 @@
|
|||
false-> (FalseFun)
|
||||
end)).
|
||||
|
||||
-define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]).
|
||||
|
||||
|
|
|
@ -129,11 +129,16 @@
|
|||
|
||||
-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Max MQTT Packet Length
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(MAX_PACKET_SIZE, 16#fffffff).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Parser and Serializer
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(MAX_LEN, 16#fffffff).
|
||||
-define(HIGHBIT, 2#10000000).
|
||||
-define(LOWBITS, 2#01111111).
|
||||
|
||||
|
|
|
@ -316,6 +316,15 @@ end}.
|
|||
{max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}]
|
||||
end}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Connection
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Force the client to GC: integer
|
||||
{mapping, "mqtt.conn.force_gc_count", "emqttd.conn_force_gc_count", [
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Client
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -329,7 +338,7 @@ end}.
|
|||
%% @doc Enable Stats of Client.
|
||||
{mapping, "mqtt.client.enable_stats", "emqttd.client", [
|
||||
{default, off},
|
||||
{datatype, [{duration, ms}, flag]}
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
%% @doc Client
|
||||
|
@ -375,7 +384,7 @@ end}.
|
|||
%% @doc Enable Stats
|
||||
{mapping, "mqtt.session.enable_stats", "emqttd.session", [
|
||||
{default, off},
|
||||
{datatype, [{duration, ms}, flag]}
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
%% @doc Session Expiry Interval
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{deps, [
|
||||
{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}}
|
||||
{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}}
|
||||
]}.
|
||||
{erl_opts, [{parse_transform,lager_transform}]}.
|
||||
|
|
|
@ -138,17 +138,20 @@ subscriber_down(Subscriber) ->
|
|||
%% Hooks API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(hook(atom(), function(), list(any())) -> ok | {error, any()}).
|
||||
hook(Hook, Function, InitArgs) ->
|
||||
emqttd_hooks:add(Hook, Function, InitArgs).
|
||||
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()))
|
||||
-> ok | {error, any()}).
|
||||
hook(Hook, TagFunction, InitArgs) ->
|
||||
emqttd_hooks:add(Hook, TagFunction, InitArgs).
|
||||
|
||||
-spec(hook(atom(), function(), list(any()), integer()) -> ok | {error, any()}).
|
||||
hook(Hook, Function, InitArgs, Priority) ->
|
||||
emqttd_hooks:add(Hook, Function, InitArgs, Priority).
|
||||
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()), integer())
|
||||
-> ok | {error, any()}).
|
||||
hook(Hook, TagFunction, InitArgs, Priority) ->
|
||||
emqttd_hooks:add(Hook, TagFunction, InitArgs, Priority).
|
||||
|
||||
-spec(unhook(atom(), function()) -> ok | {error, any()}).
|
||||
unhook(Hook, Function) ->
|
||||
emqttd_hooks:delete(Hook, Function).
|
||||
-spec(unhook(atom(), function() | {emqttd_hooks:hooktag(), function()})
|
||||
-> ok | {error, any()}).
|
||||
unhook(Hook, TagFunction) ->
|
||||
emqttd_hooks:delete(Hook, TagFunction).
|
||||
|
||||
-spec(run_hooks(atom(), list(any())) -> ok | stop).
|
||||
run_hooks(Hook, Args) ->
|
||||
|
|
|
@ -40,16 +40,16 @@
|
|||
qos = ?QOS_2,
|
||||
topic_suffix = <<>>,
|
||||
topic_prefix = <<>>,
|
||||
mqueue :: emqttd_mqueue:mqueue(),
|
||||
mqueue :: emqttd_mqueue:mqueue(),
|
||||
max_queue_len = 10000,
|
||||
ping_down_interval = ?PING_DOWN_INTERVAL,
|
||||
status = up}).
|
||||
|
||||
-type(option() :: {qos, mqtt_qos()} |
|
||||
{topic_suffix, binary()} |
|
||||
{topic_prefix, binary()} |
|
||||
{max_queue_len, pos_integer()} |
|
||||
{ping_down_interval, pos_integer()}).
|
||||
-type(option() :: {qos, mqtt_qos()} |
|
||||
{topic_suffix, binary()} |
|
||||
{topic_prefix, binary()} |
|
||||
{max_queue_len, pos_integer()} |
|
||||
{ping_down_interval, pos_integer()}).
|
||||
|
||||
-export_type([option/0]).
|
||||
|
||||
|
@ -79,9 +79,10 @@ init([Pool, Id, Node, Topic, Options]) ->
|
|||
MQueue = emqttd_mqueue:new(qname(Node, Topic),
|
||||
[{max_len, State#state.max_queue_len}],
|
||||
emqttd_alarm:alarm_fun()),
|
||||
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
|
||||
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue},
|
||||
hibernate, {backoff, 1000, 1000, 10000}};
|
||||
false ->
|
||||
{stop, {cannot_connect, Node}}
|
||||
{stop, {cannot_connect_node, Node}}
|
||||
end.
|
||||
|
||||
parse_opts([], State) ->
|
||||
|
|
|
@ -14,11 +14,11 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc MQTT/TCP Connection
|
||||
%% @doc MQTT/TCP Connection.
|
||||
|
||||
-module(emqttd_client).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-behaviour(gen_server2).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
|
@ -48,36 +48,40 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
code_change/3, terminate/2]).
|
||||
|
||||
%% Client State
|
||||
-record(client_state, {connection, connname, peername, peerhost, peerport,
|
||||
await_recv, conn_state, rate_limit, parser_fun,
|
||||
proto_state, packet_opts, keepalive, enable_stats,
|
||||
stats_timer}).
|
||||
%% gen_server2 Callbacks
|
||||
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
|
||||
|
||||
-define(INFO_KEYS, [connname, peername, peerhost, peerport, await_recv, conn_state]).
|
||||
%% Client State
|
||||
%% Unused fields: connname, peerhost, peerport
|
||||
-record(client_state, {connection, peername, conn_state, await_recv,
|
||||
rate_limit, packet_size, parser, proto_state,
|
||||
keepalive, enable_stats, force_gc_count}).
|
||||
|
||||
-define(INFO_KEYS, [peername, conn_state, await_recv]).
|
||||
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||
|
||||
-define(LOG(Level, Format, Args, State),
|
||||
lager:Level("Client(~s): " ++ Format, [State#client_state.connname | Args])).
|
||||
lager:Level("Client(~s): " ++ Format,
|
||||
[esockd_net:format(State#client_state.peername) | Args])).
|
||||
|
||||
start_link(Conn, Env) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
|
||||
{ok, proc_lib:spawn_opt(?MODULE, init, [[Conn, Env]], [link | ?FULLSWEEP_OPTS])}.
|
||||
|
||||
info(CPid) ->
|
||||
gen_server:call(CPid, info).
|
||||
gen_server2:call(CPid, info).
|
||||
|
||||
stats(CPid) ->
|
||||
gen_server:call(CPid, stats).
|
||||
gen_server2:call(CPid, stats).
|
||||
|
||||
kick(CPid) ->
|
||||
gen_server:call(CPid, kick).
|
||||
gen_server2:call(CPid, kick).
|
||||
|
||||
set_rate_limit(Cpid, Rl) ->
|
||||
gen_server:call(Cpid, {set_rate_limit, Rl}).
|
||||
gen_server2:call(Cpid, {set_rate_limit, Rl}).
|
||||
|
||||
get_rate_limit(Cpid) ->
|
||||
gen_server:call(Cpid, get_rate_limit).
|
||||
gen_server2:call(Cpid, get_rate_limit).
|
||||
|
||||
subscribe(CPid, TopicTable) ->
|
||||
CPid ! {subscribe, TopicTable}.
|
||||
|
@ -94,71 +98,81 @@ session(CPid) ->
|
|||
|
||||
init([Conn0, Env]) ->
|
||||
{ok, Conn} = Conn0:wait(),
|
||||
{PeerHost, PeerPort, PeerName} =
|
||||
case Conn:peername() of
|
||||
{ok, Peer = {Host, Port}} ->
|
||||
{Host, Port, Peer};
|
||||
{error, enotconn} ->
|
||||
Conn:fast_close(),
|
||||
exit(normal);
|
||||
{error, Reason} ->
|
||||
Conn:fast_close(),
|
||||
exit({shutdown, Reason})
|
||||
end,
|
||||
ConnName = esockd_net:format(PeerName),
|
||||
{ok, Peername} -> do_init(Conn, Env, Peername);
|
||||
{error, enotconn} -> Conn:fast_close(),
|
||||
exit(normal);
|
||||
{error, Reason} -> Conn:fast_close(),
|
||||
exit({shutdown, Reason})
|
||||
end.
|
||||
|
||||
do_init(Conn, Env, Peername) ->
|
||||
%% Send Fun
|
||||
SendFun = send_fun(Conn, Peername),
|
||||
RateLimit = get_value(rate_limit, Conn:opts()),
|
||||
PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
|
||||
Parser = emqttd_parser:initial_state(PacketSize),
|
||||
ProtoState = emqttd_protocol:init(Peername, SendFun, Env),
|
||||
EnableStats = get_value(client_enable_stats, Env, false),
|
||||
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
||||
State = run_socket(#client_state{connection = Conn,
|
||||
peername = Peername,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
rate_limit = RateLimit,
|
||||
packet_size = PacketSize,
|
||||
parser = Parser,
|
||||
proto_state = ProtoState,
|
||||
enable_stats = EnableStats,
|
||||
force_gc_count = ForceGcCount}),
|
||||
IdleTimout = get_value(client_idle_timeout, Env, 30000),
|
||||
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
|
||||
{backoff, 1000, 1000, 10000}).
|
||||
|
||||
send_fun(Conn, Peername) ->
|
||||
Self = self(),
|
||||
%% Send Packet...
|
||||
SendFun = fun(Packet) ->
|
||||
fun(Packet) ->
|
||||
Data = emqttd_serializer:serialize(Packet),
|
||||
?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}),
|
||||
?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
|
||||
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
|
||||
try Conn:async_send(Data) of
|
||||
true -> ok
|
||||
catch
|
||||
error:Error -> Self ! {shutdown, Error}
|
||||
end
|
||||
end,
|
||||
ParserFun = emqttd_parser:new(Env),
|
||||
ProtoState = emqttd_protocol:init(PeerName, SendFun, Env),
|
||||
RateLimit = get_value(rate_limit, Conn:opts()),
|
||||
EnableStats = get_value(client_enable_stats, Env, false),
|
||||
State = run_socket(#client_state{connection = Conn,
|
||||
connname = ConnName,
|
||||
peername = PeerName,
|
||||
peerhost = PeerHost,
|
||||
peerport = PeerPort,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
rate_limit = RateLimit,
|
||||
parser_fun = ParserFun,
|
||||
proto_state = ProtoState,
|
||||
packet_opts = Env,
|
||||
enable_stats = EnableStats}),
|
||||
IdleTimout = get_value(client_idle_timeout, Env, 30000),
|
||||
gen_server:enter_loop(?MODULE, [], maybe_enable_stats(State), IdleTimout).
|
||||
end.
|
||||
|
||||
prioritise_call(Msg, _From, _Len, _State) ->
|
||||
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
|
||||
|
||||
prioritise_info(Msg, _Len, _State) ->
|
||||
case Msg of {redeliver, _} -> 5; _ -> 0 end.
|
||||
|
||||
handle_pre_hibernate(State) ->
|
||||
{hibernate, emqttd_gc:reset_conn_gc_count(#client_state.force_gc_count, emit_stats(State))}.
|
||||
|
||||
handle_call(info, From, State = #client_state{proto_state = ProtoState}) ->
|
||||
ProtoInfo = emqttd_protocol:info(ProtoState),
|
||||
ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS),
|
||||
{reply, Stats, _} = handle_call(stats, From, State),
|
||||
{reply, lists:append([ClientInfo, ProtoInfo, Stats]), State};
|
||||
{reply, Stats, _, _} = handle_call(stats, From, State),
|
||||
reply(lists:append([ClientInfo, ProtoInfo, Stats]), State);
|
||||
|
||||
handle_call(stats, _From, State = #client_state{proto_state = ProtoState}) ->
|
||||
{reply, lists:append([emqttd_misc:proc_stats(),
|
||||
emqttd_protocol:stats(ProtoState),
|
||||
sock_stats(State)]), State};
|
||||
reply(lists:append([emqttd_misc:proc_stats(),
|
||||
emqttd_protocol:stats(ProtoState),
|
||||
sock_stats(State)]), State);
|
||||
|
||||
handle_call(kick, _From, State) ->
|
||||
{stop, {shutdown, kick}, ok, State};
|
||||
|
||||
handle_call({set_rate_limit, Rl}, _From, State) ->
|
||||
{reply, ok, State#client_state{rate_limit = Rl}};
|
||||
reply(ok, State#client_state{rate_limit = Rl});
|
||||
|
||||
handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) ->
|
||||
{reply, Rl, State};
|
||||
reply(Rl, State);
|
||||
|
||||
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
|
||||
{reply, emqttd_protocol:session(ProtoState), State};
|
||||
reply(emqttd_protocol:session(ProtoState), State);
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?UNEXPECTED_REQ(Req, State).
|
||||
|
@ -198,12 +212,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
|
|||
emqttd_protocol:pubrel(PacketId, ProtoState)
|
||||
end, State);
|
||||
|
||||
handle_info(emit_stats, State) ->
|
||||
{noreply, emit_stats(State), hibernate};
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
handle_info({timeout, _Timer, emit_stats}, State) ->
|
||||
hibernate(maybe_enable_stats(emit_stats(State)));
|
||||
|
||||
%% Fix issue #535
|
||||
handle_info({shutdown, Error}, State) ->
|
||||
shutdown(Error, State);
|
||||
|
@ -213,7 +227,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
|||
shutdown(conflict, State);
|
||||
|
||||
handle_info(activate_sock, State) ->
|
||||
hibernate(run_socket(State#client_state{conn_state = running}));
|
||||
{noreply, run_socket(State#client_state{conn_state = running}), hibernate};
|
||||
|
||||
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
|
||||
Size = iolist_size(Data),
|
||||
|
@ -225,7 +239,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
|||
shutdown(Reason, State);
|
||||
|
||||
handle_info({inet_reply, _Sock, ok}, State) ->
|
||||
{noreply, State};
|
||||
{noreply, gc(State), hibernate}; %% Tune GC
|
||||
|
||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
shutdown(Reason, State);
|
||||
|
@ -239,12 +253,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
|
|||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
|
||||
{noreply, stats_by_keepalive(State#client_state{keepalive = KeepAlive})};
|
||||
{noreply, State#client_state{keepalive = KeepAlive}, hibernate};
|
||||
|
||||
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
hibernate(emit_stats(State#client_state{keepalive = KeepAlive1}));
|
||||
{noreply, State#client_state{keepalive = KeepAlive1}, hibernate};
|
||||
{error, timeout} ->
|
||||
?LOG(debug, "Keepalive timeout", [], State),
|
||||
shutdown(keepalive_timeout, State);
|
||||
|
@ -279,19 +293,19 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
%% Receive and parse tcp data
|
||||
received(<<>>, State) ->
|
||||
hibernate(State);
|
||||
{noreply, gc(State), hibernate};
|
||||
|
||||
received(Bytes, State = #client_state{parser_fun = ParserFun,
|
||||
packet_opts = PacketOpts,
|
||||
received(Bytes, State = #client_state{parser = Parser,
|
||||
packet_size = PacketSize,
|
||||
proto_state = ProtoState}) ->
|
||||
case catch ParserFun(Bytes) of
|
||||
{more, NewParser} ->
|
||||
{noreply, run_socket(State#client_state{parser_fun = NewParser})};
|
||||
case catch emqttd_parser:parse(Bytes, Parser) of
|
||||
{more, NewParser} ->
|
||||
{noreply, run_socket(State#client_state{parser = NewParser}), hibernate};
|
||||
{ok, Packet, Rest} ->
|
||||
emqttd_metrics:received(Packet),
|
||||
case emqttd_protocol:received(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
received(Rest, State#client_state{parser_fun = emqttd_parser:new(PacketOpts),
|
||||
received(Rest, State#client_state{parser = emqttd_parser:initial_state(PacketSize),
|
||||
proto_state = ProtoState1});
|
||||
{error, Error} ->
|
||||
?LOG(error, "Protocol error - ~p", [Error], State),
|
||||
|
@ -332,33 +346,25 @@ run_socket(State = #client_state{connection = Conn}) ->
|
|||
|
||||
with_proto(Fun, State = #client_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = Fun(ProtoState),
|
||||
{noreply, State#client_state{proto_state = ProtoState1}}.
|
||||
{noreply, State#client_state{proto_state = ProtoState1}, hibernate}.
|
||||
|
||||
maybe_enable_stats(State = #client_state{enable_stats = false}) ->
|
||||
State;
|
||||
maybe_enable_stats(State = #client_state{enable_stats = keepalive}) ->
|
||||
State;
|
||||
maybe_enable_stats(State = #client_state{enable_stats = Interval}) ->
|
||||
State#client_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}.
|
||||
|
||||
stats_by_keepalive(State) ->
|
||||
State#client_state{enable_stats = keepalive}.
|
||||
|
||||
emit_stats(State = #client_state{enable_stats = false}) ->
|
||||
State;
|
||||
emit_stats(State = #client_state{proto_state = ProtoState}) ->
|
||||
{reply, Stats, _} = handle_call(stats, undefined, State),
|
||||
emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats),
|
||||
emit_stats(emqttd_protocol:clientid(ProtoState), State).
|
||||
|
||||
emit_stats(_ClientId, State = #client_state{enable_stats = false}) ->
|
||||
State;
|
||||
emit_stats(undefined, State) ->
|
||||
State;
|
||||
emit_stats(ClientId, State) ->
|
||||
{reply, Stats, _, _} = handle_call(stats, undefined, State),
|
||||
emqttd_stats:set_client_stats(ClientId, Stats),
|
||||
State.
|
||||
|
||||
sock_stats(#client_state{connection = Conn}) ->
|
||||
case Conn:getstat(?SOCK_STATS) of
|
||||
{ok, Ss} -> Ss;
|
||||
{error, _} -> []
|
||||
end.
|
||||
case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end.
|
||||
|
||||
hibernate(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
reply(Reply, State) ->
|
||||
{reply, Reply, State, hibernate}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
stop({shutdown, Reason}, State).
|
||||
|
@ -366,3 +372,5 @@ shutdown(Reason, State) ->
|
|||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
gc(State) ->
|
||||
emqttd_gc:maybe_force_gc(#client_state.force_gc_count, State).
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% GC Utility functions.
|
||||
|
||||
-module(emqttd_gc).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2]).
|
||||
|
||||
-spec(conn_max_gc_count() -> integer()).
|
||||
conn_max_gc_count() ->
|
||||
case emqttd:env(conn_force_gc_count) of
|
||||
{ok, I} when I > 0 -> I + rand:uniform(I);
|
||||
{ok, I} when I =< 0 -> undefined;
|
||||
undefined -> undefined
|
||||
end.
|
||||
|
||||
-spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()).
|
||||
reset_conn_gc_count(Pos, State) ->
|
||||
case element(Pos, State) of
|
||||
undefined -> State;
|
||||
_I -> setelement(Pos, State, conn_max_gc_count())
|
||||
end.
|
||||
|
||||
maybe_force_gc(Pos, State) ->
|
||||
case element(Pos, State) of
|
||||
undefined -> State;
|
||||
I when I =< 0 -> garbage_collect(),
|
||||
reset_conn_gc_count(Pos, State);
|
||||
I -> setelement(Pos, State, I - 1)
|
||||
end.
|
||||
|
|
@ -32,7 +32,12 @@
|
|||
|
||||
-record(state, {}).
|
||||
|
||||
-record(callback, {function :: function(),
|
||||
-type(hooktag() :: atom() | string() | binary()).
|
||||
|
||||
-export_type([hooktag/0]).
|
||||
|
||||
-record(callback, {tag :: hooktag(),
|
||||
function :: function(),
|
||||
init_args = [] :: list(any()),
|
||||
priority = 0 :: integer()}).
|
||||
|
||||
|
@ -47,17 +52,24 @@ start_link() ->
|
|||
%% Hooks API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(add(atom(), function(), list(any())) -> ok).
|
||||
add(HookPoint, Function, InitArgs) ->
|
||||
add(HookPoint, Function, InitArgs, 0).
|
||||
-spec(add(atom(), function() | {hooktag(), function()}, list(any())) -> ok).
|
||||
add(HookPoint, Function, InitArgs) when is_function(Function) ->
|
||||
add(HookPoint, {undefined, Function}, InitArgs, 0);
|
||||
|
||||
-spec(add(atom(), function(), list(any()), integer()) -> ok).
|
||||
add(HookPoint, Function, InitArgs, Priority) ->
|
||||
gen_server:call(?MODULE, {add, HookPoint, Function, InitArgs, Priority}).
|
||||
add(HookPoint, {Tag, Function}, InitArgs) when is_function(Function) ->
|
||||
add(HookPoint, {Tag, Function}, InitArgs, 0).
|
||||
|
||||
-spec(delete(atom(), function()) -> ok).
|
||||
delete(HookPoint, Function) ->
|
||||
gen_server:call(?MODULE, {delete, HookPoint, Function}).
|
||||
-spec(add(atom(), function() | {hooktag(), function()}, list(any()), integer()) -> ok).
|
||||
add(HookPoint, Function, InitArgs, Priority) when is_function(Function) ->
|
||||
add(HookPoint, {undefined, Function}, InitArgs, Priority);
|
||||
add(HookPoint, {Tag, Function}, InitArgs, Priority) when is_function(Function) ->
|
||||
gen_server:call(?MODULE, {add, HookPoint, {Tag, Function}, InitArgs, Priority}).
|
||||
|
||||
-spec(delete(atom(), function() | {hooktag(), function()}) -> ok).
|
||||
delete(HookPoint, Function) when is_function(Function) ->
|
||||
delete(HookPoint, {undefined, Function});
|
||||
delete(HookPoint, {Tag, Function}) when is_function(Function) ->
|
||||
gen_server:call(?MODULE, {delete, HookPoint, {Tag, Function}}).
|
||||
|
||||
%% @doc Run hooks without Acc.
|
||||
-spec(run(atom(), list(Arg :: any())) -> ok | stop).
|
||||
|
@ -85,7 +97,8 @@ run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) -
|
|||
ok -> run_(Callbacks, Args, Acc);
|
||||
{ok, NewAcc} -> run_(Callbacks, Args, NewAcc);
|
||||
stop -> {stop, Acc};
|
||||
{stop, NewAcc} -> {stop, NewAcc}
|
||||
{stop, NewAcc} -> {stop, NewAcc};
|
||||
_Any -> run_(Callbacks, Args, Acc)
|
||||
end;
|
||||
|
||||
run_([], _Args, Acc) ->
|
||||
|
@ -94,8 +107,8 @@ run_([], _Args, Acc) ->
|
|||
-spec(lookup(atom()) -> [#callback{}]).
|
||||
lookup(HookPoint) ->
|
||||
case ets:lookup(?HOOK_TAB, HookPoint) of
|
||||
[] -> [];
|
||||
[#hook{callbacks = Callbacks}] -> Callbacks
|
||||
[#hook{callbacks = Callbacks}] -> Callbacks;
|
||||
[] -> []
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -106,39 +119,38 @@ init([]) ->
|
|||
ets:new(?HOOK_TAB, [set, protected, named_table, {keypos, #hook.name}]),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call({add, HookPoint, Function, InitArgs, Priority}, _From, State) ->
|
||||
Reply =
|
||||
case ets:lookup(?HOOK_TAB, HookPoint) of
|
||||
[#hook{callbacks = Callbacks}] ->
|
||||
case lists:keyfind(Function, #callback.function, Callbacks) of
|
||||
false ->
|
||||
Callback = #callback{function = Function,
|
||||
init_args = InitArgs,
|
||||
priority = Priority},
|
||||
insert_hook_(HookPoint, add_callback_(Callback, Callbacks));
|
||||
_Callback ->
|
||||
{error, already_hooked}
|
||||
end;
|
||||
[] ->
|
||||
Callback = #callback{function = Function,
|
||||
init_args = InitArgs,
|
||||
priority = Priority},
|
||||
insert_hook_(HookPoint, [Callback])
|
||||
end,
|
||||
{reply, Reply, State};
|
||||
handle_call({add, HookPoint, {Tag, Function}, InitArgs, Priority}, _From, State) ->
|
||||
Callback = #callback{tag = Tag, function = Function,
|
||||
init_args = InitArgs, priority = Priority},
|
||||
{reply,
|
||||
case ets:lookup(?HOOK_TAB, HookPoint) of
|
||||
[#hook{callbacks = Callbacks}] ->
|
||||
case contain_(Tag, Function, Callbacks) of
|
||||
false ->
|
||||
insert_hook_(HookPoint, add_callback_(Callback, Callbacks));
|
||||
true ->
|
||||
{error, already_hooked}
|
||||
end;
|
||||
[] ->
|
||||
insert_hook_(HookPoint, [Callback])
|
||||
end, State};
|
||||
|
||||
handle_call({delete, HookPoint, Function}, _From, State) ->
|
||||
Reply =
|
||||
case ets:lookup(?HOOK_TAB, HookPoint) of
|
||||
[#hook{callbacks = Callbacks}] ->
|
||||
insert_hook_(HookPoint, del_callback_(Function, Callbacks));
|
||||
[] ->
|
||||
{error, not_found}
|
||||
end,
|
||||
{reply, Reply, State};
|
||||
handle_call({delete, HookPoint, {Tag, Function}}, _From, State) ->
|
||||
{reply,
|
||||
case ets:lookup(?HOOK_TAB, HookPoint) of
|
||||
[#hook{callbacks = Callbacks}] ->
|
||||
case contain_(Tag, Function, Callbacks) of
|
||||
true ->
|
||||
insert_hook_(HookPoint, del_callback_(Tag, Function, Callbacks));
|
||||
false ->
|
||||
{error, not_found}
|
||||
end;
|
||||
[] ->
|
||||
{error, not_found}
|
||||
end, State};
|
||||
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, ignore, State}.
|
||||
handle_call(Req, _From, State) ->
|
||||
{reply, {error, {unexpected_request, Req}}, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
@ -162,6 +174,16 @@ insert_hook_(HookPoint, Callbacks) ->
|
|||
add_callback_(Callback, Callbacks) ->
|
||||
lists:keymerge(#callback.priority, Callbacks, [Callback]).
|
||||
|
||||
del_callback_(Function, Callbacks) ->
|
||||
lists:keydelete(Function, #callback.function, Callbacks).
|
||||
del_callback_(Tag, Function, Callbacks) ->
|
||||
lists:filter(
|
||||
fun(#callback{tag = Tag1, function = Func1}) ->
|
||||
not ((Tag =:= Tag1) andalso (Function =:= Func1))
|
||||
end, Callbacks).
|
||||
|
||||
contain_(_Tag, _Function, []) ->
|
||||
false;
|
||||
contain_(Tag, Function, [#callback{tag = Tag, function = Function}|_Callbacks]) ->
|
||||
true;
|
||||
contain_(Tag, Function, [_Callback | Callbacks]) ->
|
||||
contain_(Tag, Function, Callbacks).
|
||||
|
||||
|
|
|
@ -53,9 +53,11 @@ cancel_timer(Timer) ->
|
|||
_ -> ok
|
||||
end.
|
||||
|
||||
-spec(proc_stats() -> list()).
|
||||
proc_stats() ->
|
||||
proc_stats(self()).
|
||||
|
||||
-spec(proc_stats(pid()) -> list()).
|
||||
proc_stats(Pid) ->
|
||||
Stats = process_info(Pid, [message_queue_len, heap_size, reductions]),
|
||||
{value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
|
||||
|
|
|
@ -76,7 +76,7 @@
|
|||
%% priority table
|
||||
pseq = 0, priorities = [],
|
||||
%% len of simple queue
|
||||
len = 0, max_len = ?MAX_LEN,
|
||||
len = 0, max_len = infinity,
|
||||
low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
|
||||
qos0 = false, dropped = 0,
|
||||
alarm_fun}).
|
||||
|
|
|
@ -24,27 +24,24 @@
|
|||
-include("emqttd_protocol.hrl").
|
||||
|
||||
%% API
|
||||
-export([new/1, parse/2]).
|
||||
-export([initial_state/0, initial_state/1, parse/2]).
|
||||
|
||||
-record(mqtt_packet_limit, {max_packet_size}).
|
||||
-type(max_packet_size() :: 1..?MAX_PACKET_SIZE).
|
||||
|
||||
-type(option() :: {atom(), any()}).
|
||||
|
||||
-type(parser() :: fun( (binary()) -> any() )).
|
||||
-spec(initial_state() -> {none, max_packet_size()}).
|
||||
initial_state() ->
|
||||
initial_state(?MAX_PACKET_SIZE).
|
||||
|
||||
%% @doc Initialize a parser
|
||||
-spec(new(Opts :: [option()]) -> parser()).
|
||||
new(Opts) ->
|
||||
fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
|
||||
|
||||
limit(Opts) ->
|
||||
#mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
|
||||
-spec(initial_state(max_packet_size()) -> {none, max_packet_size()}).
|
||||
initial_state(MaxSize) ->
|
||||
{none, MaxSize}.
|
||||
|
||||
%% @doc Parse MQTT Packet
|
||||
-spec(parse(binary(), {none, [option()]} | fun())
|
||||
-spec(parse(binary(), {none, pos_integer()} | fun())
|
||||
-> {ok, mqtt_packet()} | {error, any()} | {more, fun()}).
|
||||
parse(<<>>, {none, Limit}) ->
|
||||
{more, fun(Bin) -> parse(Bin, {none, Limit}) end};
|
||||
parse(<<>>, {none, MaxLen}) ->
|
||||
{more, fun(Bin) -> parse(Bin, {none, MaxLen}) end};
|
||||
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
|
||||
parse_remaining_len(Rest, #mqtt_packet_header{type = Type,
|
||||
dup = bool(Dup),
|
||||
|
@ -57,7 +54,7 @@ parse_remaining_len(<<>>, Header, Limit) ->
|
|||
parse_remaining_len(Rest, Header, Limit) ->
|
||||
parse_remaining_len(Rest, Header, 1, 0, Limit).
|
||||
|
||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #mqtt_packet_limit{max_packet_size = MaxLen})
|
||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length, MaxLen)
|
||||
when Length > MaxLen ->
|
||||
{error, invalid_mqtt_frame_len};
|
||||
parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) ->
|
||||
|
@ -70,7 +67,7 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, _Limit) ->
|
|||
parse_frame(Rest, Header, 0);
|
||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) ->
|
||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit);
|
||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, #mqtt_packet_limit{max_packet_size = MaxLen}) ->
|
||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, MaxLen) ->
|
||||
FrameLen = Value + Len * Multiplier,
|
||||
if
|
||||
FrameLen > MaxLen -> {error, invalid_mqtt_frame_len};
|
||||
|
|
|
@ -35,13 +35,15 @@
|
|||
|
||||
-export([process/2]).
|
||||
|
||||
-record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0,
|
||||
send_pkt = 0, send_msg = 0}).
|
||||
|
||||
%% Protocol State
|
||||
-record(proto_state, {peername, sendfun, connected = false,
|
||||
client_id, client_pid, clean_sess,
|
||||
proto_ver, proto_name, username, is_superuser = false,
|
||||
will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
|
||||
session, ws_initial_headers, %% Headers from first HTTP request for websocket client
|
||||
connected_at}).
|
||||
%% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
|
||||
-record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
|
||||
clean_sess, proto_ver, proto_name, username, is_superuser,
|
||||
will_msg, keepalive, max_clientid_len, session, stats_data,
|
||||
ws_initial_headers, connected_at}).
|
||||
|
||||
-type(proto_state() :: #proto_state{}).
|
||||
|
||||
|
@ -56,20 +58,22 @@
|
|||
|
||||
%% @doc Init protocol
|
||||
init(Peername, SendFun, Opts) ->
|
||||
lists:foreach(fun(K) -> put(K, 0) end, ?STATS_KEYS),
|
||||
EnableStats = get_value(client_enable_stats, Opts, false),
|
||||
MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
|
||||
WsInitialHeaders = get_value(ws_initial_headers, Opts),
|
||||
#proto_state{peername = Peername,
|
||||
sendfun = SendFun,
|
||||
max_clientid_len = MaxLen,
|
||||
client_pid = self(),
|
||||
ws_initial_headers = WsInitialHeaders}.
|
||||
max_clientid_len = MaxLen,
|
||||
is_superuser = false,
|
||||
ws_initial_headers = WsInitialHeaders,
|
||||
stats_data = #proto_stats{enable_stats = EnableStats}}.
|
||||
|
||||
info(ProtoState) ->
|
||||
?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
|
||||
|
||||
stats(_ProtoState) ->
|
||||
[{K, get(K)} || K <- ?STATS_KEYS].
|
||||
stats(#proto_state{stats_data = Stats}) ->
|
||||
tl(?record_to_proplist(proto_stats, Stats)).
|
||||
|
||||
clientid(#proto_state{client_id = ClientId}) ->
|
||||
ClientId.
|
||||
|
@ -106,8 +110,10 @@ session(#proto_state{session = Session}) ->
|
|||
|
||||
%% A Client can only send the CONNECT Packet once over a Network Connection.
|
||||
-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
|
||||
received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
|
||||
process(Packet, State#proto_state{connected = true});
|
||||
received(Packet = ?PACKET(?CONNECT),
|
||||
State = #proto_state{connected = false, stats_data = Stats}) ->
|
||||
trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats),
|
||||
process(Packet, State#proto_state{connected = true, stats_data = Stats1});
|
||||
|
||||
received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
|
||||
{error, protocol_bad_connect, State};
|
||||
|
@ -116,11 +122,11 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
|
|||
received(_Packet, State = #proto_state{connected = false}) ->
|
||||
{error, protocol_not_connected, State};
|
||||
|
||||
received(Packet = ?PACKET(_Type), State) ->
|
||||
trace(recv, Packet, State),
|
||||
received(Packet = ?PACKET(Type), State = #proto_state{stats_data = Stats}) ->
|
||||
trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats),
|
||||
case validate_packet(Packet) of
|
||||
ok ->
|
||||
process(Packet, State);
|
||||
process(Packet, State#proto_state{stats_data = Stats1});
|
||||
{error, Reason} ->
|
||||
{error, Reason, State}
|
||||
end.
|
||||
|
@ -151,7 +157,7 @@ unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId,
|
|||
%% @doc Send PUBREL
|
||||
pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State).
|
||||
|
||||
process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
||||
process(?CONNECT_PACKET(Var), State0) ->
|
||||
|
||||
#mqtt_packet_connect{proto_ver = ProtoVer,
|
||||
proto_name = ProtoName,
|
||||
|
@ -170,8 +176,6 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|||
will_msg = willmsg(Var),
|
||||
connected_at = os:timestamp()},
|
||||
|
||||
trace(recv, Packet, State1),
|
||||
|
||||
{ReturnCode1, SessPresent, State3} =
|
||||
case validate_connect(Var, State1) of
|
||||
?CONNACK_ACCEPT ->
|
||||
|
@ -187,6 +191,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|||
emqttd_cm:reg(client(State2)),
|
||||
%% Start keepalive
|
||||
start_keepalive(KeepAlive),
|
||||
%% Emit Stats
|
||||
self() ! emit_stats,
|
||||
%% ACCEPT
|
||||
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
|
||||
{error, Error} ->
|
||||
|
@ -310,22 +316,37 @@ send(Msg, State = #proto_state{client_id = ClientId, username = Username})
|
|||
emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
|
||||
send(emqttd_message:to_packet(Msg), State);
|
||||
|
||||
send(Packet, State = #proto_state{sendfun = SendFun})
|
||||
when is_record(Packet, mqtt_packet) ->
|
||||
send(Packet = ?PACKET(Type),
|
||||
State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
|
||||
trace(send, Packet, State),
|
||||
emqttd_metrics:sent(Packet),
|
||||
SendFun(Packet),
|
||||
{ok, State}.
|
||||
Stats1 = inc_stats(send, Type, Stats),
|
||||
{ok, State#proto_state{stats_data = Stats1}}.
|
||||
|
||||
trace(recv, Packet = ?PACKET(Type), ProtoState) ->
|
||||
inc(recv_pkt), ?IF(Type =:= ?PUBLISH, inc(recv_msg), ok),
|
||||
trace(recv, Packet, ProtoState) ->
|
||||
?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
|
||||
|
||||
trace(send, Packet = ?PACKET(Type), ProtoState) ->
|
||||
inc(send_pkt), ?IF(Type =:= ?PUBLISH, inc(send_msg), ok),
|
||||
trace(send, Packet, ProtoState) ->
|
||||
?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
|
||||
|
||||
inc(Key) -> put(Key, get(Key) + 1).
|
||||
inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) ->
|
||||
Stats;
|
||||
|
||||
inc_stats(recv, Type, Stats) ->
|
||||
#proto_stats{recv_pkt = PktCnt, recv_msg = MsgCnt} = Stats,
|
||||
inc_stats(Type, #proto_stats.recv_pkt, PktCnt, #proto_stats.recv_msg, MsgCnt, Stats);
|
||||
|
||||
inc_stats(send, Type, Stats) ->
|
||||
#proto_stats{send_pkt = PktCnt, send_msg = MsgCnt} = Stats,
|
||||
inc_stats(Type, #proto_stats.send_pkt, PktCnt, #proto_stats.send_msg, MsgCnt, Stats).
|
||||
|
||||
inc_stats(Type, PktPos, PktCnt, MsgPos, MsgCnt, Stats) ->
|
||||
Stats1 = setelement(PktPos, Stats, PktCnt + 1),
|
||||
case Type =:= ?PUBLISH of
|
||||
true -> setelement(MsgPos, Stats1, MsgCnt + 1);
|
||||
false -> Stats1
|
||||
end.
|
||||
|
||||
stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
|
||||
{stop, {shutdown, auth_failure}, State};
|
||||
|
|
|
@ -164,11 +164,12 @@ pick(Subscriber) ->
|
|||
|
||||
init([Pool, Id, Env]) ->
|
||||
?GPROC_POOL(join, Pool, Id),
|
||||
{ok, #state{pool = Pool, id = Id, env = Env}}.
|
||||
{ok, #state{pool = Pool, id = Id, env = Env},
|
||||
hibernate, {backoff, 2000, 2000, 20000}}.
|
||||
|
||||
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
||||
add_subscriber(Topic, Subscriber, Options),
|
||||
{reply, ok, setstats(State)};
|
||||
{reply, ok, setstats(State), hibernate};
|
||||
|
||||
handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) ->
|
||||
del_subscriber(Topic, Subscriber, Options),
|
||||
|
@ -179,7 +180,7 @@ handle_call(Req, _From, State) ->
|
|||
|
||||
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
||||
add_subscriber(Topic, Subscriber, Options),
|
||||
{noreply, setstats(State)};
|
||||
{noreply, setstats(State), hibernate};
|
||||
|
||||
handle_cast({unsubscribe, Topic, Subscriber, Options}, State) ->
|
||||
del_subscriber(Topic, Subscriber, Options),
|
||||
|
|
|
@ -42,7 +42,7 @@ serialize_header(#mqtt_packet_header{type = Type,
|
|||
{VariableBin, PayloadBin})
|
||||
when ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
|
||||
Len = byte_size(VariableBin) + byte_size(PayloadBin),
|
||||
true = (Len =< ?MAX_LEN),
|
||||
true = (Len =< ?MAX_PACKET_SIZE),
|
||||
[<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1>>,
|
||||
serialize_len(Len), VariableBin, PayloadBin].
|
||||
|
||||
|
|
|
@ -74,7 +74,8 @@
|
|||
terminate/2, code_change/3]).
|
||||
|
||||
%% gen_server2 Message Priorities
|
||||
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
|
||||
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3,
|
||||
handle_pre_hibernate/1]).
|
||||
|
||||
-record(state,
|
||||
{
|
||||
|
@ -114,7 +115,7 @@
|
|||
max_inflight = 32 :: non_neg_integer(),
|
||||
|
||||
%% Retry interval for redelivering QoS1/2 messages
|
||||
retry_interval = 20000 :: pos_integer(),
|
||||
retry_interval = 20000 :: timeout(),
|
||||
|
||||
%% Retry Timer
|
||||
retry_timer :: reference(),
|
||||
|
@ -129,7 +130,7 @@
|
|||
awaiting_rel :: map(),
|
||||
|
||||
%% Awaiting PUBREL timeout
|
||||
await_rel_timeout = 20000 :: pos_integer(),
|
||||
await_rel_timeout = 20000 :: timeout(),
|
||||
|
||||
%% Max Packets that Awaiting PUBREL
|
||||
max_awaiting_rel = 100 :: non_neg_integer(),
|
||||
|
@ -138,16 +139,16 @@
|
|||
await_rel_timer :: reference(),
|
||||
|
||||
%% Session Expiry Interval
|
||||
expiry_interval = 7200000 :: pos_integer(),
|
||||
expiry_interval = 7200000 :: timeout(),
|
||||
|
||||
%% Expired Timer
|
||||
expiry_timer :: reference(),
|
||||
|
||||
%% Enable Stats
|
||||
enable_stats :: false | pos_integer(),
|
||||
enable_stats :: boolean(),
|
||||
|
||||
%% Stats Timer
|
||||
stats_timer :: reference(),
|
||||
%% Force GC Count
|
||||
force_gc_count :: undefined | integer(),
|
||||
|
||||
created_at :: erlang:timestamp()
|
||||
}).
|
||||
|
@ -159,7 +160,8 @@
|
|||
-define(STATE_KEYS, [clean_sess, client_id, username, binding, client_pid, old_client_pid,
|
||||
next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
|
||||
max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
|
||||
await_rel_timeout, expiry_interval, enable_stats, created_at]).
|
||||
await_rel_timeout, expiry_interval, enable_stats, force_gc_count,
|
||||
created_at]).
|
||||
|
||||
-define(LOG(Level, Format, Args, State),
|
||||
lager:Level([{client, State#state.client_id}],
|
||||
|
@ -168,7 +170,8 @@
|
|||
%% @doc Start a Session
|
||||
-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}).
|
||||
start_link(CleanSess, {ClientId, Username}, ClientPid) ->
|
||||
gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
|
||||
gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid],
|
||||
[{spawn_opt, ?FULLSWEEP_OPTS}]). %% Tune GC.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% PubSub API
|
||||
|
@ -282,6 +285,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
|
|||
{ok, QEnv} = emqttd:env(queue),
|
||||
MaxInflight = get_value(max_inflight, Env, 0),
|
||||
EnableStats = get_value(enable_stats, Env, false),
|
||||
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
||||
MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
|
||||
State = #state{clean_sess = CleanSess,
|
||||
binding = binding(ClientPid),
|
||||
|
@ -300,11 +304,11 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
|
|||
max_awaiting_rel = get_value(max_awaiting_rel, Env),
|
||||
expiry_interval = get_value(expiry_interval, Env),
|
||||
enable_stats = EnableStats,
|
||||
force_gc_count = ForceGcCount,
|
||||
created_at = os:timestamp()},
|
||||
emqttd_stats:set_session_stats(ClientId, stats(State)),
|
||||
emqttd_sm:register_session(ClientId, CleanSess, info(State)),
|
||||
emqttd_hooks:run('session.created', [ClientId, Username]),
|
||||
{ok, State, hibernate, {backoff, 1000, 1000, 5000}, ?MODULE}.
|
||||
{ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}.
|
||||
|
||||
init_stats(Keys) ->
|
||||
lists:foreach(fun(K) -> put(K, 0) end, Keys).
|
||||
|
@ -336,10 +340,13 @@ prioritise_info(Msg, _Len, _State) ->
|
|||
_ -> 0
|
||||
end.
|
||||
|
||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}},
|
||||
_From, State = #state{awaiting_rel = AwaitingRel,
|
||||
await_rel_timer = Timer,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
handle_pre_hibernate(State) ->
|
||||
{hibernate, emqttd_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
|
||||
|
||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From,
|
||||
State = #state{awaiting_rel = AwaitingRel,
|
||||
await_rel_timer = Timer,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
case is_awaiting_full(State) of
|
||||
false ->
|
||||
State1 = case Timer == undefined of
|
||||
|
@ -391,7 +398,7 @@ handle_cast({subscribe, _From, TopicTable, AckFun},
|
|||
{[NewQos|QosAcc], SubMap1}
|
||||
end, {[], Subscriptions}, TopicTable),
|
||||
AckFun(lists:reverse(GrantedQos)),
|
||||
noreply(emit_stats(State#state{subscriptions = Subscriptions1}));
|
||||
hibernate(emit_stats(State#state{subscriptions = Subscriptions1}));
|
||||
|
||||
handle_cast({unsubscribe, _From, TopicTable},
|
||||
State = #state{client_id = ClientId,
|
||||
|
@ -409,55 +416,59 @@ handle_cast({unsubscribe, _From, TopicTable},
|
|||
SubMap
|
||||
end
|
||||
end, Subscriptions, TopicTable),
|
||||
noreply(emit_stats(State#state{subscriptions = Subscriptions1}));
|
||||
hibernate(emit_stats(State#state{subscriptions = Subscriptions1}));
|
||||
|
||||
%% PUBACK:
|
||||
handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) ->
|
||||
case Inflight:contain(PacketId) of
|
||||
true ->
|
||||
noreply(dequeue(acked(puback, PacketId, State)));
|
||||
false ->
|
||||
?LOG(warning, "The PUBACK ~p is not inflight: ~p",
|
||||
[PacketId, Inflight:window()], State),
|
||||
emqttd_metrics:inc('packets/puback/missed'),
|
||||
noreply(State)
|
||||
end;
|
||||
{noreply,
|
||||
case Inflight:contain(PacketId) of
|
||||
true ->
|
||||
dequeue(acked(puback, PacketId, State));
|
||||
false ->
|
||||
?LOG(warning, "PUBACK ~p missed inflight: ~p",
|
||||
[PacketId, Inflight:window()], State),
|
||||
emqttd_metrics:inc('packets/puback/missed'),
|
||||
State
|
||||
end, hibernate};
|
||||
|
||||
%% PUBREC:
|
||||
handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) ->
|
||||
case Inflight:contain(PacketId) of
|
||||
true ->
|
||||
noreply(acked(pubrec, PacketId, State));
|
||||
false ->
|
||||
?LOG(warning, "The PUBREC ~p is not inflight: ~p",
|
||||
[PacketId, Inflight:window()], State),
|
||||
emqttd_metrics:inc('packets/pubrec/missed'),
|
||||
noreply(State)
|
||||
end;
|
||||
{noreply,
|
||||
case Inflight:contain(PacketId) of
|
||||
true ->
|
||||
acked(pubrec, PacketId, State);
|
||||
false ->
|
||||
?LOG(warning, "PUBREC ~p missed inflight: ~p",
|
||||
[PacketId, Inflight:window()], State),
|
||||
emqttd_metrics:inc('packets/pubrec/missed'),
|
||||
State
|
||||
end, hibernate};
|
||||
|
||||
%% PUBREL:
|
||||
handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
|
||||
case maps:take(PacketId, AwaitingRel) of
|
||||
{Msg, AwaitingRel1} ->
|
||||
spawn(emqttd_server, publish, [Msg]),%%:)
|
||||
noreply(State#state{awaiting_rel = AwaitingRel1});
|
||||
error ->
|
||||
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
|
||||
emqttd_metrics:inc('packets/pubrel/missed'),
|
||||
noreply(State)
|
||||
end;
|
||||
{noreply,
|
||||
case maps:take(PacketId, AwaitingRel) of
|
||||
{Msg, AwaitingRel1} ->
|
||||
spawn(emqttd_server, publish, [Msg]), %%:)
|
||||
gc(State#state{awaiting_rel = AwaitingRel1});
|
||||
error ->
|
||||
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
|
||||
emqttd_metrics:inc('packets/pubrel/missed'),
|
||||
State
|
||||
end, hibernate};
|
||||
|
||||
%% PUBCOMP:
|
||||
handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) ->
|
||||
case Inflight:contain(PacketId) of
|
||||
true ->
|
||||
noreply(dequeue(acked(pubcomp, PacketId, State)));
|
||||
false ->
|
||||
?LOG(warning, "The PUBCOMP ~p is not inflight: ~p",
|
||||
[PacketId, Inflight:window()], State),
|
||||
emqttd_metrics:inc('packets/pubcomp/missed'),
|
||||
noreply(State)
|
||||
end;
|
||||
{noreply,
|
||||
case Inflight:contain(PacketId) of
|
||||
true ->
|
||||
dequeue(acked(pubcomp, PacketId, State));
|
||||
false ->
|
||||
?LOG(warning, "The PUBCOMP ~p is not inflight: ~p",
|
||||
[PacketId, Inflight:window()], State),
|
||||
emqttd_metrics:inc('packets/pubcomp/missed'),
|
||||
State
|
||||
end, hibernate};
|
||||
|
||||
%% RESUME:
|
||||
handle_cast({resume, ClientId, ClientPid},
|
||||
|
@ -466,14 +477,13 @@ handle_cast({resume, ClientId, ClientPid},
|
|||
clean_sess = CleanSess,
|
||||
retry_timer = RetryTimer,
|
||||
await_rel_timer = AwaitTimer,
|
||||
stats_timer = StatsTimer,
|
||||
expiry_timer = ExpireTimer}) ->
|
||||
|
||||
?LOG(info, "Resumed by ~p", [ClientPid], State),
|
||||
|
||||
%% Cancel Timers
|
||||
lists:foreach(fun emqttd_misc:cancel_timer/1,
|
||||
[RetryTimer, AwaitTimer, StatsTimer, ExpireTimer]),
|
||||
[RetryTimer, AwaitTimer, ExpireTimer]),
|
||||
|
||||
case kick(ClientId, OldClientPid, ClientPid) of
|
||||
ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State);
|
||||
|
@ -501,15 +511,15 @@ handle_cast({resume, ClientId, ClientPid},
|
|||
end,
|
||||
|
||||
%% Replay delivery and Dequeue pending messages
|
||||
noreply(emit_stats(dequeue(retry_delivery(true, State1))));
|
||||
hibernate(emit_stats(dequeue(retry_delivery(true, State1))));
|
||||
|
||||
handle_cast({destroy, ClientId}, State = #state{client_id = ClientId,
|
||||
client_pid = undefined}) ->
|
||||
handle_cast({destroy, ClientId},
|
||||
State = #state{client_id = ClientId, client_pid = undefined}) ->
|
||||
?LOG(warning, "Destroyed", [], State),
|
||||
shutdown(destroy, State);
|
||||
|
||||
handle_cast({destroy, ClientId}, State = #state{client_id = ClientId,
|
||||
client_pid = OldClientPid}) ->
|
||||
handle_cast({destroy, ClientId},
|
||||
State = #state{client_id = ClientId, client_pid = OldClientPid}) ->
|
||||
?LOG(warning, "kickout ~p", [OldClientPid], State),
|
||||
shutdown(conflict, State);
|
||||
|
||||
|
@ -518,20 +528,17 @@ handle_cast(Msg, State) ->
|
|||
|
||||
%% Dispatch Message
|
||||
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
|
||||
noreply(dispatch(tune_qos(Topic, Msg, State), State));
|
||||
{noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate};
|
||||
|
||||
%% Do nothing if the client has been disconnected.
|
||||
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
|
||||
hibernate(emit_stats(State#state{retry_timer = undefined}));
|
||||
|
||||
handle_info({timeout, _Timer, retry_delivery}, State) ->
|
||||
noreply(emit_stats(retry_delivery(false, State#state{retry_timer = undefined})));
|
||||
hibernate(emit_stats(retry_delivery(false, State#state{retry_timer = undefined})));
|
||||
|
||||
handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
|
||||
noreply(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
|
||||
|
||||
handle_info({timeout, _Timer, emit_stats}, State) ->
|
||||
hibernate(maybe_enable_stats(emit_stats(State)));
|
||||
hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
|
||||
|
||||
handle_info({timeout, _Timer, expired}, State) ->
|
||||
?LOG(info, "Expired, shutdown now.", [], State),
|
||||
|
@ -548,7 +555,7 @@ handle_info({'EXIT', ClientPid, Reason},
|
|||
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
|
||||
ExpireTimer = start_timer(Interval, expired),
|
||||
State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
|
||||
hibernate(maybe_enable_stats(emit_stats(State1)));
|
||||
hibernate(emit_stats(State1));
|
||||
|
||||
handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
|
||||
%%ignore
|
||||
|
@ -690,7 +697,8 @@ dispatch(Msg = #mqtt_message{qos = QoS},
|
|||
end.
|
||||
|
||||
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
||||
inc(enqueue_msg), State#state{mqueue = emqttd_mqueue:in(Msg, Q)}.
|
||||
inc_stats(enqueue_msg),
|
||||
State#state{mqueue = emqttd_mqueue:in(Msg, Q)}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Deliver
|
||||
|
@ -700,7 +708,8 @@ redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
|
|||
deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State).
|
||||
|
||||
deliver(Msg, #state{client_pid = Pid}) ->
|
||||
inc(deliver_msg), Pid ! {deliver, Msg}.
|
||||
inc_stats(deliver_msg),
|
||||
Pid ! {deliver, Msg}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Awaiting ACK for QoS1/QoS2 Messages
|
||||
|
@ -785,31 +794,20 @@ next_msg_id(State = #state{next_msg_id = Id}) ->
|
|||
%% Emit session stats
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
maybe_enable_stats(State = #state{enable_stats = false}) ->
|
||||
State;
|
||||
maybe_enable_stats(State = #state{client_pid = Pid}) when is_pid(Pid) ->
|
||||
State;
|
||||
maybe_enable_stats(State = #state{enable_stats = Interval}) ->
|
||||
StatsTimer = start_timer(Interval, emit_stats),
|
||||
State#state{stats_timer = StatsTimer}.
|
||||
|
||||
emit_stats(State = #state{enable_stats = false}) ->
|
||||
State;
|
||||
emit_stats(State = #state{client_id = ClientId}) ->
|
||||
emqttd_stats:set_session_stats(ClientId, stats(State)),
|
||||
State.
|
||||
|
||||
inc_stats(Key) -> put(Key, get(Key) + 1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
inc(Key) -> put(Key, get(Key) + 1).
|
||||
|
||||
reply(Reply, State) ->
|
||||
{reply, Reply, State}.
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State}.
|
||||
{reply, Reply, State, hibernate}.
|
||||
|
||||
hibernate(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
@ -817,3 +815,6 @@ hibernate(State) ->
|
|||
shutdown(Reason, State) ->
|
||||
{stop, {shutdown, Reason}, State}.
|
||||
|
||||
gc(State) ->
|
||||
emqttd_gc:maybe_force_gc(#state.force_gc_count, State).
|
||||
|
||||
|
|
|
@ -21,10 +21,7 @@
|
|||
-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1, ts_from_ms/1]).
|
||||
|
||||
seed() ->
|
||||
case erlang:function_exported(erlang, timestamp, 0) of
|
||||
true -> rand:seed(exsplus, erlang:timestamp()); %% R18
|
||||
false -> random:seed(os:timestamp()) %% Compress now() deprecated warning...
|
||||
end.
|
||||
rand:seed(exsplus, erlang:timestamp()).
|
||||
|
||||
now_ms() ->
|
||||
now_ms(os:timestamp()).
|
||||
|
@ -40,3 +37,4 @@ now_secs({MegaSecs, Secs, _MicroSecs}) ->
|
|||
|
||||
ts_from_ms(Ms) ->
|
||||
{Ms div 1000000, Ms rem 1000000, 0}.
|
||||
|
||||
|
|
|
@ -18,13 +18,18 @@
|
|||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-include("emqttd_protocol.hrl").
|
||||
|
||||
-import(proplists, [get_value/3]).
|
||||
|
||||
-export([handle_request/1, ws_loop/3]).
|
||||
|
||||
%% WebSocket Loop State
|
||||
-record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}).
|
||||
-record(wsocket_state, {peername, client_pid, max_packet_size, parser}).
|
||||
|
||||
-define(WSLOG(Level, Peer, Format, Args),
|
||||
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
|
||||
-define(WSLOG(Level, Format, Args, State),
|
||||
lager:Level("WsClient(~s): " ++ Format,
|
||||
[esockd_net:format(State#wsocket_state.peername) | Args])).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle WebSocket Request
|
||||
|
@ -32,18 +37,14 @@
|
|||
|
||||
%% @doc Handle WebSocket Request.
|
||||
handle_request(Req) ->
|
||||
Peer = Req:get(peer),
|
||||
{ok, PktOpts} = emqttd:env(protocol),
|
||||
ParserFun = emqttd_parser:new(PktOpts),
|
||||
{ReentryWs, ReplyChannel} = upgrade(Req),
|
||||
{ok, Env} = emqttd:env(protocol),
|
||||
PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
|
||||
Parser = emqttd_parser:initial_state(PacketSize),
|
||||
%% Upgrade WebSocket.
|
||||
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
||||
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
|
||||
ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid,
|
||||
packet_opts = PktOpts, parser_fun = ParserFun}).
|
||||
|
||||
%% @doc Upgrade WebSocket.
|
||||
%% @private
|
||||
upgrade(Req) ->
|
||||
mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
|
||||
ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
|
||||
max_packet_size = PacketSize, client_pid = ClientPid}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Receive Loop
|
||||
|
@ -54,25 +55,24 @@ ws_loop(<<>>, State, _ReplyChannel) ->
|
|||
State;
|
||||
ws_loop([<<>>], State, _ReplyChannel) ->
|
||||
State;
|
||||
ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
|
||||
parser_fun = ParserFun}, ReplyChannel) ->
|
||||
?WSLOG(debug, Peer, "RECV ~p", [Data]),
|
||||
ws_loop(Data, State = #wsocket_state{client_pid = ClientPid, parser = Parser}, ReplyChannel) ->
|
||||
?WSLOG(debug, "RECV ~p", [Data], State),
|
||||
emqttd_metrics:inc('bytes/received', iolist_size(Data)),
|
||||
case catch ParserFun(iolist_to_binary(Data)) of
|
||||
case catch emqttd_parser:parse(iolist_to_binary(Data), Parser) of
|
||||
{more, NewParser} ->
|
||||
State#wsocket_state{parser_fun = NewParser};
|
||||
State#wsocket_state{parser = NewParser};
|
||||
{ok, Packet, Rest} ->
|
||||
gen_server:cast(ClientPid, {received, Packet}),
|
||||
ws_loop(Rest, reset_parser(State), ReplyChannel);
|
||||
{error, Error} ->
|
||||
?WSLOG(error, Peer, "Frame error: ~p", [Error]),
|
||||
?WSLOG(error, "Frame error: ~p", [Error], State),
|
||||
exit({shutdown, Error});
|
||||
{'EXIT', Reason} ->
|
||||
?WSLOG(error, Peer, "Frame error: ~p", [Reason]),
|
||||
?WSLOG(error, Peer, "Error data: ~p", [Data]),
|
||||
?WSLOG(error, "Frame error: ~p", [Reason], State),
|
||||
?WSLOG(error, "Error data: ~p", [Data], State),
|
||||
exit({shutdown, parser_error})
|
||||
end.
|
||||
|
||||
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
|
||||
State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
|
||||
reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) ->
|
||||
State#wsocket_state{parser = emqttd_parser:initial_state(PacketSize)}.
|
||||
|
||||
|
|
|
@ -14,9 +14,11 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc MQTT WebSocket Connection.
|
||||
|
||||
-module(emqttd_ws_client).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-behaviour(gen_server2).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
|
@ -24,6 +26,10 @@
|
|||
|
||||
-include("emqttd_protocol.hrl").
|
||||
|
||||
-include("emqttd_internal.hrl").
|
||||
|
||||
-import(proplists, [get_value/3]).
|
||||
|
||||
%% API Exports
|
||||
-export([start_link/4]).
|
||||
|
||||
|
@ -40,27 +46,32 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
%% gen_server2 Callbacks
|
||||
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
|
||||
|
||||
%% WebSocket Client State
|
||||
-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive,
|
||||
enable_stats, stats_timer}).
|
||||
-record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive,
|
||||
enable_stats, force_gc_count}).
|
||||
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||
|
||||
-define(WSLOG(Level, Peer, Format, Args),
|
||||
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
|
||||
-define(WSLOG(Level, Format, Args, State),
|
||||
lager:Level("WsClient(~s): " ++ Format,
|
||||
[esockd_net:format(State#wsclient_state.peername) | Args])).
|
||||
|
||||
%% @doc Start WebSocket Client.
|
||||
start_link(Env, WsPid, Req, ReplyChannel) ->
|
||||
gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []).
|
||||
gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel],
|
||||
[{spawn_opt, ?FULLSWEEP_OPTS}]). %% Tune GC.
|
||||
|
||||
info(CPid) ->
|
||||
gen_server:call(CPid, info).
|
||||
gen_server2:call(CPid, info).
|
||||
|
||||
stats(CPid) ->
|
||||
gen_server:call(CPid, stats).
|
||||
gen_server2:call(CPid, stats).
|
||||
|
||||
kick(CPid) ->
|
||||
gen_server:call(CPid, kick).
|
||||
gen_server2:call(CPid, kick).
|
||||
|
||||
subscribe(CPid, TopicTable) ->
|
||||
CPid ! {subscribe, TopicTable}.
|
||||
|
@ -69,7 +80,7 @@ unsubscribe(CPid, Topics) ->
|
|||
CPid ! {unsubscribe, Topics}.
|
||||
|
||||
session(CPid) ->
|
||||
gen_server:call(CPid, session).
|
||||
gen_server2:call(CPid, session).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server Callbacks
|
||||
|
@ -81,49 +92,57 @@ init([Env, WsPid, Req, ReplyChannel]) ->
|
|||
{ok, Peername} = Req:get(peername),
|
||||
Headers = mochiweb_headers:to_list(
|
||||
mochiweb_request:get(headers, Req)),
|
||||
%% SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
|
||||
SendFun = fun(Packet) ->
|
||||
Data = emqttd_serializer:serialize(Packet),
|
||||
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
|
||||
ReplyChannel({binary, Data})
|
||||
end,
|
||||
EnableStats = proplists:get_value(client_enable_stats, Env, false),
|
||||
ProtoState = emqttd_protocol:init(Peername, SendFun,
|
||||
ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel),
|
||||
[{ws_initial_headers, Headers} | Env]),
|
||||
{ok, maybe_enable_stats(#wsclient_state{ws_pid = WsPid,
|
||||
peer = Req:get(peer),
|
||||
connection = Req:get(connection),
|
||||
proto_state = ProtoState,
|
||||
enable_stats = EnableStats}),
|
||||
proplists:get_value(client_idle_timeout, Env, 30000)}.
|
||||
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
|
||||
EnableStats = get_value(client_enable_stats, Env, false),
|
||||
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
||||
{ok, #wsclient_state{ws_pid = WsPid,
|
||||
peername = Peername,
|
||||
connection = Req:get(connection),
|
||||
proto_state = ProtoState,
|
||||
enable_stats = EnableStats,
|
||||
force_gc_count = ForceGcCount},
|
||||
IdleTimeout, {backoff, 1000, 1000, 10000}, ?MODULE}.
|
||||
|
||||
handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
|
||||
Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)],
|
||||
{reply, Stats, _} = handle_call(stats, From, State),
|
||||
{reply, lists:append(Info, Stats), State};
|
||||
prioritise_call(Msg, _From, _Len, _State) ->
|
||||
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
|
||||
|
||||
prioritise_info(Msg, _Len, _State) ->
|
||||
case Msg of {redeliver, _} -> 5; _ -> 0 end.
|
||||
|
||||
handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) ->
|
||||
erlang:garbage_collect(WsPid),
|
||||
{hibernate, emqttd_gc:reset_conn_gc_count(#wsclient_state.force_gc_count, emit_stats(State))}.
|
||||
|
||||
handle_call(info, From, State = #wsclient_state{peername = Peername,
|
||||
proto_state = ProtoState}) ->
|
||||
Info = [{websocket, true}, {peername, Peername} | emqttd_protocol:info(ProtoState)],
|
||||
{reply, Stats, _, _} = handle_call(stats, From, State),
|
||||
reply(lists:append(Info, Stats), State);
|
||||
|
||||
handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||
{reply, lists:append([emqttd_misc:proc_stats(),
|
||||
wsock_stats(State),
|
||||
emqttd_protocol:stats(ProtoState)]), State};
|
||||
reply(lists:append([emqttd_misc:proc_stats(),
|
||||
wsock_stats(State),
|
||||
emqttd_protocol:stats(ProtoState)]), State);
|
||||
|
||||
handle_call(kick, _From, State) ->
|
||||
{stop, {shutdown, kick}, ok, State};
|
||||
|
||||
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||
{reply, emqttd_protocol:session(ProtoState), State};
|
||||
reply(emqttd_protocol:session(ProtoState), State);
|
||||
|
||||
handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
|
||||
?WSLOG(error, Peer, "Unexpected request: ~p", [Req]),
|
||||
{reply, {error, unsupported_request}, State}.
|
||||
handle_call(Req, _From, State) ->
|
||||
?WSLOG(error, "Unexpected request: ~p", [Req], State),
|
||||
reply({error, unexpected_request}, State).
|
||||
|
||||
handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
|
||||
handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||
emqttd_metrics:received(Packet),
|
||||
case emqttd_protocol:received(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
{noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate};
|
||||
{noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate};
|
||||
{error, Error} ->
|
||||
?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
|
||||
?WSLOG(error, "Protocol error - ~p", [Error], State),
|
||||
shutdown(Error, State);
|
||||
{error, Error, ProtoState1} ->
|
||||
shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
|
||||
|
@ -131,9 +150,9 @@ handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state
|
|||
stop(Reason, State#wsclient_state{proto_state = ProtoState1})
|
||||
end;
|
||||
|
||||
handle_cast(Msg, State = #wsclient_state{peer = Peer}) ->
|
||||
?WSLOG(error, Peer, "Unexpected msg: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
handle_cast(Msg, State) ->
|
||||
?WSLOG(error, "Unexpected Msg: ~p", [Msg], State),
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
handle_info({subscribe, TopicTable}, State) ->
|
||||
with_proto(
|
||||
|
@ -158,7 +177,7 @@ handle_info({deliver, Message}, State) ->
|
|||
with_proto(
|
||||
fun(ProtoState) ->
|
||||
emqttd_protocol:send(Message, ProtoState)
|
||||
end, State);
|
||||
end, gc(State));
|
||||
|
||||
handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
|
||||
with_proto(
|
||||
|
@ -166,50 +185,43 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
|
|||
emqttd_protocol:pubrel(PacketId, ProtoState)
|
||||
end, State);
|
||||
|
||||
handle_info({timeout, _Timer, emit_stats}, State) ->
|
||||
{noreply, maybe_enable_stats(emit_stats(State)), hibernate};
|
||||
handle_info(emit_stats, State) ->
|
||||
{noreply, emit_stats(State), hibernate};
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) ->
|
||||
?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
|
||||
handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
||||
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
|
||||
shutdown(conflict, State);
|
||||
|
||||
handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) ->
|
||||
?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]),
|
||||
StatFun = fun() ->
|
||||
case Conn:getstat([recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
{error, Error} -> {error, Error}
|
||||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
|
||||
{noreply, stats_by_keepalive(State#wsclient_state{keepalive = KeepAlive})};
|
||||
handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) ->
|
||||
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
||||
KeepAlive = emqttd_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}),
|
||||
{noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
|
||||
|
||||
handle_info({keepalive, check}, State = #wsclient_state{peer = Peer,
|
||||
keepalive = KeepAlive}) ->
|
||||
handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
{noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate};
|
||||
{error, timeout} ->
|
||||
?WSLOG(debug, Peer, "Keepalive Timeout!", []),
|
||||
?WSLOG(debug, "Keepalive Timeout!", [], State),
|
||||
shutdown(keepalive_timeout, State);
|
||||
{error, Error} ->
|
||||
?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]),
|
||||
?WSLOG(warning, "Keepalive error - ~p", [Error], State),
|
||||
shutdown(keepalive_error, State)
|
||||
end;
|
||||
|
||||
handle_info({'EXIT', WsPid, normal}, State = #wsclient_state{ws_pid = WsPid}) ->
|
||||
stop(normal, State);
|
||||
|
||||
handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid = WsPid}) ->
|
||||
?WSLOG(error, Peer, "shutdown: ~p",[Reason]),
|
||||
handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
|
||||
?WSLOG(error, "shutdown: ~p",[Reason], State),
|
||||
shutdown(Reason, State);
|
||||
|
||||
handle_info(Info, State = #wsclient_state{peer = Peer}) ->
|
||||
?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
handle_info(Info, State) ->
|
||||
?WSLOG(error, "Unexpected Info: ~p", [Info], State),
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
|
||||
emqttd_keepalive:cancel(KeepAlive),
|
||||
|
@ -227,21 +239,31 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
maybe_enable_stats(State = #wsclient_state{enable_stats = false}) ->
|
||||
State;
|
||||
maybe_enable_stats(State = #wsclient_state{enable_stats = keepalive}) ->
|
||||
State;
|
||||
maybe_enable_stats(State = #wsclient_state{enable_stats = Interval}) ->
|
||||
State#wsclient_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}.
|
||||
send_fun(ReplyChannel) ->
|
||||
fun(Packet) ->
|
||||
Data = emqttd_serializer:serialize(Packet),
|
||||
emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
|
||||
ReplyChannel({binary, Data})
|
||||
end.
|
||||
|
||||
stats_by_keepalive(State) ->
|
||||
State#wsclient_state{enable_stats = keepalive}.
|
||||
stat_fun(Conn) ->
|
||||
fun() ->
|
||||
case Conn:getstat([recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
{error, Error} -> {error, Error}
|
||||
end
|
||||
end.
|
||||
|
||||
emit_stats(State = #wsclient_state{enable_stats = false}) ->
|
||||
State;
|
||||
emit_stats(State = #wsclient_state{proto_state = ProtoState}) ->
|
||||
{reply, Stats, _} = handle_call(stats, undefined, State),
|
||||
emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats),
|
||||
emit_stats(emqttd_protocol:clientid(ProtoState), State).
|
||||
|
||||
emit_stats(_ClientId, State = #wsclient_state{enable_stats = false}) ->
|
||||
State;
|
||||
emit_stats(undefined, State) ->
|
||||
State;
|
||||
emit_stats(ClientId, State) ->
|
||||
{reply, Stats, _, _} = handle_call(stats, undefined, State),
|
||||
emqttd_stats:set_client_stats(ClientId, Stats),
|
||||
State.
|
||||
|
||||
wsock_stats(#wsclient_state{connection = Conn}) ->
|
||||
|
@ -252,11 +274,17 @@ wsock_stats(#wsclient_state{connection = Conn}) ->
|
|||
|
||||
with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = Fun(ProtoState),
|
||||
{noreply, State#wsclient_state{proto_state = ProtoState1}}.
|
||||
{noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}.
|
||||
|
||||
reply(Reply, State) ->
|
||||
{reply, Reply, State, hibernate}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
stop({shutdown, Reason}, State).
|
||||
|
||||
stop(Reason, State ) ->
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
gc(State) ->
|
||||
emqttd_gc:maybe_force_gc(#wsclient_state.force_gc_count, State).
|
||||
|
||||
|
|
|
@ -366,37 +366,39 @@ set_get_stat(_) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
add_delete_hook(_) ->
|
||||
emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
|
||||
emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
|
||||
{error, already_hooked} = emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
|
||||
Callbacks = [{callback, fun ?MODULE:hook_fun1/1, [], 0},
|
||||
{callback, fun ?MODULE:hook_fun2/1, [], 0}],
|
||||
ok = emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
|
||||
ok = emqttd:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
|
||||
{error, already_hooked} = emqttd:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
|
||||
Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0},
|
||||
{callback, tag, fun ?MODULE:hook_fun2/1, [], 0}],
|
||||
Callbacks = emqttd_hooks:lookup(test_hook),
|
||||
emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1),
|
||||
emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
|
||||
ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
|
||||
{error, not_found} = emqttd:unhook(test_hook1, fun ?MODULE:hook_fun2/1),
|
||||
ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1),
|
||||
ct:print("Callbacks: ~p~n", [emqttd_hooks:lookup(test_hook)]),
|
||||
ok = emqttd:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}),
|
||||
{error, not_found} = emqttd:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}),
|
||||
[] = emqttd_hooks:lookup(test_hook),
|
||||
|
||||
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9),
|
||||
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun2/1, [], 8),
|
||||
Callbacks2 = [{callback, fun ?MODULE:hook_fun2/1, [], 8},
|
||||
{callback, fun ?MODULE:hook_fun1/1, [], 9}],
|
||||
ok = emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9),
|
||||
ok = emqttd:hook(emqttd_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8),
|
||||
Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8},
|
||||
{callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}],
|
||||
Callbacks2 = emqttd_hooks:lookup(emqttd_hook),
|
||||
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1),
|
||||
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun2/1),
|
||||
ok = emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1),
|
||||
ok = emqttd:unhook(emqttd_hook, {"tag", fun ?MODULE:hook_fun2/1}),
|
||||
[] = emqttd_hooks:lookup(emqttd_hook).
|
||||
|
||||
run_hooks(_) ->
|
||||
emqttd:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
|
||||
emqttd:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
|
||||
emqttd:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
|
||||
ok = emqttd:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
|
||||
ok = emqttd:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]),
|
||||
ok = emqttd:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
|
||||
ok = emqttd:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
|
||||
{stop, [r3, r2]} = emqttd:run_hooks(foldl_hook, [arg1, arg2], []),
|
||||
{ok, []} = emqttd:run_hooks(unknown_hook, [], []),
|
||||
|
||||
emqttd:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
|
||||
emqttd:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
|
||||
emqttd:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
|
||||
ok = emqttd:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
|
||||
ok = emqttd:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]),
|
||||
ok = emqttd:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
|
||||
ok = emqttd:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
|
||||
stop = emqttd:run_hooks(foreach_hook, [arg]).
|
||||
|
||||
hook_fun1([]) -> ok.
|
||||
|
|
|
@ -71,7 +71,7 @@ groups() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
parse_connect(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
|
||||
|
@ -82,7 +82,7 @@ parse_connect(_) ->
|
|||
proto_name = <<"MQIsdp">>,
|
||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60}}, <<>>} = Parser(V31ConnBin),
|
||||
keep_alive = 60}}, <<>>} = emqttd_parser:parse(V31ConnBin, Parser),
|
||||
%% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
|
||||
|
@ -93,7 +93,7 @@ parse_connect(_) ->
|
|||
proto_name = <<"MQTT">>,
|
||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>} = Parser(V311ConnBin),
|
||||
keep_alive = 60 } }, <<>>} = emqttd_parser:parse(V311ConnBin, Parser),
|
||||
|
||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
||||
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||
|
@ -105,7 +105,7 @@ parse_connect(_) ->
|
|||
proto_name = <<"MQTT">>,
|
||||
client_id = <<>>,
|
||||
clean_sess = true,
|
||||
keep_alive = 60 } }, <<>>} = Parser(V311ConnWithoutClientId),
|
||||
keep_alive = 60 } }, <<>>} = emqttd_parser:parse(V311ConnWithoutClientId, Parser),
|
||||
%%CONNECT(Q0, R0, D0, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60,
|
||||
%% Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
||||
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
||||
|
@ -124,18 +124,18 @@ parse_connect(_) ->
|
|||
will_topic = <<"/will">>,
|
||||
will_msg = <<"willmsg">>,
|
||||
username = <<"test">>,
|
||||
password = <<"public">>}}, <<>>} = Parser(ConnBinWithWill),
|
||||
password = <<"public">>}}, <<>>} = emqttd_parser:parse(ConnBinWithWill, Parser),
|
||||
ok.
|
||||
|
||||
parse_bridge(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
Data = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67,58,50,57,58,50,66,58,55,55,58,53,50,
|
||||
0,48,36,83,89,83,47,98,114,111,107,101,114,47,99,111,110,110,101,99,116,105,111,110,47,67,95,48,48,58,48,
|
||||
67,58,50,57,58,50,66,58,55,55,58,53,50,47,115,116,97,116,101,0,1,48>>,
|
||||
|
||||
%% CONNECT(Q0, R0, D0, ClientId=C_00:0C:29:2B:77:52, ProtoName=MQIsdp, ProtoVsn=131, CleanSess=false, KeepAlive=60,
|
||||
%% Username=undefined, Password=undefined, Will(Q1, R1, Topic=$SYS/broker/connection/C_00:0C:29:2B:77:52/state, Msg=0))
|
||||
{ok, #mqtt_packet{variable = Variable}, <<>>} = Parser(Data),
|
||||
{ok, #mqtt_packet{variable = Variable}, <<>>} = emqttd_parser:parse(Data, Parser),
|
||||
#mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>,
|
||||
proto_ver = 16#03,
|
||||
proto_name = <<"MQIsdp">>,
|
||||
|
@ -148,7 +148,7 @@ parse_bridge(_) ->
|
|||
will_msg = <<"0">>} = Variable.
|
||||
|
||||
parse_publish(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
||||
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
|
@ -157,7 +157,7 @@ parse_publish(_) ->
|
|||
retain = false},
|
||||
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
||||
packet_id = 1},
|
||||
payload = <<"hahah">> }, <<>>} = Parser(PubBin),
|
||||
payload = <<"hahah">> }, <<>>} = emqttd_parser:parse(PubBin, Parser),
|
||||
|
||||
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||
|
@ -168,43 +168,43 @@ parse_publish(_) ->
|
|||
retain = false},
|
||||
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
||||
packet_id = undefined},
|
||||
payload = <<"hello">> }, <<224,0>>} = Parser(PubBin1),
|
||||
payload = <<"hello">> }, <<224,0>>} = emqttd_parser:parse(PubBin1, Parser),
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}}, <<>>} = Parser(<<224, 0>>).
|
||||
retain = false}}, <<>>} = emqttd_parser:parse(<<224, 0>>, Parser).
|
||||
|
||||
parse_puback(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}}, <<>>} = Parser(<<64,2,0,1>>).
|
||||
retain = false}}, <<>>} = emqttd_parser:parse(<<64,2,0,1>>, Parser).
|
||||
parse_pubrec(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%%PUBREC(Qos=0, Retain=false, Dup=false, PacketId=1)
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}}, <<>>} = Parser(<<5:4,0:4,2,0,1>>).
|
||||
retain = false}}, <<>>} = emqttd_parser:parse(<<5:4,0:4,2,0,1>>, Parser).
|
||||
|
||||
parse_pubrel(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
|
||||
dup = false,
|
||||
qos = 1,
|
||||
retain = false}}, <<>>} = Parser(<<6:4,2:4,2,0,1>>).
|
||||
retain = false}}, <<>>} = emqttd_parser:parse(<<6:4,2:4,2,0,1>>, Parser).
|
||||
|
||||
parse_pubcomp(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}}, <<>>} = Parser(<<7:4,0:4,2,0,1>>).
|
||||
retain = false}}, <<>>} = emqttd_parser:parse(<<7:4,0:4,2,0,1>>, Parser).
|
||||
|
||||
parse_subscribe(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE,
|
||||
dup = false,
|
||||
|
@ -212,10 +212,10 @@ parse_subscribe(_) ->
|
|||
retain = false},
|
||||
variable = #mqtt_packet_subscribe{packet_id = 2,
|
||||
topic_table = [{<<"TopicA">>,2}]} }, <<>>}
|
||||
= Parser(<<130,11,0,2,0,6,84,111,112,105,99,65,2>>).
|
||||
= emqttd_parser:parse(<<130,11,0,2,0,6,84,111,112,105,99,65,2>>, Parser).
|
||||
|
||||
parse_unsubscribe(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE,
|
||||
dup = false,
|
||||
|
@ -223,24 +223,24 @@ parse_unsubscribe(_) ->
|
|||
retain = false},
|
||||
variable = #mqtt_packet_unsubscribe{packet_id = 2,
|
||||
topics = [<<"TopicA">>]}}, <<>>}
|
||||
= Parser(<<162,10,0,2,0,6,84,111,112,105,99,65>>).
|
||||
= emqttd_parser:parse(<<162,10,0,2,0,6,84,111,112,105,99,65>>, Parser).
|
||||
|
||||
parse_pingreq(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PINGREQ,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}}, <<>>}
|
||||
= Parser(<<?PINGREQ:4, 0:4, 0:8>>).
|
||||
= emqttd_parser:parse(<<?PINGREQ:4, 0:4, 0:8>>, Parser).
|
||||
|
||||
parse_disconnect(_) ->
|
||||
Parser = emqttd_parser:new([]),
|
||||
Parser = emqttd_parser:initial_state(),
|
||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||
Bin = <<224, 0>>,
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT,
|
||||
dup = false,
|
||||
qos = 0,
|
||||
retain = false}}, <<>>} = Parser(Bin).
|
||||
retain = false}}, <<>>} = emqttd_parser:parse(Bin, Parser).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Serialize Cases
|
||||
|
@ -260,7 +260,7 @@ serialize_connect(_) ->
|
|||
serialize_connack(_) ->
|
||||
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
|
||||
<<32,2,0,0>> = serialize(ConnAck).
|
||||
<<32,2,0,0>> = iolist_to_binary(serialize(ConnAck)).
|
||||
|
||||
serialize_publish(_) ->
|
||||
serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
|
||||
|
|
Loading…
Reference in New Issue