From 470d5644efcbb3881d5437975b881abd26d63637 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 29 Sep 2015 17:31:10 +0800 Subject: [PATCH 01/31] priority, high --- src/emqttd_pubsub.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 74da5026f..8bd4534e0 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -231,6 +231,7 @@ match(Topic) when is_binary(Topic) -> %%%============================================================================= init([Id, _Opts]) -> + process_flag(priority, high), %%process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. From e09148b0409f61538cd018a96c5049a97e91dabe Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 30 Sep 2015 03:06:17 +0800 Subject: [PATCH 02/31] cli --- src/emqttd_app.erl | 2 + src/emqttd_auth_username.erl | 22 ++++ src/emqttd_broker.erl | 21 +++- src/emqttd_ctl.erl | 194 ++++++++++++++++++----------------- src/emqttd_metrics.erl | 18 +++- src/emqttd_plugins.erl | 34 ++++++ src/emqttd_trace.erl | 52 +++++++++- 7 files changed, 243 insertions(+), 100 deletions(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 446bff8a9..7a9449315 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_app). -author("Feng Lee "). @@ -49,6 +50,7 @@ start(_StartType, _StartArgs) -> print_banner(), emqttd_mnesia:start(), + emqttd_ctl:init(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 789313e3a..8f7331f35 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -29,6 +29,10 @@ -author('feng@emqtt.io'). -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +%% CLI callbacks +-export([cli_useradd/1, cli_userdel/1]). -behaviour(emqttd_auth_mod). @@ -42,6 +46,22 @@ -record(?AUTH_USERNAME_TAB, {username, password}). +%%%============================================================================= +%%% CLI +%%%============================================================================= + +cli_useradd([Username, Password]) -> + ?PRINT("~p~n", [add_user(list_to_binary(Username), list_to_binary(Password))]); + +cli_useradd(_) -> + ?PRINT_CMD("useradd ", "#add user"). + +cli_userdel([Username]) -> + ?PRINT("~p~n", [remove_user(list_to_binary(Username))]); + +cli_userdel(_) -> + ?PRINT_CMD("userdel ", "#delete user"). + %%%============================================================================= %%% API %%%============================================================================= @@ -67,6 +87,8 @@ init(Opts) -> {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), + emqttd_ctl:register_cmd(useradd, {?MODULE, cli_useradd}, []), + emqttd_ctl:register_cmd(userdel, {?MODULE, cli_userdel}, []), {ok, Opts}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 2b1210221..fe77df056 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -28,7 +28,10 @@ -author("Feng Lee "). --include_lib("emqttd.hrl"). +-include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +-export([cli/1]). %% API Function Exports -export([start_link/0]). @@ -68,6 +71,16 @@ sysdescr % Broker description ]). +%%%============================================================================= +%%% CLI callback +%%%============================================================================= +cli([]) -> + Funs = [sysdescr, version, uptime, datetime], + [?PRINT("~-20s~s~n", [Fun, ?MODULE:Fun()]) || Fun <- Funs]; + +cli(_) -> + ?PRINT_CMD("broker", "#query broker version, uptime and description"). + %%%============================================================================= %%% API %%%============================================================================= @@ -223,6 +236,8 @@ init([]) -> % Create $SYS Topics emqttd_pubsub:create(<<"$SYS/brokers">>), [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], + %% CLI + emqttd_ctl:register_cmd(broker, {?MODULE, cli}, []), % Tick {ok, #state{started_at = os:timestamp(), heartbeat = start_tick(1000, heartbeat), @@ -279,7 +294,9 @@ handle_info(_Info, State) -> terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> stop_tick(Hb), - stop_tick(TRef). + stop_tick(TRef), + emqttd_ctl:unregister_cmd(broker), + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 48863c084..036dbffa2 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -24,32 +24,86 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_ctl). -author("Feng Lee "). -include("emqttd.hrl"). --define(PRINT_MSG(Msg), - io:format(Msg)). +-include("emqttd_cli.hrl"). --define(PRINT(Format, Args), - io:format(Format, Args)). +-export([init/0, + register_cmd/3, + unregister_cmd/1, + run/1]). -export([status/1, vm/1, - broker/1, stats/1, metrics/1, cluster/1, clients/1, sessions/1, listeners/1, - bridges/1, - plugins/1, - trace/1, - useradd/1, - userdel/1]). + bridges/1]). + +-define(CMD_TAB, mqttd_ctl_cmd). + +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Init cmd table. +%% @end +%%------------------------------------------------------------------------------ +init() -> + ets:new(?CMD_TAB, [ordered_set, named_table, public]), + register_cmd(status, {?MODULE, status}, []), + register_cmd(vm, {?MODULE, vm}, []), + register_cmd(cluster, {?MODULE, cluster}, []). + +%%------------------------------------------------------------------------------ +%% @doc Register a command +%% @end +%%------------------------------------------------------------------------------ +-spec register_cmd(atom(), {module(), atom()}, list()) -> true. +register_cmd(Cmd, MF, Opts) -> + ets:insert(?CMD_TAB, {Cmd, MF, Opts}). + +%%------------------------------------------------------------------------------ +%% @doc Unregister a command +%% @end +%%------------------------------------------------------------------------------ +-spec unregister_cmd(atom()) -> true. +unregister_cmd(Cmd) -> + ets:delete(?CMD_TAB, Cmd). + +%%------------------------------------------------------------------------------ +%% @doc Run a command +%% @end +%%------------------------------------------------------------------------------ + +run([]) -> usage(); + +run([CmdS|Args]) -> + case ets:lookup(?CMD_TAB, list_to_atom(CmdS)) of + [{_, {Mod, Fun}, _}] -> Mod:Fun(Args); + [] -> usage() + end. + +%%------------------------------------------------------------------------------ +%% @doc Usage +%% @end +%%------------------------------------------------------------------------------ +usage() -> + ?PRINT("Usage: ~s~n", [?MODULE]), + [Mod:Cmd(["help"]) || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)]. + +%%%============================================================================= +%%% Commands +%%%============================================================================= %%------------------------------------------------------------------------------ %% @doc Query node status @@ -63,7 +117,36 @@ status([]) -> ?PRINT_MSG("emqttd is not running~n"); {value,_Version} -> ?PRINT_MSG("emqttd is running~n") - end. + end; +status(_) -> + ?PRINT_CMD("status", "#query broker status"). + +vm([]) -> + vm(["all"]); + +vm(["all"]) -> + [begin vm([Name]), ?PRINT_MSG("~n") end || Name <- ["load", "memory", "process", "io"]]; + +vm(["load"]) -> + [?PRINT("cpu/~-20s~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; + +vm(["memory"]) -> + [?PRINT("memory/~-17s~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; + +vm(["process"]) -> + lists:foreach(fun({Name, Key}) -> + ?PRINT("process/~-16s~w~n", [Name, erlang:system_info(Key)]) + end, [{limit, process_limit}, {count, process_count}]); + +vm(["io"]) -> + ?PRINT("io/~-21s~w~n", [max_fds, proplists:get_value(max_fds, erlang:system_info(check_io))]); + +vm(_) -> + ?PRINT_CMD("vm all", "#query info of erlang vm"), + ?PRINT_CMD("vm load", "#query load of erlang vm"), + ?PRINT_CMD("vm memory", "#query memory of erlang vm"), + ?PRINT_CMD("vm process", "#query process of erlang vm"), + ?PRINT_CMD("vm io", "#queue io of erlang vm"). %%------------------------------------------------------------------------------ %% @doc Cluster with other node @@ -93,45 +176,11 @@ cluster([SNode]) -> end; pang -> ?PRINT("failed to connect to ~p~n", [Node]) - end. + end; -%%------------------------------------------------------------------------------ -%% @doc Add user -%% @end -%%------------------------------------------------------------------------------ -useradd([Username, Password]) -> - ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]). +cluster(_) -> + ?PRINT_CMD("cluster []", "#cluster with node, query cluster info "). -%%------------------------------------------------------------------------------ -%% @doc Delete user -%% @end -%%------------------------------------------------------------------------------ -userdel([Username]) -> - ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]). - -vm([]) -> - [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; - -vm(["load"]) -> - ?PRINT_MSG("Load: ~n"), - [?PRINT(" ~s:~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; - -vm(["memory"]) -> - ?PRINT_MSG("Memory: ~n"), - [?PRINT(" ~s:~p~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; - -vm(["process"]) -> - ?PRINT_MSG("Process: ~n"), - ?PRINT(" process_limit:~p~n", [erlang:system_info(process_limit)]), - ?PRINT(" process_count:~p~n", [erlang:system_info(process_count)]); - -vm(["io"]) -> - ?PRINT_MSG("IO: ~n"), - ?PRINT(" max_fds:~p~n", [proplists:get_value(max_fds, erlang:system_info(check_io))]). - -broker([]) -> - Funs = [sysdescr, version, uptime, datetime], - [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]. stats([]) -> [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()]. @@ -216,50 +265,6 @@ bridges(["stop", SNode, Topic]) -> {error, Error} -> ?PRINT("error: ~p~n", [Error]) end. -plugins(["list"]) -> - lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list()); - -plugins(["load", Name]) -> - case emqttd_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) - end; - -plugins(["unload", Name]) -> - case emqttd_plugins:unload(list_to_atom(Name)) of - ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason]) - end. - -trace(["list"]) -> - lists:foreach(fun({{Who, Name}, LogFile}) -> - ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) - end, emqttd_trace:all_traces()); - -trace(["client", ClientId, "off"]) -> - stop_trace(client, ClientId); -trace(["client", ClientId, LogFile]) -> - start_trace(client, ClientId, LogFile); -trace(["topic", Topic, "off"]) -> - stop_trace(topic, Topic); -trace(["topic", Topic, LogFile]) -> - start_trace(topic, Topic, LogFile). - -start_trace(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of - ok -> - ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) - end. -stop_trace(Who, Name) -> - case emqttd_trace:stop_trace({Who, bin(Name)}) of - ok -> - ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) - end. - node_name(SNode) -> SNode1 = @@ -333,11 +338,8 @@ print(session, {{ClientId, _ClientPid}, SessInfo}) -> "message_queue=~w, message_dropped=~w, " "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " "created_at=~w, subscriptions=~s)~n", - [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]); + [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). -print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", - [Name, Ver, Descr, Active]). format(created_at, Val) -> emqttd_util:now_to_secs(Val); diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index b6752df29..c712cc019 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -24,18 +24,21 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_metrics). -author("Feng Lee "). -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0]). +-export([start_link/0, cli/1]). -export([all/0, value/1, inc/1, inc/2, inc/3, @@ -93,6 +96,17 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%%------------------------------------------------------------------------------ +%% @doc CLI command callback +%% @end +%%------------------------------------------------------------------------------ + +cli([]) -> + [?PRINT("~-32s ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqttd_metrics:all())]; + +cli(_) -> + ?PRINT_CMD("metrics", "#query broker metrics"). + %%------------------------------------------------------------------------------ %% @doc Get all metrics %% @end @@ -193,6 +207,8 @@ init([]) -> [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics], + %% Register CLI commands + emqttd_ctl:register_cmd(metrics, {?MODULE, cli}, []), % Tick to publish metrics {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 8e23f41e1..ba1c1b408 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -30,6 +30,9 @@ -author("Feng Lee "). -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +-export([cli/1]). -export([load/0, unload/0]). @@ -37,6 +40,36 @@ -export([list/0]). +%%------------------------------------------------------------------------------ +%% CLI callback +%%------------------------------------------------------------------------------ + +cli(["list"]) -> + lists:foreach(fun(Plugin) -> print(Plugin) end, list()); + +cli(["load", Name]) -> + case load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) + end; + +cli(["unload", Name]) -> + case emqttd_plugins:unload(list_to_atom(Name)) of + ok -> + ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); + {error, Reason} -> + ?PRINT("unload plugin error: ~p~n", [Reason]) + end; + +cli(_) -> + ?PRINT_CMD("plugins list", "#query loaded plugins"), + ?PRINT_CMD("plugins load ", "#load plugin"), + ?PRINT_CMD("plugins unload ", "#unload plugin"). + +print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", + [Name, Ver, Descr, Active]). + %%------------------------------------------------------------------------------ %% @doc Load all plugins when the broker started. %% @end @@ -44,6 +77,7 @@ -spec load() -> list() | {error, any()}. load() -> + emqttd_ctl:register_cmd(plugins, {?MODULE, cli}, []), case env(loaded_file) of {ok, File} -> with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 82cd4d8a7..c79bdcbae 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -24,17 +24,23 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_trace). -author("Feng Lee "). --behaviour(gen_server). +-include("emqttd_cli.hrl"). + +%% CLI +-export([cli/1]). %% API Function Exports -export([start_link/0]). -export([start_trace/2, stop_trace/1, all_traces/0]). +-behaviour(gen_server). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -45,6 +51,48 @@ -define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). + +%%%============================================================================= +%%% CLI +%%%============================================================================= + +cli(["list"]) -> + lists:foreach(fun({{Who, Name}, LogFile}) -> + ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) + end, all_traces()); + +cli(["client", ClientId, "off"]) -> + cli(trace_off, client, ClientId); +cli(["client", ClientId, LogFile]) -> + cli(trace_on, client, ClientId, LogFile); +cli(["topic", Topic, "off"]) -> + cli(trace_off, topic, Topic); +cli(["topic", Topic, LogFile]) -> + cli(trace_on, topic, Topic, LogFile); + +cli(_) -> + ?PRINT_CMD("trace list", "#query all traces"), + ?PRINT_CMD("trace client ","#trace client with ClientId"), + ?PRINT_CMD("trace client off", "#stop to trace client"), + ?PRINT_CMD("trace topic ", "#trace topic with Topic"), + ?PRINT_CMD("trace topic off", "#stop to trace Topic"). + +cli(trace_on, Who, Name, LogFile) -> + case start_trace({Who, list_to_binary(Name)}, LogFile) of + ok -> + ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. + +cli(trace_off, Who, Name) -> + case stop_trace({Who, list_to_binary(Name)}) of + ok -> + ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + %%%============================================================================= %%% API %%%============================================================================= @@ -85,6 +133,7 @@ all_traces() -> gen_server:call(?MODULE, all_traces). init([]) -> + emqttd_ctl:register_cmd(trace, {?MODULE, cli}, []), {ok, #state{level = info, trace_map = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) -> @@ -122,6 +171,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> + emqttd_ctl:unregister_cmd(trace), ok. code_change(_OldVsn, State, _Extra) -> From 9e5478bdbbdeb7e1df95f933b67e95525a6ccb44 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 30 Sep 2015 03:06:25 +0800 Subject: [PATCH 03/31] cli --- include/emqttd_cli.hrl | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 include/emqttd_cli.hrl diff --git a/include/emqttd_cli.hrl b/include/emqttd_cli.hrl new file mode 100644 index 000000000..3b8956a96 --- /dev/null +++ b/include/emqttd_cli.hrl @@ -0,0 +1,31 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- + +-define(PRINT(Format, Args), + io:format(Format, Args)). + +-define(PRINT_MSG(Msg), + io:format(Msg)). + +-define(PRINT_CMD(Cmd, Descr), + io:format("~-40s~s~n", [Cmd, Descr])). + From 26c371dff85caa05ef2e044bd88d02ed96cae5aa Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 30 Sep 2015 03:06:45 +0800 Subject: [PATCH 04/31] cli --- rel/files/emqttd_ctl | 304 +--------------------------------------- rel/files/emqttd_ctl.bk | 300 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 306 insertions(+), 298 deletions(-) create mode 100644 rel/files/emqttd_ctl.bk diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index ccba93740..8a65bd6d3 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -79,303 +79,11 @@ ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin # Setup command to control the node NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" -# Check the first argument for instructions -case "$1" in - status) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT status" - exit 1 - fi +RES=`$NODETOOL ping` +if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 +fi - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "Node is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl status $@ - ;; - - cluster) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT cluster []" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl cluster $@ - ;; - - useradd) - if [ $# -ne 3 ]; then - echo "Usage: $SCRIPT useradd " - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl useradd $@ - ;; - - userdel) - if [ $# -ne 2 ]; then - echo "Usage: $SCRIPT userdel " - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl userdel $@ - ;; - - vm) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT vm [ load | memory | process | io ]" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl vm $@ - ;; - - broker) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT broker" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl broker $@ - ;; - - stats) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT stats" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl stats $@ - ;; - - metrics) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT metrics" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl metrics $@ - ;; - - bridges) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then - $NODETOOL rpc emqttd_ctl bridges list - elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then - $NODETOOL rpc emqttd_ctl bridges options - elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then - shift - $NODETOOL rpc emqttd_ctl bridges $@ - elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then - shift - $NODETOOL rpc emqttd_ctl bridges $@ - else - echo "Usage: " - echo "$SCRIPT bridges list" - echo "$SCRIPT bridges start " - echo "$SCRIPT bridges start " - echo "$SCRIPT bridges stop " - exit 1 - fi - ;; - clients) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl clients list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl clients $@ - else - echo "Usage: " - echo "$SCRIPT clients list" - echo "$SCRIPT clients show " - echo "$SCRIPT clients kick " - exit 1 - fi - ;; - sessions) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl sessions list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl sessions $@ - else - echo "Usage: " - echo "$SCRIPT sessions list" - echo "$SCRIPT sessions show " - exit 1 - fi - ;; - plugins) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl plugins list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl plugins $@ - else - echo "Usage: " - echo "$SCRIPT plugins list" - echo "$SCRIPT plugins load " - echo "$SCRIPT plugins unload " - exit 1 - fi - ;; - listeners) - if [ $# -gt 1 ]; then - echo "Usage: $SCRIPT listeners" - exit 1 - fi - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl listeners $@ - ;; - trace) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl trace list - elif [ $# -eq 4 ]; then - shift - $NODETOOL rpc emqttd_ctl trace $@ - else - echo "Usage: " - echo "$SCRIPT trace list" - echo "$SCRIPT trace client " - echo "$SCRIPT trace client off" - echo "$SCRIPT trace topic " - echo "$SCRIPT trace topic off" - exit 1 - fi - ;; - - *) - echo "Usage: $SCRIPT" - echo " status #query broker status" - echo " vm [ load | memory | process | io ] #query load, memory, process and io of erlang vm" - echo " broker #query broker version, uptime and description" - echo " stats #query broker statistics of clients, topics, subscribers" - echo " metrics #query broker metrics" - echo " cluster [] #query or cluster nodes" - echo " ----------------------------------------------------------------" - echo " clients list #list all clients" - echo " clients show #show a client" - echo " clients kick #kick a client" - echo " sessions list #list all sessions" - echo " sessions show #show a sessions" - echo " ----------------------------------------------------------------" - echo " plugins list #query loaded plugins" - echo " plugins load #load plugin" - echo " plugins unload #unload plugin" - echo " ----------------------------------------------------------------" - echo " bridges list #query bridges" - echo " bridges options #bridge options" - echo " bridges start #start bridge" - echo " bridges start #start bridge with options" - echo " bridges stop #stop bridge" - echo " ----------------------------------------------------------------" - echo " useradd #add user" - echo " userdel #delete user" - echo " ----------------------------------------------------------------" - echo " listeners #query broker listeners" - echo " ----------------------------------------------------------------" - echo " trace list #query all traces" - echo " trace client #trace client with ClientId" - echo " trace client off #stop to trace client" - echo " trace topic #trace topic with Topic" - echo " trace topic off #stop to trace Topic" - exit 1 - ;; - -esac +$NODETOOL rpc emqttd_ctl run $@ diff --git a/rel/files/emqttd_ctl.bk b/rel/files/emqttd_ctl.bk new file mode 100644 index 000000000..606562e88 --- /dev/null +++ b/rel/files/emqttd_ctl.bk @@ -0,0 +1,300 @@ +# Check the first argument for instructions +case "$1" in + status) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT status" + exit 1 + fi + + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl status $@ + ;; + + cluster) + if [ $# -gt 2 ]; then + echo "Usage: $SCRIPT cluster []" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl cluster $@ + ;; + + useradd) + if [ $# -ne 3 ]; then + echo "Usage: $SCRIPT useradd " + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl useradd $@ + ;; + + userdel) + if [ $# -ne 2 ]; then + echo "Usage: $SCRIPT userdel " + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl userdel $@ + ;; + + vm) + if [ $# -gt 2 ]; then + echo "Usage: $SCRIPT vm [ load | memory | process | io ]" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl vm $@ + ;; + + broker) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT broker" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl broker $@ + ;; + + stats) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT stats" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl stats $@ + ;; + + metrics) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT metrics" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl metrics $@ + ;; + + bridges) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then + $NODETOOL rpc emqttd_ctl bridges list + elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then + $NODETOOL rpc emqttd_ctl bridges options + elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then + shift + $NODETOOL rpc emqttd_ctl bridges $@ + elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then + shift + $NODETOOL rpc emqttd_ctl bridges $@ + else + echo "Usage: " + echo "$SCRIPT bridges list" + echo "$SCRIPT bridges start " + echo "$SCRIPT bridges start " + echo "$SCRIPT bridges stop " + exit 1 + fi + ;; + clients) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl clients list + elif [ $# -eq 3 ]; then + shift + $NODETOOL rpc emqttd_ctl clients $@ + else + echo "Usage: " + echo "$SCRIPT clients list" + echo "$SCRIPT clients show " + echo "$SCRIPT clients kick " + exit 1 + fi + ;; + sessions) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl sessions list + elif [ $# -eq 3 ]; then + shift + $NODETOOL rpc emqttd_ctl sessions $@ + else + echo "Usage: " + echo "$SCRIPT sessions list" + echo "$SCRIPT sessions show " + exit 1 + fi + ;; + plugins) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl plugins list + elif [ $# -eq 3 ]; then + shift + $NODETOOL rpc emqttd_ctl plugins $@ + else + echo "Usage: " + echo "$SCRIPT plugins list" + echo "$SCRIPT plugins load " + echo "$SCRIPT plugins unload " + exit 1 + fi + ;; + listeners) + if [ $# -gt 1 ]; then + echo "Usage: $SCRIPT listeners" + exit 1 + fi + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl listeners $@ + ;; + trace) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl trace list + elif [ $# -eq 4 ]; then + shift + $NODETOOL rpc emqttd_ctl trace $@ + else + echo "Usage: " + echo "$SCRIPT trace list" + echo "$SCRIPT trace client " + echo "$SCRIPT trace client off" + echo "$SCRIPT trace topic " + echo "$SCRIPT trace topic off" + exit 1 + fi + ;; + + *) + echo "Usage: $SCRIPT" + echo " status #query broker status" + echo " vm [ load | memory | process | io ] #query load, memory, process and io of erlang vm" + echo " broker #query broker version, uptime and description" + echo " stats #query broker statistics of clients, topics, subscribers" + echo " metrics #query broker metrics" + echo " cluster [] #query or cluster nodes" + echo " ----------------------------------------------------------------" + echo " clients list #list all clients" + echo " clients show #show a client" + echo " clients kick #kick a client" + echo " sessions list #list all sessions" + echo " sessions show #show a sessions" + echo " ----------------------------------------------------------------" + echo " plugins list #query loaded plugins" + echo " plugins load #load plugin" + echo " plugins unload #unload plugin" + echo " ----------------------------------------------------------------" + echo " bridges list #query bridges" + echo " bridges options #bridge options" + echo " bridges start #start bridge" + echo " bridges start #start bridge with options" + echo " bridges stop #stop bridge" + echo " ----------------------------------------------------------------" + echo " useradd #add user" + echo " userdel #delete user" + echo " ----------------------------------------------------------------" + echo " listeners #query broker listeners" + echo " ----------------------------------------------------------------" + echo " trace list #query all traces" + echo " trace client #trace client with ClientId" + echo " trace client off #stop to trace client" + echo " trace topic #trace topic with Topic" + echo " trace topic off #stop to trace Topic" + exit 1 + ;; + +esac + From 91bd0c654caeafc6ab7ce81b9ba6ee7b85910cf7 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:17:35 +0800 Subject: [PATCH 05/31] cli --- src/emqttd_cli.erl | 455 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 455 insertions(+) create mode 100644 src/emqttd_cli.erl diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl new file mode 100644 index 000000000..9c72d8da4 --- /dev/null +++ b/src/emqttd_cli.erl @@ -0,0 +1,455 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd cli. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_cli). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-include("emqttd_cli.hrl"). + +-import(lists, [foreach/2]). + +-import(proplists, [get_value/2]). + +-export([load/0]). + +-export([status/1, broker/1, cluster/1, bridges/1, + clients/1, sessions/1, plugins/1, listeners/1, + vm/1, trace/1]). + +-define(PROC_INFOKEYS, [status, + memory, + message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions]). + +load() -> + Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], + [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds]. + +is_cmd(Fun) -> + not lists:member(Fun, [init, load, module_info]). + +%%%============================================================================= +%%% Commands +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Node status +%% @end +%%------------------------------------------------------------------------------ +status([]) -> + {InternalStatus, _ProvidedStatus} = init:get_status(), + ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), + case lists:keysearch(emqttd, 1, application:which_applications()) of + false -> + ?PRINT_MSG("emqttd is not running~n"); + {value,_Version} -> + ?PRINT_MSG("emqttd is running~n") + end; +status(_) -> + ?PRINT_CMD("status", "query broker status"). + +%%------------------------------------------------------------------------------ +%% @doc Query broker +%% @end +%%------------------------------------------------------------------------------ +broker([]) -> + Funs = [sysdescr, version, uptime, datetime], + foreach(fun(Fun) -> + ?PRINT("~-10s: ~s~n", [Fun, emqttd_broker:Fun()]) + end, Funs); + +broker(["stats"]) -> + foreach(fun({Stat, Val}) -> + ?PRINT("~-20s: ~w~n", [Stat, Val]) + end, emqttd_stats:getstats()); + +broker(["metrics"]) -> + foreach(fun({Metric, Val}) -> + ?PRINT("~-24s: ~w~n", [Metric, Val]) + end, lists:sort(emqttd_metrics:all())); + +broker(["pubsub"]) -> + Pubsubs = supervisor:which_children(emqttd_pubsub_sup), + foreach(fun({{_, Id}, Pid, _, _}) -> + ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), + ?PRINT("pubsub: ~w~n", [Id]), + foreach(fun({Key, Val}) -> + ?PRINT(" ~-18s: ~w~n", [Key, Val]) + end, ProcInfo) + end, lists:reverse(Pubsubs)); + +broker(_) -> + ?USAGE([{"broker", "query broker version, uptime and description"}, + {"broker pubsub", "query process_info of pubsub"}, + {"borker stats", "query broker statistics of clients, topics, subscribers"}, + {"broker metrics", "query broker metrics"}]). + +%%------------------------------------------------------------------------------ +%% @doc Cluster with other node +%% @end +%%------------------------------------------------------------------------------ +cluster([]) -> + Nodes = emqttd_broker:running_nodes(), + ?PRINT("cluster nodes: ~p~n", [Nodes]); + +cluster(usage) -> + ?PRINT_CMD("cluster []", "cluster with node, query cluster info "); + +cluster([SNode]) -> + Node = node_name(SNode), + case lists:member(Node, emqttd_broker:running_nodes()) of + true -> + ?PRINT("~s is already clustered~n", [Node]); + false -> + cluster(Node, fun() -> + emqttd_plugins:unload(), + application:stop(emqttd), + application:stop(esockd), + application:stop(gproc), + emqttd_mnesia:cluster(Node), + application:start(gproc), + application:start(esockd), + application:start(emqttd) + end) + end; + +cluster(_) -> + cluster(usage). + +cluster(Node, DoCluster) -> + cluster(net_adm:ping(Node), Node, DoCluster). + +cluster(pong, Node, DoCluster) -> + case emqttd:is_running(Node) of + true -> + DoCluster(), + ?PRINT("cluster with ~s successfully.~n", [Node]); + false -> + ?PRINT("emqttd is not running on ~s~n", [Node]) + end; + +cluster(pang, Node, _DoCluster) -> + ?PRINT("Failed to connect ~s~n", [Node]). + +%%------------------------------------------------------------------------------ +%% @doc Query clients +%% @end +%%------------------------------------------------------------------------------ +clients(["list"]) -> + emqttd_mnesia:dump(ets, mqtt_client, fun print/1); + +clients(["show", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> ?PRINT_MSG("Not Found.~n"); + Client -> print(Client) + end; + +clients(["kick", ClientId]) -> + case emqttd_cm:lookup(list_to_binary(ClientId)) of + undefined -> + ?PRINT_MSG("Not Found.~n"); + #mqtt_client{client_pid = Pid} -> + emqttd_client:kick(Pid) + end; + +clients(_) -> + ?USAGE([{"clients list", "list all clients"}, + {"clients show ", "show a client"}, + {"clients kick ", "kick a client"}]). + +%%------------------------------------------------------------------------------ +%% @doc Sessions Command +%% @end +%%------------------------------------------------------------------------------ +sessions(["list"]) -> + [sessions(["list", Type]) || Type <- ["persistent", "transient"]]; + +sessions(["list", "persistent"]) -> + emqttd_mnesia:dump(ets, mqtt_persistent_session, fun print/1); + +sessions(["list", "transient"]) -> + emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1); + +sessions(["show", ClientId]) -> + MP = {{list_to_binary(ClientId), '_'}, '_'}, + case {ets:match_object(mqtt_transient_session, MP), + ets:match_object(mqtt_persistent_session, MP)} of + {[], []} -> + ?PRINT_MSG("Not Found.~n"); + {[SessInfo], _} -> + print(SessInfo); + {_, [SessInfo]} -> + print(SessInfo) + end; + +sessions(_) -> + ?USAGE([{"sessions list", "list all sessions"}, + {"sessions list persistent", "list all persistent sessions"}, + {"sessions list transient", "list all transient sessions"}, + {"sessions show ", "show a session"}]). + +plugins(["list"]) -> + foreach(fun print/1, emqttd_plugins:list()); + +plugins(["load", Name]) -> + case emqttd_plugins:load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) + end; + +plugins(["unload", Name]) -> + case emqttd_plugins:unload(list_to_atom(Name)) of + ok -> + ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); + {error, Reason} -> + ?PRINT("unload plugin error: ~p~n", [Reason]) + end; + +plugins(_) -> + ?USAGE([{"plugins list", "query loaded plugins"}, + {"plugins load ", "load plugin"}, + {"plugins unload ", "unload plugin"}]). + +%%------------------------------------------------------------------------------ +%% @doc Bridges command +%% @end +%%------------------------------------------------------------------------------ + +bridges(["list"]) -> + foreach(fun({{Node, Topic}, _Pid}) -> + ?PRINT("bridge: ~s ~s~n", [Node, Topic]) + end, emqttd_bridge_sup:bridges()); + +bridges(["options"]) -> + ?PRINT_MSG("Options:~n"), + ?PRINT_MSG(" qos = 0 | 1 | 2~n"), + ?PRINT_MSG(" prefix = string~n"), + ?PRINT_MSG(" suffix = string~n"), + ?PRINT_MSG(" queue = integer~n"), + ?PRINT_MSG("Example:~n"), + ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); + +bridges(["start", SNode, Topic]) -> + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(["start", SNode, Topic, OptStr]) -> + Opts = parse_opts(bridge, OptStr), + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(["stop", SNode, Topic]) -> + case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + ok -> ?PRINT_MSG("bridge is stopped.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(_) -> + ?USAGE([{"bridges list", "query bridges"}, + {"bridges options", "bridge options"}, + {"bridges start ", "start bridge"}, + {"bridges start ", "start bridge with options"}, + {"bridges stop ", "stop bridge"}]). + +parse_opts(Cmd, OptStr) -> + Tokens = string:tokens(OptStr, ","), + [parse_opt(Cmd, list_to_atom(Opt), Val) + || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. +parse_opt(bridge, qos, Qos) -> + {qos, list_to_integer(Qos)}; +parse_opt(bridge, suffix, Suffix) -> + {topic_suffix, list_to_binary(Suffix)}; +parse_opt(bridge, prefix, Prefix) -> + {topic_prefix, list_to_binary(Prefix)}; +parse_opt(bridge, queue, Len) -> + {max_queue_len, list_to_integer(Len)}; +parse_opt(_Cmd, Opt, _Val) -> + ?PRINT("Bad Option: ~s~n", [Opt]). + +%%------------------------------------------------------------------------------ +%% @doc vm command +%% @end +%%------------------------------------------------------------------------------ +vm([]) -> + vm(["all"]); + +vm(["all"]) -> + [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; + +vm(["load"]) -> + [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; + +vm(["memory"]) -> + [?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; + +vm(["process"]) -> + foreach(fun({Name, Key}) -> + ?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{limit, process_limit}, {count, process_count}]); + +vm(["io"]) -> + IoInfo = erlang:system_info(check_io), + foreach(fun(Key) -> + ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) + end, [max_fds, active_fds]); + +vm(_) -> + ?USAGE([{"vm all", "query info of erlang vm"}, + {"vm load", "query load of erlang vm"}, + {"vm memory", "query memory of erlang vm"}, + {"vm process", "query process of erlang vm"}, + {"vm io", "queue io of erlang vm"}]). +%%------------------------------------------------------------------------------ +%% @doc Trace Command +%% @end +%%------------------------------------------------------------------------------ + +trace(["list"]) -> + foreach(fun({{Who, Name}, LogFile}) -> + ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) + end, emqttd_trace:all_traces()); + +trace(["client", ClientId, "off"]) -> + trace_off(client, ClientId); + +trace(["client", ClientId, LogFile]) -> + trace_on(client, ClientId, LogFile); + +trace(["topic", Topic, "off"]) -> + trace_off(topic, Topic); + +trace(["topic", Topic, LogFile]) -> + trace_on(topic, Topic, LogFile); + +trace(_) -> + ?USAGE([{"trace list", "query all traces"}, + {"trace client ","trace client with ClientId"}, + {"trace client off", "stop to trace client"}, + {"trace topic ", "trace topic with Topic"}, + {"trace topic off", "stop to trace Topic"}]). + +trace_on(Who, Name, LogFile) -> + case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of + ok -> + ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. + +trace_off(Who, Name) -> + case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of + ok -> + ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + +%%------------------------------------------------------------------------------ +%% @doc Listeners Command +%% @end +%%------------------------------------------------------------------------------ +listeners([]) -> + foreach(fun({{Protocol, Port}, Pid}) -> + Info = [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients,esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}], + ?PRINT("listener on ~s:~w~n", [Protocol, Port]), + foreach(fun({Key, Val}) -> + ?PRINT(" ~-16s:~w~n", [Key, Val]) + end, Info) + end, esockd:listeners()); + +listeners(_) -> + ?PRINT_CMD("listeners", "query broker listeners"). + +node_name(SNode) -> + SNode1 = + case string:tokens(SNode, "@") of + [_Node, _Server] -> + SNode; + _ -> + case net_kernel:longnames() of + true -> + SNode ++ "@" ++ inet_db:gethostname() ++ + "." ++ inet_db:res_option(domain); + false -> + SNode ++ "@" ++ inet_db:gethostname(); + _ -> + SNode + end + end, + list_to_atom(SNode1). + +print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", + [Name, Ver, Descr, Active]); + +print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, + username = Username, peername = Peername, + connected_at = ConnectedAt}) -> + ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", + [ClientId, CleanSess, Username, + emqttd_net:format(Peername), + emqttd_util:now_to_secs(ConnectedAt)]); + +print({{ClientId, _ClientPid}, SessInfo}) -> + InfoKeys = [clean_sess, + max_inflight, + inflight_queue, + message_queue, + message_dropped, + awaiting_rel, + awaiting_ack, + awaiting_comp, + created_at, + subscriptions], + ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " + "message_queue=~w, message_dropped=~w, " + "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " + "created_at=~w, subscriptions=~s)~n", + [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). + +format(created_at, Val) -> + emqttd_util:now_to_secs(Val); + +format(subscriptions, List) -> + string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); + +format(_, Val) -> + Val. + From 1a8056df2242131c290a5dfb3b547ee00c8ec633 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:18:00 +0800 Subject: [PATCH 06/31] USAGE --- include/emqttd_cli.hrl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/include/emqttd_cli.hrl b/include/emqttd_cli.hrl index 3b8956a96..c2dc1ddba 100644 --- a/include/emqttd_cli.hrl +++ b/include/emqttd_cli.hrl @@ -27,5 +27,9 @@ io:format(Msg)). -define(PRINT_CMD(Cmd, Descr), - io:format("~-40s~s~n", [Cmd, Descr])). + io:format("~-40s#~s~n", [Cmd, Descr])). + +-define(USAGE(CmdList), + [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]). + From d3ce7dcae1598e372f85343b86e717d0bceec0a5 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:19:24 +0800 Subject: [PATCH 07/31] emqttd_ctl --- src/emqttd_app.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 7a9449315..c5fd94bdf 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -50,9 +50,9 @@ start(_StartType, _StartArgs) -> print_banner(), emqttd_mnesia:start(), - emqttd_ctl:init(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), + emqttd_cli:load(), emqttd:load_all_mods(), emqttd_plugins:load(), start_listeners(), @@ -73,7 +73,8 @@ start_listeners() -> emqttd:open_listeners(Listeners). start_servers(Sup) -> - Servers = [{"emqttd trace", emqttd_trace}, + Servers = [{"emqttd ctl", emqttd_ctl}, + {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, From bafff792fa906e6ca92c959c67748c9952f4512e Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:20:01 +0800 Subject: [PATCH 08/31] useradd, userdel cli --- src/emqttd_auth_username.erl | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 8f7331f35..1c05d4e59 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -29,10 +29,11 @@ -author('feng@emqtt.io'). -include("emqttd.hrl"). + -include("emqttd_cli.hrl"). %% CLI callbacks --export([cli_useradd/1, cli_userdel/1]). +-export([useradd/1, userdel/1]). -behaviour(emqttd_auth_mod). @@ -50,17 +51,17 @@ %%% CLI %%%============================================================================= -cli_useradd([Username, Password]) -> +useradd([Username, Password]) -> ?PRINT("~p~n", [add_user(list_to_binary(Username), list_to_binary(Password))]); -cli_useradd(_) -> - ?PRINT_CMD("useradd ", "#add user"). +useradd(_) -> + ?PRINT_CMD("useradd ", "add user"). -cli_userdel([Username]) -> +userdel([Username]) -> ?PRINT("~p~n", [remove_user(list_to_binary(Username))]); -cli_userdel(_) -> - ?PRINT_CMD("userdel ", "#delete user"). +userdel(_) -> + ?PRINT_CMD("userdel ", "delete user"). %%%============================================================================= %%% API @@ -87,8 +88,8 @@ init(Opts) -> {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), - emqttd_ctl:register_cmd(useradd, {?MODULE, cli_useradd}, []), - emqttd_ctl:register_cmd(userdel, {?MODULE, cli_userdel}, []), + emqttd_ctl:register_cmd(useradd, {?MODULE, useradd}, []), + emqttd_ctl:register_cmd(userdel, {?MODULE, userdel}, []), {ok, Opts}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> From b539a719a8de583d0a466e4f0b07d02bade53e51 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:20:22 +0800 Subject: [PATCH 09/31] rm cli --- src/emqttd_bridge_sup.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 0df4b1343..132eca200 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -38,6 +38,12 @@ -export([init/1]). +%%%============================================================================= +%%% CLI +%%%============================================================================= + + + %%%============================================================================= %%% API %%%============================================================================= From 95928f6f754a60296fe1e8498a4f337735c2a88e Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:20:41 +0800 Subject: [PATCH 10/31] rm cli --- src/emqttd_broker.erl | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index fe77df056..5a21eb451 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -29,9 +29,6 @@ -author("Feng Lee "). -include("emqttd.hrl"). --include("emqttd_cli.hrl"). - --export([cli/1]). %% API Function Exports -export([start_link/0]). @@ -71,16 +68,6 @@ sysdescr % Broker description ]). -%%%============================================================================= -%%% CLI callback -%%%============================================================================= -cli([]) -> - Funs = [sysdescr, version, uptime, datetime], - [?PRINT("~-20s~s~n", [Fun, ?MODULE:Fun()]) || Fun <- Funs]; - -cli(_) -> - ?PRINT_CMD("broker", "#query broker version, uptime and description"). - %%%============================================================================= %%% API %%%============================================================================= @@ -236,8 +223,6 @@ init([]) -> % Create $SYS Topics emqttd_pubsub:create(<<"$SYS/brokers">>), [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], - %% CLI - emqttd_ctl:register_cmd(broker, {?MODULE, cli}, []), % Tick {ok, #state{started_at = os:timestamp(), heartbeat = start_tick(1000, heartbeat), @@ -295,7 +280,6 @@ handle_info(_Info, State) -> terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> stop_tick(Hb), stop_tick(TRef), - emqttd_ctl:unregister_cmd(broker), ok. code_change(_OldVsn, State, _Extra) -> From 30126e3a7217af9950be0895f879aca1151efa4f Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:20:58 +0800 Subject: [PATCH 11/31] src/emqttd_cm.erl --- rel/files/emqttd_ctl.bk | 300 ---------------------------------------- 1 file changed, 300 deletions(-) delete mode 100644 rel/files/emqttd_ctl.bk diff --git a/rel/files/emqttd_ctl.bk b/rel/files/emqttd_ctl.bk deleted file mode 100644 index 606562e88..000000000 --- a/rel/files/emqttd_ctl.bk +++ /dev/null @@ -1,300 +0,0 @@ -# Check the first argument for instructions -case "$1" in - status) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT status" - exit 1 - fi - - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "Node is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl status $@ - ;; - - cluster) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT cluster []" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl cluster $@ - ;; - - useradd) - if [ $# -ne 3 ]; then - echo "Usage: $SCRIPT useradd " - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl useradd $@ - ;; - - userdel) - if [ $# -ne 2 ]; then - echo "Usage: $SCRIPT userdel " - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl userdel $@ - ;; - - vm) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT vm [ load | memory | process | io ]" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl vm $@ - ;; - - broker) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT broker" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl broker $@ - ;; - - stats) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT stats" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl stats $@ - ;; - - metrics) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT metrics" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl metrics $@ - ;; - - bridges) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then - $NODETOOL rpc emqttd_ctl bridges list - elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then - $NODETOOL rpc emqttd_ctl bridges options - elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then - shift - $NODETOOL rpc emqttd_ctl bridges $@ - elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then - shift - $NODETOOL rpc emqttd_ctl bridges $@ - else - echo "Usage: " - echo "$SCRIPT bridges list" - echo "$SCRIPT bridges start " - echo "$SCRIPT bridges start " - echo "$SCRIPT bridges stop " - exit 1 - fi - ;; - clients) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl clients list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl clients $@ - else - echo "Usage: " - echo "$SCRIPT clients list" - echo "$SCRIPT clients show " - echo "$SCRIPT clients kick " - exit 1 - fi - ;; - sessions) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl sessions list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl sessions $@ - else - echo "Usage: " - echo "$SCRIPT sessions list" - echo "$SCRIPT sessions show " - exit 1 - fi - ;; - plugins) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl plugins list - elif [ $# -eq 3 ]; then - shift - $NODETOOL rpc emqttd_ctl plugins $@ - else - echo "Usage: " - echo "$SCRIPT plugins list" - echo "$SCRIPT plugins load " - echo "$SCRIPT plugins unload " - exit 1 - fi - ;; - listeners) - if [ $# -gt 1 ]; then - echo "Usage: $SCRIPT listeners" - exit 1 - fi - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqttd_ctl listeners $@ - ;; - trace) - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "emqttd is not running!" - exit 1 - fi - if [ $# -eq 2 -a $2 = "list" ]; then - $NODETOOL rpc emqttd_ctl trace list - elif [ $# -eq 4 ]; then - shift - $NODETOOL rpc emqttd_ctl trace $@ - else - echo "Usage: " - echo "$SCRIPT trace list" - echo "$SCRIPT trace client " - echo "$SCRIPT trace client off" - echo "$SCRIPT trace topic " - echo "$SCRIPT trace topic off" - exit 1 - fi - ;; - - *) - echo "Usage: $SCRIPT" - echo " status #query broker status" - echo " vm [ load | memory | process | io ] #query load, memory, process and io of erlang vm" - echo " broker #query broker version, uptime and description" - echo " stats #query broker statistics of clients, topics, subscribers" - echo " metrics #query broker metrics" - echo " cluster [] #query or cluster nodes" - echo " ----------------------------------------------------------------" - echo " clients list #list all clients" - echo " clients show #show a client" - echo " clients kick #kick a client" - echo " sessions list #list all sessions" - echo " sessions show #show a sessions" - echo " ----------------------------------------------------------------" - echo " plugins list #query loaded plugins" - echo " plugins load #load plugin" - echo " plugins unload #unload plugin" - echo " ----------------------------------------------------------------" - echo " bridges list #query bridges" - echo " bridges options #bridge options" - echo " bridges start #start bridge" - echo " bridges start #start bridge with options" - echo " bridges stop #stop bridge" - echo " ----------------------------------------------------------------" - echo " useradd #add user" - echo " userdel #delete user" - echo " ----------------------------------------------------------------" - echo " listeners #query broker listeners" - echo " ----------------------------------------------------------------" - echo " trace list #query all traces" - echo " trace client #trace client with ClientId" - echo " trace client off #stop to trace client" - echo " trace topic #trace topic with Topic" - echo " trace topic off #stop to trace Topic" - exit 1 - ;; - -esac - From dcca96b6aeae129d0351b313b28e7aeab8cb9a3d Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:21:27 +0800 Subject: [PATCH 12/31] MODULE as pool name --- src/emqttd_cm.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 04902c9ab..d60ddc78d 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -30,22 +30,22 @@ -include("emqttd.hrl"). --behaviour(gen_server2). - --define(SERVER, ?MODULE). - %% API Exports -export([start_link/2, pool/0]). -export([lookup/1, register/1, unregister/1]). +-behaviour(gen_server2). + +-define(SERVER, ?MODULE). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, {id, statsfun}). --define(CM_POOL, cm_pool). +-define(CM_POOL, ?MODULE). %%%============================================================================= %%% API From b946f3df9b2f54a567ae5a887939321cbcb1f928 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:21:43 +0800 Subject: [PATCH 13/31] new ctl --- src/emqttd_ctl.erl | 309 +++++++-------------------------------------- 1 file changed, 48 insertions(+), 261 deletions(-) diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 036dbffa2..86f7d8b3b 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd control commands. +%%% emqttd control. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -33,20 +33,21 @@ -include("emqttd_cli.hrl"). --export([init/0, +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% API Function Exports +-export([start_link/0, register_cmd/3, unregister_cmd/1, run/1]). --export([status/1, - vm/1, - stats/1, - metrics/1, - cluster/1, - clients/1, - sessions/1, - listeners/1, - bridges/1]). +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {seq = 0}). -define(CMD_TAB, mqttd_ctl_cmd). @@ -54,15 +55,8 @@ %%% API %%%============================================================================= -%%------------------------------------------------------------------------------ -%% @doc Init cmd table. -%% @end -%%------------------------------------------------------------------------------ -init() -> - ets:new(?CMD_TAB, [ordered_set, named_table, public]), - register_cmd(status, {?MODULE, status}, []), - register_cmd(vm, {?MODULE, vm}, []), - register_cmd(cluster, {?MODULE, cluster}, []). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% @doc Register a command @@ -70,7 +64,7 @@ init() -> %%------------------------------------------------------------------------------ -spec register_cmd(atom(), {module(), atom()}, list()) -> true. register_cmd(Cmd, MF, Opts) -> - ets:insert(?CMD_TAB, {Cmd, MF, Opts}). + gen_server:cast(?SERVER, {register_cmd, Cmd, MF, Opts}). %%------------------------------------------------------------------------------ %% @doc Unregister a command @@ -78,18 +72,20 @@ register_cmd(Cmd, MF, Opts) -> %%------------------------------------------------------------------------------ -spec unregister_cmd(atom()) -> true. unregister_cmd(Cmd) -> - ets:delete(?CMD_TAB, Cmd). + gen_server:cast(?SERVER, {unregister_cmd, Cmd}). %%------------------------------------------------------------------------------ %% @doc Run a command %% @end %%------------------------------------------------------------------------------ - run([]) -> usage(); +run(["help"]) -> usage(); + run([CmdS|Args]) -> - case ets:lookup(?CMD_TAB, list_to_atom(CmdS)) of - [{_, {Mod, Fun}, _}] -> Mod:Fun(Args); + Cmd = list_to_atom(CmdS), + case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of + [[{Mod, Fun}]] -> Mod:Fun(Args); [] -> usage() end. @@ -99,254 +95,45 @@ run([CmdS|Args]) -> %%------------------------------------------------------------------------------ usage() -> ?PRINT("Usage: ~s~n", [?MODULE]), - [Mod:Cmd(["help"]) || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)]. + [begin ?PRINT("~80..-s~n", [""]), Mod:Cmd(usage) end + || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)]. %%%============================================================================= -%%% Commands +%%% gen_server callbacks %%%============================================================================= -%%------------------------------------------------------------------------------ -%% @doc Query node status -%% @end -%%------------------------------------------------------------------------------ -status([]) -> - {InternalStatus, _ProvidedStatus} = init:get_status(), - ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), - case lists:keysearch(emqttd, 1, application:which_applications()) of - false -> - ?PRINT_MSG("emqttd is not running~n"); - {value,_Version} -> - ?PRINT_MSG("emqttd is running~n") - end; -status(_) -> - ?PRINT_CMD("status", "#query broker status"). +init([]) -> + ets:new(?CMD_TAB, [ordered_set, named_table, protected]), + {ok, #state{seq = 0}}. -vm([]) -> - vm(["all"]); +handle_call(_Request, _From, State) -> + {reply, ok, State}. -vm(["all"]) -> - [begin vm([Name]), ?PRINT_MSG("~n") end || Name <- ["load", "memory", "process", "io"]]; +handle_cast({register_cmd, Cmd, MF, Opts}, State = #state{seq = Seq}) -> + ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}), + noreply(next_seq(State)); -vm(["load"]) -> - [?PRINT("cpu/~-20s~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; +handle_cast({unregister_cmd, Cmd}, State) -> + ets:match_delete(?CMD_TAB, {{'_', Cmd}, '_', '_'}), + noreply(State); -vm(["memory"]) -> - [?PRINT("memory/~-17s~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; +handle_cast(_Msg, State) -> + noreply(State). -vm(["process"]) -> - lists:foreach(fun({Name, Key}) -> - ?PRINT("process/~-16s~w~n", [Name, erlang:system_info(Key)]) - end, [{limit, process_limit}, {count, process_count}]); +handle_info(_Info, State) -> + noreply(State). -vm(["io"]) -> - ?PRINT("io/~-21s~w~n", [max_fds, proplists:get_value(max_fds, erlang:system_info(check_io))]); +terminate(_Reason, _State) -> + ok. -vm(_) -> - ?PRINT_CMD("vm all", "#query info of erlang vm"), - ?PRINT_CMD("vm load", "#query load of erlang vm"), - ?PRINT_CMD("vm memory", "#query memory of erlang vm"), - ?PRINT_CMD("vm process", "#query process of erlang vm"), - ?PRINT_CMD("vm io", "#queue io of erlang vm"). +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -%%------------------------------------------------------------------------------ -%% @doc Cluster with other node -%% @end -%%------------------------------------------------------------------------------ -cluster([]) -> - Nodes = emqttd_broker:running_nodes(), - ?PRINT("cluster nodes: ~p~n", [Nodes]); +%%%============================================================================= +%%% Internal Function Definitions +%%%============================================================================= -cluster([SNode]) -> - Node = node_name(SNode), - case net_adm:ping(Node) of - pong -> - case emqttd:is_running(Node) of - true -> - emqttd_plugins:unload(), - application:stop(emqttd), - application:stop(esockd), - application:stop(gproc), - emqttd_mnesia:cluster(Node), - application:start(gproc), - application:start(esockd), - application:start(emqttd), - ?PRINT("cluster with ~p successfully.~n", [Node]); - false -> - ?PRINT("emqttd is not running on ~p~n", [Node]) - end; - pang -> - ?PRINT("failed to connect to ~p~n", [Node]) - end; +noreply(State) -> {noreply, State, hibernate}. -cluster(_) -> - ?PRINT_CMD("cluster []", "#cluster with node, query cluster info "). - - -stats([]) -> - [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()]. - -metrics([]) -> - [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. - -clients(["list"]) -> - dump(client, mqtt_client); - -clients(["show", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> - ?PRINT_MSG("Not Found.~n"); - Client -> - print(client, Client) - end; - -clients(["kick", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> - ?PRINT_MSG("Not Found.~n"); - #mqtt_client{client_pid = Pid} -> - emqttd_client:kick(Pid) - end. - -sessions(["list"]) -> - dump(session, mqtt_transient_session), - dump(session, mqtt_persistent_session); - -sessions(["show", ClientId]) -> - MP = {{list_to_binary(ClientId), '_'}, '_'}, - case {ets:match_object(mqtt_transient_session, MP), - ets:match_object(mqtt_persistent_session, MP)} of - {[], []} -> - ?PRINT_MSG("Not Found.~n"); - {[SessInfo], _} -> - print(session, SessInfo); - {_, [SessInfo]} -> - print(session, SessInfo) - end. - -listeners([]) -> - lists:foreach(fun({{Protocol, Port}, Pid}) -> - ?PRINT("listener ~s:~w~n", [Protocol, Port]), - ?PRINT(" acceptors: ~w~n", [esockd:get_acceptors(Pid)]), - ?PRINT(" max_clients: ~w~n", [esockd:get_max_clients(Pid)]), - ?PRINT(" current_clients: ~w~n", [esockd:get_current_clients(Pid)]), - ?PRINT(" shutdown_count: ~p~n", [esockd:get_shutdown_count(Pid)]) - end, esockd:listeners()). - -bridges(["list"]) -> - lists:foreach(fun({{Node, Topic}, _Pid}) -> - ?PRINT("bridge: ~s ~s~n", [Node, Topic]) - end, emqttd_bridge_sup:bridges()); - -bridges(["options"]) -> - ?PRINT_MSG("Options:~n"), - ?PRINT_MSG(" qos = 0 | 1 | 2~n"), - ?PRINT_MSG(" prefix = string~n"), - ?PRINT_MSG(" suffix = string~n"), - ?PRINT_MSG(" queue = integer~n"), - ?PRINT_MSG("Example:~n"), - ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); - -bridges(["start", SNode, Topic]) -> - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; - -bridges(["start", SNode, Topic, OptStr]) -> - Opts = parse_opts(bridge, OptStr), - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; - -bridges(["stop", SNode, Topic]) -> - case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), bin(Topic)) of - ok -> ?PRINT_MSG("bridge is stopped.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end. - - -node_name(SNode) -> - SNode1 = - case string:tokens(SNode, "@") of - [_Node, _Server] -> - SNode; - _ -> - case net_kernel:longnames() of - true -> - SNode ++ "@" ++ inet_db:gethostname() ++ - "." ++ inet_db:res_option(domain); - false -> - SNode ++ "@" ++ inet_db:gethostname(); - _ -> - SNode - end - end, - list_to_atom(SNode1). - -bin(S) when is_list(S) -> list_to_binary(S); -bin(B) when is_binary(B) -> B. - -parse_opts(Cmd, OptStr) -> - Tokens = string:tokens(OptStr, ","), - [parse_opt(Cmd, list_to_atom(Opt), Val) - || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. - -parse_opt(bridge, qos, Qos) -> - {qos, list_to_integer(Qos)}; -parse_opt(bridge, suffix, Suffix) -> - {topic_suffix, list_to_binary(Suffix)}; -parse_opt(bridge, prefix, Prefix) -> - {topic_prefix, list_to_binary(Prefix)}; -parse_opt(bridge, queue, Len) -> - {max_queue_len, list_to_integer(Len)}; -parse_opt(_Cmd, Opt, _Val) -> - ?PRINT("Bad Option: ~s~n", [Opt]). - -dump(Type, Table) -> - dump(Type, Table, ets:first(Table)). - -dump(_Type, _Table, '$end_of_table') -> - ok; -dump(Type, Table, Key) -> - case ets:lookup(Table, Key) of - [Record] -> print(Type, Record); - [] -> ignore - end, - dump(Type, Table, ets:next(Table, Key)). - -print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess, - username = Username, peername = Peername, - connected_at = ConnectedAt}) -> - ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", - [ClientId, CleanSess, Username, - emqttd_net:format(Peername), - emqttd_util:now_to_secs(ConnectedAt)]); - -print(session, {{ClientId, _ClientPid}, SessInfo}) -> - InfoKeys = [clean_sess, - max_inflight, - inflight_queue, - message_queue, - message_dropped, - awaiting_rel, - awaiting_ack, - awaiting_comp, - created_at, - subscriptions], - ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " - "message_queue=~w, message_dropped=~w, " - "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " - "created_at=~w, subscriptions=~s)~n", - [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). - - -format(created_at, Val) -> - emqttd_util:now_to_secs(Val); - -format(subscriptions, List) -> - string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); - -format(_, Val) -> - Val. +next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}. From af14bf93297eb73d3be886b5a239ea02a646b531 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:22:01 +0800 Subject: [PATCH 14/31] rm cli --- src/emqttd_metrics.erl | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index c712cc019..707c2a9f9 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -31,14 +31,12 @@ -include("emqttd.hrl"). --include("emqttd_cli.hrl"). - -behaviour(gen_server). -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0, cli/1]). +-export([start_link/0]). -export([all/0, value/1, inc/1, inc/2, inc/3, @@ -96,17 +94,6 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%%------------------------------------------------------------------------------ -%% @doc CLI command callback -%% @end -%%------------------------------------------------------------------------------ - -cli([]) -> - [?PRINT("~-32s ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqttd_metrics:all())]; - -cli(_) -> - ?PRINT_CMD("metrics", "#query broker metrics"). - %%------------------------------------------------------------------------------ %% @doc Get all metrics %% @end @@ -207,8 +194,6 @@ init([]) -> [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics], - %% Register CLI commands - emqttd_ctl:register_cmd(metrics, {?MODULE, cli}, []), % Tick to publish metrics {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. From d7ca17b3367e9ed7720e775001e6c8b2a75543e4 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:22:18 +0800 Subject: [PATCH 15/31] dump --- src/emqttd_mnesia.erl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 86fa118f8..355f3d547 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -35,6 +35,8 @@ -export([create_table/2, copy_table/1]). +-export([dump/3]). + start() -> case init_schema() of ok -> @@ -168,3 +170,16 @@ wait_for_mnesia(stop) -> {error, mnesia_unexpectedly_starting} end. +dump(ets, Table, Fun) -> + dump(ets, Table, ets:first(Table), Fun). + +dump(ets, _Table, '$end_of_table', _Fun) -> + ok; + +dump(ets, Table, Key, Fun) -> + case ets:lookup(Table, Key) of + [Record] -> Fun(Record); + [] -> ignore + end, + dump(ets, Table, ets:next(Table, Key), Fun). + From e7b6778a43a7b120f8c9db56c348448b11fbdc2e Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:22:33 +0800 Subject: [PATCH 16/31] rm cli --- src/emqttd_plugins.erl | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index ba1c1b408..8e23f41e1 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -30,9 +30,6 @@ -author("Feng Lee "). -include("emqttd.hrl"). --include("emqttd_cli.hrl"). - --export([cli/1]). -export([load/0, unload/0]). @@ -40,36 +37,6 @@ -export([list/0]). -%%------------------------------------------------------------------------------ -%% CLI callback -%%------------------------------------------------------------------------------ - -cli(["list"]) -> - lists:foreach(fun(Plugin) -> print(Plugin) end, list()); - -cli(["load", Name]) -> - case load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) - end; - -cli(["unload", Name]) -> - case emqttd_plugins:unload(list_to_atom(Name)) of - ok -> - ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); - {error, Reason} -> - ?PRINT("unload plugin error: ~p~n", [Reason]) - end; - -cli(_) -> - ?PRINT_CMD("plugins list", "#query loaded plugins"), - ?PRINT_CMD("plugins load ", "#load plugin"), - ?PRINT_CMD("plugins unload ", "#unload plugin"). - -print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", - [Name, Ver, Descr, Active]). - %%------------------------------------------------------------------------------ %% @doc Load all plugins when the broker started. %% @end @@ -77,7 +44,6 @@ print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -spec load() -> list() | {error, any()}. load() -> - emqttd_ctl:register_cmd(plugins, {?MODULE, cli}, []), case env(loaded_file) of {ok, File} -> with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); From 14d2de3b055bf19e52aaf6768bbaae203e34ff1f Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:22:57 +0800 Subject: [PATCH 17/31] MODUEL as pool name --- src/emqttd_sm.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index c17203f80..99c99c83f 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -55,9 +55,9 @@ -record(state, {id}). --define(SM_POOL, sm_pool). +-define(SM_POOL, ?MODULE). -%% todo... +%% TODO... -define(SESSION_TIMEOUT, 60000). %%%============================================================================= From 354ae6f398a251c9064522f0c3c8e222bae674c2 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:23:24 +0800 Subject: [PATCH 18/31] sort --- src/emqttd_stats.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index bfdf5209d..345dc2d26 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -31,12 +31,12 @@ -include("emqttd.hrl"). --export([start_link/0]). - -behaviour(gen_server). -define(SERVER, ?MODULE). +-export([start_link/0]). + %% statistics API. -export([statsfun/1, statsfun/2, getstats/0, getstat/1, @@ -52,8 +52,8 @@ %% $SYS Topics for Clients -define(SYSTOP_CLIENTS, [ - 'clients/count', % clients connected current - 'clients/max' % max clients connected + 'clients/count', % clients connected current + 'clients/max' % max clients connected ]). %% $SYS Topics for Sessions @@ -72,6 +72,7 @@ 'queues/max' % ... ]). + %%%============================================================================= %%% API %%%============================================================================= @@ -102,7 +103,7 @@ statsfun(Stat, MaxStat) -> %%------------------------------------------------------------------------------ -spec getstats() -> [{atom(), non_neg_integer()}]. getstats() -> - ets:tab2list(?STATS_TAB). + lists:sort(ets:tab2list(?STATS_TAB)). %%------------------------------------------------------------------------------ %% @doc Get stats by name From 1fab8084e1226acc1cefa475be3bbd52cd2dfb86 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 11:23:37 +0800 Subject: [PATCH 19/31] rm cli --- src/emqttd_trace.erl | 49 -------------------------------------------- 1 file changed, 49 deletions(-) diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index c79bdcbae..8edcebad0 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -29,11 +29,6 @@ -author("Feng Lee "). --include("emqttd_cli.hrl"). - -%% CLI --export([cli/1]). - %% API Function Exports -export([start_link/0]). @@ -51,48 +46,6 @@ -define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). - -%%%============================================================================= -%%% CLI -%%%============================================================================= - -cli(["list"]) -> - lists:foreach(fun({{Who, Name}, LogFile}) -> - ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) - end, all_traces()); - -cli(["client", ClientId, "off"]) -> - cli(trace_off, client, ClientId); -cli(["client", ClientId, LogFile]) -> - cli(trace_on, client, ClientId, LogFile); -cli(["topic", Topic, "off"]) -> - cli(trace_off, topic, Topic); -cli(["topic", Topic, LogFile]) -> - cli(trace_on, topic, Topic, LogFile); - -cli(_) -> - ?PRINT_CMD("trace list", "#query all traces"), - ?PRINT_CMD("trace client ","#trace client with ClientId"), - ?PRINT_CMD("trace client off", "#stop to trace client"), - ?PRINT_CMD("trace topic ", "#trace topic with Topic"), - ?PRINT_CMD("trace topic off", "#stop to trace Topic"). - -cli(trace_on, Who, Name, LogFile) -> - case start_trace({Who, list_to_binary(Name)}, LogFile) of - ok -> - ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) - end. - -cli(trace_off, Who, Name) -> - case stop_trace({Who, list_to_binary(Name)}) of - ok -> - ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) - end. - %%%============================================================================= %%% API %%%============================================================================= @@ -133,7 +86,6 @@ all_traces() -> gen_server:call(?MODULE, all_traces). init([]) -> - emqttd_ctl:register_cmd(trace, {?MODULE, cli}, []), {ok, #state{level = info, trace_map = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) -> @@ -171,7 +123,6 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - emqttd_ctl:unregister_cmd(trace), ok. code_change(_OldVsn, State, _Extra) -> From 7494b60c63bd166fc634b31b01abe6539bbc666f Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 15:06:45 +0800 Subject: [PATCH 20/31] recon --- .gitmodules | 3 +++ plugins/emqttd_recon | 1 + plugins/emqttd_sockjs | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) create mode 160000 plugins/emqttd_recon diff --git a/.gitmodules b/.gitmodules index 7db3aaa3c..3e84cb96d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,6 @@ [submodule "plugins/emqttd_stomp"] path = plugins/emqttd_stomp url = https://github.com/emqtt/emqttd_stomp.git +[submodule "plugins/emqttd_recon"] + path = plugins/emqttd_recon + url = https://github.com/emqtt/emqttd_recon.git diff --git a/plugins/emqttd_recon b/plugins/emqttd_recon new file mode 160000 index 000000000..cbbfc417f --- /dev/null +++ b/plugins/emqttd_recon @@ -0,0 +1 @@ +Subproject commit cbbfc417f5c9505750a6e9a309c69d251ed71141 diff --git a/plugins/emqttd_sockjs b/plugins/emqttd_sockjs index 9caeefc42..3677170d3 160000 --- a/plugins/emqttd_sockjs +++ b/plugins/emqttd_sockjs @@ -1 +1 @@ -Subproject commit 9caeefc425e2119be754be6342d7b6481217bbf8 +Subproject commit 3677170d316cbbba3f6d19632427eca17d14d64f From ae3e8b963e955b81b723e0d4fecf6d35dcc1831e Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 15:16:55 +0800 Subject: [PATCH 21/31] up recon --- plugins/emqttd_recon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/emqttd_recon b/plugins/emqttd_recon index cbbfc417f..66443bf8d 160000 --- a/plugins/emqttd_recon +++ b/plugins/emqttd_recon @@ -1 +1 @@ -Subproject commit cbbfc417f5c9505750a6e9a309c69d251ed71141 +Subproject commit 66443bf8dd956b66f7ee5079b0212a1fb3aeeb73 From ecc588dc6c01687018bb97f46744e7db3d494a9c Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 15:37:27 +0800 Subject: [PATCH 22/31] 0.11.1 --- plugins/emqttd_dashboard | 2 +- plugins/emqttd_plugin_mysql | 2 +- plugins/emqttd_plugin_pgsql | 2 +- plugins/emqttd_recon | 2 +- plugins/emqttd_sockjs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index 07a0b3c8f..8c8fab9bb 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit 07a0b3c8fab4a6e77f12552667617d8732bf86a7 +Subproject commit 8c8fab9bbb7a4de36ddf81dab7858f628efc5511 diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql index 01cb44bed..6323f8a54 160000 --- a/plugins/emqttd_plugin_mysql +++ b/plugins/emqttd_plugin_mysql @@ -1 +1 @@ -Subproject commit 01cb44bed2cec5a8d667d1342bf6f452c1bd335a +Subproject commit 6323f8a54c2c21c60c38d3065659c7c13a2afe26 diff --git a/plugins/emqttd_plugin_pgsql b/plugins/emqttd_plugin_pgsql index fd610be85..80f0b866d 160000 --- a/plugins/emqttd_plugin_pgsql +++ b/plugins/emqttd_plugin_pgsql @@ -1 +1 @@ -Subproject commit fd610be85d0466ddcac661e0733b621abfb15b91 +Subproject commit 80f0b866d99a02ba89de94ccdaa9ee1d687566ce diff --git a/plugins/emqttd_recon b/plugins/emqttd_recon index 66443bf8d..7f725bc34 160000 --- a/plugins/emqttd_recon +++ b/plugins/emqttd_recon @@ -1 +1 @@ -Subproject commit 66443bf8dd956b66f7ee5079b0212a1fb3aeeb73 +Subproject commit 7f725bc3438d4c25a1f10e90286095271bf7a0f9 diff --git a/plugins/emqttd_sockjs b/plugins/emqttd_sockjs index 3677170d3..6d5ba0dfe 160000 --- a/plugins/emqttd_sockjs +++ b/plugins/emqttd_sockjs @@ -1 +1 @@ -Subproject commit 3677170d316cbbba3f6d19632427eca17d14d64f +Subproject commit 6d5ba0dfe62d375da09f1d53823b8aa54046aa11 From a6770e3727d56fd663f204f20e4bd494d85c377b Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 1 Oct 2015 16:10:21 +0800 Subject: [PATCH 23/31] mnesia --- src/emqttd_cli.erl | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 9c72d8da4..6f78d4745 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -41,7 +41,7 @@ -export([status/1, broker/1, cluster/1, bridges/1, clients/1, sessions/1, plugins/1, listeners/1, - vm/1, trace/1]). + vm/1, mnesia/1, trace/1]). -define(PROC_INFOKEYS, [status, memory, @@ -333,11 +333,21 @@ vm(_) -> {"vm memory", "query memory of erlang vm"}, {"vm process", "query process of erlang vm"}, {"vm io", "queue io of erlang vm"}]). + +%%------------------------------------------------------------------------------ +%% @doc mnesia Command +%% @end +%%------------------------------------------------------------------------------ +mnesia([]) -> + mnesia:system_info(); + +mnesia(_) -> + ?PRINT_CMD("mnesia", "mnesia system info"). + %%------------------------------------------------------------------------------ %% @doc Trace Command %% @end %%------------------------------------------------------------------------------ - trace(["list"]) -> foreach(fun({{Who, Name}, LogFile}) -> ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) @@ -390,7 +400,7 @@ listeners([]) -> {shutdown_count, esockd:get_shutdown_count(Pid)}], ?PRINT("listener on ~s:~w~n", [Protocol, Port]), foreach(fun({Key, Val}) -> - ?PRINT(" ~-16s:~w~n", [Key, Val]) + ?PRINT(" ~-16s: ~w~n", [Key, Val]) end, Info) end, esockd:listeners()); From aec571a0b03d1c9d1d55ca30e199f3bc87e7fe91 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 3 Oct 2015 15:43:28 +0800 Subject: [PATCH 24/31] etop --- rel/files/emqttd_top | 117 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100755 rel/files/emqttd_top diff --git a/rel/files/emqttd_top b/rel/files/emqttd_top new file mode 100755 index 000000000..714542a2a --- /dev/null +++ b/rel/files/emqttd_top @@ -0,0 +1,117 @@ +#!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et + +# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. +if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then + POSIX_SHELL="true" + export POSIX_SHELL + # To support 'whoami' add /usr/ucb to path + PATH=/usr/ucb:$PATH + export PATH + exec /usr/bin/ksh $0 "$@" +fi +unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well + +RUNNER_SCRIPT_DIR={{runner_script_dir}} +RUNNER_SCRIPT=${0##*/} + +RUNNER_BASE_DIR={{runner_base_dir}} +RUNNER_ETC_DIR={{runner_etc_dir}} +RUNNER_LIB_DIR={{platform_lib_dir}} +RUNNER_USER={{runner_user}} + +WHOAMI=$(whoami) + +# Make sure this script is running as the appropriate user +if ([ "$RUNNER_USER" ] && [ "x$WHOAMI" != "x$RUNNER_USER" ]); then + type sudo > /dev/null 2>&1 + if [ $? -ne 0 ]; then + echo "sudo doesn't appear to be installed and your EUID isn't $RUNNER_USER" 1>&2 + exit 1 + fi + echo "Attempting to restart script through sudo -H -u $RUNNER_USER" >&2 + exec sudo -H -u $RUNNER_USER -i $RUNNER_SCRIPT_DIR/$RUNNER_SCRIPT $@ +fi + +# Make sure CWD is set to runner base dir +cd $RUNNER_BASE_DIR + +# Extract the target node name from node.args +NAME_ARG=`egrep "^ *-s?name" $RUNNER_ETC_DIR/vm.args` +if [ -z "$NAME_ARG" ]; then + echo "vm.args needs to have either -name or -sname parameter." + exit 1 +fi + +# Learn how to specify node name for connection from remote nodes +echo "$NAME_ARG" | grep '^-sname' > /dev/null 2>&1 +if [ "X$?" = "X0" ]; then + NAME_PARAM="-sname" + NAME_HOST="" +else + NAME_PARAM="-name" + echo "$NAME_ARG" | grep '@.*' > /dev/null 2>&1 + if [ "X$?" = "X0" ]; then + NAME_HOST=`echo "${NAME_ARG}" | sed -e 's/.*\(@.*\)$/\1/'` + else + NAME_HOST="" + fi +fi + +# Extract the target cookie +COOKIE_ARG=`grep '\-setcookie' $RUNNER_ETC_DIR/vm.args` +if [ -z "$COOKIE_ARG" ]; then + echo "vm.args needs to have a -setcookie parameter." + exit 1 +fi + +# Identify the script name +SCRIPT=`basename $0` + +# Parse out release and erts info +START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data` +ERTS_VSN=${START_ERL% *} +APP_VSN=${START_ERL#* } + +# Add ERTS bin dir to our path +ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin + +NODE_NAME=${NAME_ARG#* } + +# Setup command to control the node +NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" + +RES=`$NODETOOL ping` +if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 +fi + +case "$1" in + runtime) + SORTBY="runtime" + ;; + reductions) + SORTBY="reductions" + ;; + memory) + SORTBY="memory" + ;; + msg_q) + SORTBY="msg_q" + ;; + *) + echo "Usage: $SCRIPT {runtime | reductions | memory | msg_q}" + exit 1 + ;; +esac + +MYPID=$$ +ETOP_ARGS="-sort $SORTBY -interval 10 -lines 100 -tracing off" +$ERTS_PATH/erl -noshell -noinput \ + -pa $RUNNER_LIB_DIR/basho-patches \ + -hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \ + -s etop -s erlang halt -output text \ + -node $NODE_NAME $ETOP_ARGS + From 1fc79bd9a1acbbad40f7115869218909f1b230e8 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 3 Oct 2015 15:44:23 +0800 Subject: [PATCH 25/31] runtime_tools --- rel/reltool.config | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rel/reltool.config b/rel/reltool.config index 53ce02f86..f8cda1e7b 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -19,6 +19,7 @@ inets, goldrush, compiler, + runtime_tools, lager, {gen_logger, load}, gproc, @@ -54,6 +55,8 @@ {app, eldap, [{incl_cond, include}]}, {app, inets, [{incl_cond, include}]}, {app, compiler, [{incl_cond, include}]}, + {app, runtime_tools, [{incl_cond, include}]}, + {app, observer, [{incl_cond, include}]}, {app, goldrush, [{incl_cond, include}]}, {app, gen_logger, [{incl_cond, include}]}, {app, lager, [{incl_cond, include}]}, @@ -78,6 +81,7 @@ {template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"}, {template, "files/emqttd", "bin/emqttd"}, {template, "files/emqttd_ctl", "bin/emqttd_ctl"}, + {template, "files/emqttd_top", "bin/emqttd_top"}, {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, From 4e5b499aa799a1025e9d11f32a18b1fb27013020 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 3 Oct 2015 15:45:04 +0800 Subject: [PATCH 26/31] tab --- rel/files/emqttd_ctl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index 8a65bd6d3..4292b893e 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -1,4 +1,6 @@ #!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then From 273149f633560491bb24498b6072812424368800 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 11:23:24 +0800 Subject: [PATCH 27/31] keepalive tests --- test/emqttd_keepalive_tests.erl | 44 +++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 test/emqttd_keepalive_tests.erl diff --git a/test/emqttd_keepalive_tests.erl b/test/emqttd_keepalive_tests.erl new file mode 100644 index 000000000..96f84450a --- /dev/null +++ b/test/emqttd_keepalive_tests.erl @@ -0,0 +1,44 @@ + +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +-module(emqttd_keepalive_tests). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +keepalive_test() -> + KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), + ?assertEqual([resumed, timeout], lists:reverse(loop(KA, []))). + +loop(KA, Acc) -> + receive + {keepalive, timeout} -> + case emqttd_keepalive:check(KA) of + {ok, KA1} -> loop(KA1, [resumed | Acc]); + {error, timeout} -> [timeout | Acc] + end + after 4000 -> + Acc + end. + +-endif. From 78288e80886466ac2f96c6bfef06e47b747c2de1 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:43:58 +0800 Subject: [PATCH 28/31] improve keepalie --- src/emqttd_keepalive.erl | 75 ++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index f5c7f2ac7..e06382207 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -23,62 +23,61 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_keepalive). -author("Feng Lee "). --export([new/3, resume/1, cancel/1]). +-export([start/3, check/1, cancel/1]). --record(keepalive, {transport, - socket, - recv_oct, - timeout_sec, - timeout_msg, - timer_ref}). +-record(keepalive, {statfun, statval, + tsec, tmsg, tref, + repeat = 0}). %%------------------------------------------------------------------------------ -%% @doc Create a keepalive +%% @doc Start a keepalive %% @end %%------------------------------------------------------------------------------ -new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> - {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), - Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref}. +start(_, 0, _) -> + undefined; +start(StatFun, TimeoutSec, TimeoutMsg) -> + {ok, StatVal} = StatFun(), + #keepalive{statfun = StatFun, statval = StatVal, + tsec = TimeoutSec, tmsg = TimeoutMsg, + tref = timer(TimeoutSec, TimeoutMsg)}. %%------------------------------------------------------------------------------ -%% @doc Try to resume keepalive, called when timeout +%% @doc Check keepalive, called when timeout. %% @end %%------------------------------------------------------------------------------ -resume(KeepAlive = #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }) -> - {ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), - if - NewRecvOct =:= RecvOct -> - timeout; - true -> - %need? - cancel(Ref), - NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - {resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}} +check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> + case StatFun() of + {ok, NewVal} -> + if NewVal =/= LastVal -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; + Repeat < 1 -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; + true -> + {error, timeout} + end; + {error, Error} -> + {error, Error} end. +resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> + KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. + %%------------------------------------------------------------------------------ %% @doc Cancel Keepalive %% @end %%------------------------------------------------------------------------------ -cancel(#keepalive{timer_ref = Ref}) -> - cancel(Ref); +cancel(#keepalive{tref = TRef}) -> + cancel(TRef); cancel(undefined) -> - undefined; -cancel(Ref) -> - catch erlang:cancel_timer(Ref). + ok; +cancel(TRef) -> + catch erlang:cancel_timer(TRef). + +timer(Sec, Msg) -> + erlang:send_after(timer:seconds(Sec), self(), Msg). From 9f643ea267336eed71c84b49aa739d9e9a7dfbc7 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:44:14 +0800 Subject: [PATCH 29/31] 50 lines --- rel/files/emqttd_top | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/emqttd_top b/rel/files/emqttd_top index 714542a2a..24533c436 100755 --- a/rel/files/emqttd_top +++ b/rel/files/emqttd_top @@ -108,7 +108,7 @@ case "$1" in esac MYPID=$$ -ETOP_ARGS="-sort $SORTBY -interval 10 -lines 100 -tracing off" +ETOP_ARGS="-sort $SORTBY -interval 10 -lines 50 -tracing off" $ERTS_PATH/erl -noshell -noinput \ -pa $RUNNER_LIB_DIR/basho-patches \ -hidden $NAME_PARAM emqttd_top$MYPID$NAME_HOST $COOKIE_ARG \ From d5a400c308b17c21ff7361d38ba3a75bf238a5b8 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:48:50 +0800 Subject: [PATCH 30/31] fix issue #292 - async sub/unsub --- src/emqttd_client.erl | 63 +++++++++++++++++--------- src/emqttd_protocol.erl | 17 +++---- src/emqttd_session.erl | 96 +++++++++++++++++++++------------------- src/emqttd_ws_client.erl | 77 +++++++++++++++++++++++++------- 4 files changed, 159 insertions(+), 94 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index da38d9d8d..7eb4be8f4 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,10 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, session/1, info/1, kick/1, subscribe/2]). +-export([start_link/2, session/1, info/1, kick/1]). + +%% SUB/UNSUB Asynchronously +-export([subscribe/2, unsubscribe/2]). -behaviour(gen_server). @@ -59,7 +62,7 @@ start_link(SockArgs, MqttEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. session(CPid) -> - gen_server:call(CPid, session). + gen_server:call(CPid, session, infinity). info(CPid) -> gen_server:call(CPid, info, infinity). @@ -70,6 +73,9 @@ kick(CPid) -> subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). +unsubscribe(CPid, Topics) -> + gen_server:cast(CPid, {unsubscribe, Topics}). + init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), @@ -107,9 +113,11 @@ handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. -handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - noreply(State#state{proto_state = ProtoState1}); +handle_cast({subscribe, TopicTable}, State) -> + with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + +handle_cast({unsubscribe, Topics}, State) -> + with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast(Msg, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), @@ -149,17 +157,26 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), - KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), + StatFun = fun() -> + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), noreply(State#state{keepalive = KeepAlive}); -handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> +handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) -> + case emqttd_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), + noreply(State#state{keepalive = KeepAlive1}); + {error, timeout} -> lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); - {resumed, KeepAlive1} -> - lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), - noreply(State#state{keepalive = KeepAlive1}) + {error, Error} -> + lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), + stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; handle_info(Info, State = #state{peername = Peername}) -> @@ -188,12 +205,20 @@ terminate(Reason, #state{peername = Peername, code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + noreply(State) -> {noreply, State, hibernate}. - -%------------------------------------------------------- -% receive and parse tcp data -%------------------------------------------------------- + +stop(Reason, State) -> + {stop, Reason, State}. + +with_session(Fun, State = #state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + +%% receive and parse tcp data received(<<>>, State) -> {noreply, State, hibernate}; @@ -244,12 +269,8 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -stop(Reason, State) -> - {stop, Reason, State}. - received_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/received'), - inc(Type). + emqttd_metrics:inc('packets/received'), inc(Type). inc(?CONNECT) -> emqttd_metrics:inc('packets/connect'); inc(?PUBLISH) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index dcd120035..840339819 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -239,16 +239,11 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = case lists:member(deny, AllowDenies) of true -> %%TODO: return 128 QoS when deny... no need to SUBACK? - lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), - {ok, State}; + lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]); false -> - %%TODO: GrantedQos should be renamed. - {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), - send(?SUBACK_PACKET(PacketId, GrantedQos), State) - end; - -handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> - {ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end, + emqttd_session:subscribe(Session, TopicTable, Callback) + end, {ok, State}; %% protect from empty topic list @@ -256,7 +251,7 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - ok = emqttd_session:unsubscribe(Session, Topics), + emqttd_session:unsubscribe(Session, Topics), send(?UNSUBACK_PACKET(PacketId), State); handle(?PACKET(?PINGREQ), State) -> @@ -349,7 +344,7 @@ send_willmsg(ClientId, WillMsg) -> start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> - self() ! {keepalive, start, round(Sec * 1.5)}. + self() ! {keepalive, start, round(Sec * 1.2)}. %%---------------------------------------------------------------------------- %% Validate Packets diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 1d3caa98f..a23f76f7d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -59,7 +59,7 @@ %% PubSub APIs -export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2, - subscribe/2, unsubscribe/2]). + subscribe/2, subscribe/3, unsubscribe/2]). -behaviour(gen_server2). @@ -166,9 +166,13 @@ destroy(SessPid, ClientId) -> %% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ --spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. +-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok. subscribe(SessPid, TopicTable) -> - gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT). + subscribe(SessPid, TopicTable, fun(_) -> ok end). + +-spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok. +subscribe(SessPid, TopicTable, Callback) -> + gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}). %%------------------------------------------------------------------------------ %% @doc Publish message @@ -213,7 +217,7 @@ pubcomp(SessPid, PktId) -> %%------------------------------------------------------------------------------ -spec unsubscribe(pid(), [binary()]) -> ok. unsubscribe(SessPid, Topics) -> - gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT). + gen_server2:cast(SessPid, {unsubscribe, Topics}). %%%============================================================================= %%% gen_server callbacks @@ -247,26 +251,24 @@ init([CleanSess, ClientId, ClientPid]) -> {ok, start_collector(Session#session{client_mon = MRef}), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> - case Msg of - {unsubscribe, _} -> 2; - {subscribe, _} -> 1; - _ -> 0 - end. + case Msg of _ -> 0 end. prioritise_cast(Msg, _Len, _State) -> case Msg of - {destroy, _} -> 10; - {resume, _, _} -> 9; - {pubrel, _PktId} -> 8; - {pubcomp, _PktId} -> 8; - {pubrec, _PktId} -> 8; - {puback, _PktId} -> 7; - _ -> 0 + {destroy, _} -> 10; + {resume, _, _} -> 9; + {pubrel, _PktId} -> 8; + {pubcomp, _PktId} -> 8; + {pubrec, _PktId} -> 8; + {puback, _PktId} -> 7; + {unsubscribe, _, _} -> 6; + {subscribe, _, _} -> 5; + _ -> 0 end. prioritise_info(Msg, _Len, _State) -> case Msg of - {'DOWN', _, process, _, _} -> 10; + {'DOWN', _, _, _, _} -> 10; {'EXIT', _, _} -> 10; session_expired -> 10; {timeout, _, _} -> 5; @@ -275,17 +277,40 @@ prioritise_info(Msg, _Len, _State) -> _ -> 0 end. -handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, + Session = #session{client_id = ClientId, + awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> + case check_awaiting_rel(Session) of + true -> + TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), + AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), + {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; + false -> + lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " + "for too many awaiting_rel: ~p", [ClientId, Msg]), + {reply, {error, dropped}, Session} + end; - case TopicTable0 -- Subscriptions of +handle_call(Req, _From, State) -> + lager:critical("Unexpected Request: ~p", [Req]), + {reply, ok, State}. + +handle_cast({subscribe, TopicTable0, Callback}, Session = #session{ + client_id = ClientId, subscriptions = Subscriptions}) -> + + TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), + + case TopicTable -- Subscriptions of [] -> - {reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session}; + catch Callback([Qos || {_, Qos} <- TopicTable]), + noreply(Session); _ -> - TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), + catch Callback(GrantedQos), + emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", @@ -310,11 +335,11 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie [{Topic, Qos} | Acc] end end, Subscriptions, TopicTable), - {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}} + noreply(Session#session{subscriptions = Subscriptions1}) end; -handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), @@ -333,26 +358,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client end end, Subscriptions, Topics), - {reply, ok, Session#session{subscriptions = Subscriptions1}}; - -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, - Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> - case check_awaiting_rel(Session) of - true -> - TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), - AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), - {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; - false -> - lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " - "for too many awaiting_rel: ~p", [ClientId, Msg]), - {reply, {error, dropped}, Session} - end; - -handle_call(Req, _From, State) -> - lager:critical("Unexpected Request: ~p", [Req]), - {reply, ok, State}. + noreply(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 6b4a001b2..827d7de18 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -34,7 +34,10 @@ -include("emqttd_protocol.hrl"). %% API Exports --export([start_link/1, ws_loop/3, subscribe/2]). +-export([start_link/1, ws_loop/3, session/1, info/1, kick/1]). + +%% SUB/UNSUB Asynchronously +-export([subscribe/2, unsubscribe/2]). -behaviour(gen_server). @@ -61,9 +64,21 @@ start_link(Req) -> packet_opts = PktOpts, parser = emqttd_parser:new(PktOpts)}). +session(CPid) -> + gen_server:call(CPid, session, infinity). + +info(CPid) -> + gen_server:call(CPid, info, infinity). + +kick(CPid) -> + gen_server:call(CPid, kick). + subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). +unsubscribe(CPid, Topics) -> + gen_server:cast(CPid, {unsubscribe, Topics}). + %%------------------------------------------------------------------------------ %% @private %% @doc Start WebSocket client. @@ -112,17 +127,30 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]), {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. +handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> + {reply, emqttd_protocol:session(ProtoState), State}; + +handle_call(info, _From, State = #client_state{request = Req, + proto_state = ProtoState}) -> + {reply, [{websocket, true}, {peer, Req:get(peer)} + | emqttd_protocol:info(ProtoState)], State}; + +handle_call(kick, _From, State) -> + {stop, {shutdown, kick}, ok, State}; + handle_call(_Req, _From, State) -> {reply, error, State}. -handle_cast({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}, hibernate}; +handle_cast({subscribe, TopicTable}, State) -> + with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + +handle_cast({unsubscribe, Topics}, State) -> + with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p", [Error]), stop({shutdown, Error}, State); @@ -137,11 +165,11 @@ handle_cast(_Msg, State) -> handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; + noreply(State#client_state{proto_state = ProtoState1}); handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) -> lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), @@ -149,18 +177,27 @@ handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = P handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), - KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)}, - TimeoutSec, {keepalive, timeout}), - {noreply, State#client_state{keepalive = KeepAlive}}; + Socket = Req:get(socket), + StatFun = fun() -> + case esockd_transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), + noreply(State#client_state{keepalive = KeepAlive}); -handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> +handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> + case emqttd_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), + noreply(State#client_state{keepalive = KeepAlive1}); + {error, timeout} -> lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); - {resumed, KeepAlive1} -> - lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), - {noreply, State#client_state{keepalive = KeepAlive1}} + {error, Error} -> + lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), + stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) end; handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> @@ -170,7 +207,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto handle_info(Info, State = #client_state{request = Req}) -> lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), - {noreply, State}. + noreply(State). terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> lager:info("WebSocket client terminated: ~p", [Reason]), @@ -189,6 +226,12 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +noreply(State) -> + {noreply, State, hibernate}. + stop(Reason, State ) -> {stop, Reason, State}. +with_session(Fun, State = #client_state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + From b270399ec759138c694b30d081e53648e628234e Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 7 Oct 2015 22:54:25 +0800 Subject: [PATCH 31/31] production config as default --- rel/files/emqttd.config.production | 4 ++-- rel/reltool.config | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index ff0df40c9..f34e2f9d6 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -16,7 +16,7 @@ {error_logger_redirect, false}, {crash_log, "log/emqttd_crash.log"}, {handlers, [ - {lager_console_backend, info}, + %%{lager_console_backend, info}, {lager_file_backend, [ {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, {file, "log/emqttd_error.log"}, @@ -95,7 +95,7 @@ {max_awaiting_rel, 0}, %% Statistics Collection Interval(seconds) - {collect_interval, 20}, + {collect_interval, 0}, %% Expired after 2 days {expired_after, 48} diff --git a/rel/reltool.config b/rel/reltool.config index f8cda1e7b..33d985545 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -87,8 +87,8 @@ {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, - {template, "files/emqttd.config.development", "etc/emqttd.config"}, - {template, "files/emqttd.config.production", "etc/emqttd.config.production"}, + {template, "files/emqttd.config.production", "etc/emqttd.config"}, + {template, "files/emqttd.config.development", "etc/emqttd.config.development"}, {template, "files/acl.config", "etc/acl.config"}, {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"},