From 2d6104fbd90809c9074fb707454b58ffe74cee0d Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 11 May 2017 16:02:38 +0800 Subject: [PATCH] Ctl use clique --- Makefile | 4 +- priv/{emq.schema => emqttd.schema} | 0 src/emqttd_app.erl | 1 + src/emqttd_cli2.erl | 891 +++++++++++++++++++++++++++++ 4 files changed, 894 insertions(+), 2 deletions(-) rename priv/{emq.schema => emqttd.schema} (100%) create mode 100644 src/emqttd_cli2.erl diff --git a/Makefile b/Makefile index abd416178..5aa16fe04 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.2 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt +DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt clique dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc @@ -13,7 +13,7 @@ dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master - +dep_clique = git https://github.com/turtleDeng/clique ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish diff --git a/priv/emq.schema b/priv/emqttd.schema similarity index 100% rename from priv/emq.schema rename to priv/emqttd.schema diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 992e22da6..3b8a9c85b 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -43,6 +43,7 @@ start(_Type, _Args) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), + emqttd_cli2:register_cli(), register_acl_mod(), emqttd_plugins:init(), emqttd_plugins:load(), diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl new file mode 100644 index 000000000..3f9bdeba2 --- /dev/null +++ b/src/emqttd_cli2.erl @@ -0,0 +1,891 @@ +-module (emqttd_cli2). + +-export([register_cli/0]). + +-include("emqttd.hrl"). + +-include("emqttd_cli.hrl"). + +-include("emqttd_protocol.hrl"). + +-export([run/1]). + +-behaviour(clique_handler). + +-import(proplists, [get_value/2]). + +-define(APP, emqttd). + +-define(PROC_INFOKEYS, [status, + memory, + message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions]). + +register_cli() -> + F = fun() -> emqttd_mnesia:running_nodes() end, + clique:register_node_finder(F), + register_usage(), + register_cmd(). + +run(Cmd) -> + clique:run(Cmd). + +register_usage() -> + clique:register_usage(["broker"], broker_usage()), + clique:register_usage(["cluster"], cluster_usage()), + clique:register_usage(["acl"], acl_usage()), + clique:register_usage(["clients"], clients_usage()), + clique:register_usage(["sessions"], sessions_usage()), + clique:register_usage(["routes"], routes_usage()), + clique:register_usage(["topics"], topics_usage()), + clique:register_usage(["subscriptions"], subscriptions_usage()), + clique:register_usage(["plugins"], plugins_usage()), + clique:register_usage(["bridges"], bridges_usage()), + clique:register_usage(["vm"], vm_usage()), + clique:register_usage(["trace"], trace_usage()), + clique:register_usage(["status"], status_usage()), + clique:register_usage(["listeners"], listeners_usage()), + clique:register_usage(["mnesia"], mnesia_usage()). + +register_cmd() -> + node_status(), + broker_status(), + broker_stats(), + broker_metrics(), + broker_pubsub(), + cluster_join(), + cluster_leave(), + cluster_remove(), + acl_reload(), + clients_list(), + clients_show(), + clients_kick(), + sessions_list(), + sessions_list_persistent(), + sessions_list_transient(), + sessions_query(), + routes_list(), + routes_query(), + topics_list(), + topics_query(), + subscriptions_list(), + subscriptions_query(), + subscriptions_subscribe(), + subscriptions_del(), + subscriptions_unsubscribe(), + plugins_list(), + plugins_load(), + plugins_unload(), + bridges_list(), + bridges_start(), + bridges_stop(), + vm_all(), + vm_load(), + vm_memory(), + vm_process(), + vm_io(), + vm_ports(), + mnesia_info(), + trace_list(), + trace_on(), + trace_off(), + listeners(). + +node_status() -> + Cmd = ["status"], + Callback = + fun (_, _, _) -> + {Status, Vsn} = case lists:keysearch(?APP, 1, application:which_applications()) of + false -> + {"not running", undefined}; + {value, {?APP, _Desc, Vsn0}} -> + {"running", Vsn0} + end, + [clique_status:table([[{node, node()}, {status, Status}, {version, Vsn}]])] + end, + clique:register_command(Cmd, [], [], Callback). + +%%-------------------------------------------------------------------- +%% @doc Query broker + +broker_status() -> + Cmd = ["broker", "info"], + Callback = + fun (_, _, _) -> + Funs = [sysdescr, version, uptime, datetime], + Table = lists:map(fun(Fun) -> + {Fun, emqttd_broker:Fun()} + end, Funs), + [clique_status:table([Table])] + end, + clique:register_command(Cmd, [], [], Callback). + +broker_stats() -> + Cmd = ["broker", "stats"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Key, Val}) -> + io_lib:format("~-20s: ~w~n", [Key, Val]) + end, emqttd_stats:getstats()), + [clique_status:list(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +broker_metrics() -> + Cmd = ["broker", "metrics"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Key, Val}) -> + io_lib:format("~-24s: ~w~n", [Key, Val]) + end, lists:sort(emqttd_metrics:all())), + [clique_status:list(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +broker_pubsub() -> + Cmd = ["broker", "pubsub"], + Callback = + fun (_, _, _) -> + Pubsubs = supervisor:which_children(emqttd_pubsub_sup:pubsub_pool()), + Table = lists:map( + fun({{_, Id}, Pid, _, _}) -> + ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), + [{id, Id}] ++ ProcInfo + end, lists:reverse(Pubsubs)), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + + +%%-------------------------------------------------------------------- +%% @doc Cluster with other nodes + +cluster_join() -> + Cmd = ["cluster", "join"], + KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Node}], _) -> + Text = case emqttd_cluster:join(Node) of + ok -> + ["Join the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to join the cluster: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +cluster_leave() -> + Cmd = ["cluster", "leave"], + Callback = + fun(_, _, _) -> + Text = case emqttd_cluster:leave() of + ok -> + ["Leave the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to leave the cluster: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, [], [], Callback). + +cluster_remove() -> + Cmd = ["cluster", "remove"], + KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Node}], _) -> + Text = case emqttd_cluster:remove(Node) of + ok -> + ["Remove the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to remove the cluster: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +cluster(["status"]) -> + io_lib:format("Cluster status: ~p~n", [emqttd_cluster:status()]). + +%%-------------------------------------------------------------------- +%% @doc acl + +acl_reload() -> + Cmd = ["acl", "reload"], + Callback = + fun (_, _, _) -> + emqttd_access_control:reload_acl(), + [clique_status:text("")] + end, + clique:register_command(Cmd, [], [], Callback). + +%%-------------------------------------------------------------------- +%% @doc Query clients + +clients_list() -> + Cmd = ["clients", "list"], + Callback = + fun (_, _, _) -> + [clique_status:table(dump(mqtt_client))] + end, + clique:register_command(Cmd, [], [], Callback). + +clients_show() -> + Cmd = ["clients", "query"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + [clique_status:table(if_client(ClientId, fun print/1))] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +clients_kick() -> + Cmd = ["clients", "kick"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + Result = if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end), + case Result of + [ok] -> [clique_status:text(io_lib:format("Kick client_id: ~p successfully~n", [ClientId]))]; + _ -> [clique_status:text("")] + end + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +if_client(ClientId, Fun) -> + case emqttd_cm:lookup(ClientId) of + undefined -> ?PRINT_MSG("Not Found.~n"), []; + Client -> [Fun(Client)] + end. + +%%-------------------------------------------------------------------- +%% @doc Sessions Command + +sessions_list() -> + Cmd = ["sessions", "list"], + Callback = + fun (_, _, _) -> + [clique_status:table(dump(mqtt_local_session))] + end, + clique:register_command(Cmd, [], [], Callback). + +%% performance issue? + +sessions_list_persistent() -> + Cmd = ["sessions", "list", "persistent"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'})), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +%% performance issue? + +sessions_list_transient() -> + Cmd = ["sessions", "list", "transient"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'})), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +sessions_query() -> + Cmd = ["sessions", "query"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + + case ets:lookup(mqtt_local_session, ClientId) of + [] -> + ?PRINT_MSG("Not Found.~n"), + [clique_status:table([])]; + [SessInfo] -> + [clique_status:table([print(SessInfo)])] + end + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Routes Command + +routes_list() -> + Cmd = ["routes", "list"], + Callback = + fun (_, _, _) -> + Table = lists:flatten(lists:map(fun print/1, emqttd_router:dump())), + [clique_status:table([Table])] + end, + clique:register_command(Cmd, [], [], Callback). + +routes_query() -> + Cmd = ["routes", "query"], + KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Topic}], _) -> + [clique_status:table([print(mnesia:dirty_read(mqtt_route, Topic))])] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Topics Command + +topics_list() -> + Cmd = ["topics", "list"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun(Topic) -> [{topic, Topic}] end, emqttd:topics()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +topics_query() -> + Cmd = ["topics", "query"], + KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Topic}], _) -> + Table = print(mnesia:dirty_read(mqtt_route, Topic)), + [clique_status:table([Table])] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Subscriptions Command +subscriptions_list() -> + Cmd = ["subscriptions", "list"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun(Subscription) -> + print(subscription, Subscription) + end, ets:tab2list(mqtt_subscription)), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +subscriptions_query() -> + Cmd = ["subscriptions", "query"], + KeySpecs = [{'client_id', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + case ets:lookup(mqtt_subscription, ClientId) of + [] -> + ?PRINT_MSG("Not Found.~n"), + [clique_status:table([])]; + Records -> + Table = [print(subscription, Subscription) || Subscription <- Records], + [clique_status:table(Table)] + end + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +subscriptions_subscribe() -> + Cmd = ["subscriptions", "subscribe"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, + {'qos', [{typecast, fun(QoS) -> list_to_integer(QoS) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}, {_, Topic}, {_, QoS}], _) -> + Text = case emqttd:subscribe(Topic, ClientId, [{qos, QoS}]) of + ok -> + io_lib:format("Client_id: ~p subscribe topic: ~p qos: ~p successfully~n", [ClientId, Topic, QoS]); + {error, already_existed} -> + io_lib:format("Error: client_id: ~p subscribe topic: ~p already existed~n", [ClientId, Topic]); + {error, Reason} -> + io_lib:format("Error: ~p~n", [Reason]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +subscriptions_del() -> + Cmd = ["subscriptions", "del"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + emqttd:subscriber_down(ClientId), + Text = io_lib:format("Client_id del subscriptions:~p successfully~n", [ClientId]), + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +subscriptions_unsubscribe() -> + Cmd = ["subscriptions", "unsubscribe"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}, {_, Topic}], _) -> + emqttd:unsubscribe(Topic, ClientId), + Text = io_lib:format("Client_id: ~p unsubscribe topic: ~p successfully~n", [ClientId, Topic]), + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Plugins Command +plugins_list() -> + Cmd = ["plugins", "list"], + Callback = + fun (_, _, _) -> + Text = lists:map(fun(Plugin) -> print(Plugin) end, emqttd_plugins:list()), + [clique_status:table(Text)] + end, + clique:register_command(Cmd, [], [], Callback). + +plugins_load() -> + Cmd = ["plugins", "load"], + KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, PluginName}], _) -> + Text = case emqttd_plugins:load(PluginName) of + {ok, StartedApps} -> + io_lib:format("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, PluginName]); + {error, Reason} -> + io_lib:format("load plugin error: ~p~n", [Reason]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +plugins_unload() -> + Cmd = ["plugins", "unload"], + KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, PluginName}], _) -> + Text = case emqttd_plugins:unload(PluginName) of + ok -> + io_lib:format("Plugin ~s unloaded successfully.~n", [PluginName]); + {error, Reason} -> + io_lib:format("unload plugin error: ~p~n", [Reason]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + + +%%-------------------------------------------------------------------- +%% @doc Bridges command + +bridges_list() -> + Cmd = ["bridges", "list"], + Callback = + fun (_, _, _) -> + Text = lists:map( + fun({Node, Topic, _Pid}) -> + [{bridge, node()}, {topic, Topic}, {node, Node}] + end, emqttd_bridge_sup_sup:bridges()), + [clique_status:table(Text)] + end, + clique:register_command(Cmd, [], [], Callback). + +bridges_start() -> + Cmd = ["bridges", "start"], + KeySpecs = [{'snode', [{typecast, fun(SNode) -> list_to_atom(SNode) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, + {'qos', [{typecast, fun(Qos) -> list_to_integer(Qos) end}]}, + {'topic_suffix', [{typecast, fun(Prefix) -> list_to_binary(Prefix) end}]}, + {'topic_prefix', [{typecast, fun(Suffix) -> list_to_binary(Suffix) end}]}, + {'max_queue_len', [{typecast, fun(Queue) -> list_to_integer(Queue) end}]}], + FlagSpecs = [], + Callback = + fun (_, Params, _) -> + Text = case {get_value('snode', Params), get_value('topic', Params)} of + {undefined, undefined} -> + io_lib:format("Invalid snode and topic error~n", []); + {undefined, _} -> + io_lib:format("Invalid snode error~n", []); + {_, undefined} -> + io_lib:format("Invalid topic error~n", []); + {SNode, Topic} -> + Opts = Params -- [{'snode', SNode}, {'topic', Topic}], + case emqttd_bridge_sup_sup:start_bridge(SNode, Topic, Opts) of + {ok, _} -> + io_lib:format("bridge is started.~n", []); + {error, Error} -> + io_lib:format("error: ~p~n", [Error]) + end + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +bridges_stop() -> + Cmd = ["bridges", "stop"], + KeySpecs = [{'snode', [{typecast, fun(SNode) -> list_to_atom(SNode) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, SNode},{_, Topic}], _) -> + Text = case emqttd_bridge_sup_sup:stop_bridge(SNode, Topic) of + ok -> io_lib:format("bridge is stopped.~n", []); + {error, Error} -> io_lib:format("error: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc vm command + +vm_all() -> + Cmd = ["vm","info"], + Callback = + fun (_, _, _) -> + + Load = [io_lib:format("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()], + + Memory = [io_lib:format("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()], + + Process = lists:map(fun({Name, Key}) -> + io_lib:format("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{limit, process_limit}, {count, process_count}]), + + IoInfo = erlang:system_info(check_io), + IO = lists:map(fun(Key) -> + io_lib:format("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) + end, [max_fds, active_fds]), + + Ports = lists:map(fun({Name, Key}) -> + io_lib:format("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{count, port_count}, {limit, port_limit}]), + [clique_status:text([Load, Memory, Process, IO, Ports])] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_load() -> + Cmd = ["vm","load"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, Val}] + end, emqttd_vm:loads()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_memory() -> + Cmd = ["vm","memory"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, Val}] + end, erlang:memory()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_process() -> + Cmd = ["vm","process"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, erlang:system_info(Val)}] + end, [{limit, process_limit}, {count, process_count}]), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_io() -> + Cmd = ["vm","io"], + Callback = + fun (_, _, _) -> + IoInfo = erlang:system_info(check_io), + Table = lists:map( + fun(Key) -> + [{name, Key}, {val, get_value(Key, IoInfo)}] + end, [max_fds, active_fds]), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_ports() -> + Cmd = ["vm","ports"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, erlang:system_info(Val)}] + end, [{count, port_count}, {limit, port_limit}]), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +% %%-------------------------------------------------------------------- +%% @doc mnesia Command + +mnesia_info() -> + Cmd = ["mnesia", "info"], + Callback = + fun (_, _, _) -> + mnesia:system_info(), + [clique_status:text("")] + end, + clique:register_command(Cmd, [], [], Callback). + +%%-------------------------------------------------------------------- +%% @doc Trace Command + +trace_list() -> + Cmd = ["trace", "list"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun({{Who, Name}, LogFile}) -> + [{trace, Who}, {name, Name}, {log_file, LogFile}] + end, emqttd_trace:all_traces()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +trace_on() -> + Cmd = ["trace"], + KeySpecs = [{'type', [{typecast, fun(Type) -> list_to_atom(Type) end}]}, + {'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, + {'log_file', [{typecast, fun(LogFile) -> list_to_binary(LogFile) end}]}], + FlagSpecs = [], + Callback = + fun (_, Params, _) -> + Text = case get_value('type', Params) of + client -> + trace_on(client, get_value('client_id', Params), get_value('log_file', Params)); + topic -> + trace_on(topic, get_value('topic', Params), get_value('log_file', Params)); + Type -> + io_lib:format("Invalid type: ~p error~n", [Type]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +trace_off() -> + Cmd = ["trace", "off"], + KeySpecs = [{'type', [{typecast, fun(Type) -> list_to_atom(Type) end}]}, + {'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, Params, _) -> + Text = case get_value('type', Params) of + client -> + trace_off(client, get_value('client_id', Params)); + topic -> + trace_off(topic, get_value('topic', Params)); + Type -> + io_lib:format("Invalid type: ~p error~n", [Type]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +trace_on(Who, Name, LogFile) -> + case emqttd_trace:start_trace({Who, Name}, LogFile) of + ok -> + io_lib:format("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + io_lib:format("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. + +trace_off(Who, Name) -> + case emqttd_trace:stop_trace({Who, Name}) of + ok -> + io_lib:format("stop tracing ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + io_lib:format("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + +%%-------------------------------------------------------------------- +%% @doc Listeners Command + +listeners() -> + Cmd = ["listeners"], + Callback = + fun (_, _, _) -> + Table = + lists:map(fun({{Protocol, ListenOn}, Pid}) -> + Info = [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients,esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}], + Listener = io_lib:format("~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), + [{listener, Listener}| Info] + end, esockd:listeners()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +%%------------------------------------------------------------- +%% usage +%%------------------------------------------------------------- +broker_usage() -> + ["\n broker info Show broker version, uptime and description\n", + " broker pubsub Show process_info of pubsub\n", + " broker stats Show broker statistics of clients, topics, subscribers\n", + " broker metrics Show broker metrics\n"]. + +cluster_usage() -> + ["\n cluster join node= Join the cluster\n", + " cluster leave Leave the cluster\n", + " cluster remove node= Remove the node from cluster\n", + " cluster status Cluster status\n"]. + +acl_usage() -> + ["\n acl reload reload etc/acl.conf\n"]. + +clients_usage() -> + ["\n clients list List all clients\n", + " clients show client_id= Show a client\n", + " clients kick client_id= Kick out a client\n"]. + +sessions_usage() -> + ["\n sessions list List all sessions\n", + " sessions list persistent List all persistent sessions\n", + " sessions list transient List all transient sessions\n", + " sessions show client_id= Show a session\n"]. + +routes_usage() -> + ["\n routes list List all routes\n", + " routes show topic= Show a route\n"]. + +topics_usage() -> + ["\n topics list List all topics\n", + " topics show topic= Show a topic\n"]. + +subscriptions_usage() -> + ["\n subscriptions list List all subscriptions\n", + " subscriptions show client_id= Show subscriptions of a client\n", + " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", + " subscriptions del client_id= Delete static subscriptions manually\n", + " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. + +plugins_usage() -> + ["\n plugins list Show loaded plugins\n", + " plugins load plugin_name= Load plugin\n", + " plugins unload plugin_name= Unload plugin\n"]. + +bridges_usage() -> + ["\n bridges list List bridges\n", + " bridges start snode= topic= Start a bridge + options: + qos= + topic_prefix= + topic_suffix= + queue=\n", + " bridges stop snode= topic= Stop a bridge\n"]. + +vm_usage() -> + ["\n vm info Show info of Erlang VM\n", + " vm load Show load of Erlang VM\n", + " vm memory Show memory of Erlang VM\n", + " vm process Show process of Erlang VM\n", + " vm io Show IO of Erlang VM\n", + " vm ports Show Ports of Erlang VM\n"]. + +trace_usage() -> + ["\n trace list List all traces\n", + " trace type=client|topic client_id= topic= log_file= Start tracing\n", + " trace off type=client|topic client_id= topic= Stop tracing\n"]. + +status_usage() -> + ["\n status Show broker status\n"]. + +listeners_usage() -> + ["\n listeners List listeners\n"]. + +mnesia_usage() -> + ["\n mnesia info Mnesia system info\n"]. + +%%-------------------------------------------------------------------- +%% Dump ETS +%%-------------------------------------------------------------------- + +dump(Table) -> + dump(Table, []). + +dump(Table, Acc) -> + dump(Table, ets:first(Table), Acc). + +dump(_Table, '$end_of_table', Acc) -> + lists:reverse(Acc); + +dump(Table, Key, Acc) -> + case ets:lookup(Table, Key) of + [Record] -> dump(Table, ets:next(Table, Key), [print(Record)|Acc]); + [] -> dump(Table, ets:next(Table, Key), Acc) + end. + +print([]) -> + []; + +print(Routes = [#mqtt_route{topic = Topic} | _]) -> + Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], + [{topic, Topic}, {routes, string:join(Nodes, ",")}]; + +print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + [{plugin, Name}, {version, Ver}, {description, Descr}, {active, Active}]; + +print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username, + peername = Peername, connected_at = ConnectedAt}) -> + [{client_id, ClientId}, + {clean_sess, CleanSess}, + {username, Username}, + {ip, emqttd_net:format(Peername)}, + {connected_at, emqttd_time:now_secs(ConnectedAt)}]; + +print({route, Routes}) -> + lists:map(fun print/1, Routes); +print({local_route, Routes}) -> + lists:map(fun print/1, Routes); +print(#mqtt_route{topic = Topic, node = Node}) -> + [{topic, Topic}, {node, Node}]; +print({Topic, Node}) -> + [{topic, Topic}, {node, Node}]; + +print({ClientId, _ClientPid, _Persistent, SessInfo}) -> + Data = lists:append(SessInfo, emqttd_stats:get_session_stats(ClientId)), + InfoKeys = [clean_sess, + subscriptions, + max_inflight, + inflight_len, + mqueue_len, + mqueue_dropped, + awaiting_rel_len, + deliver_msg, + enqueue_msg, + created_at], + [{client_id, ClientId} | [{Key, format(Key, get_value(Key, Data))} || Key <- InfoKeys]]. + +print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) -> + [{subscription, Sub}, {topic, Topic}]; +print(subscription, {Sub, Topic}) when is_pid(Sub) -> + [{subscription, Sub}, {topic, Topic}]; +print(subscription, {Sub, {_Share, Topic}}) -> + [{subscription, Sub}, {topic, Topic}]; +print(subscription, {Sub, Topic}) -> + [{subscription, Sub}, {topic, Topic}]. + +format(created_at, Val) -> + emqttd_time:now_secs(Val); + +format(_, Val) -> + Val.