diff --git a/.gitmodules b/.gitmodules index 7db3aaa3c..3e84cb96d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,6 @@ [submodule "plugins/emqttd_stomp"] path = plugins/emqttd_stomp url = https://github.com/emqtt/emqttd_stomp.git +[submodule "plugins/emqttd_recon"] + path = plugins/emqttd_recon + url = https://github.com/emqtt/emqttd_recon.git diff --git a/include/emqttd_cli.hrl b/include/emqttd_cli.hrl new file mode 100644 index 000000000..c2dc1ddba --- /dev/null +++ b/include/emqttd_cli.hrl @@ -0,0 +1,35 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- + +-define(PRINT(Format, Args), + io:format(Format, Args)). + +-define(PRINT_MSG(Msg), + io:format(Msg)). + +-define(PRINT_CMD(Cmd, Descr), + io:format("~-40s#~s~n", [Cmd, Descr])). + +-define(USAGE(CmdList), + [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]). + + diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index 07a0b3c8f..8c8fab9bb 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit 07a0b3c8fab4a6e77f12552667617d8732bf86a7 +Subproject commit 8c8fab9bbb7a4de36ddf81dab7858f628efc5511 diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql index 01cb44bed..6323f8a54 160000 --- a/plugins/emqttd_plugin_mysql +++ b/plugins/emqttd_plugin_mysql @@ -1 +1 @@ -Subproject commit 01cb44bed2cec5a8d667d1342bf6f452c1bd335a +Subproject commit 6323f8a54c2c21c60c38d3065659c7c13a2afe26 diff --git a/plugins/emqttd_plugin_pgsql b/plugins/emqttd_plugin_pgsql index fd610be85..80f0b866d 160000 --- a/plugins/emqttd_plugin_pgsql +++ b/plugins/emqttd_plugin_pgsql @@ -1 +1 @@ -Subproject commit fd610be85d0466ddcac661e0733b621abfb15b91 +Subproject commit 80f0b866d99a02ba89de94ccdaa9ee1d687566ce diff --git a/plugins/emqttd_recon b/plugins/emqttd_recon new file mode 160000 index 000000000..7f725bc34 --- /dev/null +++ b/plugins/emqttd_recon @@ -0,0 +1 @@ +Subproject commit 7f725bc3438d4c25a1f10e90286095271bf7a0f9 diff --git a/plugins/emqttd_sockjs b/plugins/emqttd_sockjs index 9caeefc42..6d5ba0dfe 160000 --- a/plugins/emqttd_sockjs +++ b/plugins/emqttd_sockjs @@ -1 +1 @@ -Subproject commit 9caeefc425e2119be754be6342d7b6481217bbf8 +Subproject commit 6d5ba0dfe62d375da09f1d53823b8aa54046aa11 diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index ff0df40c9..f34e2f9d6 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -16,7 +16,7 @@ {error_logger_redirect, false}, {crash_log, "log/emqttd_crash.log"}, {handlers, [ - {lager_console_backend, info}, + %%{lager_console_backend, info}, {lager_file_backend, [ {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, {file, "log/emqttd_error.log"}, @@ -95,7 +95,7 @@ {max_awaiting_rel, 0}, %% Statistics Collection Interval(seconds) - {collect_interval, 20}, + {collect_interval, 0}, %% Expired after 2 days {expired_after, 48} diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index ccba93740..4292b893e 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -1,4 +1,6 @@ #!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then @@ -79,303 +81,11 @@ ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin # Setup command to control the node NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" -# Check the first argument for instructions -case "$1" in - status) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT status" - exit 1 - fi +RES=`$NODETOOL ping` +if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 +fi - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "Node is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl status $@ - ;; - - cluster) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT cluster []" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl cluster $@ - ;; - - useradd) - if [ $# -ne 3 ]; then - echo "Usage: $SCRIPT useradd " - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl useradd $@ - ;; - - userdel) - if [ $# -ne 2 ]; then - echo "Usage: $SCRIPT userdel " - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl userdel $@ - ;; - - vm) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT vm [ load | memory | process | io ]" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl vm $@ - ;; - - broker) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT broker" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl broker $@ - ;; - - stats) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT stats" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl stats $@ - ;; - - metrics) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT metrics" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl metrics $@ - ;; - - bridges) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then - $NODETOOL rpc emqttd_ctl bridges list - elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then - $NODETOOL rpc emqttd_ctl bridges options - elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then - shift - $NODETOOL rpc emqttd_ctl bridges $@ - elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then - shift - $NODETOOL rpc emqttd_ctl bridges $@ - else - echo "Usage: " - echo "$SCRIPT bridges list" - echo "$SCRIPT bridges start " - echo "$SCRIPT bridges start " - echo "$SCRIPT bridges stop " - exit 1 - fi - ;; - clients) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl clients list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl clients $@ - else - echo "Usage: " - echo "$SCRIPT clients list" - echo "$SCRIPT clients show " - echo "$SCRIPT clients kick " - exit 1 - fi - ;; - sessions) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl sessions list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl sessions $@ - else - echo "Usage: " - echo "$SCRIPT sessions list" - echo "$SCRIPT sessions show " - exit 1 - fi - ;; - plugins) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl plugins list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl plugins $@ - else - echo "Usage: " - echo "$SCRIPT plugins list" - echo "$SCRIPT plugins load " - echo "$SCRIPT plugins unload " - exit 1 - fi - ;; - listeners) - if [ $# -gt 1 ]; then - echo "Usage: $SCRIPT listeners" - exit 1 - fi - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl listeners $@ - ;; - trace) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl trace list - elif [ $# -eq 4 ]; then - shift - $NODETOOL rpc emqttd_ctl trace $@ - else - echo "Usage: " - echo "$SCRIPT trace list" - echo "$SCRIPT trace client " - echo "$SCRIPT trace client off" - echo "$SCRIPT trace topic " - echo "$SCRIPT trace topic off" - exit 1 - fi - ;; - - *) - echo "Usage: $SCRIPT" - echo " status #query broker status" - echo " vm [ load | memory | process | io ] #query load, memory, process and io of erlang vm" - echo " broker #query broker version, uptime and description" - echo " stats #query broker statistics of clients, topics, subscribers" - echo " metrics #query broker metrics" - echo " cluster [] #query or cluster nodes" - echo " ----------------------------------------------------------------" - echo " clients list #list all clients" - echo " clients show #show a client" - echo " clients kick #kick a client" - echo " sessions list #list all sessions" - echo " sessions show #show a sessions" - echo " ----------------------------------------------------------------" - echo " plugins list #query loaded plugins" - echo " plugins load #load plugin" - echo " plugins unload #unload plugin" - echo " ----------------------------------------------------------------" - echo " bridges list #query bridges" - echo " bridges options #bridge options" - echo " bridges start #start bridge" - echo " bridges start #start bridge with options" - echo " bridges stop #stop bridge" - echo " ----------------------------------------------------------------" - echo " useradd #add user" - echo " userdel #delete user" - echo " ----------------------------------------------------------------" - echo " listeners #query broker listeners" - echo " ----------------------------------------------------------------" - echo " trace list #query all traces" - echo " trace client #trace client with ClientId" - echo " trace client off #stop to trace client" - echo " trace topic #trace topic with Topic" - echo " trace topic off #stop to trace Topic" - exit 1 - ;; - -esac +$NODETOOL rpc emqttd_ctl run $@ diff --git a/rel/files/emqttd_top b/rel/files/emqttd_top new file mode 100755 index 000000000..24533c436 --- /dev/null +++ b/rel/files/emqttd_top @@ -0,0 +1,117 @@ +#!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et + +# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. +if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then + POSIX_SHELL="true" + export POSIX_SHELL + # To support 'whoami' add /usr/ucb to path + PATH=/usr/ucb:$PATH + export PATH + exec /usr/bin/ksh $0 "$@" +fi +unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well + +RUNNER_SCRIPT_DIR={{runner_script_dir}} +RUNNER_SCRIPT=${0##*/} + +RUNNER_BASE_DIR={{runner_base_dir}} +RUNNER_ETC_DIR={{runner_etc_dir}} +RUNNER_LIB_DIR={{platform_lib_dir}} +RUNNER_USER={{runner_user}} + +WHOAMI=$(whoami) + +# Make sure this script is running as the appropriate user +if ([ "$RUNNER_USER" ] && [ "x$WHOAMI" != "x$RUNNER_USER" ]); then + type sudo > /dev/null 2>&1 + if [ $? -ne 0 ]; then + echo "sudo doesn't appear to be installed and your EUID isn't $RUNNER_USER" 1>&2 + exit 1 + fi + echo "Attempting to restart script through sudo -H -u $RUNNER_USER" >&2 + exec sudo -H -u $RUNNER_USER -i $RUNNER_SCRIPT_DIR/$RUNNER_SCRIPT $@ +fi + +# Make sure CWD is set to runner base dir +cd $RUNNER_BASE_DIR + +# Extract the target node name from node.args +NAME_ARG=`egrep "^ *-s?name" $RUNNER_ETC_DIR/vm.args` +if [ -z "$NAME_ARG" ]; then + echo "vm.args needs to have either -name or -sname parameter." + exit 1 +fi + +# Learn how to specify node name for connection from remote nodes +echo "$NAME_ARG" | grep '^-sname' > /dev/null 2>&1 +if [ "X$?" = "X0" ]; then + NAME_PARAM="-sname" + NAME_HOST="" +else + NAME_PARAM="-name" + echo "$NAME_ARG" | grep '@.*' > /dev/null 2>&1 + if [ "X$?" = "X0" ]; then + NAME_HOST=`echo "${NAME_ARG}" | sed -e 's/.*\(@.*\)$/\1/'` + else + NAME_HOST="" + fi +fi + +# Extract the target cookie +COOKIE_ARG=`grep '\-setcookie' $RUNNER_ETC_DIR/vm.args` +if [ -z "$COOKIE_ARG" ]; then + echo "vm.args needs to have a -setcookie parameter." + exit 1 +fi + +# Identify the script name +SCRIPT=`basename $0` + +# Parse out release and erts info +START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data` +ERTS_VSN=${START_ERL% *} +APP_VSN=${START_ERL#* } + +# Add ERTS bin dir to our path +ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin + +NODE_NAME=${NAME_ARG#* } + +# Setup command to control the node +NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" + +RES=`$NODETOOL ping` +if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 +fi + +case "$1" in + runtime) + SORTBY="runtime" + ;; + reductions) + SORTBY="reductions" + ;; + memory) + SORTBY="memory" + ;; + msg_q) + SORTBY="msg_q" + ;; + *) + echo "Usage: $SCRIPT {runtime | reductions | memory | msg_q}" + exit 1 + ;; +esac + +MYPID=$$ +ETOP_ARGS="-sort $SORTBY -interval 10 -lines 50 -tracing off" +$ERTS_PATH/erl -noshell -noinput \ + -pa $RUNNER_LIB_DIR/basho-patches \ + -hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \ + -s etop -s erlang halt -output text \ + -node $NODE_NAME $ETOP_ARGS + diff --git a/rel/reltool.config b/rel/reltool.config index 53ce02f86..33d985545 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -19,6 +19,7 @@ inets, goldrush, compiler, + runtime_tools, lager, {gen_logger, load}, gproc, @@ -54,6 +55,8 @@ {app, eldap, [{incl_cond, include}]}, {app, inets, [{incl_cond, include}]}, {app, compiler, [{incl_cond, include}]}, + {app, runtime_tools, [{incl_cond, include}]}, + {app, observer, [{incl_cond, include}]}, {app, goldrush, [{incl_cond, include}]}, {app, gen_logger, [{incl_cond, include}]}, {app, lager, [{incl_cond, include}]}, @@ -78,13 +81,14 @@ {template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"}, {template, "files/emqttd", "bin/emqttd"}, {template, "files/emqttd_ctl", "bin/emqttd_ctl"}, + {template, "files/emqttd_top", "bin/emqttd_top"}, {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, - {template, "files/emqttd.config.development", "etc/emqttd.config"}, - {template, "files/emqttd.config.production", "etc/emqttd.config.production"}, + {template, "files/emqttd.config.production", "etc/emqttd.config"}, + {template, "files/emqttd.config.development", "etc/emqttd.config.development"}, {template, "files/acl.config", "etc/acl.config"}, {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 446bff8a9..c5fd94bdf 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_app). -author("Feng Lee "). @@ -51,6 +52,7 @@ start(_StartType, _StartArgs) -> emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), + emqttd_cli:load(), emqttd:load_all_mods(), emqttd_plugins:load(), start_listeners(), @@ -71,7 +73,8 @@ start_listeners() -> emqttd:open_listeners(Listeners). start_servers(Sup) -> - Servers = [{"emqttd trace", emqttd_trace}, + Servers = [{"emqttd ctl", emqttd_ctl}, + {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 789313e3a..1c05d4e59 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -30,6 +30,11 @@ -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +%% CLI callbacks +-export([useradd/1, userdel/1]). + -behaviour(emqttd_auth_mod). -export([add_user/2, remove_user/1, @@ -42,6 +47,22 @@ -record(?AUTH_USERNAME_TAB, {username, password}). +%%%============================================================================= +%%% CLI +%%%============================================================================= + +useradd([Username, Password]) -> + ?PRINT("~p~n", [add_user(list_to_binary(Username), list_to_binary(Password))]); + +useradd(_) -> + ?PRINT_CMD("useradd ", "add user"). + +userdel([Username]) -> + ?PRINT("~p~n", [remove_user(list_to_binary(Username))]); + +userdel(_) -> + ?PRINT_CMD("userdel ", "delete user"). + %%%============================================================================= %%% API %%%============================================================================= @@ -67,6 +88,8 @@ init(Opts) -> {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), + emqttd_ctl:register_cmd(useradd, {?MODULE, useradd}, []), + emqttd_ctl:register_cmd(userdel, {?MODULE, userdel}, []), {ok, Opts}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 0df4b1343..132eca200 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -38,6 +38,12 @@ -export([init/1]). +%%%============================================================================= +%%% CLI +%%%============================================================================= + + + %%%============================================================================= %%% API %%%============================================================================= diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 2b1210221..5a21eb451 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -28,7 +28,7 @@ -author("Feng Lee "). --include_lib("emqttd.hrl"). +-include("emqttd.hrl"). %% API Function Exports -export([start_link/0]). @@ -279,7 +279,8 @@ handle_info(_Info, State) -> terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> stop_tick(Hb), - stop_tick(TRef). + stop_tick(TRef), + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl new file mode 100644 index 000000000..6f78d4745 --- /dev/null +++ b/src/emqttd_cli.erl @@ -0,0 +1,465 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd cli. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_cli). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-include("emqttd_cli.hrl"). + +-import(lists, [foreach/2]). + +-import(proplists, [get_value/2]). + +-export([load/0]). + +-export([status/1, broker/1, cluster/1, bridges/1, + clients/1, sessions/1, plugins/1, listeners/1, + vm/1, mnesia/1, trace/1]). + +-define(PROC_INFOKEYS, [status, + memory, + message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions]). + +load() -> + Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], + [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds]. + +is_cmd(Fun) -> + not lists:member(Fun, [init, load, module_info]). + +%%%============================================================================= +%%% Commands +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Node status +%% @end +%%------------------------------------------------------------------------------ +status([]) -> + {InternalStatus, _ProvidedStatus} = init:get_status(), + ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), + case lists:keysearch(emqttd, 1, application:which_applications()) of + false -> + ?PRINT_MSG("emqttd is not running~n"); + {value,_Version} -> + ?PRINT_MSG("emqttd is running~n") + end; +status(_) -> + ?PRINT_CMD("status", "query broker status"). + +%%------------------------------------------------------------------------------ +%% @doc Query broker +%% @end +%%------------------------------------------------------------------------------ +broker([]) -> + Funs = [sysdescr, version, uptime, datetime], + foreach(fun(Fun) -> + ?PRINT("~-10s: ~s~n", [Fun, emqttd_broker:Fun()]) + end, Funs); + +broker(["stats"]) -> + foreach(fun({Stat, Val}) -> + ?PRINT("~-20s: ~w~n", [Stat, Val]) + end, emqttd_stats:getstats()); + +broker(["metrics"]) -> + foreach(fun({Metric, Val}) -> + ?PRINT("~-24s: ~w~n", [Metric, Val]) + end, lists:sort(emqttd_metrics:all())); + +broker(["pubsub"]) -> + Pubsubs = supervisor:which_children(emqttd_pubsub_sup), + foreach(fun({{_, Id}, Pid, _, _}) -> + ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), + ?PRINT("pubsub: ~w~n", [Id]), + foreach(fun({Key, Val}) -> + ?PRINT(" ~-18s: ~w~n", [Key, Val]) + end, ProcInfo) + end, lists:reverse(Pubsubs)); + +broker(_) -> + ?USAGE([{"broker", "query broker version, uptime and description"}, + {"broker pubsub", "query process_info of pubsub"}, + {"borker stats", "query broker statistics of clients, topics, subscribers"}, + {"broker metrics", "query broker metrics"}]). + +%%------------------------------------------------------------------------------ +%% @doc Cluster with other node +%% @end +%%------------------------------------------------------------------------------ +cluster([]) -> + Nodes = emqttd_broker:running_nodes(), + ?PRINT("cluster nodes: ~p~n", [Nodes]); + +cluster(usage) -> + ?PRINT_CMD("cluster []", "cluster with node, query cluster info "); + +cluster([SNode]) -> + Node = node_name(SNode), + case lists:member(Node, emqttd_broker:running_nodes()) of + true -> + ?PRINT("~s is already clustered~n", [Node]); + false -> + cluster(Node, fun() -> + emqttd_plugins:unload(), + application:stop(emqttd), + application:stop(esockd), + application:stop(gproc), + emqttd_mnesia:cluster(Node), + application:start(gproc), + application:start(esockd), + application:start(emqttd) + end) + end; + +cluster(_) -> + cluster(usage). + +cluster(Node, DoCluster) -> + cluster(net_adm:ping(Node), Node, DoCluster). + +cluster(pong, Node, DoCluster) -> + case emqttd:is_running(Node) of + true -> + DoCluster(), + ?PRINT("cluster with ~s successfully.~n", [Node]); + false -> + ?PRINT("emqttd is not running on ~s~n", [Node]) + end; + +cluster(pang, Node, _DoCluster) -> + ?PRINT("Failed to connect ~s~n", [Node]). + +%%------------------------------------------------------------------------------ +%% @doc Query clients +%% @end +%%------------------------------------------------------------------------------ +clients(["list"]) -> + emqttd_mnesia:dump(ets, mqtt_client, fun print/1); + +clients(["show", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> ?PRINT_MSG("Not Found.~n"); + Client -> print(Client) + end; + +clients(["kick", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> + ?PRINT_MSG("Not Found.~n"); + #mqtt_client{client_pid = Pid} -> + emqttd_client:kick(Pid) + end; + +clients(_) -> + ?USAGE([{"clients list", "list all clients"}, + {"clients show ", "show a client"}, + {"clients kick ", "kick a client"}]). + +%%------------------------------------------------------------------------------ +%% @doc Sessions Command +%% @end +%%------------------------------------------------------------------------------ +sessions(["list"]) -> + [sessions(["list", Type]) || Type <- ["persistent", "transient"]]; + +sessions(["list", "persistent"]) -> + emqttd_mnesia:dump(ets, mqtt_persistent_session, fun print/1); + +sessions(["list", "transient"]) -> + emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1); + +sessions(["show", ClientId]) -> + MP = {{list_to_binary(ClientId), '_'}, '_'}, + case {ets:match_object(mqtt_transient_session, MP), + ets:match_object(mqtt_persistent_session, MP)} of + {[], []} -> + ?PRINT_MSG("Not Found.~n"); + {[SessInfo], _} -> + print(SessInfo); + {_, [SessInfo]} -> + print(SessInfo) + end; + +sessions(_) -> + ?USAGE([{"sessions list", "list all sessions"}, + {"sessions list persistent", "list all persistent sessions"}, + {"sessions list transient", "list all transient sessions"}, + {"sessions show ", "show a session"}]). + +plugins(["list"]) -> + foreach(fun print/1, emqttd_plugins:list()); + +plugins(["load", Name]) -> + case emqttd_plugins:load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) + end; + +plugins(["unload", Name]) -> + case emqttd_plugins:unload(list_to_atom(Name)) of + ok -> + ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); + {error, Reason} -> + ?PRINT("unload plugin error: ~p~n", [Reason]) + end; + +plugins(_) -> + ?USAGE([{"plugins list", "query loaded plugins"}, + {"plugins load ", "load plugin"}, + {"plugins unload ", "unload plugin"}]). + +%%------------------------------------------------------------------------------ +%% @doc Bridges command +%% @end +%%------------------------------------------------------------------------------ + +bridges(["list"]) -> + foreach(fun({{Node, Topic}, _Pid}) -> + ?PRINT("bridge: ~s ~s~n", [Node, Topic]) + end, emqttd_bridge_sup:bridges()); + +bridges(["options"]) -> + ?PRINT_MSG("Options:~n"), + ?PRINT_MSG(" qos = 0 | 1 | 2~n"), + ?PRINT_MSG(" prefix = string~n"), + ?PRINT_MSG(" suffix = string~n"), + ?PRINT_MSG(" queue = integer~n"), + ?PRINT_MSG("Example:~n"), + ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); + +bridges(["start", SNode, Topic]) -> + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(["start", SNode, Topic, OptStr]) -> + Opts = parse_opts(bridge, OptStr), + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(["stop", SNode, Topic]) -> + case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + ok -> ?PRINT_MSG("bridge is stopped.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(_) -> + ?USAGE([{"bridges list", "query bridges"}, + {"bridges options", "bridge options"}, + {"bridges start ", "start bridge"}, + {"bridges start ", "start bridge with options"}, + {"bridges stop ", "stop bridge"}]). + +parse_opts(Cmd, OptStr) -> + Tokens = string:tokens(OptStr, ","), + [parse_opt(Cmd, list_to_atom(Opt), Val) + || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. +parse_opt(bridge, qos, Qos) -> + {qos, list_to_integer(Qos)}; +parse_opt(bridge, suffix, Suffix) -> + {topic_suffix, list_to_binary(Suffix)}; +parse_opt(bridge, prefix, Prefix) -> + {topic_prefix, list_to_binary(Prefix)}; +parse_opt(bridge, queue, Len) -> + {max_queue_len, list_to_integer(Len)}; +parse_opt(_Cmd, Opt, _Val) -> + ?PRINT("Bad Option: ~s~n", [Opt]). + +%%------------------------------------------------------------------------------ +%% @doc vm command +%% @end +%%------------------------------------------------------------------------------ +vm([]) -> + vm(["all"]); + +vm(["all"]) -> + [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; + +vm(["load"]) -> + [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; + +vm(["memory"]) -> + [?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; + +vm(["process"]) -> + foreach(fun({Name, Key}) -> + ?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{limit, process_limit}, {count, process_count}]); + +vm(["io"]) -> + IoInfo = erlang:system_info(check_io), + foreach(fun(Key) -> + ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) + end, [max_fds, active_fds]); + +vm(_) -> + ?USAGE([{"vm all", "query info of erlang vm"}, + {"vm load", "query load of erlang vm"}, + {"vm memory", "query memory of erlang vm"}, + {"vm process", "query process of erlang vm"}, + {"vm io", "queue io of erlang vm"}]). + +%%------------------------------------------------------------------------------ +%% @doc mnesia Command +%% @end +%%------------------------------------------------------------------------------ +mnesia([]) -> + mnesia:system_info(); + +mnesia(_) -> + ?PRINT_CMD("mnesia", "mnesia system info"). + +%%------------------------------------------------------------------------------ +%% @doc Trace Command +%% @end +%%------------------------------------------------------------------------------ +trace(["list"]) -> + foreach(fun({{Who, Name}, LogFile}) -> + ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) + end, emqttd_trace:all_traces()); + +trace(["client", ClientId, "off"]) -> + trace_off(client, ClientId); + +trace(["client", ClientId, LogFile]) -> + trace_on(client, ClientId, LogFile); + +trace(["topic", Topic, "off"]) -> + trace_off(topic, Topic); + +trace(["topic", Topic, LogFile]) -> + trace_on(topic, Topic, LogFile); + +trace(_) -> + ?USAGE([{"trace list", "query all traces"}, + {"trace client ","trace client with ClientId"}, + {"trace client off", "stop to trace client"}, + {"trace topic ", "trace topic with Topic"}, + {"trace topic off", "stop to trace Topic"}]). + +trace_on(Who, Name, LogFile) -> + case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of + ok -> + ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. + +trace_off(Who, Name) -> + case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of + ok -> + ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + +%%------------------------------------------------------------------------------ +%% @doc Listeners Command +%% @end +%%------------------------------------------------------------------------------ +listeners([]) -> + foreach(fun({{Protocol, Port}, Pid}) -> + Info = [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients,esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}], + ?PRINT("listener on ~s:~w~n", [Protocol, Port]), + foreach(fun({Key, Val}) -> + ?PRINT(" ~-16s: ~w~n", [Key, Val]) + end, Info) + end, esockd:listeners()); + +listeners(_) -> + ?PRINT_CMD("listeners", "query broker listeners"). + +node_name(SNode) -> + SNode1 = + case string:tokens(SNode, "@") of + [_Node, _Server] -> + SNode; + _ -> + case net_kernel:longnames() of + true -> + SNode ++ "@" ++ inet_db:gethostname() ++ + "." ++ inet_db:res_option(domain); + false -> + SNode ++ "@" ++ inet_db:gethostname(); + _ -> + SNode + end + end, + list_to_atom(SNode1). + +print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", + [Name, Ver, Descr, Active]); + +print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, + username = Username, peername = Peername, + connected_at = ConnectedAt}) -> + ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", + [ClientId, CleanSess, Username, + emqttd_net:format(Peername), + emqttd_util:now_to_secs(ConnectedAt)]); + +print({{ClientId, _ClientPid}, SessInfo}) -> + InfoKeys = [clean_sess, + max_inflight, + inflight_queue, + message_queue, + message_dropped, + awaiting_rel, + awaiting_ack, + awaiting_comp, + created_at, + subscriptions], + ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " + "message_queue=~w, message_dropped=~w, " + "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " + "created_at=~w, subscriptions=~s)~n", + [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). + +format(created_at, Val) -> + emqttd_util:now_to_secs(Val); + +format(subscriptions, List) -> + string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); + +format(_, Val) -> + Val. + diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index da38d9d8d..7eb4be8f4 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,10 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, session/1, info/1, kick/1, subscribe/2]). +-export([start_link/2, session/1, info/1, kick/1]). + +%% SUB/UNSUB Asynchronously +-export([subscribe/2, unsubscribe/2]). -behaviour(gen_server). @@ -59,7 +62,7 @@ start_link(SockArgs, MqttEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. session(CPid) -> - gen_server:call(CPid, session). + gen_server:call(CPid, session, infinity). info(CPid) -> gen_server:call(CPid, info, infinity). @@ -70,6 +73,9 @@ kick(CPid) -> subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). +unsubscribe(CPid, Topics) -> + gen_server:cast(CPid, {unsubscribe, Topics}). + init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), @@ -107,9 +113,11 @@ handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. -handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - noreply(State#state{proto_state = ProtoState1}); +handle_cast({subscribe, TopicTable}, State) -> + with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + +handle_cast({unsubscribe, Topics}, State) -> + with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast(Msg, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), @@ -149,17 +157,26 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), - KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), + StatFun = fun() -> + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), noreply(State#state{keepalive = KeepAlive}); -handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> +handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) -> + case emqttd_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), + noreply(State#state{keepalive = KeepAlive1}); + {error, timeout} -> lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); - {resumed, KeepAlive1} -> - lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), - noreply(State#state{keepalive = KeepAlive1}) + {error, Error} -> + lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), + stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; handle_info(Info, State = #state{peername = Peername}) -> @@ -188,12 +205,20 @@ terminate(Reason, #state{peername = Peername, code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + noreply(State) -> {noreply, State, hibernate}. - -%------------------------------------------------------- -% receive and parse tcp data -%------------------------------------------------------- + +stop(Reason, State) -> + {stop, Reason, State}. + +with_session(Fun, State = #state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + +%% receive and parse tcp data received(<<>>, State) -> {noreply, State, hibernate}; @@ -244,12 +269,8 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -stop(Reason, State) -> - {stop, Reason, State}. - received_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/received'), - inc(Type). + emqttd_metrics:inc('packets/received'), inc(Type). inc(?CONNECT) -> emqttd_metrics:inc('packets/connect'); inc(?PUBLISH) -> diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 04902c9ab..d60ddc78d 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -30,22 +30,22 @@ -include("emqttd.hrl"). --behaviour(gen_server2). - --define(SERVER, ?MODULE). - %% API Exports -export([start_link/2, pool/0]). -export([lookup/1, register/1, unregister/1]). +-behaviour(gen_server2). + +-define(SERVER, ?MODULE). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, {id, statsfun}). --define(CM_POOL, cm_pool). +-define(CM_POOL, ?MODULE). %%%============================================================================= %%% API diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 48863c084..86f7d8b3b 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -20,331 +20,120 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd control commands. +%%% emqttd control. %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_ctl). -author("Feng Lee "). -include("emqttd.hrl"). --define(PRINT_MSG(Msg), - io:format(Msg)). +-include("emqttd_cli.hrl"). --define(PRINT(Format, Args), - io:format(Format, Args)). +-behaviour(gen_server). --export([status/1, - vm/1, - broker/1, - stats/1, - metrics/1, - cluster/1, - clients/1, - sessions/1, - listeners/1, - bridges/1, - plugins/1, - trace/1, - useradd/1, - userdel/1]). +-define(SERVER, ?MODULE). + +%% API Function Exports +-export([start_link/0, + register_cmd/3, + unregister_cmd/1, + run/1]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {seq = 0}). + +-define(CMD_TAB, mqttd_ctl_cmd). + +%%%============================================================================= +%%% API +%%%============================================================================= + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%------------------------------------------------------------------------------ -%% @doc Query node status +%% @doc Register a command %% @end %%------------------------------------------------------------------------------ -status([]) -> - {InternalStatus, _ProvidedStatus} = init:get_status(), - ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), - case lists:keysearch(emqttd, 1, application:which_applications()) of - false -> - ?PRINT_MSG("emqttd is not running~n"); - {value,_Version} -> - ?PRINT_MSG("emqttd is running~n") - end. +-spec register_cmd(atom(), {module(), atom()}, list()) -> true. +register_cmd(Cmd, MF, Opts) -> + gen_server:cast(?SERVER, {register_cmd, Cmd, MF, Opts}). %%------------------------------------------------------------------------------ -%% @doc Cluster with other node +%% @doc Unregister a command %% @end %%------------------------------------------------------------------------------ -cluster([]) -> - Nodes = emqttd_broker:running_nodes(), - ?PRINT("cluster nodes: ~p~n", [Nodes]); - -cluster([SNode]) -> - Node = node_name(SNode), - case net_adm:ping(Node) of - pong -> - case emqttd:is_running(Node) of - true -> - emqttd_plugins:unload(), - application:stop(emqttd), - application:stop(esockd), - application:stop(gproc), - emqttd_mnesia:cluster(Node), - application:start(gproc), - application:start(esockd), - application:start(emqttd), - ?PRINT("cluster with ~p successfully.~n", [Node]); - false -> - ?PRINT("emqttd is not running on ~p~n", [Node]) - end; - pang -> - ?PRINT("failed to connect to ~p~n", [Node]) - end. +-spec unregister_cmd(atom()) -> true. +unregister_cmd(Cmd) -> + gen_server:cast(?SERVER, {unregister_cmd, Cmd}). %%------------------------------------------------------------------------------ -%% @doc Add user +%% @doc Run a command %% @end %%------------------------------------------------------------------------------ -useradd([Username, Password]) -> - ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]). +run([]) -> usage(); -%%------------------------------------------------------------------------------ -%% @doc Delete user -%% @end -%%------------------------------------------------------------------------------ -userdel([Username]) -> - ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]). +run(["help"]) -> usage(); -vm([]) -> - [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; - -vm(["load"]) -> - ?PRINT_MSG("Load: ~n"), - [?PRINT(" ~s:~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; - -vm(["memory"]) -> - ?PRINT_MSG("Memory: ~n"), - [?PRINT(" ~s:~p~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; - -vm(["process"]) -> - ?PRINT_MSG("Process: ~n"), - ?PRINT(" process_limit:~p~n", [erlang:system_info(process_limit)]), - ?PRINT(" process_count:~p~n", [erlang:system_info(process_count)]); - -vm(["io"]) -> - ?PRINT_MSG("IO: ~n"), - ?PRINT(" max_fds:~p~n", [proplists:get_value(max_fds, erlang:system_info(check_io))]). - -broker([]) -> - Funs = [sysdescr, version, uptime, datetime], - [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]. - -stats([]) -> - [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()]. - -metrics([]) -> - [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. - -clients(["list"]) -> - dump(client, mqtt_client); - -clients(["show", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> - ?PRINT_MSG("Not Found.~n"); - Client -> - print(client, Client) - end; - -clients(["kick", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> - ?PRINT_MSG("Not Found.~n"); - #mqtt_client{client_pid = Pid} -> - emqttd_client:kick(Pid) - end. - -sessions(["list"]) -> - dump(session, mqtt_transient_session), - dump(session, mqtt_persistent_session); - -sessions(["show", ClientId]) -> - MP = {{list_to_binary(ClientId), '_'}, '_'}, - case {ets:match_object(mqtt_transient_session, MP), - ets:match_object(mqtt_persistent_session, MP)} of - {[], []} -> - ?PRINT_MSG("Not Found.~n"); - {[SessInfo], _} -> - print(session, SessInfo); - {_, [SessInfo]} -> - print(session, SessInfo) +run([CmdS|Args]) -> + Cmd = list_to_atom(CmdS), + case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of + [[{Mod, Fun}]] -> Mod:Fun(Args); + [] -> usage() end. -listeners([]) -> - lists:foreach(fun({{Protocol, Port}, Pid}) -> - ?PRINT("listener ~s:~w~n", [Protocol, Port]), - ?PRINT(" acceptors: ~w~n", [esockd:get_acceptors(Pid)]), - ?PRINT(" max_clients: ~w~n", [esockd:get_max_clients(Pid)]), - ?PRINT(" current_clients: ~w~n", [esockd:get_current_clients(Pid)]), - ?PRINT(" shutdown_count: ~p~n", [esockd:get_shutdown_count(Pid)]) - end, esockd:listeners()). +%%------------------------------------------------------------------------------ +%% @doc Usage +%% @end +%%------------------------------------------------------------------------------ +usage() -> + ?PRINT("Usage: ~s~n", [?MODULE]), + [begin ?PRINT("~80..-s~n", [""]), Mod:Cmd(usage) end + || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)]. -bridges(["list"]) -> - lists:foreach(fun({{Node, Topic}, _Pid}) -> - ?PRINT("bridge: ~s ~s~n", [Node, Topic]) - end, emqttd_bridge_sup:bridges()); +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= -bridges(["options"]) -> - ?PRINT_MSG("Options:~n"), - ?PRINT_MSG(" qos = 0 | 1 | 2~n"), - ?PRINT_MSG(" prefix = string~n"), - ?PRINT_MSG(" suffix = string~n"), - ?PRINT_MSG(" queue = integer~n"), - ?PRINT_MSG("Example:~n"), - ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); +init([]) -> + ets:new(?CMD_TAB, [ordered_set, named_table, protected]), + {ok, #state{seq = 0}}. -bridges(["start", SNode, Topic]) -> - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; +handle_call(_Request, _From, State) -> + {reply, ok, State}. -bridges(["start", SNode, Topic, OptStr]) -> - Opts = parse_opts(bridge, OptStr), - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; +handle_cast({register_cmd, Cmd, MF, Opts}, State = #state{seq = Seq}) -> + ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}), + noreply(next_seq(State)); -bridges(["stop", SNode, Topic]) -> - case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), bin(Topic)) of - ok -> ?PRINT_MSG("bridge is stopped.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end. +handle_cast({unregister_cmd, Cmd}, State) -> + ets:match_delete(?CMD_TAB, {{'_', Cmd}, '_', '_'}), + noreply(State); -plugins(["list"]) -> - lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list()); +handle_cast(_Msg, State) -> + noreply(State). -plugins(["load", Name]) -> - case emqttd_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) - end; +handle_info(_Info, State) -> + noreply(State). -plugins(["unload", Name]) -> - case emqttd_plugins:unload(list_to_atom(Name)) of - ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason]) - end. +terminate(_Reason, _State) -> + ok. -trace(["list"]) -> - lists:foreach(fun({{Who, Name}, LogFile}) -> - ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) - end, emqttd_trace:all_traces()); +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -trace(["client", ClientId, "off"]) -> - stop_trace(client, ClientId); -trace(["client", ClientId, LogFile]) -> - start_trace(client, ClientId, LogFile); -trace(["topic", Topic, "off"]) -> - stop_trace(topic, Topic); -trace(["topic", Topic, LogFile]) -> - start_trace(topic, Topic, LogFile). +%%%============================================================================= +%%% Internal Function Definitions +%%%============================================================================= -start_trace(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of - ok -> - ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) - end. -stop_trace(Who, Name) -> - case emqttd_trace:stop_trace({Who, bin(Name)}) of - ok -> - ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) - end. +noreply(State) -> {noreply, State, hibernate}. - -node_name(SNode) -> - SNode1 = - case string:tokens(SNode, "@") of - [_Node, _Server] -> - SNode; - _ -> - case net_kernel:longnames() of - true -> - SNode ++ "@" ++ inet_db:gethostname() ++ - "." ++ inet_db:res_option(domain); - false -> - SNode ++ "@" ++ inet_db:gethostname(); - _ -> - SNode - end - end, - list_to_atom(SNode1). - -bin(S) when is_list(S) -> list_to_binary(S); -bin(B) when is_binary(B) -> B. - -parse_opts(Cmd, OptStr) -> - Tokens = string:tokens(OptStr, ","), - [parse_opt(Cmd, list_to_atom(Opt), Val) - || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. - -parse_opt(bridge, qos, Qos) -> - {qos, list_to_integer(Qos)}; -parse_opt(bridge, suffix, Suffix) -> - {topic_suffix, list_to_binary(Suffix)}; -parse_opt(bridge, prefix, Prefix) -> - {topic_prefix, list_to_binary(Prefix)}; -parse_opt(bridge, queue, Len) -> - {max_queue_len, list_to_integer(Len)}; -parse_opt(_Cmd, Opt, _Val) -> - ?PRINT("Bad Option: ~s~n", [Opt]). - -dump(Type, Table) -> - dump(Type, Table, ets:first(Table)). - -dump(_Type, _Table, '$end_of_table') -> - ok; -dump(Type, Table, Key) -> - case ets:lookup(Table, Key) of - [Record] -> print(Type, Record); - [] -> ignore - end, - dump(Type, Table, ets:next(Table, Key)). - -print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess, - username = Username, peername = Peername, - connected_at = ConnectedAt}) -> - ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", - [ClientId, CleanSess, Username, - emqttd_net:format(Peername), - emqttd_util:now_to_secs(ConnectedAt)]); - -print(session, {{ClientId, _ClientPid}, SessInfo}) -> - InfoKeys = [clean_sess, - max_inflight, - inflight_queue, - message_queue, - message_dropped, - awaiting_rel, - awaiting_ack, - awaiting_comp, - created_at, - subscriptions], - ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " - "message_queue=~w, message_dropped=~w, " - "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " - "created_at=~w, subscriptions=~s)~n", - [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]); - -print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", - [Name, Ver, Descr, Active]). - -format(created_at, Val) -> - emqttd_util:now_to_secs(Val); - -format(subscriptions, List) -> - string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); - -format(_, Val) -> - Val. +next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}. diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index f5c7f2ac7..e06382207 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -23,62 +23,61 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_keepalive). -author("Feng Lee "). --export([new/3, resume/1, cancel/1]). +-export([start/3, check/1, cancel/1]). --record(keepalive, {transport, - socket, - recv_oct, - timeout_sec, - timeout_msg, - timer_ref}). +-record(keepalive, {statfun, statval, + tsec, tmsg, tref, + repeat = 0}). %%------------------------------------------------------------------------------ -%% @doc Create a keepalive +%% @doc Start a keepalive %% @end %%------------------------------------------------------------------------------ -new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> - {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), - Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref}. +start(_, 0, _) -> + undefined; +start(StatFun, TimeoutSec, TimeoutMsg) -> + {ok, StatVal} = StatFun(), + #keepalive{statfun = StatFun, statval = StatVal, + tsec = TimeoutSec, tmsg = TimeoutMsg, + tref = timer(TimeoutSec, TimeoutMsg)}. %%------------------------------------------------------------------------------ -%% @doc Try to resume keepalive, called when timeout +%% @doc Check keepalive, called when timeout. %% @end %%------------------------------------------------------------------------------ -resume(KeepAlive = #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }) -> - {ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), - if - NewRecvOct =:= RecvOct -> - timeout; - true -> - %need? - cancel(Ref), - NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - {resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}} +check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> + case StatFun() of + {ok, NewVal} -> + if NewVal =/= LastVal -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; + Repeat < 1 -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; + true -> + {error, timeout} + end; + {error, Error} -> + {error, Error} end. +resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> + KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. + %%------------------------------------------------------------------------------ %% @doc Cancel Keepalive %% @end %%------------------------------------------------------------------------------ -cancel(#keepalive{timer_ref = Ref}) -> - cancel(Ref); +cancel(#keepalive{tref = TRef}) -> + cancel(TRef); cancel(undefined) -> - undefined; -cancel(Ref) -> - catch erlang:cancel_timer(Ref). + ok; +cancel(TRef) -> + catch erlang:cancel_timer(TRef). + +timer(Sec, Msg) -> + erlang:send_after(timer:seconds(Sec), self(), Msg). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index b6752df29..707c2a9f9 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_metrics). -author("Feng Lee "). diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 86fa118f8..355f3d547 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -35,6 +35,8 @@ -export([create_table/2, copy_table/1]). +-export([dump/3]). + start() -> case init_schema() of ok -> @@ -168,3 +170,16 @@ wait_for_mnesia(stop) -> {error, mnesia_unexpectedly_starting} end. +dump(ets, Table, Fun) -> + dump(ets, Table, ets:first(Table), Fun). + +dump(ets, _Table, '$end_of_table', _Fun) -> + ok; + +dump(ets, Table, Key, Fun) -> + case ets:lookup(Table, Key) of + [Record] -> Fun(Record); + [] -> ignore + end, + dump(ets, Table, ets:next(Table, Key), Fun). + diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index dcd120035..840339819 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -239,16 +239,11 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = case lists:member(deny, AllowDenies) of true -> %%TODO: return 128 QoS when deny... no need to SUBACK? - lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), - {ok, State}; + lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]); false -> - %%TODO: GrantedQos should be renamed. - {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), - send(?SUBACK_PACKET(PacketId, GrantedQos), State) - end; - -handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> - {ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end, + emqttd_session:subscribe(Session, TopicTable, Callback) + end, {ok, State}; %% protect from empty topic list @@ -256,7 +251,7 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - ok = emqttd_session:unsubscribe(Session, Topics), + emqttd_session:unsubscribe(Session, Topics), send(?UNSUBACK_PACKET(PacketId), State); handle(?PACKET(?PINGREQ), State) -> @@ -349,7 +344,7 @@ send_willmsg(ClientId, WillMsg) -> start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> - self() ! {keepalive, start, round(Sec * 1.5)}. + self() ! {keepalive, start, round(Sec * 1.2)}. %%---------------------------------------------------------------------------- %% Validate Packets diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 74da5026f..8bd4534e0 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -231,6 +231,7 @@ match(Topic) when is_binary(Topic) -> %%%============================================================================= init([Id, _Opts]) -> + process_flag(priority, high), %%process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 1d3caa98f..a23f76f7d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -59,7 +59,7 @@ %% PubSub APIs -export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2, - subscribe/2, unsubscribe/2]). + subscribe/2, subscribe/3, unsubscribe/2]). -behaviour(gen_server2). @@ -166,9 +166,13 @@ destroy(SessPid, ClientId) -> %% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ --spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. +-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok. subscribe(SessPid, TopicTable) -> - gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT). + subscribe(SessPid, TopicTable, fun(_) -> ok end). + +-spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok. +subscribe(SessPid, TopicTable, Callback) -> + gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}). %%------------------------------------------------------------------------------ %% @doc Publish message @@ -213,7 +217,7 @@ pubcomp(SessPid, PktId) -> %%------------------------------------------------------------------------------ -spec unsubscribe(pid(), [binary()]) -> ok. unsubscribe(SessPid, Topics) -> - gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT). + gen_server2:cast(SessPid, {unsubscribe, Topics}). %%%============================================================================= %%% gen_server callbacks @@ -247,26 +251,24 @@ init([CleanSess, ClientId, ClientPid]) -> {ok, start_collector(Session#session{client_mon = MRef}), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> - case Msg of - {unsubscribe, _} -> 2; - {subscribe, _} -> 1; - _ -> 0 - end. + case Msg of _ -> 0 end. prioritise_cast(Msg, _Len, _State) -> case Msg of - {destroy, _} -> 10; - {resume, _, _} -> 9; - {pubrel, _PktId} -> 8; - {pubcomp, _PktId} -> 8; - {pubrec, _PktId} -> 8; - {puback, _PktId} -> 7; - _ -> 0 + {destroy, _} -> 10; + {resume, _, _} -> 9; + {pubrel, _PktId} -> 8; + {pubcomp, _PktId} -> 8; + {pubrec, _PktId} -> 8; + {puback, _PktId} -> 7; + {unsubscribe, _, _} -> 6; + {subscribe, _, _} -> 5; + _ -> 0 end. prioritise_info(Msg, _Len, _State) -> case Msg of - {'DOWN', _, process, _, _} -> 10; + {'DOWN', _, _, _, _} -> 10; {'EXIT', _, _} -> 10; session_expired -> 10; {timeout, _, _} -> 5; @@ -275,17 +277,40 @@ prioritise_info(Msg, _Len, _State) -> _ -> 0 end. -handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, + Session = #session{client_id = ClientId, + awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> + case check_awaiting_rel(Session) of + true -> + TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), + AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), + {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; + false -> + lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " + "for too many awaiting_rel: ~p", [ClientId, Msg]), + {reply, {error, dropped}, Session} + end; - case TopicTable0 -- Subscriptions of +handle_call(Req, _From, State) -> + lager:critical("Unexpected Request: ~p", [Req]), + {reply, ok, State}. + +handle_cast({subscribe, TopicTable0, Callback}, Session = #session{ + client_id = ClientId, subscriptions = Subscriptions}) -> + + TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), + + case TopicTable -- Subscriptions of [] -> - {reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session}; + catch Callback([Qos || {_, Qos} <- TopicTable]), + noreply(Session); _ -> - TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), + catch Callback(GrantedQos), + emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", @@ -310,11 +335,11 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie [{Topic, Qos} | Acc] end end, Subscriptions, TopicTable), - {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}} + noreply(Session#session{subscriptions = Subscriptions1}) end; -handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), @@ -333,26 +358,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client end end, Subscriptions, Topics), - {reply, ok, Session#session{subscriptions = Subscriptions1}}; - -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, - Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> - case check_awaiting_rel(Session) of - true -> - TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), - AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), - {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; - false -> - lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " - "for too many awaiting_rel: ~p", [ClientId, Msg]), - {reply, {error, dropped}, Session} - end; - -handle_call(Req, _From, State) -> - lager:critical("Unexpected Request: ~p", [Req]), - {reply, ok, State}. + noreply(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]), diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index c17203f80..99c99c83f 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -55,9 +55,9 @@ -record(state, {id}). --define(SM_POOL, sm_pool). +-define(SM_POOL, ?MODULE). -%% todo... +%% TODO... -define(SESSION_TIMEOUT, 60000). %%%============================================================================= diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index bfdf5209d..345dc2d26 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -31,12 +31,12 @@ -include("emqttd.hrl"). --export([start_link/0]). - -behaviour(gen_server). -define(SERVER, ?MODULE). +-export([start_link/0]). + %% statistics API. -export([statsfun/1, statsfun/2, getstats/0, getstat/1, @@ -52,8 +52,8 @@ %% $SYS Topics for Clients -define(SYSTOP_CLIENTS, [ - 'clients/count', % clients connected current - 'clients/max' % max clients connected + 'clients/count', % clients connected current + 'clients/max' % max clients connected ]). %% $SYS Topics for Sessions @@ -72,6 +72,7 @@ 'queues/max' % ... ]). + %%%============================================================================= %%% API %%%============================================================================= @@ -102,7 +103,7 @@ statsfun(Stat, MaxStat) -> %%------------------------------------------------------------------------------ -spec getstats() -> [{atom(), non_neg_integer()}]. getstats() -> - ets:tab2list(?STATS_TAB). + lists:sort(ets:tab2list(?STATS_TAB)). %%------------------------------------------------------------------------------ %% @doc Get stats by name diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 82cd4d8a7..8edcebad0 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -24,17 +24,18 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_trace). -author("Feng Lee "). --behaviour(gen_server). - %% API Function Exports -export([start_link/0]). -export([start_trace/2, stop_trace/1, all_traces/0]). +-behaviour(gen_server). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 6b4a001b2..827d7de18 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -34,7 +34,10 @@ -include("emqttd_protocol.hrl"). %% API Exports --export([start_link/1, ws_loop/3, subscribe/2]). +-export([start_link/1, ws_loop/3, session/1, info/1, kick/1]). + +%% SUB/UNSUB Asynchronously +-export([subscribe/2, unsubscribe/2]). -behaviour(gen_server). @@ -61,9 +64,21 @@ start_link(Req) -> packet_opts = PktOpts, parser = emqttd_parser:new(PktOpts)}). +session(CPid) -> + gen_server:call(CPid, session, infinity). + +info(CPid) -> + gen_server:call(CPid, info, infinity). + +kick(CPid) -> + gen_server:call(CPid, kick). + subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). +unsubscribe(CPid, Topics) -> + gen_server:cast(CPid, {unsubscribe, Topics}). + %%------------------------------------------------------------------------------ %% @private %% @doc Start WebSocket client. @@ -112,17 +127,30 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]), {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. +handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> + {reply, emqttd_protocol:session(ProtoState), State}; + +handle_call(info, _From, State = #client_state{request = Req, + proto_state = ProtoState}) -> + {reply, [{websocket, true}, {peer, Req:get(peer)} + | emqttd_protocol:info(ProtoState)], State}; + +handle_call(kick, _From, State) -> + {stop, {shutdown, kick}, ok, State}; + handle_call(_Req, _From, State) -> {reply, error, State}. -handle_cast({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}, hibernate}; +handle_cast({subscribe, TopicTable}, State) -> + with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + +handle_cast({unsubscribe, Topics}, State) -> + with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p", [Error]), stop({shutdown, Error}, State); @@ -137,11 +165,11 @@ handle_cast(_Msg, State) -> handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) -> lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), @@ -149,18 +177,27 @@ handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = P handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), - KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)}, - TimeoutSec, {keepalive, timeout}), - {noreply, State#client_state{keepalive = KeepAlive}}; + Socket = Req:get(socket), + StatFun = fun() -> + case esockd_transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), + noreply(State#client_state{keepalive = KeepAlive}); -handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> +handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> + case emqttd_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), + noreply(State#client_state{keepalive = KeepAlive1}); + {error, timeout} -> lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); - {resumed, KeepAlive1} -> - lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), - {noreply, State#client_state{keepalive = KeepAlive1}} + {error, Error} -> + lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), + stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) end; handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> @@ -170,7 +207,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto handle_info(Info, State = #client_state{request = Req}) -> lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), - {noreply, State}. + noreply(State). terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> lager:info("WebSocket client terminated: ~p", [Reason]), @@ -189,6 +226,12 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +noreply(State) -> + {noreply, State, hibernate}. + stop(Reason, State ) -> {stop, Reason, State}. +with_session(Fun, State = #client_state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + diff --git a/test/emqttd_keepalive_tests.erl b/test/emqttd_keepalive_tests.erl new file mode 100644 index 000000000..96f84450a --- /dev/null +++ b/test/emqttd_keepalive_tests.erl @@ -0,0 +1,44 @@ + +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +-module(emqttd_keepalive_tests). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +keepalive_test() -> + KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), + ?assertEqual([resumed, timeout], lists:reverse(loop(KA, []))). + +loop(KA, Acc) -> + receive + {keepalive, timeout} -> + case emqttd_keepalive:check(KA) of + {ok, KA1} -> loop(KA1, [resumed | Acc]); + {error, timeout} -> [timeout | Acc] + end + after 4000 -> + Acc + end. + +-endif.