From aec571a0b03d1c9d1d55ca30e199f3bc87e7fe91 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 3 Oct 2015 15:43:28 +0800 Subject: [PATCH 1/8] etop --- rel/files/emqttd_top | 117 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100755 rel/files/emqttd_top diff --git a/rel/files/emqttd_top b/rel/files/emqttd_top new file mode 100755 index 000000000..714542a2a --- /dev/null +++ b/rel/files/emqttd_top @@ -0,0 +1,117 @@ +#!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et + +# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. +if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then + POSIX_SHELL="true" + export POSIX_SHELL + # To support 'whoami' add /usr/ucb to path + PATH=/usr/ucb:$PATH + export PATH + exec /usr/bin/ksh $0 "$@" +fi +unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well + +RUNNER_SCRIPT_DIR={{runner_script_dir}} +RUNNER_SCRIPT=${0##*/} + +RUNNER_BASE_DIR={{runner_base_dir}} +RUNNER_ETC_DIR={{runner_etc_dir}} +RUNNER_LIB_DIR={{platform_lib_dir}} +RUNNER_USER={{runner_user}} + +WHOAMI=$(whoami) + +# Make sure this script is running as the appropriate user +if ([ "$RUNNER_USER" ] && [ "x$WHOAMI" != "x$RUNNER_USER" ]); then + type sudo > /dev/null 2>&1 + if [ $? -ne 0 ]; then + echo "sudo doesn't appear to be installed and your EUID isn't $RUNNER_USER" 1>&2 + exit 1 + fi + echo "Attempting to restart script through sudo -H -u $RUNNER_USER" >&2 + exec sudo -H -u $RUNNER_USER -i $RUNNER_SCRIPT_DIR/$RUNNER_SCRIPT $@ +fi + +# Make sure CWD is set to runner base dir +cd $RUNNER_BASE_DIR + +# Extract the target node name from node.args +NAME_ARG=`egrep "^ *-s?name" $RUNNER_ETC_DIR/vm.args` +if [ -z "$NAME_ARG" ]; then + echo "vm.args needs to have either -name or -sname parameter." + exit 1 +fi + +# Learn how to specify node name for connection from remote nodes +echo "$NAME_ARG" | grep '^-sname' > /dev/null 2>&1 +if [ "X$?" = "X0" ]; then + NAME_PARAM="-sname" + NAME_HOST="" +else + NAME_PARAM="-name" + echo "$NAME_ARG" | grep '@.*' > /dev/null 2>&1 + if [ "X$?" = "X0" ]; then + NAME_HOST=`echo "${NAME_ARG}" | sed -e 's/.*\(@.*\)$/\1/'` + else + NAME_HOST="" + fi +fi + +# Extract the target cookie +COOKIE_ARG=`grep '\-setcookie' $RUNNER_ETC_DIR/vm.args` +if [ -z "$COOKIE_ARG" ]; then + echo "vm.args needs to have a -setcookie parameter." + exit 1 +fi + +# Identify the script name +SCRIPT=`basename $0` + +# Parse out release and erts info +START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data` +ERTS_VSN=${START_ERL% *} +APP_VSN=${START_ERL#* } + +# Add ERTS bin dir to our path +ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin + +NODE_NAME=${NAME_ARG#* } + +# Setup command to control the node +NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" + +RES=`$NODETOOL ping` +if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 +fi + +case "$1" in + runtime) + SORTBY="runtime" + ;; + reductions) + SORTBY="reductions" + ;; + memory) + SORTBY="memory" + ;; + msg_q) + SORTBY="msg_q" + ;; + *) + echo "Usage: $SCRIPT {runtime | reductions | memory | msg_q}" + exit 1 + ;; +esac + +MYPID=$$ +ETOP_ARGS="-sort $SORTBY -interval 10 -lines 100 -tracing off" +$ERTS_PATH/erl -noshell -noinput \ + -pa $RUNNER_LIB_DIR/basho-patches \ + -hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \ + -s etop -s erlang halt -output text \ + -node $NODE_NAME $ETOP_ARGS + From 1fc79bd9a1acbbad40f7115869218909f1b230e8 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 3 Oct 2015 15:44:23 +0800 Subject: [PATCH 2/8] runtime_tools --- rel/reltool.config | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rel/reltool.config b/rel/reltool.config index 53ce02f86..f8cda1e7b 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -19,6 +19,7 @@ inets, goldrush, compiler, + runtime_tools, lager, {gen_logger, load}, gproc, @@ -54,6 +55,8 @@ {app, eldap, [{incl_cond, include}]}, {app, inets, [{incl_cond, include}]}, {app, compiler, [{incl_cond, include}]}, + {app, runtime_tools, [{incl_cond, include}]}, + {app, observer, [{incl_cond, include}]}, {app, goldrush, [{incl_cond, include}]}, {app, gen_logger, [{incl_cond, include}]}, {app, lager, [{incl_cond, include}]}, @@ -78,6 +81,7 @@ {template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"}, {template, "files/emqttd", "bin/emqttd"}, {template, "files/emqttd_ctl", "bin/emqttd_ctl"}, + {template, "files/emqttd_top", "bin/emqttd_top"}, {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, From 4e5b499aa799a1025e9d11f32a18b1fb27013020 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 3 Oct 2015 15:45:04 +0800 Subject: [PATCH 3/8] tab --- rel/files/emqttd_ctl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index 8a65bd6d3..4292b893e 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -1,4 +1,6 @@ #!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then From 273149f633560491bb24498b6072812424368800 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 11:23:24 +0800 Subject: [PATCH 4/8] keepalive tests --- test/emqttd_keepalive_tests.erl | 44 +++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 test/emqttd_keepalive_tests.erl diff --git a/test/emqttd_keepalive_tests.erl b/test/emqttd_keepalive_tests.erl new file mode 100644 index 000000000..96f84450a --- /dev/null +++ b/test/emqttd_keepalive_tests.erl @@ -0,0 +1,44 @@ + +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +-module(emqttd_keepalive_tests). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +keepalive_test() -> + KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), + ?assertEqual([resumed, timeout], lists:reverse(loop(KA, []))). + +loop(KA, Acc) -> + receive + {keepalive, timeout} -> + case emqttd_keepalive:check(KA) of + {ok, KA1} -> loop(KA1, [resumed | Acc]); + {error, timeout} -> [timeout | Acc] + end + after 4000 -> + Acc + end. + +-endif. From 78288e80886466ac2f96c6bfef06e47b747c2de1 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:43:58 +0800 Subject: [PATCH 5/8] improve keepalie --- src/emqttd_keepalive.erl | 75 ++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index f5c7f2ac7..e06382207 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -23,62 +23,61 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_keepalive). -author("Feng Lee "). --export([new/3, resume/1, cancel/1]). +-export([start/3, check/1, cancel/1]). --record(keepalive, {transport, - socket, - recv_oct, - timeout_sec, - timeout_msg, - timer_ref}). +-record(keepalive, {statfun, statval, + tsec, tmsg, tref, + repeat = 0}). %%------------------------------------------------------------------------------ -%% @doc Create a keepalive +%% @doc Start a keepalive %% @end %%------------------------------------------------------------------------------ -new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> - {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), - Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref}. +start(_, 0, _) -> + undefined; +start(StatFun, TimeoutSec, TimeoutMsg) -> + {ok, StatVal} = StatFun(), + #keepalive{statfun = StatFun, statval = StatVal, + tsec = TimeoutSec, tmsg = TimeoutMsg, + tref = timer(TimeoutSec, TimeoutMsg)}. %%------------------------------------------------------------------------------ -%% @doc Try to resume keepalive, called when timeout +%% @doc Check keepalive, called when timeout. %% @end %%------------------------------------------------------------------------------ -resume(KeepAlive = #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }) -> - {ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), - if - NewRecvOct =:= RecvOct -> - timeout; - true -> - %need? - cancel(Ref), - NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - {resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}} +check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> + case StatFun() of + {ok, NewVal} -> + if NewVal =/= LastVal -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; + Repeat < 1 -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; + true -> + {error, timeout} + end; + {error, Error} -> + {error, Error} end. +resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> + KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. + %%------------------------------------------------------------------------------ %% @doc Cancel Keepalive %% @end %%------------------------------------------------------------------------------ -cancel(#keepalive{timer_ref = Ref}) -> - cancel(Ref); +cancel(#keepalive{tref = TRef}) -> + cancel(TRef); cancel(undefined) -> - undefined; -cancel(Ref) -> - catch erlang:cancel_timer(Ref). + ok; +cancel(TRef) -> + catch erlang:cancel_timer(TRef). + +timer(Sec, Msg) -> + erlang:send_after(timer:seconds(Sec), self(), Msg). From 9f643ea267336eed71c84b49aa739d9e9a7dfbc7 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:44:14 +0800 Subject: [PATCH 6/8] 50 lines --- rel/files/emqttd_top | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/emqttd_top b/rel/files/emqttd_top index 714542a2a..24533c436 100755 --- a/rel/files/emqttd_top +++ b/rel/files/emqttd_top @@ -108,7 +108,7 @@ case "$1" in esac MYPID=$$ -ETOP_ARGS="-sort $SORTBY -interval 10 -lines 100 -tracing off" +ETOP_ARGS="-sort $SORTBY -interval 10 -lines 50 -tracing off" $ERTS_PATH/erl -noshell -noinput \ -pa $RUNNER_LIB_DIR/basho-patches \ -hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \ From d5a400c308b17c21ff7361d38ba3a75bf238a5b8 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:48:50 +0800 Subject: [PATCH 7/8] fix issue #292 - async sub/unsub --- src/emqttd_client.erl | 63 +++++++++++++++++--------- src/emqttd_protocol.erl | 17 +++---- src/emqttd_session.erl | 96 +++++++++++++++++++++------------------- src/emqttd_ws_client.erl | 77 +++++++++++++++++++++++++------- 4 files changed, 159 insertions(+), 94 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index da38d9d8d..7eb4be8f4 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,10 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, session/1, info/1, kick/1, subscribe/2]). +-export([start_link/2, session/1, info/1, kick/1]). + +%% SUB/UNSUB Asynchronously +-export([subscribe/2, unsubscribe/2]). -behaviour(gen_server). @@ -59,7 +62,7 @@ start_link(SockArgs, MqttEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. session(CPid) -> - gen_server:call(CPid, session). + gen_server:call(CPid, session, infinity). info(CPid) -> gen_server:call(CPid, info, infinity). @@ -70,6 +73,9 @@ kick(CPid) -> subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). +unsubscribe(CPid, Topics) -> + gen_server:cast(CPid, {unsubscribe, Topics}). + init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), @@ -107,9 +113,11 @@ handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. -handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - noreply(State#state{proto_state = ProtoState1}); +handle_cast({subscribe, TopicTable}, State) -> + with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + +handle_cast({unsubscribe, Topics}, State) -> + with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast(Msg, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), @@ -149,17 +157,26 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), - KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), + StatFun = fun() -> + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), noreply(State#state{keepalive = KeepAlive}); -handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> +handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) -> + case emqttd_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), + noreply(State#state{keepalive = KeepAlive1}); + {error, timeout} -> lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); - {resumed, KeepAlive1} -> - lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), - noreply(State#state{keepalive = KeepAlive1}) + {error, Error} -> + lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), + stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; handle_info(Info, State = #state{peername = Peername}) -> @@ -188,12 +205,20 @@ terminate(Reason, #state{peername = Peername, code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + noreply(State) -> {noreply, State, hibernate}. - -%------------------------------------------------------- -% receive and parse tcp data -%------------------------------------------------------- + +stop(Reason, State) -> + {stop, Reason, State}. + +with_session(Fun, State = #state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + +%% receive and parse tcp data received(<<>>, State) -> {noreply, State, hibernate}; @@ -244,12 +269,8 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -stop(Reason, State) -> - {stop, Reason, State}. - received_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/received'), - inc(Type). + emqttd_metrics:inc('packets/received'), inc(Type). inc(?CONNECT) -> emqttd_metrics:inc('packets/connect'); inc(?PUBLISH) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index dcd120035..840339819 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -239,16 +239,11 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = case lists:member(deny, AllowDenies) of true -> %%TODO: return 128 QoS when deny... no need to SUBACK? - lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), - {ok, State}; + lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]); false -> - %%TODO: GrantedQos should be renamed. - {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), - send(?SUBACK_PACKET(PacketId, GrantedQos), State) - end; - -handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> - {ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end, + emqttd_session:subscribe(Session, TopicTable, Callback) + end, {ok, State}; %% protect from empty topic list @@ -256,7 +251,7 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - ok = emqttd_session:unsubscribe(Session, Topics), + emqttd_session:unsubscribe(Session, Topics), send(?UNSUBACK_PACKET(PacketId), State); handle(?PACKET(?PINGREQ), State) -> @@ -349,7 +344,7 @@ send_willmsg(ClientId, WillMsg) -> start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> - self() ! {keepalive, start, round(Sec * 1.5)}. + self() ! {keepalive, start, round(Sec * 1.2)}. %%---------------------------------------------------------------------------- %% Validate Packets diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 1d3caa98f..a23f76f7d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -59,7 +59,7 @@ %% PubSub APIs -export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2, - subscribe/2, unsubscribe/2]). + subscribe/2, subscribe/3, unsubscribe/2]). -behaviour(gen_server2). @@ -166,9 +166,13 @@ destroy(SessPid, ClientId) -> %% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ --spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. +-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok. subscribe(SessPid, TopicTable) -> - gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT). + subscribe(SessPid, TopicTable, fun(_) -> ok end). + +-spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok. +subscribe(SessPid, TopicTable, Callback) -> + gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}). %%------------------------------------------------------------------------------ %% @doc Publish message @@ -213,7 +217,7 @@ pubcomp(SessPid, PktId) -> %%------------------------------------------------------------------------------ -spec unsubscribe(pid(), [binary()]) -> ok. unsubscribe(SessPid, Topics) -> - gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT). + gen_server2:cast(SessPid, {unsubscribe, Topics}). %%%============================================================================= %%% gen_server callbacks @@ -247,26 +251,24 @@ init([CleanSess, ClientId, ClientPid]) -> {ok, start_collector(Session#session{client_mon = MRef}), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> - case Msg of - {unsubscribe, _} -> 2; - {subscribe, _} -> 1; - _ -> 0 - end. + case Msg of _ -> 0 end. prioritise_cast(Msg, _Len, _State) -> case Msg of - {destroy, _} -> 10; - {resume, _, _} -> 9; - {pubrel, _PktId} -> 8; - {pubcomp, _PktId} -> 8; - {pubrec, _PktId} -> 8; - {puback, _PktId} -> 7; - _ -> 0 + {destroy, _} -> 10; + {resume, _, _} -> 9; + {pubrel, _PktId} -> 8; + {pubcomp, _PktId} -> 8; + {pubrec, _PktId} -> 8; + {puback, _PktId} -> 7; + {unsubscribe, _, _} -> 6; + {subscribe, _, _} -> 5; + _ -> 0 end. prioritise_info(Msg, _Len, _State) -> case Msg of - {'DOWN', _, process, _, _} -> 10; + {'DOWN', _, _, _, _} -> 10; {'EXIT', _, _} -> 10; session_expired -> 10; {timeout, _, _} -> 5; @@ -275,17 +277,40 @@ prioritise_info(Msg, _Len, _State) -> _ -> 0 end. -handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, + Session = #session{client_id = ClientId, + awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> + case check_awaiting_rel(Session) of + true -> + TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), + AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), + {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; + false -> + lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " + "for too many awaiting_rel: ~p", [ClientId, Msg]), + {reply, {error, dropped}, Session} + end; - case TopicTable0 -- Subscriptions of +handle_call(Req, _From, State) -> + lager:critical("Unexpected Request: ~p", [Req]), + {reply, ok, State}. + +handle_cast({subscribe, TopicTable0, Callback}, Session = #session{ + client_id = ClientId, subscriptions = Subscriptions}) -> + + TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), + + case TopicTable -- Subscriptions of [] -> - {reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session}; + catch Callback([Qos || {_, Qos} <- TopicTable]), + noreply(Session); _ -> - TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), + catch Callback(GrantedQos), + emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", @@ -310,11 +335,11 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie [{Topic, Qos} | Acc] end end, Subscriptions, TopicTable), - {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}} + noreply(Session#session{subscriptions = Subscriptions1}) end; -handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), @@ -333,26 +358,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client end end, Subscriptions, Topics), - {reply, ok, Session#session{subscriptions = Subscriptions1}}; - -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, - Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> - case check_awaiting_rel(Session) of - true -> - TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), - AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), - {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; - false -> - lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " - "for too many awaiting_rel: ~p", [ClientId, Msg]), - {reply, {error, dropped}, Session} - end; - -handle_call(Req, _From, State) -> - lager:critical("Unexpected Request: ~p", [Req]), - {reply, ok, State}. + noreply(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 6b4a001b2..827d7de18 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -34,7 +34,10 @@ -include("emqttd_protocol.hrl"). %% API Exports --export([start_link/1, ws_loop/3, subscribe/2]). +-export([start_link/1, ws_loop/3, session/1, info/1, kick/1]). + +%% SUB/UNSUB Asynchronously +-export([subscribe/2, unsubscribe/2]). -behaviour(gen_server). @@ -61,9 +64,21 @@ start_link(Req) -> packet_opts = PktOpts, parser = emqttd_parser:new(PktOpts)}). +session(CPid) -> + gen_server:call(CPid, session, infinity). + +info(CPid) -> + gen_server:call(CPid, info, infinity). + +kick(CPid) -> + gen_server:call(CPid, kick). + subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). +unsubscribe(CPid, Topics) -> + gen_server:cast(CPid, {unsubscribe, Topics}). + %%------------------------------------------------------------------------------ %% @private %% @doc Start WebSocket client. @@ -112,17 +127,30 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]), {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. +handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> + {reply, emqttd_protocol:session(ProtoState), State}; + +handle_call(info, _From, State = #client_state{request = Req, + proto_state = ProtoState}) -> + {reply, [{websocket, true}, {peer, Req:get(peer)} + | emqttd_protocol:info(ProtoState)], State}; + +handle_call(kick, _From, State) -> + {stop, {shutdown, kick}, ok, State}; + handle_call(_Req, _From, State) -> {reply, error, State}. -handle_cast({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}, hibernate}; +handle_cast({subscribe, TopicTable}, State) -> + with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + +handle_cast({unsubscribe, Topics}, State) -> + with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p", [Error]), stop({shutdown, Error}, State); @@ -137,11 +165,11 @@ handle_cast(_Msg, State) -> handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) -> lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), @@ -149,18 +177,27 @@ handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = P handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), - KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)}, - TimeoutSec, {keepalive, timeout}), - {noreply, State#client_state{keepalive = KeepAlive}}; + Socket = Req:get(socket), + StatFun = fun() -> + case esockd_transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), + noreply(State#client_state{keepalive = KeepAlive}); -handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> +handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> + case emqttd_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), + noreply(State#client_state{keepalive = KeepAlive1}); + {error, timeout} -> lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); - {resumed, KeepAlive1} -> - lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), - {noreply, State#client_state{keepalive = KeepAlive1}} + {error, Error} -> + lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), + stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) end; handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> @@ -170,7 +207,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto handle_info(Info, State = #client_state{request = Req}) -> lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), - {noreply, State}. + noreply(State). terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> lager:info("WebSocket client terminated: ~p", [Reason]), @@ -189,6 +226,12 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +noreply(State) -> + {noreply, State, hibernate}. + stop(Reason, State ) -> {stop, Reason, State}. +with_session(Fun, State = #client_state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + From b270399ec759138c694b30d081e53648e628234e Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 7 Oct 2015 22:54:25 +0800 Subject: [PATCH 8/8] production config as default --- rel/files/emqttd.config.production | 4 ++-- rel/reltool.config | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index ff0df40c9..f34e2f9d6 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -16,7 +16,7 @@ {error_logger_redirect, false}, {crash_log, "log/emqttd_crash.log"}, {handlers, [ - {lager_console_backend, info}, + %%{lager_console_backend, info}, {lager_file_backend, [ {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, {file, "log/emqttd_error.log"}, @@ -95,7 +95,7 @@ {max_awaiting_rel, 0}, %% Statistics Collection Interval(seconds) - {collect_interval, 20}, + {collect_interval, 0}, %% Expired after 2 days {expired_after, 48} diff --git a/rel/reltool.config b/rel/reltool.config index f8cda1e7b..33d985545 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -87,8 +87,8 @@ {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, - {template, "files/emqttd.config.development", "etc/emqttd.config"}, - {template, "files/emqttd.config.production", "etc/emqttd.config.production"}, + {template, "files/emqttd.config.production", "etc/emqttd.config"}, + {template, "files/emqttd.config.development", "etc/emqttd.config.development"}, {template, "files/acl.config", "etc/acl.config"}, {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"},