Merge branch 'dev-feng' of github.com:emqtt/emqtt into dev-feng
This commit is contained in:
commit
989443b150
|
@ -16,7 +16,7 @@
|
|||
{error_logger_redirect, false},
|
||||
{crash_log, "log/emqttd_crash.log"},
|
||||
{handlers, [
|
||||
{lager_console_backend, info},
|
||||
%%{lager_console_backend, info},
|
||||
{lager_file_backend, [
|
||||
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
|
||||
{file, "log/emqttd_error.log"},
|
||||
|
@ -95,7 +95,7 @@
|
|||
{max_awaiting_rel, 0},
|
||||
|
||||
%% Statistics Collection Interval(seconds)
|
||||
{collect_interval, 20},
|
||||
{collect_interval, 0},
|
||||
|
||||
%% Expired after 2 days
|
||||
{expired_after, 48}
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
#!/bin/sh
|
||||
# -*- tab-width:4;indent-tabs-mode:nil -*-
|
||||
# ex: ts=4 sw=4 et
|
||||
|
||||
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
||||
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
#!/bin/sh
|
||||
# -*- tab-width:4;indent-tabs-mode:nil -*-
|
||||
# ex: ts=4 sw=4 et
|
||||
|
||||
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
||||
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
||||
POSIX_SHELL="true"
|
||||
export POSIX_SHELL
|
||||
# To support 'whoami' add /usr/ucb to path
|
||||
PATH=/usr/ucb:$PATH
|
||||
export PATH
|
||||
exec /usr/bin/ksh $0 "$@"
|
||||
fi
|
||||
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well
|
||||
|
||||
RUNNER_SCRIPT_DIR={{runner_script_dir}}
|
||||
RUNNER_SCRIPT=${0##*/}
|
||||
|
||||
RUNNER_BASE_DIR={{runner_base_dir}}
|
||||
RUNNER_ETC_DIR={{runner_etc_dir}}
|
||||
RUNNER_LIB_DIR={{platform_lib_dir}}
|
||||
RUNNER_USER={{runner_user}}
|
||||
|
||||
WHOAMI=$(whoami)
|
||||
|
||||
# Make sure this script is running as the appropriate user
|
||||
if ([ "$RUNNER_USER" ] && [ "x$WHOAMI" != "x$RUNNER_USER" ]); then
|
||||
type sudo > /dev/null 2>&1
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "sudo doesn't appear to be installed and your EUID isn't $RUNNER_USER" 1>&2
|
||||
exit 1
|
||||
fi
|
||||
echo "Attempting to restart script through sudo -H -u $RUNNER_USER" >&2
|
||||
exec sudo -H -u $RUNNER_USER -i $RUNNER_SCRIPT_DIR/$RUNNER_SCRIPT $@
|
||||
fi
|
||||
|
||||
# Make sure CWD is set to runner base dir
|
||||
cd $RUNNER_BASE_DIR
|
||||
|
||||
# Extract the target node name from node.args
|
||||
NAME_ARG=`egrep "^ *-s?name" $RUNNER_ETC_DIR/vm.args`
|
||||
if [ -z "$NAME_ARG" ]; then
|
||||
echo "vm.args needs to have either -name or -sname parameter."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Learn how to specify node name for connection from remote nodes
|
||||
echo "$NAME_ARG" | grep '^-sname' > /dev/null 2>&1
|
||||
if [ "X$?" = "X0" ]; then
|
||||
NAME_PARAM="-sname"
|
||||
NAME_HOST=""
|
||||
else
|
||||
NAME_PARAM="-name"
|
||||
echo "$NAME_ARG" | grep '@.*' > /dev/null 2>&1
|
||||
if [ "X$?" = "X0" ]; then
|
||||
NAME_HOST=`echo "${NAME_ARG}" | sed -e 's/.*\(@.*\)$/\1/'`
|
||||
else
|
||||
NAME_HOST=""
|
||||
fi
|
||||
fi
|
||||
|
||||
# Extract the target cookie
|
||||
COOKIE_ARG=`grep '\-setcookie' $RUNNER_ETC_DIR/vm.args`
|
||||
if [ -z "$COOKIE_ARG" ]; then
|
||||
echo "vm.args needs to have a -setcookie parameter."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Identify the script name
|
||||
SCRIPT=`basename $0`
|
||||
|
||||
# Parse out release and erts info
|
||||
START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data`
|
||||
ERTS_VSN=${START_ERL% *}
|
||||
APP_VSN=${START_ERL#* }
|
||||
|
||||
# Add ERTS bin dir to our path
|
||||
ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin
|
||||
|
||||
NODE_NAME=${NAME_ARG#* }
|
||||
|
||||
# Setup command to control the node
|
||||
NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG"
|
||||
|
||||
RES=`$NODETOOL ping`
|
||||
if [ "$RES" != "pong" ]; then
|
||||
echo "Node is not running!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
case "$1" in
|
||||
runtime)
|
||||
SORTBY="runtime"
|
||||
;;
|
||||
reductions)
|
||||
SORTBY="reductions"
|
||||
;;
|
||||
memory)
|
||||
SORTBY="memory"
|
||||
;;
|
||||
msg_q)
|
||||
SORTBY="msg_q"
|
||||
;;
|
||||
*)
|
||||
echo "Usage: $SCRIPT {runtime | reductions | memory | msg_q}"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
MYPID=$$
|
||||
ETOP_ARGS="-sort $SORTBY -interval 10 -lines 50 -tracing off"
|
||||
$ERTS_PATH/erl -noshell -noinput \
|
||||
-pa $RUNNER_LIB_DIR/basho-patches \
|
||||
-hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \
|
||||
-s etop -s erlang halt -output text \
|
||||
-node $NODE_NAME $ETOP_ARGS
|
||||
|
|
@ -19,6 +19,7 @@
|
|||
inets,
|
||||
goldrush,
|
||||
compiler,
|
||||
runtime_tools,
|
||||
lager,
|
||||
{gen_logger, load},
|
||||
gproc,
|
||||
|
@ -54,6 +55,8 @@
|
|||
{app, eldap, [{incl_cond, include}]},
|
||||
{app, inets, [{incl_cond, include}]},
|
||||
{app, compiler, [{incl_cond, include}]},
|
||||
{app, runtime_tools, [{incl_cond, include}]},
|
||||
{app, observer, [{incl_cond, include}]},
|
||||
{app, goldrush, [{incl_cond, include}]},
|
||||
{app, gen_logger, [{incl_cond, include}]},
|
||||
{app, lager, [{incl_cond, include}]},
|
||||
|
@ -78,13 +81,14 @@
|
|||
{template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"},
|
||||
{template, "files/emqttd", "bin/emqttd"},
|
||||
{template, "files/emqttd_ctl", "bin/emqttd_ctl"},
|
||||
{template, "files/emqttd_top", "bin/emqttd_top"},
|
||||
{template, "files/emqttd.cmd", "bin/emqttd.cmd"},
|
||||
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
|
||||
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
|
||||
{copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"},
|
||||
{copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"},
|
||||
{template, "files/emqttd.config.development", "etc/emqttd.config"},
|
||||
{template, "files/emqttd.config.production", "etc/emqttd.config.production"},
|
||||
{template, "files/emqttd.config.production", "etc/emqttd.config"},
|
||||
{template, "files/emqttd.config.development", "etc/emqttd.config.development"},
|
||||
{template, "files/acl.config", "etc/acl.config"},
|
||||
{template, "files/rewrite.config", "etc/rewrite.config"},
|
||||
{template, "files/clients.config", "etc/clients.config"},
|
||||
|
|
|
@ -34,7 +34,10 @@
|
|||
-include("emqttd_protocol.hrl").
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/2, session/1, info/1, kick/1, subscribe/2]).
|
||||
-export([start_link/2, session/1, info/1, kick/1]).
|
||||
|
||||
%% SUB/UNSUB Asynchronously
|
||||
-export([subscribe/2, unsubscribe/2]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
|
@ -59,7 +62,7 @@ start_link(SockArgs, MqttEnv) ->
|
|||
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
|
||||
|
||||
session(CPid) ->
|
||||
gen_server:call(CPid, session).
|
||||
gen_server:call(CPid, session, infinity).
|
||||
|
||||
info(CPid) ->
|
||||
gen_server:call(CPid, info, infinity).
|
||||
|
@ -70,6 +73,9 @@ kick(CPid) ->
|
|||
subscribe(CPid, TopicTable) ->
|
||||
gen_server:cast(CPid, {subscribe, TopicTable}).
|
||||
|
||||
unsubscribe(CPid, Topics) ->
|
||||
gen_server:cast(CPid, {unsubscribe, Topics}).
|
||||
|
||||
init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
|
||||
% Transform if ssl.
|
||||
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
||||
|
@ -107,9 +113,11 @@ handle_call(Req, _From, State = #state{peername = Peername}) ->
|
|||
lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
|
||||
{reply, {error, unsupported_request}, State}.
|
||||
|
||||
handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
|
||||
noreply(State#state{proto_state = ProtoState1});
|
||||
handle_cast({subscribe, TopicTable}, State) ->
|
||||
with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State);
|
||||
|
||||
handle_cast({unsubscribe, Topics}, State) ->
|
||||
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
|
||||
|
||||
handle_cast(Msg, State = #state{peername = Peername}) ->
|
||||
lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
|
||||
|
@ -149,17 +157,26 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer
|
|||
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
|
||||
lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
|
||||
KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
|
||||
StatFun = fun() ->
|
||||
case Transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
{error, Error} -> {error, Error}
|
||||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
|
||||
noreply(State#state{keepalive = KeepAlive});
|
||||
|
||||
handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:resume(KeepAlive) of
|
||||
timeout ->
|
||||
handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
|
||||
noreply(State#state{keepalive = KeepAlive1});
|
||||
{error, timeout} ->
|
||||
lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
|
||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
||||
{resumed, KeepAlive1} ->
|
||||
lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
|
||||
noreply(State#state{keepalive = KeepAlive1})
|
||||
{error, Error} ->
|
||||
lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
|
||||
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
||||
end;
|
||||
|
||||
handle_info(Info, State = #state{peername = Peername}) ->
|
||||
|
@ -188,12 +205,20 @@ terminate(Reason, #state{peername = Peername,
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Internal functions
|
||||
%%%=============================================================================
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
%-------------------------------------------------------
|
||||
% receive and parse tcp data
|
||||
%-------------------------------------------------------
|
||||
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
with_session(Fun, State = #state{proto_state = ProtoState}) ->
|
||||
Fun(emqttd_protocol:session(ProtoState)), noreply(State).
|
||||
|
||||
%% receive and parse tcp data
|
||||
received(<<>>, State) ->
|
||||
{noreply, State, hibernate};
|
||||
|
||||
|
@ -244,12 +269,8 @@ control_throttle(State = #state{conn_state = Flow,
|
|||
{_, _} -> run_socket(State)
|
||||
end.
|
||||
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
received_stats(?PACKET(Type)) ->
|
||||
emqttd_metrics:inc('packets/received'),
|
||||
inc(Type).
|
||||
emqttd_metrics:inc('packets/received'), inc(Type).
|
||||
inc(?CONNECT) ->
|
||||
emqttd_metrics:inc('packets/connect');
|
||||
inc(?PUBLISH) ->
|
||||
|
|
|
@ -23,62 +23,61 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_keepalive).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-export([new/3, resume/1, cancel/1]).
|
||||
-export([start/3, check/1, cancel/1]).
|
||||
|
||||
-record(keepalive, {transport,
|
||||
socket,
|
||||
recv_oct,
|
||||
timeout_sec,
|
||||
timeout_msg,
|
||||
timer_ref}).
|
||||
-record(keepalive, {statfun, statval,
|
||||
tsec, tmsg, tref,
|
||||
repeat = 0}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Create a keepalive
|
||||
%% @doc Start a keepalive
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
|
||||
{ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]),
|
||||
Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg),
|
||||
#keepalive {transport = Transport,
|
||||
socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref}.
|
||||
start(_, 0, _) ->
|
||||
undefined;
|
||||
start(StatFun, TimeoutSec, TimeoutMsg) ->
|
||||
{ok, StatVal} = StatFun(),
|
||||
#keepalive{statfun = StatFun, statval = StatVal,
|
||||
tsec = TimeoutSec, tmsg = TimeoutMsg,
|
||||
tref = timer(TimeoutSec, TimeoutMsg)}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Try to resume keepalive, called when timeout
|
||||
%% @doc Check keepalive, called when timeout.
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
resume(KeepAlive = #keepalive {transport = Transport,
|
||||
socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }) ->
|
||||
{ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]),
|
||||
if
|
||||
NewRecvOct =:= RecvOct ->
|
||||
timeout;
|
||||
true ->
|
||||
%need?
|
||||
cancel(Ref),
|
||||
NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg),
|
||||
{resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}}
|
||||
check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
|
||||
case StatFun() of
|
||||
{ok, NewVal} ->
|
||||
if NewVal =/= LastVal ->
|
||||
{ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
|
||||
Repeat < 1 ->
|
||||
{ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
|
||||
true ->
|
||||
{error, timeout}
|
||||
end;
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
|
||||
KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Cancel Keepalive
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
cancel(#keepalive{timer_ref = Ref}) ->
|
||||
cancel(Ref);
|
||||
cancel(#keepalive{tref = TRef}) ->
|
||||
cancel(TRef);
|
||||
cancel(undefined) ->
|
||||
undefined;
|
||||
cancel(Ref) ->
|
||||
catch erlang:cancel_timer(Ref).
|
||||
ok;
|
||||
cancel(TRef) ->
|
||||
catch erlang:cancel_timer(TRef).
|
||||
|
||||
timer(Sec, Msg) ->
|
||||
erlang:send_after(timer:seconds(Sec), self(), Msg).
|
||||
|
||||
|
|
|
@ -239,16 +239,11 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id =
|
|||
case lists:member(deny, AllowDenies) of
|
||||
true ->
|
||||
%%TODO: return 128 QoS when deny... no need to SUBACK?
|
||||
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
||||
{ok, State};
|
||||
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]);
|
||||
false ->
|
||||
%%TODO: GrantedQos should be renamed.
|
||||
{ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
|
||||
send(?SUBACK_PACKET(PacketId, GrantedQos), State)
|
||||
end;
|
||||
|
||||
handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
|
||||
{ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
|
||||
Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end,
|
||||
emqttd_session:subscribe(Session, TopicTable, Callback)
|
||||
end,
|
||||
{ok, State};
|
||||
|
||||
%% protect from empty topic list
|
||||
|
@ -256,7 +251,7 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
|
|||
send(?UNSUBACK_PACKET(PacketId), State);
|
||||
|
||||
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
||||
ok = emqttd_session:unsubscribe(Session, Topics),
|
||||
emqttd_session:unsubscribe(Session, Topics),
|
||||
send(?UNSUBACK_PACKET(PacketId), State);
|
||||
|
||||
handle(?PACKET(?PINGREQ), State) ->
|
||||
|
@ -349,7 +344,7 @@ send_willmsg(ClientId, WillMsg) ->
|
|||
start_keepalive(0) -> ignore;
|
||||
|
||||
start_keepalive(Sec) when Sec > 0 ->
|
||||
self() ! {keepalive, start, round(Sec * 1.5)}.
|
||||
self() ! {keepalive, start, round(Sec * 1.2)}.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
%% Validate Packets
|
||||
|
|
|
@ -59,7 +59,7 @@
|
|||
%% PubSub APIs
|
||||
-export([publish/2,
|
||||
puback/2, pubrec/2, pubrel/2, pubcomp/2,
|
||||
subscribe/2, unsubscribe/2]).
|
||||
subscribe/2, subscribe/3, unsubscribe/2]).
|
||||
|
||||
-behaviour(gen_server2).
|
||||
|
||||
|
@ -166,9 +166,13 @@ destroy(SessPid, ClientId) ->
|
|||
%% @doc Subscribe Topics
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}.
|
||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
|
||||
subscribe(SessPid, TopicTable) ->
|
||||
gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT).
|
||||
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
||||
|
||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok.
|
||||
subscribe(SessPid, TopicTable, Callback) ->
|
||||
gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Publish message
|
||||
|
@ -213,7 +217,7 @@ pubcomp(SessPid, PktId) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec unsubscribe(pid(), [binary()]) -> ok.
|
||||
unsubscribe(SessPid, Topics) ->
|
||||
gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT).
|
||||
gen_server2:cast(SessPid, {unsubscribe, Topics}).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
|
@ -247,26 +251,24 @@ init([CleanSess, ClientId, ClientPid]) ->
|
|||
{ok, start_collector(Session#session{client_mon = MRef}), hibernate}.
|
||||
|
||||
prioritise_call(Msg, _From, _Len, _State) ->
|
||||
case Msg of
|
||||
{unsubscribe, _} -> 2;
|
||||
{subscribe, _} -> 1;
|
||||
_ -> 0
|
||||
end.
|
||||
case Msg of _ -> 0 end.
|
||||
|
||||
prioritise_cast(Msg, _Len, _State) ->
|
||||
case Msg of
|
||||
{destroy, _} -> 10;
|
||||
{resume, _, _} -> 9;
|
||||
{pubrel, _PktId} -> 8;
|
||||
{pubcomp, _PktId} -> 8;
|
||||
{pubrec, _PktId} -> 8;
|
||||
{puback, _PktId} -> 7;
|
||||
_ -> 0
|
||||
{destroy, _} -> 10;
|
||||
{resume, _, _} -> 9;
|
||||
{pubrel, _PktId} -> 8;
|
||||
{pubcomp, _PktId} -> 8;
|
||||
{pubrec, _PktId} -> 8;
|
||||
{puback, _PktId} -> 7;
|
||||
{unsubscribe, _, _} -> 6;
|
||||
{subscribe, _, _} -> 5;
|
||||
_ -> 0
|
||||
end.
|
||||
|
||||
prioritise_info(Msg, _Len, _State) ->
|
||||
case Msg of
|
||||
{'DOWN', _, process, _, _} -> 10;
|
||||
{'DOWN', _, _, _, _} -> 10;
|
||||
{'EXIT', _, _} -> 10;
|
||||
session_expired -> 10;
|
||||
{timeout, _, _} -> 5;
|
||||
|
@ -275,17 +277,40 @@ prioritise_info(Msg, _Len, _State) ->
|
|||
_ -> 0
|
||||
end.
|
||||
|
||||
handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
|
||||
Session = #session{client_id = ClientId,
|
||||
awaiting_rel = AwaitingRel,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
case check_awaiting_rel(Session) of
|
||||
true ->
|
||||
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
|
||||
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
|
||||
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
|
||||
false ->
|
||||
lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message "
|
||||
"for too many awaiting_rel: ~p", [ClientId, Msg]),
|
||||
{reply, {error, dropped}, Session}
|
||||
end;
|
||||
|
||||
case TopicTable0 -- Subscriptions of
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:critical("Unexpected Request: ~p", [Req]),
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast({subscribe, TopicTable0, Callback}, Session = #session{
|
||||
client_id = ClientId, subscriptions = Subscriptions}) ->
|
||||
|
||||
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
||||
|
||||
case TopicTable -- Subscriptions of
|
||||
[] ->
|
||||
{reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session};
|
||||
catch Callback([Qos || {_, Qos} <- TopicTable]),
|
||||
noreply(Session);
|
||||
_ ->
|
||||
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
||||
%% subscribe first and don't care if the subscriptions have been existed
|
||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
||||
|
||||
catch Callback(GrantedQos),
|
||||
|
||||
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
||||
|
||||
lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p",
|
||||
|
@ -310,11 +335,11 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie
|
|||
[{Topic, Qos} | Acc]
|
||||
end
|
||||
end, Subscriptions, TopicTable),
|
||||
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}
|
||||
noreply(Session#session{subscriptions = Subscriptions1})
|
||||
end;
|
||||
|
||||
handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
|
||||
Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
|
||||
|
||||
|
@ -333,26 +358,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client
|
|||
end
|
||||
end, Subscriptions, Topics),
|
||||
|
||||
{reply, ok, Session#session{subscriptions = Subscriptions1}};
|
||||
|
||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
|
||||
Session = #session{client_id = ClientId,
|
||||
awaiting_rel = AwaitingRel,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
case check_awaiting_rel(Session) of
|
||||
true ->
|
||||
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
|
||||
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
|
||||
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
|
||||
false ->
|
||||
lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message "
|
||||
"for too many awaiting_rel: ~p", [ClientId, Msg]),
|
||||
{reply, {error, dropped}, Session}
|
||||
end;
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:critical("Unexpected Request: ~p", [Req]),
|
||||
{reply, ok, State}.
|
||||
noreply(Session#session{subscriptions = Subscriptions1});
|
||||
|
||||
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
|
||||
lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]),
|
||||
|
|
|
@ -34,7 +34,10 @@
|
|||
-include("emqttd_protocol.hrl").
|
||||
|
||||
%% API Exports
|
||||
-export([start_link/1, ws_loop/3, subscribe/2]).
|
||||
-export([start_link/1, ws_loop/3, session/1, info/1, kick/1]).
|
||||
|
||||
%% SUB/UNSUB Asynchronously
|
||||
-export([subscribe/2, unsubscribe/2]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
|
@ -61,9 +64,21 @@ start_link(Req) ->
|
|||
packet_opts = PktOpts,
|
||||
parser = emqttd_parser:new(PktOpts)}).
|
||||
|
||||
session(CPid) ->
|
||||
gen_server:call(CPid, session, infinity).
|
||||
|
||||
info(CPid) ->
|
||||
gen_server:call(CPid, info, infinity).
|
||||
|
||||
kick(CPid) ->
|
||||
gen_server:call(CPid, kick).
|
||||
|
||||
subscribe(CPid, TopicTable) ->
|
||||
gen_server:cast(CPid, {subscribe, TopicTable}).
|
||||
|
||||
unsubscribe(CPid, Topics) ->
|
||||
gen_server:cast(CPid, {unsubscribe, Topics}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @private
|
||||
%% @doc Start WebSocket client.
|
||||
|
@ -112,17 +127,30 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
|||
ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]),
|
||||
{ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
|
||||
|
||||
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
|
||||
{reply, emqttd_protocol:session(ProtoState), State};
|
||||
|
||||
handle_call(info, _From, State = #client_state{request = Req,
|
||||
proto_state = ProtoState}) ->
|
||||
{reply, [{websocket, true}, {peer, Req:get(peer)}
|
||||
| emqttd_protocol:info(ProtoState)], State};
|
||||
|
||||
handle_call(kick, _From, State) ->
|
||||
{stop, {shutdown, kick}, ok, State};
|
||||
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, error, State}.
|
||||
|
||||
handle_cast({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
|
||||
{noreply, State#client_state{proto_state = ProtoState1}, hibernate};
|
||||
handle_cast({subscribe, TopicTable}, State) ->
|
||||
with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State);
|
||||
|
||||
handle_cast({unsubscribe, Topics}, State) ->
|
||||
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
|
||||
|
||||
handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
|
||||
case emqttd_protocol:received(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||
noreply(State#client_state{proto_state = ProtoState1});
|
||||
{error, Error} ->
|
||||
lager:error("MQTT protocol error ~p", [Error]),
|
||||
stop({shutdown, Error}, State);
|
||||
|
@ -137,11 +165,11 @@ handle_cast(_Msg, State) ->
|
|||
|
||||
handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
||||
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||
noreply(State#client_state{proto_state = ProtoState1});
|
||||
|
||||
handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
||||
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||
noreply(State#client_state{proto_state = ProtoState1});
|
||||
|
||||
handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) ->
|
||||
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
|
||||
|
@ -149,18 +177,27 @@ handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = P
|
|||
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) ->
|
||||
lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
|
||||
KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)},
|
||||
TimeoutSec, {keepalive, timeout}),
|
||||
{noreply, State#client_state{keepalive = KeepAlive}};
|
||||
Socket = Req:get(socket),
|
||||
StatFun = fun() ->
|
||||
case esockd_transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
{error, Error} -> {error, Error}
|
||||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
|
||||
noreply(State#client_state{keepalive = KeepAlive});
|
||||
|
||||
handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:resume(KeepAlive) of
|
||||
timeout ->
|
||||
handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
|
||||
noreply(State#client_state{keepalive = KeepAlive1});
|
||||
{error, timeout} ->
|
||||
lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
|
||||
stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
|
||||
{resumed, KeepAlive1} ->
|
||||
lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
|
||||
{noreply, State#client_state{keepalive = KeepAlive1}}
|
||||
{error, Error} ->
|
||||
lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]),
|
||||
stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined})
|
||||
end;
|
||||
|
||||
handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) ->
|
||||
|
@ -170,7 +207,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto
|
|||
|
||||
handle_info(Info, State = #client_state{request = Req}) ->
|
||||
lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]),
|
||||
{noreply, State}.
|
||||
noreply(State).
|
||||
|
||||
terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
|
||||
lager:info("WebSocket client terminated: ~p", [Reason]),
|
||||
|
@ -189,6 +226,12 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%=============================================================================
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
stop(Reason, State ) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
|
||||
Fun(emqttd_protocol:session(ProtoState)), noreply(State).
|
||||
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqttd_keepalive_tests).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
keepalive_test() ->
|
||||
KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
|
||||
?assertEqual([resumed, timeout], lists:reverse(loop(KA, []))).
|
||||
|
||||
loop(KA, Acc) ->
|
||||
receive
|
||||
{keepalive, timeout} ->
|
||||
case emqttd_keepalive:check(KA) of
|
||||
{ok, KA1} -> loop(KA1, [resumed | Acc]);
|
||||
{error, timeout} -> [timeout | Acc]
|
||||
end
|
||||
after 4000 ->
|
||||
Acc
|
||||
end.
|
||||
|
||||
-endif.
|
Loading…
Reference in New Issue