Merge pull request #326 from emqtt/dev-feng

0.11.1
This commit is contained in:
Feng Lee 2015-10-08 14:33:38 +08:00
commit d33c7c1937
30 changed files with 1027 additions and 743 deletions

3
.gitmodules vendored
View File

@ -16,3 +16,6 @@
[submodule "plugins/emqttd_stomp"] [submodule "plugins/emqttd_stomp"]
path = plugins/emqttd_stomp path = plugins/emqttd_stomp
url = https://github.com/emqtt/emqttd_stomp.git url = https://github.com/emqtt/emqttd_stomp.git
[submodule "plugins/emqttd_recon"]
path = plugins/emqttd_recon
url = https://github.com/emqtt/emqttd_recon.git

35
include/emqttd_cli.hrl Normal file
View File

@ -0,0 +1,35 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
-define(PRINT(Format, Args),
io:format(Format, Args)).
-define(PRINT_MSG(Msg),
io:format(Msg)).
-define(PRINT_CMD(Cmd, Descr),
io:format("~-40s#~s~n", [Cmd, Descr])).
-define(USAGE(CmdList),
[?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]).

@ -1 +1 @@
Subproject commit 07a0b3c8fab4a6e77f12552667617d8732bf86a7 Subproject commit 8c8fab9bbb7a4de36ddf81dab7858f628efc5511

@ -1 +1 @@
Subproject commit 01cb44bed2cec5a8d667d1342bf6f452c1bd335a Subproject commit 6323f8a54c2c21c60c38d3065659c7c13a2afe26

@ -1 +1 @@
Subproject commit fd610be85d0466ddcac661e0733b621abfb15b91 Subproject commit 80f0b866d99a02ba89de94ccdaa9ee1d687566ce

1
plugins/emqttd_recon Submodule

@ -0,0 +1 @@
Subproject commit 7f725bc3438d4c25a1f10e90286095271bf7a0f9

@ -1 +1 @@
Subproject commit 9caeefc425e2119be754be6342d7b6481217bbf8 Subproject commit 6d5ba0dfe62d375da09f1d53823b8aa54046aa11

View File

@ -16,7 +16,7 @@
{error_logger_redirect, false}, {error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"}, {crash_log, "log/emqttd_crash.log"},
{handlers, [ {handlers, [
{lager_console_backend, info}, %%{lager_console_backend, info},
{lager_file_backend, [ {lager_file_backend, [
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_error.log"}, {file, "log/emqttd_error.log"},
@ -95,7 +95,7 @@
{max_awaiting_rel, 0}, {max_awaiting_rel, 0},
%% Statistics Collection Interval(seconds) %% Statistics Collection Interval(seconds)
{collect_interval, 20}, {collect_interval, 0},
%% Expired after 2 days %% Expired after 2 days
{expired_after, 48} {expired_after, 48}

View File

@ -1,4 +1,6 @@
#!/bin/sh #!/bin/sh
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
@ -79,303 +81,11 @@ ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin
# Setup command to control the node # Setup command to control the node
NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG"
# Check the first argument for instructions RES=`$NODETOOL ping`
case "$1" in if [ "$RES" != "pong" ]; then
status) echo "Node is not running!"
if [ $# -ne 1 ]; then exit 1
echo "Usage: $SCRIPT status" fi
exit 1
fi
RES=`$NODETOOL ping` $NODETOOL rpc emqttd_ctl run $@
if [ "$RES" != "pong" ]; then
echo "Node is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl status $@
;;
cluster)
if [ $# -gt 2 ]; then
echo "Usage: $SCRIPT cluster [<Node>]"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl cluster $@
;;
useradd)
if [ $# -ne 3 ]; then
echo "Usage: $SCRIPT useradd <Username> <Password>"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl useradd $@
;;
userdel)
if [ $# -ne 2 ]; then
echo "Usage: $SCRIPT userdel <Username>"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl userdel $@
;;
vm)
if [ $# -gt 2 ]; then
echo "Usage: $SCRIPT vm [ load | memory | process | io ]"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl vm $@
;;
broker)
if [ $# -ne 1 ]; then
echo "Usage: $SCRIPT broker"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl broker $@
;;
stats)
if [ $# -ne 1 ]; then
echo "Usage: $SCRIPT stats"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl stats $@
;;
metrics)
if [ $# -ne 1 ]; then
echo "Usage: $SCRIPT metrics"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl metrics $@
;;
bridges)
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then
$NODETOOL rpc emqttd_ctl bridges list
elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then
$NODETOOL rpc emqttd_ctl bridges options
elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then
shift
$NODETOOL rpc emqttd_ctl bridges $@
elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then
shift
$NODETOOL rpc emqttd_ctl bridges $@
else
echo "Usage: "
echo "$SCRIPT bridges list"
echo "$SCRIPT bridges start <Node> <Topic>"
echo "$SCRIPT bridges start <Node> <Topic> <Options>"
echo "$SCRIPT bridges stop <Node> <Topic>"
exit 1
fi
;;
clients)
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
if [ $# -eq 2 -a $2 = "list" ]; then
$NODETOOL rpc emqttd_ctl clients list
elif [ $# -eq 3 ]; then
shift
$NODETOOL rpc emqttd_ctl clients $@
else
echo "Usage: "
echo "$SCRIPT clients list"
echo "$SCRIPT clients show <ClientId>"
echo "$SCRIPT clients kick <ClientId>"
exit 1
fi
;;
sessions)
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
if [ $# -eq 2 -a $2 = "list" ]; then
$NODETOOL rpc emqttd_ctl sessions list
elif [ $# -eq 3 ]; then
shift
$NODETOOL rpc emqttd_ctl sessions $@
else
echo "Usage: "
echo "$SCRIPT sessions list"
echo "$SCRIPT sessions show <ClientId>"
exit 1
fi
;;
plugins)
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
if [ $# -eq 2 -a $2 = "list" ]; then
$NODETOOL rpc emqttd_ctl plugins list
elif [ $# -eq 3 ]; then
shift
$NODETOOL rpc emqttd_ctl plugins $@
else
echo "Usage: "
echo "$SCRIPT plugins list"
echo "$SCRIPT plugins load <Plugin>"
echo "$SCRIPT plugins unload <Plugin>"
exit 1
fi
;;
listeners)
if [ $# -gt 1 ]; then
echo "Usage: $SCRIPT listeners"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqttd_ctl listeners $@
;;
trace)
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
if [ $# -eq 2 -a $2 = "list" ]; then
$NODETOOL rpc emqttd_ctl trace list
elif [ $# -eq 4 ]; then
shift
$NODETOOL rpc emqttd_ctl trace $@
else
echo "Usage: "
echo "$SCRIPT trace list"
echo "$SCRIPT trace client <ClientId> <LogFile>"
echo "$SCRIPT trace client <ClientId> off"
echo "$SCRIPT trace topic <Topic> <LogFile>"
echo "$SCRIPT trace topic <Topic> off"
exit 1
fi
;;
*)
echo "Usage: $SCRIPT"
echo " status #query broker status"
echo " vm [ load | memory | process | io ] #query load, memory, process and io of erlang vm"
echo " broker #query broker version, uptime and description"
echo " stats #query broker statistics of clients, topics, subscribers"
echo " metrics #query broker metrics"
echo " cluster [<Node>] #query or cluster nodes"
echo " ----------------------------------------------------------------"
echo " clients list #list all clients"
echo " clients show <ClientId> #show a client"
echo " clients kick <ClientId> #kick a client"
echo " sessions list #list all sessions"
echo " sessions show <ClientId> #show a sessions"
echo " ----------------------------------------------------------------"
echo " plugins list #query loaded plugins"
echo " plugins load <Plugin> #load plugin"
echo " plugins unload <Plugin> #unload plugin"
echo " ----------------------------------------------------------------"
echo " bridges list #query bridges"
echo " bridges options #bridge options"
echo " bridges start <Node> <Topic> #start bridge"
echo " bridges start <Node> <Topic> <Options> #start bridge with options"
echo " bridges stop <Node> <Topic> #stop bridge"
echo " ----------------------------------------------------------------"
echo " useradd <Username> <Password> #add user"
echo " userdel <Username> #delete user"
echo " ----------------------------------------------------------------"
echo " listeners #query broker listeners"
echo " ----------------------------------------------------------------"
echo " trace list #query all traces"
echo " trace client <ClientId> <LogFile> #trace client with ClientId"
echo " trace client <ClientId> off #stop to trace client"
echo " trace topic <Topic> <LogFile> #trace topic with Topic"
echo " trace topic <Topic> off #stop to trace Topic"
exit 1
;;
esac

117
rel/files/emqttd_top Executable file
View File

@ -0,0 +1,117 @@
#!/bin/sh
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true"
export POSIX_SHELL
# To support 'whoami' add /usr/ucb to path
PATH=/usr/ucb:$PATH
export PATH
exec /usr/bin/ksh $0 "$@"
fi
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well
RUNNER_SCRIPT_DIR={{runner_script_dir}}
RUNNER_SCRIPT=${0##*/}
RUNNER_BASE_DIR={{runner_base_dir}}
RUNNER_ETC_DIR={{runner_etc_dir}}
RUNNER_LIB_DIR={{platform_lib_dir}}
RUNNER_USER={{runner_user}}
WHOAMI=$(whoami)
# Make sure this script is running as the appropriate user
if ([ "$RUNNER_USER" ] && [ "x$WHOAMI" != "x$RUNNER_USER" ]); then
type sudo > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "sudo doesn't appear to be installed and your EUID isn't $RUNNER_USER" 1>&2
exit 1
fi
echo "Attempting to restart script through sudo -H -u $RUNNER_USER" >&2
exec sudo -H -u $RUNNER_USER -i $RUNNER_SCRIPT_DIR/$RUNNER_SCRIPT $@
fi
# Make sure CWD is set to runner base dir
cd $RUNNER_BASE_DIR
# Extract the target node name from node.args
NAME_ARG=`egrep "^ *-s?name" $RUNNER_ETC_DIR/vm.args`
if [ -z "$NAME_ARG" ]; then
echo "vm.args needs to have either -name or -sname parameter."
exit 1
fi
# Learn how to specify node name for connection from remote nodes
echo "$NAME_ARG" | grep '^-sname' > /dev/null 2>&1
if [ "X$?" = "X0" ]; then
NAME_PARAM="-sname"
NAME_HOST=""
else
NAME_PARAM="-name"
echo "$NAME_ARG" | grep '@.*' > /dev/null 2>&1
if [ "X$?" = "X0" ]; then
NAME_HOST=`echo "${NAME_ARG}" | sed -e 's/.*\(@.*\)$/\1/'`
else
NAME_HOST=""
fi
fi
# Extract the target cookie
COOKIE_ARG=`grep '\-setcookie' $RUNNER_ETC_DIR/vm.args`
if [ -z "$COOKIE_ARG" ]; then
echo "vm.args needs to have a -setcookie parameter."
exit 1
fi
# Identify the script name
SCRIPT=`basename $0`
# Parse out release and erts info
START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data`
ERTS_VSN=${START_ERL% *}
APP_VSN=${START_ERL#* }
# Add ERTS bin dir to our path
ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin
NODE_NAME=${NAME_ARG#* }
# Setup command to control the node
NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG"
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "Node is not running!"
exit 1
fi
case "$1" in
runtime)
SORTBY="runtime"
;;
reductions)
SORTBY="reductions"
;;
memory)
SORTBY="memory"
;;
msg_q)
SORTBY="msg_q"
;;
*)
echo "Usage: $SCRIPT {runtime | reductions | memory | msg_q}"
exit 1
;;
esac
MYPID=$$
ETOP_ARGS="-sort $SORTBY -interval 10 -lines 50 -tracing off"
$ERTS_PATH/erl -noshell -noinput \
-pa $RUNNER_LIB_DIR/basho-patches \
-hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \
-s etop -s erlang halt -output text \
-node $NODE_NAME $ETOP_ARGS

View File

@ -19,6 +19,7 @@
inets, inets,
goldrush, goldrush,
compiler, compiler,
runtime_tools,
lager, lager,
{gen_logger, load}, {gen_logger, load},
gproc, gproc,
@ -54,6 +55,8 @@
{app, eldap, [{incl_cond, include}]}, {app, eldap, [{incl_cond, include}]},
{app, inets, [{incl_cond, include}]}, {app, inets, [{incl_cond, include}]},
{app, compiler, [{incl_cond, include}]}, {app, compiler, [{incl_cond, include}]},
{app, runtime_tools, [{incl_cond, include}]},
{app, observer, [{incl_cond, include}]},
{app, goldrush, [{incl_cond, include}]}, {app, goldrush, [{incl_cond, include}]},
{app, gen_logger, [{incl_cond, include}]}, {app, gen_logger, [{incl_cond, include}]},
{app, lager, [{incl_cond, include}]}, {app, lager, [{incl_cond, include}]},
@ -78,13 +81,14 @@
{template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"}, {template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"},
{template, "files/emqttd", "bin/emqttd"}, {template, "files/emqttd", "bin/emqttd"},
{template, "files/emqttd_ctl", "bin/emqttd_ctl"}, {template, "files/emqttd_ctl", "bin/emqttd_ctl"},
{template, "files/emqttd_top", "bin/emqttd_top"},
{template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {template, "files/emqttd.cmd", "bin/emqttd.cmd"},
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
{copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"},
{copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"},
{template, "files/emqttd.config.development", "etc/emqttd.config"}, {template, "files/emqttd.config.production", "etc/emqttd.config"},
{template, "files/emqttd.config.production", "etc/emqttd.config.production"}, {template, "files/emqttd.config.development", "etc/emqttd.config.development"},
{template, "files/acl.config", "etc/acl.config"}, {template, "files/acl.config", "etc/acl.config"},
{template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/rewrite.config", "etc/rewrite.config"},
{template, "files/clients.config", "etc/clients.config"}, {template, "files/clients.config", "etc/clients.config"},

View File

@ -24,6 +24,7 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_app). -module(emqttd_app).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -51,6 +52,7 @@ start(_StartType, _StartArgs) ->
emqttd_mnesia:start(), emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(), {ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup), start_servers(Sup),
emqttd_cli:load(),
emqttd:load_all_mods(), emqttd:load_all_mods(),
emqttd_plugins:load(), emqttd_plugins:load(),
start_listeners(), start_listeners(),
@ -71,7 +73,8 @@ start_listeners() ->
emqttd:open_listeners(Listeners). emqttd:open_listeners(Listeners).
start_servers(Sup) -> start_servers(Sup) ->
Servers = [{"emqttd trace", emqttd_trace}, Servers = [{"emqttd ctl", emqttd_ctl},
{"emqttd trace", emqttd_trace},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}},
{"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}},

View File

@ -30,6 +30,11 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_cli.hrl").
%% CLI callbacks
-export([useradd/1, userdel/1]).
-behaviour(emqttd_auth_mod). -behaviour(emqttd_auth_mod).
-export([add_user/2, remove_user/1, -export([add_user/2, remove_user/1,
@ -42,6 +47,22 @@
-record(?AUTH_USERNAME_TAB, {username, password}). -record(?AUTH_USERNAME_TAB, {username, password}).
%%%=============================================================================
%%% CLI
%%%=============================================================================
useradd([Username, Password]) ->
?PRINT("~p~n", [add_user(list_to_binary(Username), list_to_binary(Password))]);
useradd(_) ->
?PRINT_CMD("useradd <Username> <Password>", "add user").
userdel([Username]) ->
?PRINT("~p~n", [remove_user(list_to_binary(Username))]);
userdel(_) ->
?PRINT_CMD("userdel <Username>", "delete user").
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%============================================================================= %%%=============================================================================
@ -67,6 +88,8 @@ init(Opts) ->
{disc_copies, [node()]}, {disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies),
emqttd_ctl:register_cmd(useradd, {?MODULE, useradd}, []),
emqttd_ctl:register_cmd(userdel, {?MODULE, userdel}, []),
{ok, Opts}. {ok, Opts}.
check(#mqtt_client{username = undefined}, _Password, _Opts) -> check(#mqtt_client{username = undefined}, _Password, _Opts) ->

View File

@ -38,6 +38,12 @@
-export([init/1]). -export([init/1]).
%%%=============================================================================
%%% CLI
%%%=============================================================================
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%============================================================================= %%%=============================================================================

View File

@ -28,7 +28,7 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd.hrl"). -include("emqttd.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
@ -279,7 +279,8 @@ handle_info(_Info, State) ->
terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) ->
stop_tick(Hb), stop_tick(Hb),
stop_tick(TRef). stop_tick(TRef),
ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.

465
src/emqttd_cli.erl Normal file
View File

@ -0,0 +1,465 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd cli.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_cli).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-include("emqttd_cli.hrl").
-import(lists, [foreach/2]).
-import(proplists, [get_value/2]).
-export([load/0]).
-export([status/1, broker/1, cluster/1, bridges/1,
clients/1, sessions/1, plugins/1, listeners/1,
vm/1, mnesia/1, trace/1]).
-define(PROC_INFOKEYS, [status,
memory,
message_queue_len,
total_heap_size,
heap_size,
stack_size,
reductions]).
load() ->
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
[emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
is_cmd(Fun) ->
not lists:member(Fun, [init, load, module_info]).
%%%=============================================================================
%%% Commands
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Node status
%% @end
%%------------------------------------------------------------------------------
status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
case lists:keysearch(emqttd, 1, application:which_applications()) of
false ->
?PRINT_MSG("emqttd is not running~n");
{value,_Version} ->
?PRINT_MSG("emqttd is running~n")
end;
status(_) ->
?PRINT_CMD("status", "query broker status").
%%------------------------------------------------------------------------------
%% @doc Query broker
%% @end
%%------------------------------------------------------------------------------
broker([]) ->
Funs = [sysdescr, version, uptime, datetime],
foreach(fun(Fun) ->
?PRINT("~-10s: ~s~n", [Fun, emqttd_broker:Fun()])
end, Funs);
broker(["stats"]) ->
foreach(fun({Stat, Val}) ->
?PRINT("~-20s: ~w~n", [Stat, Val])
end, emqttd_stats:getstats());
broker(["metrics"]) ->
foreach(fun({Metric, Val}) ->
?PRINT("~-24s: ~w~n", [Metric, Val])
end, lists:sort(emqttd_metrics:all()));
broker(["pubsub"]) ->
Pubsubs = supervisor:which_children(emqttd_pubsub_sup),
foreach(fun({{_, Id}, Pid, _, _}) ->
ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
?PRINT("pubsub: ~w~n", [Id]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-18s: ~w~n", [Key, Val])
end, ProcInfo)
end, lists:reverse(Pubsubs));
broker(_) ->
?USAGE([{"broker", "query broker version, uptime and description"},
{"broker pubsub", "query process_info of pubsub"},
{"borker stats", "query broker statistics of clients, topics, subscribers"},
{"broker metrics", "query broker metrics"}]).
%%------------------------------------------------------------------------------
%% @doc Cluster with other node
%% @end
%%------------------------------------------------------------------------------
cluster([]) ->
Nodes = emqttd_broker:running_nodes(),
?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster(usage) ->
?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info ");
cluster([SNode]) ->
Node = node_name(SNode),
case lists:member(Node, emqttd_broker:running_nodes()) of
true ->
?PRINT("~s is already clustered~n", [Node]);
false ->
cluster(Node, fun() ->
emqttd_plugins:unload(),
application:stop(emqttd),
application:stop(esockd),
application:stop(gproc),
emqttd_mnesia:cluster(Node),
application:start(gproc),
application:start(esockd),
application:start(emqttd)
end)
end;
cluster(_) ->
cluster(usage).
cluster(Node, DoCluster) ->
cluster(net_adm:ping(Node), Node, DoCluster).
cluster(pong, Node, DoCluster) ->
case emqttd:is_running(Node) of
true ->
DoCluster(),
?PRINT("cluster with ~s successfully.~n", [Node]);
false ->
?PRINT("emqttd is not running on ~s~n", [Node])
end;
cluster(pang, Node, _DoCluster) ->
?PRINT("Failed to connect ~s~n", [Node]).
%%------------------------------------------------------------------------------
%% @doc Query clients
%% @end
%%------------------------------------------------------------------------------
clients(["list"]) ->
emqttd_mnesia:dump(ets, mqtt_client, fun print/1);
clients(["show", ClientId]) ->
case emqttd_cm:lookup(list_to_binary(ClientId)) of
undefined -> ?PRINT_MSG("Not Found.~n");
Client -> print(Client)
end;
clients(["kick", ClientId]) ->
case emqttd_cm:lookup(list_to_binary(ClientId)) of
undefined ->
?PRINT_MSG("Not Found.~n");
#mqtt_client{client_pid = Pid} ->
emqttd_client:kick(Pid)
end;
clients(_) ->
?USAGE([{"clients list", "list all clients"},
{"clients show <ClientId>", "show a client"},
{"clients kick <ClientId>", "kick a client"}]).
%%------------------------------------------------------------------------------
%% @doc Sessions Command
%% @end
%%------------------------------------------------------------------------------
sessions(["list"]) ->
[sessions(["list", Type]) || Type <- ["persistent", "transient"]];
sessions(["list", "persistent"]) ->
emqttd_mnesia:dump(ets, mqtt_persistent_session, fun print/1);
sessions(["list", "transient"]) ->
emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1);
sessions(["show", ClientId]) ->
MP = {{list_to_binary(ClientId), '_'}, '_'},
case {ets:match_object(mqtt_transient_session, MP),
ets:match_object(mqtt_persistent_session, MP)} of
{[], []} ->
?PRINT_MSG("Not Found.~n");
{[SessInfo], _} ->
print(SessInfo);
{_, [SessInfo]} ->
print(SessInfo)
end;
sessions(_) ->
?USAGE([{"sessions list", "list all sessions"},
{"sessions list persistent", "list all persistent sessions"},
{"sessions list transient", "list all transient sessions"},
{"sessions show <ClientId>", "show a session"}]).
plugins(["list"]) ->
foreach(fun print/1, emqttd_plugins:list());
plugins(["load", Name]) ->
case emqttd_plugins:load(list_to_atom(Name)) of
{ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
{error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason])
end;
plugins(["unload", Name]) ->
case emqttd_plugins:unload(list_to_atom(Name)) of
ok ->
?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
{error, Reason} ->
?PRINT("unload plugin error: ~p~n", [Reason])
end;
plugins(_) ->
?USAGE([{"plugins list", "query loaded plugins"},
{"plugins load <Plugin>", "load plugin"},
{"plugins unload <Plugin>", "unload plugin"}]).
%%------------------------------------------------------------------------------
%% @doc Bridges command
%% @end
%%------------------------------------------------------------------------------
bridges(["list"]) ->
foreach(fun({{Node, Topic}, _Pid}) ->
?PRINT("bridge: ~s ~s~n", [Node, Topic])
end, emqttd_bridge_sup:bridges());
bridges(["options"]) ->
?PRINT_MSG("Options:~n"),
?PRINT_MSG(" qos = 0 | 1 | 2~n"),
?PRINT_MSG(" prefix = string~n"),
?PRINT_MSG(" suffix = string~n"),
?PRINT_MSG(" queue = integer~n"),
?PRINT_MSG("Example:~n"),
?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
bridges(["start", SNode, Topic]) ->
case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(["start", SNode, Topic, OptStr]) ->
Opts = parse_opts(bridge, OptStr),
case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(["stop", SNode, Topic]) ->
case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
ok -> ?PRINT_MSG("bridge is stopped.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(_) ->
?USAGE([{"bridges list", "query bridges"},
{"bridges options", "bridge options"},
{"bridges start <Node> <Topic>", "start bridge"},
{"bridges start <Node> <Topic> <Options>", "start bridge with options"},
{"bridges stop <Node> <Topic>", "stop bridge"}]).
parse_opts(Cmd, OptStr) ->
Tokens = string:tokens(OptStr, ","),
[parse_opt(Cmd, list_to_atom(Opt), Val)
|| [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
parse_opt(bridge, qos, Qos) ->
{qos, list_to_integer(Qos)};
parse_opt(bridge, suffix, Suffix) ->
{topic_suffix, list_to_binary(Suffix)};
parse_opt(bridge, prefix, Prefix) ->
{topic_prefix, list_to_binary(Prefix)};
parse_opt(bridge, queue, Len) ->
{max_queue_len, list_to_integer(Len)};
parse_opt(_Cmd, Opt, _Val) ->
?PRINT("Bad Option: ~s~n", [Opt]).
%%------------------------------------------------------------------------------
%% @doc vm command
%% @end
%%------------------------------------------------------------------------------
vm([]) ->
vm(["all"]);
vm(["all"]) ->
[vm([Name]) || Name <- ["load", "memory", "process", "io"]];
vm(["load"]) ->
[?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()];
vm(["memory"]) ->
[?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
vm(["process"]) ->
foreach(fun({Name, Key}) ->
?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
end, [{limit, process_limit}, {count, process_count}]);
vm(["io"]) ->
IoInfo = erlang:system_info(check_io),
foreach(fun(Key) ->
?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)])
end, [max_fds, active_fds]);
vm(_) ->
?USAGE([{"vm all", "query info of erlang vm"},
{"vm load", "query load of erlang vm"},
{"vm memory", "query memory of erlang vm"},
{"vm process", "query process of erlang vm"},
{"vm io", "queue io of erlang vm"}]).
%%------------------------------------------------------------------------------
%% @doc mnesia Command
%% @end
%%------------------------------------------------------------------------------
mnesia([]) ->
mnesia:system_info();
mnesia(_) ->
?PRINT_CMD("mnesia", "mnesia system info").
%%------------------------------------------------------------------------------
%% @doc Trace Command
%% @end
%%------------------------------------------------------------------------------
trace(["list"]) ->
foreach(fun({{Who, Name}, LogFile}) ->
?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
end, emqttd_trace:all_traces());
trace(["client", ClientId, "off"]) ->
trace_off(client, ClientId);
trace(["client", ClientId, LogFile]) ->
trace_on(client, ClientId, LogFile);
trace(["topic", Topic, "off"]) ->
trace_off(topic, Topic);
trace(["topic", Topic, LogFile]) ->
trace_on(topic, Topic, LogFile);
trace(_) ->
?USAGE([{"trace list", "query all traces"},
{"trace client <ClientId> <LogFile>","trace client with ClientId"},
{"trace client <ClientId> off", "stop to trace client"},
{"trace topic <Topic> <LogFile>", "trace topic with Topic"},
{"trace topic <Topic> off", "stop to trace Topic"}]).
trace_on(Who, Name, LogFile) ->
case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of
ok ->
?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
end.
trace_off(Who, Name) ->
case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of
ok ->
?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
end.
%%------------------------------------------------------------------------------
%% @doc Listeners Command
%% @end
%%------------------------------------------------------------------------------
listeners([]) ->
foreach(fun({{Protocol, Port}, Pid}) ->
Info = [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)},
{current_clients,esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}],
?PRINT("listener on ~s:~w~n", [Protocol, Port]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-16s: ~w~n", [Key, Val])
end, Info)
end, esockd:listeners());
listeners(_) ->
?PRINT_CMD("listeners", "query broker listeners").
node_name(SNode) ->
SNode1 =
case string:tokens(SNode, "@") of
[_Node, _Server] ->
SNode;
_ ->
case net_kernel:longnames() of
true ->
SNode ++ "@" ++ inet_db:gethostname() ++
"." ++ inet_db:res_option(domain);
false ->
SNode ++ "@" ++ inet_db:gethostname();
_ ->
SNode
end
end,
list_to_atom(SNode1).
print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
[Name, Ver, Descr, Active]);
print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
username = Username, peername = Peername,
connected_at = ConnectedAt}) ->
?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
[ClientId, CleanSess, Username,
emqttd_net:format(Peername),
emqttd_util:now_to_secs(ConnectedAt)]);
print({{ClientId, _ClientPid}, SessInfo}) ->
InfoKeys = [clean_sess,
max_inflight,
inflight_queue,
message_queue,
message_dropped,
awaiting_rel,
awaiting_ack,
awaiting_comp,
created_at,
subscriptions],
?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
"message_queue=~w, message_dropped=~w, "
"awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
"created_at=~w, subscriptions=~s)~n",
[ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
format(created_at, Val) ->
emqttd_util:now_to_secs(Val);
format(subscriptions, List) ->
string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
format(_, Val) ->
Val.

View File

@ -34,7 +34,10 @@
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/2, session/1, info/1, kick/1, subscribe/2]). -export([start_link/2, session/1, info/1, kick/1]).
%% SUB/UNSUB Asynchronously
-export([subscribe/2, unsubscribe/2]).
-behaviour(gen_server). -behaviour(gen_server).
@ -59,7 +62,7 @@ start_link(SockArgs, MqttEnv) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
session(CPid) -> session(CPid) ->
gen_server:call(CPid, session). gen_server:call(CPid, session, infinity).
info(CPid) -> info(CPid) ->
gen_server:call(CPid, info, infinity). gen_server:call(CPid, info, infinity).
@ -70,6 +73,9 @@ kick(CPid) ->
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
gen_server:cast(CPid, {subscribe, TopicTable}). gen_server:cast(CPid, {subscribe, TopicTable}).
unsubscribe(CPid, Topics) ->
gen_server:cast(CPid, {unsubscribe, Topics}).
init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
% Transform if ssl. % Transform if ssl.
{ok, NewSock} = esockd_connection:accept(SockArgs), {ok, NewSock} = esockd_connection:accept(SockArgs),
@ -107,9 +113,11 @@ handle_call(Req, _From, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
{reply, {error, unsupported_request}, State}. {reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> handle_cast({subscribe, TopicTable}, State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State);
noreply(State#state{proto_state = ProtoState1});
handle_cast({unsubscribe, Topics}, State) ->
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
handle_cast(Msg, State = #state{peername = Peername}) -> handle_cast(Msg, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
@ -149,17 +157,26 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), StatFun = fun() ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error}
end
end,
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
noreply(State#state{keepalive = KeepAlive}); noreply(State#state{keepalive = KeepAlive});
handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
case emqttd_keepalive:resume(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
timeout -> {ok, KeepAlive1} ->
lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
noreply(State#state{keepalive = KeepAlive1});
{error, timeout} ->
lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
{resumed, KeepAlive1} -> {error, Error} ->
lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
noreply(State#state{keepalive = KeepAlive1}) stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
end; end;
handle_info(Info, State = #state{peername = Peername}) -> handle_info(Info, State = #state{peername = Peername}) ->
@ -188,12 +205,20 @@ terminate(Reason, #state{peername = Peername,
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
noreply(State) -> noreply(State) ->
{noreply, State, hibernate}. {noreply, State, hibernate}.
%------------------------------------------------------- stop(Reason, State) ->
% receive and parse tcp data {stop, Reason, State}.
%-------------------------------------------------------
with_session(Fun, State = #state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)), noreply(State).
%% receive and parse tcp data
received(<<>>, State) -> received(<<>>, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
@ -244,12 +269,8 @@ control_throttle(State = #state{conn_state = Flow,
{_, _} -> run_socket(State) {_, _} -> run_socket(State)
end. end.
stop(Reason, State) ->
{stop, Reason, State}.
received_stats(?PACKET(Type)) -> received_stats(?PACKET(Type)) ->
emqttd_metrics:inc('packets/received'), emqttd_metrics:inc('packets/received'), inc(Type).
inc(Type).
inc(?CONNECT) -> inc(?CONNECT) ->
emqttd_metrics:inc('packets/connect'); emqttd_metrics:inc('packets/connect');
inc(?PUBLISH) -> inc(?PUBLISH) ->

View File

@ -30,22 +30,22 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-behaviour(gen_server2).
-define(SERVER, ?MODULE).
%% API Exports %% API Exports
-export([start_link/2, pool/0]). -export([start_link/2, pool/0]).
-export([lookup/1, register/1, unregister/1]). -export([lookup/1, register/1, unregister/1]).
-behaviour(gen_server2).
-define(SERVER, ?MODULE).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {id, statsfun}). -record(state, {id, statsfun}).
-define(CM_POOL, cm_pool). -define(CM_POOL, ?MODULE).
%%%============================================================================= %%%=============================================================================
%%% API %%% API

View File

@ -20,331 +20,120 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd control commands. %%% emqttd control.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_ctl). -module(emqttd_ctl).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-define(PRINT_MSG(Msg), -include("emqttd_cli.hrl").
io:format(Msg)).
-define(PRINT(Format, Args), -behaviour(gen_server).
io:format(Format, Args)).
-export([status/1, -define(SERVER, ?MODULE).
vm/1,
broker/1, %% API Function Exports
stats/1, -export([start_link/0,
metrics/1, register_cmd/3,
cluster/1, unregister_cmd/1,
clients/1, run/1]).
sessions/1,
listeners/1, %% gen_server Function Exports
bridges/1, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
plugins/1, terminate/2, code_change/3]).
trace/1,
useradd/1, -record(state, {seq = 0}).
userdel/1]).
-define(CMD_TAB, mqttd_ctl_cmd).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Query node status %% @doc Register a command
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
status([]) -> -spec register_cmd(atom(), {module(), atom()}, list()) -> true.
{InternalStatus, _ProvidedStatus} = init:get_status(), register_cmd(Cmd, MF, Opts) ->
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), gen_server:cast(?SERVER, {register_cmd, Cmd, MF, Opts}).
case lists:keysearch(emqttd, 1, application:which_applications()) of
false ->
?PRINT_MSG("emqttd is not running~n");
{value,_Version} ->
?PRINT_MSG("emqttd is running~n")
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Cluster with other node %% @doc Unregister a command
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cluster([]) -> -spec unregister_cmd(atom()) -> true.
Nodes = emqttd_broker:running_nodes(), unregister_cmd(Cmd) ->
?PRINT("cluster nodes: ~p~n", [Nodes]); gen_server:cast(?SERVER, {unregister_cmd, Cmd}).
cluster([SNode]) ->
Node = node_name(SNode),
case net_adm:ping(Node) of
pong ->
case emqttd:is_running(Node) of
true ->
emqttd_plugins:unload(),
application:stop(emqttd),
application:stop(esockd),
application:stop(gproc),
emqttd_mnesia:cluster(Node),
application:start(gproc),
application:start(esockd),
application:start(emqttd),
?PRINT("cluster with ~p successfully.~n", [Node]);
false ->
?PRINT("emqttd is not running on ~p~n", [Node])
end;
pang ->
?PRINT("failed to connect to ~p~n", [Node])
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Add user %% @doc Run a command
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
useradd([Username, Password]) -> run([]) -> usage();
?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]).
run(["help"]) -> usage();
run([CmdS|Args]) ->
Cmd = list_to_atom(CmdS),
case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of
[[{Mod, Fun}]] -> Mod:Fun(Args);
[] -> usage()
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Delete user %% @doc Usage
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
userdel([Username]) -> usage() ->
?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]). ?PRINT("Usage: ~s~n", [?MODULE]),
[begin ?PRINT("~80..-s~n", [""]), Mod:Cmd(usage) end
|| {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)].
vm([]) -> %%%=============================================================================
[vm([Name]) || Name <- ["load", "memory", "process", "io"]]; %%% gen_server callbacks
%%%=============================================================================
vm(["load"]) -> init([]) ->
?PRINT_MSG("Load: ~n"), ets:new(?CMD_TAB, [ordered_set, named_table, protected]),
[?PRINT(" ~s:~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; {ok, #state{seq = 0}}.
vm(["memory"]) -> handle_call(_Request, _From, State) ->
?PRINT_MSG("Memory: ~n"), {reply, ok, State}.
[?PRINT(" ~s:~p~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
vm(["process"]) -> handle_cast({register_cmd, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
?PRINT_MSG("Process: ~n"), ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}),
?PRINT(" process_limit:~p~n", [erlang:system_info(process_limit)]), noreply(next_seq(State));
?PRINT(" process_count:~p~n", [erlang:system_info(process_count)]);
vm(["io"]) -> handle_cast({unregister_cmd, Cmd}, State) ->
?PRINT_MSG("IO: ~n"), ets:match_delete(?CMD_TAB, {{'_', Cmd}, '_', '_'}),
?PRINT(" max_fds:~p~n", [proplists:get_value(max_fds, erlang:system_info(check_io))]). noreply(State);
broker([]) -> handle_cast(_Msg, State) ->
Funs = [sysdescr, version, uptime, datetime], noreply(State).
[?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs].
stats([]) -> handle_info(_Info, State) ->
[?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()]. noreply(State).
metrics([]) -> terminate(_Reason, _State) ->
[?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. ok.
clients(["list"]) -> code_change(_OldVsn, State, _Extra) ->
dump(client, mqtt_client); {ok, State}.
clients(["show", ClientId]) -> %%%=============================================================================
case emqttd_cm:lookup(list_to_binary(ClientId)) of %%% Internal Function Definitions
undefined -> %%%=============================================================================
?PRINT_MSG("Not Found.~n");
Client ->
print(client, Client)
end;
clients(["kick", ClientId]) -> noreply(State) -> {noreply, State, hibernate}.
case emqttd_cm:lookup(list_to_binary(ClientId)) of
undefined ->
?PRINT_MSG("Not Found.~n");
#mqtt_client{client_pid = Pid} ->
emqttd_client:kick(Pid)
end.
sessions(["list"]) -> next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}.
dump(session, mqtt_transient_session),
dump(session, mqtt_persistent_session);
sessions(["show", ClientId]) ->
MP = {{list_to_binary(ClientId), '_'}, '_'},
case {ets:match_object(mqtt_transient_session, MP),
ets:match_object(mqtt_persistent_session, MP)} of
{[], []} ->
?PRINT_MSG("Not Found.~n");
{[SessInfo], _} ->
print(session, SessInfo);
{_, [SessInfo]} ->
print(session, SessInfo)
end.
listeners([]) ->
lists:foreach(fun({{Protocol, Port}, Pid}) ->
?PRINT("listener ~s:~w~n", [Protocol, Port]),
?PRINT(" acceptors: ~w~n", [esockd:get_acceptors(Pid)]),
?PRINT(" max_clients: ~w~n", [esockd:get_max_clients(Pid)]),
?PRINT(" current_clients: ~w~n", [esockd:get_current_clients(Pid)]),
?PRINT(" shutdown_count: ~p~n", [esockd:get_shutdown_count(Pid)])
end, esockd:listeners()).
bridges(["list"]) ->
lists:foreach(fun({{Node, Topic}, _Pid}) ->
?PRINT("bridge: ~s ~s~n", [Node, Topic])
end, emqttd_bridge_sup:bridges());
bridges(["options"]) ->
?PRINT_MSG("Options:~n"),
?PRINT_MSG(" qos = 0 | 1 | 2~n"),
?PRINT_MSG(" prefix = string~n"),
?PRINT_MSG(" suffix = string~n"),
?PRINT_MSG(" queue = integer~n"),
?PRINT_MSG("Example:~n"),
?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
bridges(["start", SNode, Topic]) ->
case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(["start", SNode, Topic, OptStr]) ->
Opts = parse_opts(bridge, OptStr),
case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(["stop", SNode, Topic]) ->
case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), bin(Topic)) of
ok -> ?PRINT_MSG("bridge is stopped.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end.
plugins(["list"]) ->
lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list());
plugins(["load", Name]) ->
case emqttd_plugins:load(list_to_atom(Name)) of
{ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
{error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason])
end;
plugins(["unload", Name]) ->
case emqttd_plugins:unload(list_to_atom(Name)) of
ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
{error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason])
end.
trace(["list"]) ->
lists:foreach(fun({{Who, Name}, LogFile}) ->
?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
end, emqttd_trace:all_traces());
trace(["client", ClientId, "off"]) ->
stop_trace(client, ClientId);
trace(["client", ClientId, LogFile]) ->
start_trace(client, ClientId, LogFile);
trace(["topic", Topic, "off"]) ->
stop_trace(topic, Topic);
trace(["topic", Topic, LogFile]) ->
start_trace(topic, Topic, LogFile).
start_trace(Who, Name, LogFile) ->
case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of
ok ->
?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
end.
stop_trace(Who, Name) ->
case emqttd_trace:stop_trace({Who, bin(Name)}) of
ok ->
?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
end.
node_name(SNode) ->
SNode1 =
case string:tokens(SNode, "@") of
[_Node, _Server] ->
SNode;
_ ->
case net_kernel:longnames() of
true ->
SNode ++ "@" ++ inet_db:gethostname() ++
"." ++ inet_db:res_option(domain);
false ->
SNode ++ "@" ++ inet_db:gethostname();
_ ->
SNode
end
end,
list_to_atom(SNode1).
bin(S) when is_list(S) -> list_to_binary(S);
bin(B) when is_binary(B) -> B.
parse_opts(Cmd, OptStr) ->
Tokens = string:tokens(OptStr, ","),
[parse_opt(Cmd, list_to_atom(Opt), Val)
|| [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
parse_opt(bridge, qos, Qos) ->
{qos, list_to_integer(Qos)};
parse_opt(bridge, suffix, Suffix) ->
{topic_suffix, list_to_binary(Suffix)};
parse_opt(bridge, prefix, Prefix) ->
{topic_prefix, list_to_binary(Prefix)};
parse_opt(bridge, queue, Len) ->
{max_queue_len, list_to_integer(Len)};
parse_opt(_Cmd, Opt, _Val) ->
?PRINT("Bad Option: ~s~n", [Opt]).
dump(Type, Table) ->
dump(Type, Table, ets:first(Table)).
dump(_Type, _Table, '$end_of_table') ->
ok;
dump(Type, Table, Key) ->
case ets:lookup(Table, Key) of
[Record] -> print(Type, Record);
[] -> ignore
end,
dump(Type, Table, ets:next(Table, Key)).
print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess,
username = Username, peername = Peername,
connected_at = ConnectedAt}) ->
?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
[ClientId, CleanSess, Username,
emqttd_net:format(Peername),
emqttd_util:now_to_secs(ConnectedAt)]);
print(session, {{ClientId, _ClientPid}, SessInfo}) ->
InfoKeys = [clean_sess,
max_inflight,
inflight_queue,
message_queue,
message_dropped,
awaiting_rel,
awaiting_ack,
awaiting_comp,
created_at,
subscriptions],
?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
"message_queue=~w, message_dropped=~w, "
"awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
"created_at=~w, subscriptions=~s)~n",
[ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]);
print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
[Name, Ver, Descr, Active]).
format(created_at, Val) ->
emqttd_util:now_to_secs(Val);
format(subscriptions, List) ->
string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
format(_, Val) ->
Val.

View File

@ -23,62 +23,61 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_keepalive). -module(emqttd_keepalive).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-export([new/3, resume/1, cancel/1]). -export([start/3, check/1, cancel/1]).
-record(keepalive, {transport, -record(keepalive, {statfun, statval,
socket, tsec, tmsg, tref,
recv_oct, repeat = 0}).
timeout_sec,
timeout_msg,
timer_ref}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Create a keepalive %% @doc Start a keepalive
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> start(_, 0, _) ->
{ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), undefined;
Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), start(StatFun, TimeoutSec, TimeoutMsg) ->
#keepalive {transport = Transport, {ok, StatVal} = StatFun(),
socket = Socket, #keepalive{statfun = StatFun, statval = StatVal,
recv_oct = RecvOct, tsec = TimeoutSec, tmsg = TimeoutMsg,
timeout_sec = TimeoutSec, tref = timer(TimeoutSec, TimeoutMsg)}.
timeout_msg = TimeoutMsg,
timer_ref = Ref}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Try to resume keepalive, called when timeout %% @doc Check keepalive, called when timeout.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
resume(KeepAlive = #keepalive {transport = Transport, check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
socket = Socket, case StatFun() of
recv_oct = RecvOct, {ok, NewVal} ->
timeout_sec = TimeoutSec, if NewVal =/= LastVal ->
timeout_msg = TimeoutMsg, {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
timer_ref = Ref }) -> Repeat < 1 ->
{ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
if true ->
NewRecvOct =:= RecvOct -> {error, timeout}
timeout; end;
true -> {error, Error} ->
%need? {error, Error}
cancel(Ref),
NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg),
{resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}}
end. end.
resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Cancel Keepalive %% @doc Cancel Keepalive
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cancel(#keepalive{timer_ref = Ref}) -> cancel(#keepalive{tref = TRef}) ->
cancel(Ref); cancel(TRef);
cancel(undefined) -> cancel(undefined) ->
undefined; ok;
cancel(Ref) -> cancel(TRef) ->
catch erlang:cancel_timer(Ref). catch erlang:cancel_timer(TRef).
timer(Sec, Msg) ->
erlang:send_after(timer:seconds(Sec), self(), Msg).

View File

@ -24,6 +24,7 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_metrics). -module(emqttd_metrics).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").

View File

@ -35,6 +35,8 @@
-export([create_table/2, copy_table/1]). -export([create_table/2, copy_table/1]).
-export([dump/3]).
start() -> start() ->
case init_schema() of case init_schema() of
ok -> ok ->
@ -168,3 +170,16 @@ wait_for_mnesia(stop) ->
{error, mnesia_unexpectedly_starting} {error, mnesia_unexpectedly_starting}
end. end.
dump(ets, Table, Fun) ->
dump(ets, Table, ets:first(Table), Fun).
dump(ets, _Table, '$end_of_table', _Fun) ->
ok;
dump(ets, Table, Key, Fun) ->
case ets:lookup(Table, Key) of
[Record] -> Fun(Record);
[] -> ignore
end,
dump(ets, Table, ets:next(Table, Key), Fun).

View File

@ -239,16 +239,11 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id =
case lists:member(deny, AllowDenies) of case lists:member(deny, AllowDenies) of
true -> true ->
%%TODO: return 128 QoS when deny... no need to SUBACK? %%TODO: return 128 QoS when deny... no need to SUBACK?
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]);
{ok, State};
false -> false ->
%%TODO: GrantedQos should be renamed. Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end,
{ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), emqttd_session:subscribe(Session, TopicTable, Callback)
send(?SUBACK_PACKET(PacketId, GrantedQos), State) end,
end;
handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
{ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
{ok, State}; {ok, State};
%% protect from empty topic list %% protect from empty topic list
@ -256,7 +251,7 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State); send(?UNSUBACK_PACKET(PacketId), State);
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
ok = emqttd_session:unsubscribe(Session, Topics), emqttd_session:unsubscribe(Session, Topics),
send(?UNSUBACK_PACKET(PacketId), State); send(?UNSUBACK_PACKET(PacketId), State);
handle(?PACKET(?PINGREQ), State) -> handle(?PACKET(?PINGREQ), State) ->
@ -349,7 +344,7 @@ send_willmsg(ClientId, WillMsg) ->
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec) when Sec > 0 ->
self() ! {keepalive, start, round(Sec * 1.5)}. self() ! {keepalive, start, round(Sec * 1.2)}.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
%% Validate Packets %% Validate Packets

View File

@ -231,6 +231,7 @@ match(Topic) when is_binary(Topic) ->
%%%============================================================================= %%%=============================================================================
init([Id, _Opts]) -> init([Id, _Opts]) ->
process_flag(priority, high),
%%process_flag(min_heap_size, 1024*1024), %%process_flag(min_heap_size, 1024*1024),
gproc_pool:connect_worker(pubsub, {?MODULE, Id}), gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
{ok, #state{id = Id, submap = maps:new()}}. {ok, #state{id = Id, submap = maps:new()}}.

View File

@ -59,7 +59,7 @@
%% PubSub APIs %% PubSub APIs
-export([publish/2, -export([publish/2,
puback/2, pubrec/2, pubrel/2, pubcomp/2, puback/2, pubrec/2, pubrel/2, pubcomp/2,
subscribe/2, unsubscribe/2]). subscribe/2, subscribe/3, unsubscribe/2]).
-behaviour(gen_server2). -behaviour(gen_server2).
@ -166,9 +166,13 @@ destroy(SessPid, ClientId) ->
%% @doc Subscribe Topics %% @doc Subscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
subscribe(SessPid, TopicTable) -> subscribe(SessPid, TopicTable) ->
gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT). subscribe(SessPid, TopicTable, fun(_) -> ok end).
-spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok.
subscribe(SessPid, TopicTable, Callback) ->
gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish message %% @doc Publish message
@ -213,7 +217,7 @@ pubcomp(SessPid, PktId) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unsubscribe(pid(), [binary()]) -> ok. -spec unsubscribe(pid(), [binary()]) -> ok.
unsubscribe(SessPid, Topics) -> unsubscribe(SessPid, Topics) ->
gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT). gen_server2:cast(SessPid, {unsubscribe, Topics}).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -247,26 +251,24 @@ init([CleanSess, ClientId, ClientPid]) ->
{ok, start_collector(Session#session{client_mon = MRef}), hibernate}. {ok, start_collector(Session#session{client_mon = MRef}), hibernate}.
prioritise_call(Msg, _From, _Len, _State) -> prioritise_call(Msg, _From, _Len, _State) ->
case Msg of case Msg of _ -> 0 end.
{unsubscribe, _} -> 2;
{subscribe, _} -> 1;
_ -> 0
end.
prioritise_cast(Msg, _Len, _State) -> prioritise_cast(Msg, _Len, _State) ->
case Msg of case Msg of
{destroy, _} -> 10; {destroy, _} -> 10;
{resume, _, _} -> 9; {resume, _, _} -> 9;
{pubrel, _PktId} -> 8; {pubrel, _PktId} -> 8;
{pubcomp, _PktId} -> 8; {pubcomp, _PktId} -> 8;
{pubrec, _PktId} -> 8; {pubrec, _PktId} -> 8;
{puback, _PktId} -> 7; {puback, _PktId} -> 7;
_ -> 0 {unsubscribe, _, _} -> 6;
{subscribe, _, _} -> 5;
_ -> 0
end. end.
prioritise_info(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) ->
case Msg of case Msg of
{'DOWN', _, process, _, _} -> 10; {'DOWN', _, _, _, _} -> 10;
{'EXIT', _, _} -> 10; {'EXIT', _, _} -> 10;
session_expired -> 10; session_expired -> 10;
{timeout, _, _} -> 5; {timeout, _, _} -> 5;
@ -275,17 +277,40 @@ prioritise_info(Msg, _Len, _State) ->
_ -> 0 _ -> 0
end. end.
handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
subscriptions = Subscriptions}) -> Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case check_awaiting_rel(Session) of
true ->
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false ->
lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message "
"for too many awaiting_rel: ~p", [ClientId, Msg]),
{reply, {error, dropped}, Session}
end;
case TopicTable0 -- Subscriptions of handle_call(Req, _From, State) ->
lager:critical("Unexpected Request: ~p", [Req]),
{reply, ok, State}.
handle_cast({subscribe, TopicTable0, Callback}, Session = #session{
client_id = ClientId, subscriptions = Subscriptions}) ->
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
case TopicTable -- Subscriptions of
[] -> [] ->
{reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session}; catch Callback([Qos || {_, Qos} <- TopicTable]),
noreply(Session);
_ -> _ ->
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
catch Callback(GrantedQos),
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p",
@ -310,11 +335,11 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie
[{Topic, Qos} | Acc] [{Topic, Qos} | Acc]
end end
end, Subscriptions, TopicTable), end, Subscriptions, TopicTable),
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}} noreply(Session#session{subscriptions = Subscriptions1})
end; end;
handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId, handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
@ -333,26 +358,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
{reply, ok, Session#session{subscriptions = Subscriptions1}}; noreply(Session#session{subscriptions = Subscriptions1});
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case check_awaiting_rel(Session) of
true ->
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false ->
lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message "
"for too many awaiting_rel: ~p", [ClientId, Msg]),
{reply, {error, dropped}, Session}
end;
handle_call(Req, _From, State) ->
lager:critical("Unexpected Request: ~p", [Req]),
{reply, ok, State}.
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]), lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]),

View File

@ -55,9 +55,9 @@
-record(state, {id}). -record(state, {id}).
-define(SM_POOL, sm_pool). -define(SM_POOL, ?MODULE).
%% todo... %% TODO...
-define(SESSION_TIMEOUT, 60000). -define(SESSION_TIMEOUT, 60000).
%%%============================================================================= %%%=============================================================================

View File

@ -31,12 +31,12 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-export([start_link/0]).
-behaviour(gen_server). -behaviour(gen_server).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-export([start_link/0]).
%% statistics API. %% statistics API.
-export([statsfun/1, statsfun/2, -export([statsfun/1, statsfun/2,
getstats/0, getstat/1, getstats/0, getstat/1,
@ -52,8 +52,8 @@
%% $SYS Topics for Clients %% $SYS Topics for Clients
-define(SYSTOP_CLIENTS, [ -define(SYSTOP_CLIENTS, [
'clients/count', % clients connected current 'clients/count', % clients connected current
'clients/max' % max clients connected 'clients/max' % max clients connected
]). ]).
%% $SYS Topics for Sessions %% $SYS Topics for Sessions
@ -72,6 +72,7 @@
'queues/max' % ... 'queues/max' % ...
]). ]).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%============================================================================= %%%=============================================================================
@ -102,7 +103,7 @@ statsfun(Stat, MaxStat) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec getstats() -> [{atom(), non_neg_integer()}]. -spec getstats() -> [{atom(), non_neg_integer()}].
getstats() -> getstats() ->
ets:tab2list(?STATS_TAB). lists:sort(ets:tab2list(?STATS_TAB)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Get stats by name %% @doc Get stats by name

View File

@ -24,17 +24,18 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_trace). -module(emqttd_trace).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
-export([start_trace/2, stop_trace/1, all_traces/0]). -export([start_trace/2, stop_trace/1, all_traces/0]).
-behaviour(gen_server).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).

View File

@ -34,7 +34,10 @@
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
%% API Exports %% API Exports
-export([start_link/1, ws_loop/3, subscribe/2]). -export([start_link/1, ws_loop/3, session/1, info/1, kick/1]).
%% SUB/UNSUB Asynchronously
-export([subscribe/2, unsubscribe/2]).
-behaviour(gen_server). -behaviour(gen_server).
@ -61,9 +64,21 @@ start_link(Req) ->
packet_opts = PktOpts, packet_opts = PktOpts,
parser = emqttd_parser:new(PktOpts)}). parser = emqttd_parser:new(PktOpts)}).
session(CPid) ->
gen_server:call(CPid, session, infinity).
info(CPid) ->
gen_server:call(CPid, info, infinity).
kick(CPid) ->
gen_server:call(CPid, kick).
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
gen_server:cast(CPid, {subscribe, TopicTable}). gen_server:cast(CPid, {subscribe, TopicTable}).
unsubscribe(CPid, Topics) ->
gen_server:cast(CPid, {unsubscribe, Topics}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @private %% @private
%% @doc Start WebSocket client. %% @doc Start WebSocket client.
@ -112,17 +127,30 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]), ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]),
{ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State};
handle_call(info, _From, State = #client_state{request = Req,
proto_state = ProtoState}) ->
{reply, [{websocket, true}, {peer, Req:get(peer)}
| emqttd_protocol:info(ProtoState)], State};
handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State};
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
{reply, error, State}. {reply, error, State}.
handle_cast({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) -> handle_cast({subscribe, TopicTable}, State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State);
{noreply, State#client_state{proto_state = ProtoState1}, hibernate};
handle_cast({unsubscribe, Topics}, State) ->
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
case emqttd_protocol:received(Packet, ProtoState) of case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
{noreply, State#client_state{proto_state = ProtoState1}}; noreply(State#client_state{proto_state = ProtoState1});
{error, Error} -> {error, Error} ->
lager:error("MQTT protocol error ~p", [Error]), lager:error("MQTT protocol error ~p", [Error]),
stop({shutdown, Error}, State); stop({shutdown, Error}, State);
@ -137,11 +165,11 @@ handle_cast(_Msg, State) ->
handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
{noreply, State#client_state{proto_state = ProtoState1}}; noreply(State#client_state{proto_state = ProtoState1});
handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) -> handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#client_state{proto_state = ProtoState1}}; noreply(State#client_state{proto_state = ProtoState1});
handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) -> handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) ->
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
@ -149,18 +177,27 @@ handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = P
handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) ->
lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)}, Socket = Req:get(socket),
TimeoutSec, {keepalive, timeout}), StatFun = fun() ->
{noreply, State#client_state{keepalive = KeepAlive}}; case esockd_transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error}
end
end,
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
noreply(State#client_state{keepalive = KeepAlive});
handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) -> handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) ->
case emqttd_keepalive:resume(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
timeout -> {ok, KeepAlive1} ->
lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
noreply(State#client_state{keepalive = KeepAlive1});
{error, timeout} ->
lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
{resumed, KeepAlive1} -> {error, Error} ->
lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]),
{noreply, State#client_state{keepalive = KeepAlive1}} stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined})
end; end;
handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) ->
@ -170,7 +207,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto
handle_info(Info, State = #client_state{request = Req}) -> handle_info(Info, State = #client_state{request = Req}) ->
lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]),
{noreply, State}. noreply(State).
terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
lager:info("WebSocket client terminated: ~p", [Reason]), lager:info("WebSocket client terminated: ~p", [Reason]),
@ -189,6 +226,12 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
noreply(State) ->
{noreply, State, hibernate}.
stop(Reason, State ) -> stop(Reason, State ) ->
{stop, Reason, State}. {stop, Reason, State}.
with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)), noreply(State).

View File

@ -0,0 +1,44 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
-module(emqttd_keepalive_tests).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
keepalive_test() ->
KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
?assertEqual([resumed, timeout], lists:reverse(loop(KA, []))).
loop(KA, Acc) ->
receive
{keepalive, timeout} ->
case emqttd_keepalive:check(KA) of
{ok, KA1} -> loop(KA1, [resumed | Acc]);
{error, timeout} -> [timeout | Acc]
end
after 4000 ->
Acc
end.
-endif.