diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 446bff8a9..7a9449315 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_app). -author("Feng Lee "). @@ -49,6 +50,7 @@ start(_StartType, _StartArgs) -> print_banner(), emqttd_mnesia:start(), + emqttd_ctl:init(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 789313e3a..8f7331f35 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -29,6 +29,10 @@ -author('feng@emqtt.io'). -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +%% CLI callbacks +-export([cli_useradd/1, cli_userdel/1]). -behaviour(emqttd_auth_mod). @@ -42,6 +46,22 @@ -record(?AUTH_USERNAME_TAB, {username, password}). +%%%============================================================================= +%%% CLI +%%%============================================================================= + +cli_useradd([Username, Password]) -> + ?PRINT("~p~n", [add_user(list_to_binary(Username), list_to_binary(Password))]); + +cli_useradd(_) -> + ?PRINT_CMD("useradd ", "#add user"). + +cli_userdel([Username]) -> + ?PRINT("~p~n", [remove_user(list_to_binary(Username))]); + +cli_userdel(_) -> + ?PRINT_CMD("userdel ", "#delete user"). + %%%============================================================================= %%% API %%%============================================================================= @@ -67,6 +87,8 @@ init(Opts) -> {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), + emqttd_ctl:register_cmd(useradd, {?MODULE, cli_useradd}, []), + emqttd_ctl:register_cmd(userdel, {?MODULE, cli_userdel}, []), {ok, Opts}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 2b1210221..fe77df056 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -28,7 +28,10 @@ -author("Feng Lee "). --include_lib("emqttd.hrl"). +-include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +-export([cli/1]). %% API Function Exports -export([start_link/0]). @@ -68,6 +71,16 @@ sysdescr % Broker description ]). +%%%============================================================================= +%%% CLI callback +%%%============================================================================= +cli([]) -> + Funs = [sysdescr, version, uptime, datetime], + [?PRINT("~-20s~s~n", [Fun, ?MODULE:Fun()]) || Fun <- Funs]; + +cli(_) -> + ?PRINT_CMD("broker", "#query broker version, uptime and description"). + %%%============================================================================= %%% API %%%============================================================================= @@ -223,6 +236,8 @@ init([]) -> % Create $SYS Topics emqttd_pubsub:create(<<"$SYS/brokers">>), [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], + %% CLI + emqttd_ctl:register_cmd(broker, {?MODULE, cli}, []), % Tick {ok, #state{started_at = os:timestamp(), heartbeat = start_tick(1000, heartbeat), @@ -279,7 +294,9 @@ handle_info(_Info, State) -> terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> stop_tick(Hb), - stop_tick(TRef). + stop_tick(TRef), + emqttd_ctl:unregister_cmd(broker), + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 48863c084..036dbffa2 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -24,32 +24,86 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_ctl). -author("Feng Lee "). -include("emqttd.hrl"). --define(PRINT_MSG(Msg), - io:format(Msg)). +-include("emqttd_cli.hrl"). --define(PRINT(Format, Args), - io:format(Format, Args)). +-export([init/0, + register_cmd/3, + unregister_cmd/1, + run/1]). -export([status/1, vm/1, - broker/1, stats/1, metrics/1, cluster/1, clients/1, sessions/1, listeners/1, - bridges/1, - plugins/1, - trace/1, - useradd/1, - userdel/1]). + bridges/1]). + +-define(CMD_TAB, mqttd_ctl_cmd). + +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Init cmd table. +%% @end +%%------------------------------------------------------------------------------ +init() -> + ets:new(?CMD_TAB, [ordered_set, named_table, public]), + register_cmd(status, {?MODULE, status}, []), + register_cmd(vm, {?MODULE, vm}, []), + register_cmd(cluster, {?MODULE, cluster}, []). + +%%------------------------------------------------------------------------------ +%% @doc Register a command +%% @end +%%------------------------------------------------------------------------------ +-spec register_cmd(atom(), {module(), atom()}, list()) -> true. +register_cmd(Cmd, MF, Opts) -> + ets:insert(?CMD_TAB, {Cmd, MF, Opts}). + +%%------------------------------------------------------------------------------ +%% @doc Unregister a command +%% @end +%%------------------------------------------------------------------------------ +-spec unregister_cmd(atom()) -> true. +unregister_cmd(Cmd) -> + ets:delete(?CMD_TAB, Cmd). + +%%------------------------------------------------------------------------------ +%% @doc Run a command +%% @end +%%------------------------------------------------------------------------------ + +run([]) -> usage(); + +run([CmdS|Args]) -> + case ets:lookup(?CMD_TAB, list_to_atom(CmdS)) of + [{_, {Mod, Fun}, _}] -> Mod:Fun(Args); + [] -> usage() + end. + +%%------------------------------------------------------------------------------ +%% @doc Usage +%% @end +%%------------------------------------------------------------------------------ +usage() -> + ?PRINT("Usage: ~s~n", [?MODULE]), + [Mod:Cmd(["help"]) || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)]. + +%%%============================================================================= +%%% Commands +%%%============================================================================= %%------------------------------------------------------------------------------ %% @doc Query node status @@ -63,7 +117,36 @@ status([]) -> ?PRINT_MSG("emqttd is not running~n"); {value,_Version} -> ?PRINT_MSG("emqttd is running~n") - end. + end; +status(_) -> + ?PRINT_CMD("status", "#query broker status"). + +vm([]) -> + vm(["all"]); + +vm(["all"]) -> + [begin vm([Name]), ?PRINT_MSG("~n") end || Name <- ["load", "memory", "process", "io"]]; + +vm(["load"]) -> + [?PRINT("cpu/~-20s~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; + +vm(["memory"]) -> + [?PRINT("memory/~-17s~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; + +vm(["process"]) -> + lists:foreach(fun({Name, Key}) -> + ?PRINT("process/~-16s~w~n", [Name, erlang:system_info(Key)]) + end, [{limit, process_limit}, {count, process_count}]); + +vm(["io"]) -> + ?PRINT("io/~-21s~w~n", [max_fds, proplists:get_value(max_fds, erlang:system_info(check_io))]); + +vm(_) -> + ?PRINT_CMD("vm all", "#query info of erlang vm"), + ?PRINT_CMD("vm load", "#query load of erlang vm"), + ?PRINT_CMD("vm memory", "#query memory of erlang vm"), + ?PRINT_CMD("vm process", "#query process of erlang vm"), + ?PRINT_CMD("vm io", "#queue io of erlang vm"). %%------------------------------------------------------------------------------ %% @doc Cluster with other node @@ -93,45 +176,11 @@ cluster([SNode]) -> end; pang -> ?PRINT("failed to connect to ~p~n", [Node]) - end. + end; -%%------------------------------------------------------------------------------ -%% @doc Add user -%% @end -%%------------------------------------------------------------------------------ -useradd([Username, Password]) -> - ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]). +cluster(_) -> + ?PRINT_CMD("cluster []", "#cluster with node, query cluster info "). -%%------------------------------------------------------------------------------ -%% @doc Delete user -%% @end -%%------------------------------------------------------------------------------ -userdel([Username]) -> - ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]). - -vm([]) -> - [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; - -vm(["load"]) -> - ?PRINT_MSG("Load: ~n"), - [?PRINT(" ~s:~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; - -vm(["memory"]) -> - ?PRINT_MSG("Memory: ~n"), - [?PRINT(" ~s:~p~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; - -vm(["process"]) -> - ?PRINT_MSG("Process: ~n"), - ?PRINT(" process_limit:~p~n", [erlang:system_info(process_limit)]), - ?PRINT(" process_count:~p~n", [erlang:system_info(process_count)]); - -vm(["io"]) -> - ?PRINT_MSG("IO: ~n"), - ?PRINT(" max_fds:~p~n", [proplists:get_value(max_fds, erlang:system_info(check_io))]). - -broker([]) -> - Funs = [sysdescr, version, uptime, datetime], - [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]. stats([]) -> [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()]. @@ -216,50 +265,6 @@ bridges(["stop", SNode, Topic]) -> {error, Error} -> ?PRINT("error: ~p~n", [Error]) end. -plugins(["list"]) -> - lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list()); - -plugins(["load", Name]) -> - case emqttd_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) - end; - -plugins(["unload", Name]) -> - case emqttd_plugins:unload(list_to_atom(Name)) of - ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason]) - end. - -trace(["list"]) -> - lists:foreach(fun({{Who, Name}, LogFile}) -> - ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) - end, emqttd_trace:all_traces()); - -trace(["client", ClientId, "off"]) -> - stop_trace(client, ClientId); -trace(["client", ClientId, LogFile]) -> - start_trace(client, ClientId, LogFile); -trace(["topic", Topic, "off"]) -> - stop_trace(topic, Topic); -trace(["topic", Topic, LogFile]) -> - start_trace(topic, Topic, LogFile). - -start_trace(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of - ok -> - ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) - end. -stop_trace(Who, Name) -> - case emqttd_trace:stop_trace({Who, bin(Name)}) of - ok -> - ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) - end. - node_name(SNode) -> SNode1 = @@ -333,11 +338,8 @@ print(session, {{ClientId, _ClientPid}, SessInfo}) -> "message_queue=~w, message_dropped=~w, " "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " "created_at=~w, subscriptions=~s)~n", - [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]); + [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). -print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", - [Name, Ver, Descr, Active]). format(created_at, Val) -> emqttd_util:now_to_secs(Val); diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index b6752df29..c712cc019 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -24,18 +24,21 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_metrics). -author("Feng Lee "). -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0]). +-export([start_link/0, cli/1]). -export([all/0, value/1, inc/1, inc/2, inc/3, @@ -93,6 +96,17 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%%------------------------------------------------------------------------------ +%% @doc CLI command callback +%% @end +%%------------------------------------------------------------------------------ + +cli([]) -> + [?PRINT("~-32s ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqttd_metrics:all())]; + +cli(_) -> + ?PRINT_CMD("metrics", "#query broker metrics"). + %%------------------------------------------------------------------------------ %% @doc Get all metrics %% @end @@ -193,6 +207,8 @@ init([]) -> [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics], + %% Register CLI commands + emqttd_ctl:register_cmd(metrics, {?MODULE, cli}, []), % Tick to publish metrics {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 8e23f41e1..ba1c1b408 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -30,6 +30,9 @@ -author("Feng Lee "). -include("emqttd.hrl"). +-include("emqttd_cli.hrl"). + +-export([cli/1]). -export([load/0, unload/0]). @@ -37,6 +40,36 @@ -export([list/0]). +%%------------------------------------------------------------------------------ +%% CLI callback +%%------------------------------------------------------------------------------ + +cli(["list"]) -> + lists:foreach(fun(Plugin) -> print(Plugin) end, list()); + +cli(["load", Name]) -> + case load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) + end; + +cli(["unload", Name]) -> + case emqttd_plugins:unload(list_to_atom(Name)) of + ok -> + ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); + {error, Reason} -> + ?PRINT("unload plugin error: ~p~n", [Reason]) + end; + +cli(_) -> + ?PRINT_CMD("plugins list", "#query loaded plugins"), + ?PRINT_CMD("plugins load ", "#load plugin"), + ?PRINT_CMD("plugins unload ", "#unload plugin"). + +print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", + [Name, Ver, Descr, Active]). + %%------------------------------------------------------------------------------ %% @doc Load all plugins when the broker started. %% @end @@ -44,6 +77,7 @@ -spec load() -> list() | {error, any()}. load() -> + emqttd_ctl:register_cmd(plugins, {?MODULE, cli}, []), case env(loaded_file) of {ok, File} -> with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 82cd4d8a7..c79bdcbae 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -24,17 +24,23 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_trace). -author("Feng Lee "). --behaviour(gen_server). +-include("emqttd_cli.hrl"). + +%% CLI +-export([cli/1]). %% API Function Exports -export([start_link/0]). -export([start_trace/2, stop_trace/1, all_traces/0]). +-behaviour(gen_server). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -45,6 +51,48 @@ -define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). + +%%%============================================================================= +%%% CLI +%%%============================================================================= + +cli(["list"]) -> + lists:foreach(fun({{Who, Name}, LogFile}) -> + ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) + end, all_traces()); + +cli(["client", ClientId, "off"]) -> + cli(trace_off, client, ClientId); +cli(["client", ClientId, LogFile]) -> + cli(trace_on, client, ClientId, LogFile); +cli(["topic", Topic, "off"]) -> + cli(trace_off, topic, Topic); +cli(["topic", Topic, LogFile]) -> + cli(trace_on, topic, Topic, LogFile); + +cli(_) -> + ?PRINT_CMD("trace list", "#query all traces"), + ?PRINT_CMD("trace client ","#trace client with ClientId"), + ?PRINT_CMD("trace client off", "#stop to trace client"), + ?PRINT_CMD("trace topic ", "#trace topic with Topic"), + ?PRINT_CMD("trace topic off", "#stop to trace Topic"). + +cli(trace_on, Who, Name, LogFile) -> + case start_trace({Who, list_to_binary(Name)}, LogFile) of + ok -> + ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. + +cli(trace_off, Who, Name) -> + case stop_trace({Who, list_to_binary(Name)}) of + ok -> + ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + %%%============================================================================= %%% API %%%============================================================================= @@ -85,6 +133,7 @@ all_traces() -> gen_server:call(?MODULE, all_traces). init([]) -> + emqttd_ctl:register_cmd(trace, {?MODULE, cli}, []), {ok, #state{level = info, trace_map = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) -> @@ -122,6 +171,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> + emqttd_ctl:unregister_cmd(trace), ok. code_change(_OldVsn, State, _Extra) ->