From 161e5a2b5b31321756423cba9759b6503133d5d4 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 8 May 2017 18:13:52 +0800 Subject: [PATCH 01/16] Http publish mochiweb_request:parse_post(Req) -> Req:recv_body --- src/emqttd_http.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 25b7c3d23..5da607319 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -88,7 +88,7 @@ handle_request(Method, Path, Req) -> %%-------------------------------------------------------------------- http_publish(Req) -> - Params = mochiweb_request:parse_post(Req), + Params = Req:recv_body(), lager:info("HTTP Publish: ~p", [Params]), Topics = topics(Params), ClientId = get_value("client", Params, http), From 2d6104fbd90809c9074fb707454b58ffe74cee0d Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 11 May 2017 16:02:38 +0800 Subject: [PATCH 02/16] Ctl use clique --- Makefile | 4 +- priv/{emq.schema => emqttd.schema} | 0 src/emqttd_app.erl | 1 + src/emqttd_cli2.erl | 891 +++++++++++++++++++++++++++++ 4 files changed, 894 insertions(+), 2 deletions(-) rename priv/{emq.schema => emqttd.schema} (100%) create mode 100644 src/emqttd_cli2.erl diff --git a/Makefile b/Makefile index abd416178..5aa16fe04 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.2 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt +DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt clique dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc @@ -13,7 +13,7 @@ dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master - +dep_clique = git https://github.com/turtleDeng/clique ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish diff --git a/priv/emq.schema b/priv/emqttd.schema similarity index 100% rename from priv/emq.schema rename to priv/emqttd.schema diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 992e22da6..3b8a9c85b 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -43,6 +43,7 @@ start(_Type, _Args) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), + emqttd_cli2:register_cli(), register_acl_mod(), emqttd_plugins:init(), emqttd_plugins:load(), diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl new file mode 100644 index 000000000..3f9bdeba2 --- /dev/null +++ b/src/emqttd_cli2.erl @@ -0,0 +1,891 @@ +-module (emqttd_cli2). + +-export([register_cli/0]). + +-include("emqttd.hrl"). + +-include("emqttd_cli.hrl"). + +-include("emqttd_protocol.hrl"). + +-export([run/1]). + +-behaviour(clique_handler). + +-import(proplists, [get_value/2]). + +-define(APP, emqttd). + +-define(PROC_INFOKEYS, [status, + memory, + message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions]). + +register_cli() -> + F = fun() -> emqttd_mnesia:running_nodes() end, + clique:register_node_finder(F), + register_usage(), + register_cmd(). + +run(Cmd) -> + clique:run(Cmd). + +register_usage() -> + clique:register_usage(["broker"], broker_usage()), + clique:register_usage(["cluster"], cluster_usage()), + clique:register_usage(["acl"], acl_usage()), + clique:register_usage(["clients"], clients_usage()), + clique:register_usage(["sessions"], sessions_usage()), + clique:register_usage(["routes"], routes_usage()), + clique:register_usage(["topics"], topics_usage()), + clique:register_usage(["subscriptions"], subscriptions_usage()), + clique:register_usage(["plugins"], plugins_usage()), + clique:register_usage(["bridges"], bridges_usage()), + clique:register_usage(["vm"], vm_usage()), + clique:register_usage(["trace"], trace_usage()), + clique:register_usage(["status"], status_usage()), + clique:register_usage(["listeners"], listeners_usage()), + clique:register_usage(["mnesia"], mnesia_usage()). + +register_cmd() -> + node_status(), + broker_status(), + broker_stats(), + broker_metrics(), + broker_pubsub(), + cluster_join(), + cluster_leave(), + cluster_remove(), + acl_reload(), + clients_list(), + clients_show(), + clients_kick(), + sessions_list(), + sessions_list_persistent(), + sessions_list_transient(), + sessions_query(), + routes_list(), + routes_query(), + topics_list(), + topics_query(), + subscriptions_list(), + subscriptions_query(), + subscriptions_subscribe(), + subscriptions_del(), + subscriptions_unsubscribe(), + plugins_list(), + plugins_load(), + plugins_unload(), + bridges_list(), + bridges_start(), + bridges_stop(), + vm_all(), + vm_load(), + vm_memory(), + vm_process(), + vm_io(), + vm_ports(), + mnesia_info(), + trace_list(), + trace_on(), + trace_off(), + listeners(). + +node_status() -> + Cmd = ["status"], + Callback = + fun (_, _, _) -> + {Status, Vsn} = case lists:keysearch(?APP, 1, application:which_applications()) of + false -> + {"not running", undefined}; + {value, {?APP, _Desc, Vsn0}} -> + {"running", Vsn0} + end, + [clique_status:table([[{node, node()}, {status, Status}, {version, Vsn}]])] + end, + clique:register_command(Cmd, [], [], Callback). + +%%-------------------------------------------------------------------- +%% @doc Query broker + +broker_status() -> + Cmd = ["broker", "info"], + Callback = + fun (_, _, _) -> + Funs = [sysdescr, version, uptime, datetime], + Table = lists:map(fun(Fun) -> + {Fun, emqttd_broker:Fun()} + end, Funs), + [clique_status:table([Table])] + end, + clique:register_command(Cmd, [], [], Callback). + +broker_stats() -> + Cmd = ["broker", "stats"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Key, Val}) -> + io_lib:format("~-20s: ~w~n", [Key, Val]) + end, emqttd_stats:getstats()), + [clique_status:list(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +broker_metrics() -> + Cmd = ["broker", "metrics"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Key, Val}) -> + io_lib:format("~-24s: ~w~n", [Key, Val]) + end, lists:sort(emqttd_metrics:all())), + [clique_status:list(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +broker_pubsub() -> + Cmd = ["broker", "pubsub"], + Callback = + fun (_, _, _) -> + Pubsubs = supervisor:which_children(emqttd_pubsub_sup:pubsub_pool()), + Table = lists:map( + fun({{_, Id}, Pid, _, _}) -> + ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), + [{id, Id}] ++ ProcInfo + end, lists:reverse(Pubsubs)), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + + +%%-------------------------------------------------------------------- +%% @doc Cluster with other nodes + +cluster_join() -> + Cmd = ["cluster", "join"], + KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Node}], _) -> + Text = case emqttd_cluster:join(Node) of + ok -> + ["Join the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to join the cluster: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +cluster_leave() -> + Cmd = ["cluster", "leave"], + Callback = + fun(_, _, _) -> + Text = case emqttd_cluster:leave() of + ok -> + ["Leave the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to leave the cluster: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, [], [], Callback). + +cluster_remove() -> + Cmd = ["cluster", "remove"], + KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Node}], _) -> + Text = case emqttd_cluster:remove(Node) of + ok -> + ["Remove the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to remove the cluster: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +cluster(["status"]) -> + io_lib:format("Cluster status: ~p~n", [emqttd_cluster:status()]). + +%%-------------------------------------------------------------------- +%% @doc acl + +acl_reload() -> + Cmd = ["acl", "reload"], + Callback = + fun (_, _, _) -> + emqttd_access_control:reload_acl(), + [clique_status:text("")] + end, + clique:register_command(Cmd, [], [], Callback). + +%%-------------------------------------------------------------------- +%% @doc Query clients + +clients_list() -> + Cmd = ["clients", "list"], + Callback = + fun (_, _, _) -> + [clique_status:table(dump(mqtt_client))] + end, + clique:register_command(Cmd, [], [], Callback). + +clients_show() -> + Cmd = ["clients", "query"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + [clique_status:table(if_client(ClientId, fun print/1))] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +clients_kick() -> + Cmd = ["clients", "kick"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + Result = if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end), + case Result of + [ok] -> [clique_status:text(io_lib:format("Kick client_id: ~p successfully~n", [ClientId]))]; + _ -> [clique_status:text("")] + end + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +if_client(ClientId, Fun) -> + case emqttd_cm:lookup(ClientId) of + undefined -> ?PRINT_MSG("Not Found.~n"), []; + Client -> [Fun(Client)] + end. + +%%-------------------------------------------------------------------- +%% @doc Sessions Command + +sessions_list() -> + Cmd = ["sessions", "list"], + Callback = + fun (_, _, _) -> + [clique_status:table(dump(mqtt_local_session))] + end, + clique:register_command(Cmd, [], [], Callback). + +%% performance issue? + +sessions_list_persistent() -> + Cmd = ["sessions", "list", "persistent"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'})), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +%% performance issue? + +sessions_list_transient() -> + Cmd = ["sessions", "list", "transient"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'})), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +sessions_query() -> + Cmd = ["sessions", "query"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + + case ets:lookup(mqtt_local_session, ClientId) of + [] -> + ?PRINT_MSG("Not Found.~n"), + [clique_status:table([])]; + [SessInfo] -> + [clique_status:table([print(SessInfo)])] + end + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Routes Command + +routes_list() -> + Cmd = ["routes", "list"], + Callback = + fun (_, _, _) -> + Table = lists:flatten(lists:map(fun print/1, emqttd_router:dump())), + [clique_status:table([Table])] + end, + clique:register_command(Cmd, [], [], Callback). + +routes_query() -> + Cmd = ["routes", "query"], + KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Topic}], _) -> + [clique_status:table([print(mnesia:dirty_read(mqtt_route, Topic))])] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Topics Command + +topics_list() -> + Cmd = ["topics", "list"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun(Topic) -> [{topic, Topic}] end, emqttd:topics()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +topics_query() -> + Cmd = ["topics", "query"], + KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, Topic}], _) -> + Table = print(mnesia:dirty_read(mqtt_route, Topic)), + [clique_status:table([Table])] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Subscriptions Command +subscriptions_list() -> + Cmd = ["subscriptions", "list"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun(Subscription) -> + print(subscription, Subscription) + end, ets:tab2list(mqtt_subscription)), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +subscriptions_query() -> + Cmd = ["subscriptions", "query"], + KeySpecs = [{'client_id', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + case ets:lookup(mqtt_subscription, ClientId) of + [] -> + ?PRINT_MSG("Not Found.~n"), + [clique_status:table([])]; + Records -> + Table = [print(subscription, Subscription) || Subscription <- Records], + [clique_status:table(Table)] + end + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +subscriptions_subscribe() -> + Cmd = ["subscriptions", "subscribe"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, + {'qos', [{typecast, fun(QoS) -> list_to_integer(QoS) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}, {_, Topic}, {_, QoS}], _) -> + Text = case emqttd:subscribe(Topic, ClientId, [{qos, QoS}]) of + ok -> + io_lib:format("Client_id: ~p subscribe topic: ~p qos: ~p successfully~n", [ClientId, Topic, QoS]); + {error, already_existed} -> + io_lib:format("Error: client_id: ~p subscribe topic: ~p already existed~n", [ClientId, Topic]); + {error, Reason} -> + io_lib:format("Error: ~p~n", [Reason]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +subscriptions_del() -> + Cmd = ["subscriptions", "del"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}], _) -> + emqttd:subscriber_down(ClientId), + Text = io_lib:format("Client_id del subscriptions:~p successfully~n", [ClientId]), + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +subscriptions_unsubscribe() -> + Cmd = ["subscriptions", "unsubscribe"], + KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, ClientId}, {_, Topic}], _) -> + emqttd:unsubscribe(Topic, ClientId), + Text = io_lib:format("Client_id: ~p unsubscribe topic: ~p successfully~n", [ClientId, Topic]), + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc Plugins Command +plugins_list() -> + Cmd = ["plugins", "list"], + Callback = + fun (_, _, _) -> + Text = lists:map(fun(Plugin) -> print(Plugin) end, emqttd_plugins:list()), + [clique_status:table(Text)] + end, + clique:register_command(Cmd, [], [], Callback). + +plugins_load() -> + Cmd = ["plugins", "load"], + KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, PluginName}], _) -> + Text = case emqttd_plugins:load(PluginName) of + {ok, StartedApps} -> + io_lib:format("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, PluginName]); + {error, Reason} -> + io_lib:format("load plugin error: ~p~n", [Reason]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +plugins_unload() -> + Cmd = ["plugins", "unload"], + KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, PluginName}], _) -> + Text = case emqttd_plugins:unload(PluginName) of + ok -> + io_lib:format("Plugin ~s unloaded successfully.~n", [PluginName]); + {error, Reason} -> + io_lib:format("unload plugin error: ~p~n", [Reason]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + + +%%-------------------------------------------------------------------- +%% @doc Bridges command + +bridges_list() -> + Cmd = ["bridges", "list"], + Callback = + fun (_, _, _) -> + Text = lists:map( + fun({Node, Topic, _Pid}) -> + [{bridge, node()}, {topic, Topic}, {node, Node}] + end, emqttd_bridge_sup_sup:bridges()), + [clique_status:table(Text)] + end, + clique:register_command(Cmd, [], [], Callback). + +bridges_start() -> + Cmd = ["bridges", "start"], + KeySpecs = [{'snode', [{typecast, fun(SNode) -> list_to_atom(SNode) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, + {'qos', [{typecast, fun(Qos) -> list_to_integer(Qos) end}]}, + {'topic_suffix', [{typecast, fun(Prefix) -> list_to_binary(Prefix) end}]}, + {'topic_prefix', [{typecast, fun(Suffix) -> list_to_binary(Suffix) end}]}, + {'max_queue_len', [{typecast, fun(Queue) -> list_to_integer(Queue) end}]}], + FlagSpecs = [], + Callback = + fun (_, Params, _) -> + Text = case {get_value('snode', Params), get_value('topic', Params)} of + {undefined, undefined} -> + io_lib:format("Invalid snode and topic error~n", []); + {undefined, _} -> + io_lib:format("Invalid snode error~n", []); + {_, undefined} -> + io_lib:format("Invalid topic error~n", []); + {SNode, Topic} -> + Opts = Params -- [{'snode', SNode}, {'topic', Topic}], + case emqttd_bridge_sup_sup:start_bridge(SNode, Topic, Opts) of + {ok, _} -> + io_lib:format("bridge is started.~n", []); + {error, Error} -> + io_lib:format("error: ~p~n", [Error]) + end + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +bridges_stop() -> + Cmd = ["bridges", "stop"], + KeySpecs = [{'snode', [{typecast, fun(SNode) -> list_to_atom(SNode) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, [{_, SNode},{_, Topic}], _) -> + Text = case emqttd_bridge_sup_sup:stop_bridge(SNode, Topic) of + ok -> io_lib:format("bridge is stopped.~n", []); + {error, Error} -> io_lib:format("error: ~p~n", [Error]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +%%-------------------------------------------------------------------- +%% @doc vm command + +vm_all() -> + Cmd = ["vm","info"], + Callback = + fun (_, _, _) -> + + Load = [io_lib:format("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()], + + Memory = [io_lib:format("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()], + + Process = lists:map(fun({Name, Key}) -> + io_lib:format("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{limit, process_limit}, {count, process_count}]), + + IoInfo = erlang:system_info(check_io), + IO = lists:map(fun(Key) -> + io_lib:format("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) + end, [max_fds, active_fds]), + + Ports = lists:map(fun({Name, Key}) -> + io_lib:format("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{count, port_count}, {limit, port_limit}]), + [clique_status:text([Load, Memory, Process, IO, Ports])] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_load() -> + Cmd = ["vm","load"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, Val}] + end, emqttd_vm:loads()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_memory() -> + Cmd = ["vm","memory"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, Val}] + end, erlang:memory()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_process() -> + Cmd = ["vm","process"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, erlang:system_info(Val)}] + end, [{limit, process_limit}, {count, process_count}]), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_io() -> + Cmd = ["vm","io"], + Callback = + fun (_, _, _) -> + IoInfo = erlang:system_info(check_io), + Table = lists:map( + fun(Key) -> + [{name, Key}, {val, get_value(Key, IoInfo)}] + end, [max_fds, active_fds]), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +vm_ports() -> + Cmd = ["vm","ports"], + Callback = + fun (_, _, _) -> + Table = lists:map( + fun({Name, Val}) -> + [{name, Name}, {val, erlang:system_info(Val)}] + end, [{count, port_count}, {limit, port_limit}]), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +% %%-------------------------------------------------------------------- +%% @doc mnesia Command + +mnesia_info() -> + Cmd = ["mnesia", "info"], + Callback = + fun (_, _, _) -> + mnesia:system_info(), + [clique_status:text("")] + end, + clique:register_command(Cmd, [], [], Callback). + +%%-------------------------------------------------------------------- +%% @doc Trace Command + +trace_list() -> + Cmd = ["trace", "list"], + Callback = + fun (_, _, _) -> + Table = lists:map(fun({{Who, Name}, LogFile}) -> + [{trace, Who}, {name, Name}, {log_file, LogFile}] + end, emqttd_trace:all_traces()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +trace_on() -> + Cmd = ["trace"], + KeySpecs = [{'type', [{typecast, fun(Type) -> list_to_atom(Type) end}]}, + {'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, + {'log_file', [{typecast, fun(LogFile) -> list_to_binary(LogFile) end}]}], + FlagSpecs = [], + Callback = + fun (_, Params, _) -> + Text = case get_value('type', Params) of + client -> + trace_on(client, get_value('client_id', Params), get_value('log_file', Params)); + topic -> + trace_on(topic, get_value('topic', Params), get_value('log_file', Params)); + Type -> + io_lib:format("Invalid type: ~p error~n", [Type]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +trace_off() -> + Cmd = ["trace", "off"], + KeySpecs = [{'type', [{typecast, fun(Type) -> list_to_atom(Type) end}]}, + {'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, + {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], + FlagSpecs = [], + Callback = + fun (_, Params, _) -> + Text = case get_value('type', Params) of + client -> + trace_off(client, get_value('client_id', Params)); + topic -> + trace_off(topic, get_value('topic', Params)); + Type -> + io_lib:format("Invalid type: ~p error~n", [Type]) + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +trace_on(Who, Name, LogFile) -> + case emqttd_trace:start_trace({Who, Name}, LogFile) of + ok -> + io_lib:format("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + io_lib:format("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. + +trace_off(Who, Name) -> + case emqttd_trace:stop_trace({Who, Name}) of + ok -> + io_lib:format("stop tracing ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + io_lib:format("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + +%%-------------------------------------------------------------------- +%% @doc Listeners Command + +listeners() -> + Cmd = ["listeners"], + Callback = + fun (_, _, _) -> + Table = + lists:map(fun({{Protocol, ListenOn}, Pid}) -> + Info = [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients,esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}], + Listener = io_lib:format("~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), + [{listener, Listener}| Info] + end, esockd:listeners()), + [clique_status:table(Table)] + end, + clique:register_command(Cmd, [], [], Callback). + +%%------------------------------------------------------------- +%% usage +%%------------------------------------------------------------- +broker_usage() -> + ["\n broker info Show broker version, uptime and description\n", + " broker pubsub Show process_info of pubsub\n", + " broker stats Show broker statistics of clients, topics, subscribers\n", + " broker metrics Show broker metrics\n"]. + +cluster_usage() -> + ["\n cluster join node= Join the cluster\n", + " cluster leave Leave the cluster\n", + " cluster remove node= Remove the node from cluster\n", + " cluster status Cluster status\n"]. + +acl_usage() -> + ["\n acl reload reload etc/acl.conf\n"]. + +clients_usage() -> + ["\n clients list List all clients\n", + " clients show client_id= Show a client\n", + " clients kick client_id= Kick out a client\n"]. + +sessions_usage() -> + ["\n sessions list List all sessions\n", + " sessions list persistent List all persistent sessions\n", + " sessions list transient List all transient sessions\n", + " sessions show client_id= Show a session\n"]. + +routes_usage() -> + ["\n routes list List all routes\n", + " routes show topic= Show a route\n"]. + +topics_usage() -> + ["\n topics list List all topics\n", + " topics show topic= Show a topic\n"]. + +subscriptions_usage() -> + ["\n subscriptions list List all subscriptions\n", + " subscriptions show client_id= Show subscriptions of a client\n", + " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", + " subscriptions del client_id= Delete static subscriptions manually\n", + " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. + +plugins_usage() -> + ["\n plugins list Show loaded plugins\n", + " plugins load plugin_name= Load plugin\n", + " plugins unload plugin_name= Unload plugin\n"]. + +bridges_usage() -> + ["\n bridges list List bridges\n", + " bridges start snode= topic= Start a bridge + options: + qos= + topic_prefix= + topic_suffix= + queue=\n", + " bridges stop snode= topic= Stop a bridge\n"]. + +vm_usage() -> + ["\n vm info Show info of Erlang VM\n", + " vm load Show load of Erlang VM\n", + " vm memory Show memory of Erlang VM\n", + " vm process Show process of Erlang VM\n", + " vm io Show IO of Erlang VM\n", + " vm ports Show Ports of Erlang VM\n"]. + +trace_usage() -> + ["\n trace list List all traces\n", + " trace type=client|topic client_id= topic= log_file= Start tracing\n", + " trace off type=client|topic client_id= topic= Stop tracing\n"]. + +status_usage() -> + ["\n status Show broker status\n"]. + +listeners_usage() -> + ["\n listeners List listeners\n"]. + +mnesia_usage() -> + ["\n mnesia info Mnesia system info\n"]. + +%%-------------------------------------------------------------------- +%% Dump ETS +%%-------------------------------------------------------------------- + +dump(Table) -> + dump(Table, []). + +dump(Table, Acc) -> + dump(Table, ets:first(Table), Acc). + +dump(_Table, '$end_of_table', Acc) -> + lists:reverse(Acc); + +dump(Table, Key, Acc) -> + case ets:lookup(Table, Key) of + [Record] -> dump(Table, ets:next(Table, Key), [print(Record)|Acc]); + [] -> dump(Table, ets:next(Table, Key), Acc) + end. + +print([]) -> + []; + +print(Routes = [#mqtt_route{topic = Topic} | _]) -> + Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], + [{topic, Topic}, {routes, string:join(Nodes, ",")}]; + +print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + [{plugin, Name}, {version, Ver}, {description, Descr}, {active, Active}]; + +print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username, + peername = Peername, connected_at = ConnectedAt}) -> + [{client_id, ClientId}, + {clean_sess, CleanSess}, + {username, Username}, + {ip, emqttd_net:format(Peername)}, + {connected_at, emqttd_time:now_secs(ConnectedAt)}]; + +print({route, Routes}) -> + lists:map(fun print/1, Routes); +print({local_route, Routes}) -> + lists:map(fun print/1, Routes); +print(#mqtt_route{topic = Topic, node = Node}) -> + [{topic, Topic}, {node, Node}]; +print({Topic, Node}) -> + [{topic, Topic}, {node, Node}]; + +print({ClientId, _ClientPid, _Persistent, SessInfo}) -> + Data = lists:append(SessInfo, emqttd_stats:get_session_stats(ClientId)), + InfoKeys = [clean_sess, + subscriptions, + max_inflight, + inflight_len, + mqueue_len, + mqueue_dropped, + awaiting_rel_len, + deliver_msg, + enqueue_msg, + created_at], + [{client_id, ClientId} | [{Key, format(Key, get_value(Key, Data))} || Key <- InfoKeys]]. + +print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) -> + [{subscription, Sub}, {topic, Topic}]; +print(subscription, {Sub, Topic}) when is_pid(Sub) -> + [{subscription, Sub}, {topic, Topic}]; +print(subscription, {Sub, {_Share, Topic}}) -> + [{subscription, Sub}, {topic, Topic}]; +print(subscription, {Sub, Topic}) -> + [{subscription, Sub}, {topic, Topic}]. + +format(created_at, Val) -> + emqttd_time:now_secs(Val); + +format(_, Val) -> + Val. From 06d291e354ec2131b023a16a9714c2d72ac91580 Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 17 May 2017 10:18:09 +0800 Subject: [PATCH 03/16] Update Cli command --- src/emqttd_cli2.erl | 73 +++++++++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index 3f9bdeba2..0aa89c65d 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -95,7 +95,7 @@ register_cmd() -> listeners(). node_status() -> - Cmd = ["status"], + Cmd = ["status", "info"], Callback = fun (_, _, _) -> {Status, Vsn} = case lists:keysearch(?APP, 1, application:which_applications()) of @@ -399,14 +399,26 @@ subscriptions_subscribe() -> {'qos', [{typecast, fun(QoS) -> list_to_integer(QoS) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}, {_, Topic}, {_, QoS}], _) -> - Text = case emqttd:subscribe(Topic, ClientId, [{qos, QoS}]) of - ok -> - io_lib:format("Client_id: ~p subscribe topic: ~p qos: ~p successfully~n", [ClientId, Topic, QoS]); - {error, already_existed} -> - io_lib:format("Error: client_id: ~p subscribe topic: ~p already existed~n", [ClientId, Topic]); - {error, Reason} -> - io_lib:format("Error: ~p~n", [Reason]) + fun (_, Params, _) -> + Topic = get_value('topic', Params), + ClientId = get_value('client_id', Params), + QoS = get_value('qos', Params), + Text = case {Topic, ClientId, QoS} of + {undefined, _, _} -> + io_lib:format("Invalid topic is undefined~n", []); + {_, undefined, _} -> + io_lib:format("Invalid client_id is undefined~n", []); + {_, _, undefined} -> + io_lib:format("Invalid qos is undefined~n", []); + {_, _, _} -> + case emqttd:subscribe(Topic, ClientId, [{qos, QoS}]) of + ok -> + io_lib:format("Client_id: ~p subscribe topic: ~p qos: ~p successfully~n", [ClientId, Topic, QoS]); + {error, already_existed} -> + io_lib:format("Error: client_id: ~p subscribe topic: ~p already existed~n", [ClientId, Topic]); + {error, Reason} -> + io_lib:format("Error: ~p~n", [Reason]) + end end, [clique_status:text(Text)] end, @@ -430,9 +442,19 @@ subscriptions_unsubscribe() -> {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}, {_, Topic}], _) -> - emqttd:unsubscribe(Topic, ClientId), - Text = io_lib:format("Client_id: ~p unsubscribe topic: ~p successfully~n", [ClientId, Topic]), + fun (_, Params, _) -> + Topic = get_value('topic', Params), + ClientId = get_value('client_id', Params), + QoS = get_value('qos', Params), + Text = case {Topic, ClientId, QoS} of + {undefined, _} -> + io_lib:format("Invalid topic is undefined~n", []); + {_, undefined} -> + io_lib:format("Invalid client_id is undefined~n", []); + {_, _} -> + emqttd:unsubscribe(Topic, ClientId), + io_lib:format("Client_id: ~p unsubscribe topic: ~p successfully~n", [ClientId, Topic]) + end, [clique_status:text(Text)] end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -508,12 +530,10 @@ bridges_start() -> Callback = fun (_, Params, _) -> Text = case {get_value('snode', Params), get_value('topic', Params)} of - {undefined, undefined} -> - io_lib:format("Invalid snode and topic error~n", []); {undefined, _} -> - io_lib:format("Invalid snode error~n", []); + io_lib:format("Invalid snode is undefined~n", []); {_, undefined} -> - io_lib:format("Invalid topic error~n", []); + io_lib:format("Invalid topic is undefined~n", []); {SNode, Topic} -> Opts = Params -- [{'snode', SNode}, {'topic', Topic}], case emqttd_bridge_sup_sup:start_bridge(SNode, Topic, Opts) of @@ -533,10 +553,17 @@ bridges_stop() -> {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, SNode},{_, Topic}], _) -> - Text = case emqttd_bridge_sup_sup:stop_bridge(SNode, Topic) of - ok -> io_lib:format("bridge is stopped.~n", []); - {error, Error} -> io_lib:format("error: ~p~n", [Error]) + fun (_, Params, _) -> + Text = case {get_value('snode', Params), get_value('topic', Params)} of + {undefined, _} -> + io_lib:format("Invalid snode is undefined~n", []); + {_, undefined} -> + io_lib:format("Invalid topic is undefined~n", []); + {SNode, Topic} -> + case emqttd_bridge_sup_sup:stop_bridge(SNode, Topic) of + ok -> io_lib:format("bridge is stopped.~n", []); + {error, Error} -> io_lib:format("error: ~p~n", [Error]) + end end, [clique_status:text(Text)] end, @@ -718,7 +745,7 @@ trace_off(Who, Name) -> %% @doc Listeners Command listeners() -> - Cmd = ["listeners"], + Cmd = ["listeners", "info"], Callback = fun (_, _, _) -> Table = @@ -807,10 +834,10 @@ trace_usage() -> " trace off type=client|topic client_id= topic= Stop tracing\n"]. status_usage() -> - ["\n status Show broker status\n"]. + ["\n status info Show broker status\n"]. listeners_usage() -> - ["\n listeners List listeners\n"]. + ["\n listeners info List listeners\n"]. mnesia_usage() -> ["\n mnesia info Mnesia system info\n"]. From d5ccb9f92f4cccdcde164727bc867d61bbe115b9 Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 17 May 2017 16:27:18 +0800 Subject: [PATCH 04/16] Cli command support --format=json output --- Makefile | 3 +- src/emqttd.app.src | 2 +- src/emqttd_cli2.erl | 75 ++++++++++++++++++++++++--------------- src/emqttd_cli_format.erl | 27 ++++++++++++++ 4 files changed, 77 insertions(+), 30 deletions(-) create mode 100644 src/emqttd_cli_format.erl diff --git a/Makefile b/Makefile index 5aa16fe04..88bb4c53a 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.2 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt clique +DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt clique jsx dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc @@ -14,6 +14,7 @@ dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master dep_clique = git https://github.com/turtleDeng/clique +dep_jsx = git https://github.com/talentdeficit/jsx ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 3a7ed3482..3e8c7e1bd 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -4,7 +4,7 @@ {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, - lager_syslog,pbkdf2,bcrypt]}, + lager_syslog,pbkdf2,bcrypt,jsx]}, {env,[]}, {mod,{emqttd_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index 0aa89c65d..fefc117f9 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -27,9 +27,32 @@ register_cli() -> F = fun() -> emqttd_mnesia:running_nodes() end, clique:register_node_finder(F), + clique:register_writer("json", emqttd_cli_format), register_usage(), register_cmd(). +run([]) -> + AllUsage = [["broker"], + ["cluster"], + ["acl"], + ["clients"], + ["sessions"], + ["routes"], + ["topics"], + ["subscriptions"], + ["plugins"], + ["bridges"], + ["vm"], + ["trace"], + ["status"], + ["listeners"], + ["mnesia"]], + io:format("--------------------------------------------------------------------------------~n"), + lists:foreach(fun(Item) -> + io:format("~ts", [clique_usage:find(Item)]), + io:format("--------------------------------------------------------------------------------~n") + end, AllUsage); + run(Cmd) -> clique:run(Cmd). @@ -127,11 +150,10 @@ broker_stats() -> Cmd = ["broker", "stats"], Callback = fun (_, _, _) -> - Table = lists:map( + lists:map( fun({Key, Val}) -> - io_lib:format("~-20s: ~w~n", [Key, Val]) - end, emqttd_stats:getstats()), - [clique_status:list(Table)] + clique_status:list(Key, io_lib:format("~p", [Val])) + end, emqttd_stats:getstats()) end, clique:register_command(Cmd, [], [], Callback). @@ -139,11 +161,10 @@ broker_metrics() -> Cmd = ["broker", "metrics"], Callback = fun (_, _, _) -> - Table = lists:map( + lists:map( fun({Key, Val}) -> - io_lib:format("~-24s: ~w~n", [Key, Val]) - end, lists:sort(emqttd_metrics:all())), - [clique_status:list(Table)] + clique_status:list(Key, io_lib:format("~p", [Val])) + end, lists:sort(emqttd_metrics:all())) end, clique:register_command(Cmd, [], [], Callback). @@ -447,11 +468,14 @@ subscriptions_unsubscribe() -> ClientId = get_value('client_id', Params), QoS = get_value('qos', Params), Text = case {Topic, ClientId, QoS} of - {undefined, _} -> + {undefined, _, _} -> io_lib:format("Invalid topic is undefined~n", []); - {_, undefined} -> + {_, undefined, _} -> io_lib:format("Invalid client_id is undefined~n", []); - {_, _} -> + {_, _, undefined} -> + io_lib:format("Invalid qos is undefined~n", []); + + {_, _, _} -> emqttd:unsubscribe(Topic, ClientId), io_lib:format("Client_id: ~p unsubscribe topic: ~p successfully~n", [ClientId, Topic]) end, @@ -576,27 +600,22 @@ vm_all() -> Cmd = ["vm","info"], Callback = fun (_, _, _) -> - - Load = [io_lib:format("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()], - - Memory = [io_lib:format("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()], - - Process = lists:map(fun({Name, Key}) -> - io_lib:format("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) - end, [{limit, process_limit}, {count, process_count}]), - + Cpu = [vm_info("cpu", K, list_to_float(V)) || {K, V} <- emqttd_vm:loads()], + Memory = [vm_info("memory", K, V) || {K, V} <- erlang:memory()], + Process = [vm_info("process", K, erlang:system_info(V)) || {K, V} <- [{limit, process_limit}, {count, process_count}]], IoInfo = erlang:system_info(check_io), - IO = lists:map(fun(Key) -> - io_lib:format("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) - end, [max_fds, active_fds]), - - Ports = lists:map(fun({Name, Key}) -> - io_lib:format("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) - end, [{count, port_count}, {limit, port_limit}]), - [clique_status:text([Load, Memory, Process, IO, Ports])] + Io = [vm_info("io", K, get_value(K, IoInfo)) || K <- [max_fds, active_fds]], + Ports = [vm_info("ports", K, erlang:system_info(V)) || {K, V} <- [{count, port_count}, {limit, port_limit}]], + lists:flatten([Cpu, Memory, Process, Io, Ports]) end, clique:register_command(Cmd, [], [], Callback). +vm_info(Item, K, V) -> + clique_status:list(format_key(Item, K), io_lib:format("~p", [V])). + +format_key(Item, K) -> + list_to_atom(lists:concat([Item, "/", K])). + vm_load() -> Cmd = ["vm","load"], Callback = diff --git a/src/emqttd_cli_format.erl b/src/emqttd_cli_format.erl new file mode 100644 index 000000000..cd1adca4b --- /dev/null +++ b/src/emqttd_cli_format.erl @@ -0,0 +1,27 @@ +-module (emqttd_cli_format). + +-behavior(clique_writer). + +%% API +-export([write/1]). + +write([{text, Text}]) -> + Json = jsx:encode([{text, lists:flatten(Text)}]), + {io_lib:format("~p~n", [Json]), []}; + +write([{table, Table}]) -> + Json = jsx:encode(Table), + {io_lib:format("~p~n", [Json]), []}; + +write([{list, Key, [Value]}| Tail]) -> + Table = lists:reverse(write(Tail, [{Key, Value}])), + Json = jsx:encode(Table), + {io_lib:format("~p~n", [Json]), []}; + +write(_) -> + {io_lib:format("error~n", []), []}. + +write([], Acc) -> + Acc; +write([{list, Key, [Value]}| Tail], Acc) -> + write(Tail, [{Key, Value}| Acc]). \ No newline at end of file From 8853e43275fe55dac4dd463e12180292b6794fad Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 17 May 2017 16:37:55 +0800 Subject: [PATCH 05/16] cli query -> show --- src/emqttd_cli2.erl | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index fefc117f9..6a784977d 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -74,28 +74,34 @@ register_usage() -> clique:register_usage(["mnesia"], mnesia_usage()). register_cmd() -> + node_status(), + broker_status(), broker_stats(), broker_metrics(), broker_pubsub(), + cluster_join(), cluster_leave(), cluster_remove(), + acl_reload(), + clients_list(), clients_show(), clients_kick(), + sessions_list(), sessions_list_persistent(), sessions_list_transient(), - sessions_query(), + sessions_show(), routes_list(), - routes_query(), + routes_show(), topics_list(), - topics_query(), + topics_show(), subscriptions_list(), - subscriptions_query(), + subscriptions_show(), subscriptions_subscribe(), subscriptions_del(), subscriptions_unsubscribe(), @@ -259,7 +265,7 @@ clients_list() -> clique:register_command(Cmd, [], [], Callback). clients_show() -> - Cmd = ["clients", "query"], + Cmd = ["clients", "show"], KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], FlagSpecs = [], Callback = @@ -321,8 +327,8 @@ sessions_list_transient() -> end, clique:register_command(Cmd, [], [], Callback). -sessions_query() -> - Cmd = ["sessions", "query"], +sessions_show() -> + Cmd = ["sessions", "show"], KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], FlagSpecs = [], Callback = @@ -350,8 +356,8 @@ routes_list() -> end, clique:register_command(Cmd, [], [], Callback). -routes_query() -> - Cmd = ["routes", "query"], +routes_show() -> + Cmd = ["routes", "show"], KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = @@ -372,8 +378,8 @@ topics_list() -> end, clique:register_command(Cmd, [], [], Callback). -topics_query() -> - Cmd = ["topics", "query"], +topics_show() -> + Cmd = ["topics", "show"], KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = @@ -396,8 +402,8 @@ subscriptions_list() -> end, clique:register_command(Cmd, [], [], Callback). -subscriptions_query() -> - Cmd = ["subscriptions", "query"], +subscriptions_show() -> + Cmd = ["subscriptions", "show"], KeySpecs = [{'client_id', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = From 1f2cd4023739c226837de828ea61923f72bdf3cc Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 17 May 2017 17:12:19 +0800 Subject: [PATCH 06/16] Invalid params --- src/emqttd_cli2.erl | 187 ++++++++++++++++++++++++++++---------------- 1 file changed, 120 insertions(+), 67 deletions(-) diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index 6a784977d..efae3e62f 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -197,12 +197,17 @@ cluster_join() -> KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, Node}], _) -> - Text = case emqttd_cluster:join(Node) of - ok -> - ["Join the cluster successfully.\n", cluster(["status"])]; - {error, Error} -> - io_lib:format("Failed to join the cluster: ~p~n", [Error]) + fun (_, Params, _) -> + Text = case get_value('node', Params) of + undefined -> + io_lib:format("Invalid params node is undefined~n", []); + Node -> + case emqttd_cluster:join(Node) of + ok -> + ["Join the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to join the cluster: ~p~n", [Error]) + end end, [clique_status:text(Text)] end, @@ -227,12 +232,17 @@ cluster_remove() -> KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, Node}], _) -> - Text = case emqttd_cluster:remove(Node) of - ok -> - ["Remove the cluster successfully.\n", cluster(["status"])]; - {error, Error} -> - io_lib:format("Failed to remove the cluster: ~p~n", [Error]) + fun (_, Params, _) -> + Text = case get_value('node', Params) of + undefined -> + io_lib:format("Invalid params node is undefined~n", []); + Node -> + case emqttd_cluster:remove(Node) of + ok -> + ["Remove the cluster successfully.\n", cluster(["status"])]; + {error, Error} -> + io_lib:format("Failed to remove the cluster: ~p~n", [Error]) + end end, [clique_status:text(Text)] end, @@ -279,11 +289,16 @@ clients_kick() -> KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}], _) -> - Result = if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end), - case Result of - [ok] -> [clique_status:text(io_lib:format("Kick client_id: ~p successfully~n", [ClientId]))]; - _ -> [clique_status:text("")] + fun (_, Params, _) -> + case get_value('client_id', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; + ClientId -> + Result = if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end), + case Result of + [ok] -> [clique_status:text(io_lib:format("Kick client_id: ~p successfully~n", [ClientId]))]; + _ -> [clique_status:text("")] + end end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -332,14 +347,18 @@ sessions_show() -> KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}], _) -> - - case ets:lookup(mqtt_local_session, ClientId) of - [] -> - ?PRINT_MSG("Not Found.~n"), - [clique_status:table([])]; - [SessInfo] -> - [clique_status:table([print(SessInfo)])] + fun (_, Params, _) -> + case get_value('client_id', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; + ClientId -> + case ets:lookup(mqtt_local_session, ClientId) of + [] -> + ?PRINT_MSG("Not Found.~n"), + [clique_status:table([])]; + [SessInfo] -> + [clique_status:table([print(SessInfo)])] + end end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -361,8 +380,13 @@ routes_show() -> KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, Topic}], _) -> - [clique_status:table([print(mnesia:dirty_read(mqtt_route, Topic))])] + fun (_, Params, _) -> + case get_value('topic', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params topic is undefined~n", []))]; + Topic -> + [clique_status:table([print(mnesia:dirty_read(mqtt_route, Topic))])] + end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -383,9 +407,14 @@ topics_show() -> KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, Topic}], _) -> - Table = print(mnesia:dirty_read(mqtt_route, Topic)), - [clique_status:table([Table])] + fun (_, Params, _) -> + case get_value('client_id', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params topic is undefined~n", []))]; + Topic -> + Table = print(mnesia:dirty_read(mqtt_route, Topic)), + [clique_status:table([Table])] + end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -407,14 +436,19 @@ subscriptions_show() -> KeySpecs = [{'client_id', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}], _) -> - case ets:lookup(mqtt_subscription, ClientId) of - [] -> - ?PRINT_MSG("Not Found.~n"), - [clique_status:table([])]; - Records -> - Table = [print(subscription, Subscription) || Subscription <- Records], - [clique_status:table(Table)] + fun (_, Params, _) -> + case get_value('client_id', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; + ClientId -> + case ets:lookup(mqtt_subscription, ClientId) of + [] -> + ?PRINT_MSG("Not Found.~n"), + [clique_status:table([])]; + Records -> + Table = [print(subscription, Subscription) || Subscription <- Records], + [clique_status:table(Table)] + end end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -432,11 +466,11 @@ subscriptions_subscribe() -> QoS = get_value('qos', Params), Text = case {Topic, ClientId, QoS} of {undefined, _, _} -> - io_lib:format("Invalid topic is undefined~n", []); + io_lib:format("Invalid params topic is undefined~n", []); {_, undefined, _} -> - io_lib:format("Invalid client_id is undefined~n", []); + io_lib:format("Invalid params client_id is undefined~n", []); {_, _, undefined} -> - io_lib:format("Invalid qos is undefined~n", []); + io_lib:format("Invalid params qos is undefined~n", []); {_, _, _} -> case emqttd:subscribe(Topic, ClientId, [{qos, QoS}]) of ok -> @@ -456,10 +490,15 @@ subscriptions_del() -> KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}], _) -> - emqttd:subscriber_down(ClientId), - Text = io_lib:format("Client_id del subscriptions:~p successfully~n", [ClientId]), - [clique_status:text(Text)] + fun (_, Params, _) -> + case get_value('client_id', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; + ClientId -> + emqttd:subscriber_down(ClientId), + Text = io_lib:format("Client_id del subscriptions:~p successfully~n", [ClientId]), + [clique_status:text(Text)] + end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). @@ -475,11 +514,11 @@ subscriptions_unsubscribe() -> QoS = get_value('qos', Params), Text = case {Topic, ClientId, QoS} of {undefined, _, _} -> - io_lib:format("Invalid topic is undefined~n", []); + io_lib:format("Invalid params topic is undefined~n", []); {_, undefined, _} -> - io_lib:format("Invalid client_id is undefined~n", []); + io_lib:format("Invalid params client_id is undefined~n", []); {_, _, undefined} -> - io_lib:format("Invalid qos is undefined~n", []); + io_lib:format("Invalid params qos is undefined~n", []); {_, _, _} -> emqttd:unsubscribe(Topic, ClientId), @@ -505,12 +544,17 @@ plugins_load() -> KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, PluginName}], _) -> - Text = case emqttd_plugins:load(PluginName) of - {ok, StartedApps} -> - io_lib:format("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, PluginName]); - {error, Reason} -> - io_lib:format("load plugin error: ~p~n", [Reason]) + fun (_, Params, _) -> + Text = case get_value('plugin_name', Params) of + undefined -> + io_lib:format("Invalid params plugin_name is undefined~n", []); + PluginName -> + case emqttd_plugins:load(PluginName) of + {ok, StartedApps} -> + io_lib:format("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, PluginName]); + {error, Reason} -> + io_lib:format("load plugin error: ~p~n", [Reason]) + end end, [clique_status:text(Text)] end, @@ -521,12 +565,17 @@ plugins_unload() -> KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, PluginName}], _) -> - Text = case emqttd_plugins:unload(PluginName) of - ok -> - io_lib:format("Plugin ~s unloaded successfully.~n", [PluginName]); - {error, Reason} -> - io_lib:format("unload plugin error: ~p~n", [Reason]) + fun (_, Params, _) -> + Text = case get_value('plugin_name', Params) of + undefined -> + io_lib:format("Invalid params plugin_name is undefined~n", []); + PluginName -> + case emqttd_plugins:unload(PluginName) of + ok -> + io_lib:format("Plugin ~s unloaded successfully.~n", [PluginName]); + {error, Reason} -> + io_lib:format("unload plugin error: ~p~n", [Reason]) + end end, [clique_status:text(Text)] end, @@ -561,9 +610,9 @@ bridges_start() -> fun (_, Params, _) -> Text = case {get_value('snode', Params), get_value('topic', Params)} of {undefined, _} -> - io_lib:format("Invalid snode is undefined~n", []); + io_lib:format("Invalid params snode is undefined~n", []); {_, undefined} -> - io_lib:format("Invalid topic is undefined~n", []); + io_lib:format("Invalid params topic is undefined~n", []); {SNode, Topic} -> Opts = Params -- [{'snode', SNode}, {'topic', Topic}], case emqttd_bridge_sup_sup:start_bridge(SNode, Topic, Opts) of @@ -586,9 +635,9 @@ bridges_stop() -> fun (_, Params, _) -> Text = case {get_value('snode', Params), get_value('topic', Params)} of {undefined, _} -> - io_lib:format("Invalid snode is undefined~n", []); + io_lib:format("Invalid params snode is undefined~n", []); {_, undefined} -> - io_lib:format("Invalid topic is undefined~n", []); + io_lib:format("Invalid params topic is undefined~n", []); {SNode, Topic} -> case emqttd_bridge_sup_sup:stop_bridge(SNode, Topic) of ok -> io_lib:format("bridge is stopped.~n", []); @@ -724,7 +773,7 @@ trace_on() -> topic -> trace_on(topic, get_value('topic', Params), get_value('log_file', Params)); Type -> - io_lib:format("Invalid type: ~p error~n", [Type]) + io_lib:format("Invalid params type: ~p error~n", [Type]) end, [clique_status:text(Text)] end, @@ -744,7 +793,7 @@ trace_off() -> topic -> trace_off(topic, get_value('topic', Params)); Type -> - io_lib:format("Invalid type: ~p error~n", [Type]) + io_lib:format("Invalid params type: ~p error~n", [Type]) end, [clique_status:text(Text)] end, @@ -862,7 +911,11 @@ status_usage() -> ["\n status info Show broker status\n"]. listeners_usage() -> - ["\n listeners info List listeners\n"]. + ["\n listeners info List listeners\n", + " listeners start Create and start a listener\n", + " listeners stop Stop accepting new connections for a running listener\n", + " listeners restart Restart accepting new connections for a stopped listener\n", + " listeners delete Delete a stopped listener"]. mnesia_usage() -> ["\n mnesia info Mnesia system info\n"]. From 95232189cdb8c11d7c9ac228164f17164f9bffa5 Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 17 May 2017 17:15:21 +0800 Subject: [PATCH 07/16] Invalid params --- src/emqttd_cli2.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index efae3e62f..e5fec6d20 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -279,8 +279,13 @@ clients_show() -> KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], FlagSpecs = [], Callback = - fun (_, [{_, ClientId}], _) -> - [clique_status:table(if_client(ClientId, fun print/1))] + fun (_, Params, _) -> + case get_value('client_id', Params) of + undefined -> + [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; + ClientId -> + [clique_status:table(if_client(ClientId, fun print/1))] + end end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). From caaf66311d23442a3a5c149b7ffb3460114b1ab2 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 18 May 2017 09:24:15 +0800 Subject: [PATCH 08/16] Cli show all usage --- src/emqttd_cli2.erl | 91 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 21 deletions(-) diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index e5fec6d20..397d063ea 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -32,27 +32,14 @@ register_cli() -> register_cmd(). run([]) -> - AllUsage = [["broker"], - ["cluster"], - ["acl"], - ["clients"], - ["sessions"], - ["routes"], - ["topics"], - ["subscriptions"], - ["plugins"], - ["bridges"], - ["vm"], - ["trace"], - ["status"], - ["listeners"], - ["mnesia"]], + All = clique_usage:find_all(), io:format("--------------------------------------------------------------------------------~n"), - lists:foreach(fun(Item) -> - io:format("~ts", [clique_usage:find(Item)]), + lists:foreach(fun({Cmd, Usage}) -> + io:format("~p usage:", [Cmd]), + io:format("~ts", [Usage]), io:format("--------------------------------------------------------------------------------~n") - end, AllUsage); - + end, lists:sort(All)); + run(Cmd) -> clique:run(Cmd). @@ -71,6 +58,7 @@ register_usage() -> clique:register_usage(["trace"], trace_usage()), clique:register_usage(["status"], status_usage()), clique:register_usage(["listeners"], listeners_usage()), + clique:register_usage(["listeners", "stop"],listener_stop_usage()), clique:register_usage(["mnesia"], mnesia_usage()). register_cmd() -> @@ -96,32 +84,41 @@ register_cmd() -> sessions_list_persistent(), sessions_list_transient(), sessions_show(), + routes_list(), routes_show(), topics_list(), topics_show(), + subscriptions_list(), subscriptions_show(), subscriptions_subscribe(), subscriptions_del(), subscriptions_unsubscribe(), + plugins_list(), plugins_load(), plugins_unload(), + bridges_list(), bridges_start(), bridges_stop(), + vm_all(), vm_load(), vm_memory(), vm_process(), vm_io(), vm_ports(), + mnesia_info(), + trace_list(), trace_on(), trace_off(), - listeners(). + + listeners(), + listeners_stop(). node_status() -> Cmd = ["status", "info"], @@ -840,6 +837,51 @@ listeners() -> end, clique:register_command(Cmd, [], [], Callback). +listeners_stop() -> + Cmd = ["listeners", "stop"], + KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, + {'port', [{typecast, fun parse_port/1}]}, + {'type', [{typecast, fun parse_type/1}]}], + FlagSpecs = [{kill, [{shortname, "k"}, + {longname, "kill_sessions"}]}], + Callback = + fun (_, Params, Flag) -> + Address = get_value('address', Params), + Port = get_value('port', Params), + Type = get_value('type', Params), + case Address of + undefined -> emqttd_app:stop_listener({Type, Port, []}); + Address -> emqttd_app:stop_listener({Type, {Address, Port}, []}) + end, + [clique_status:text("aaa")] + end, + clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). + +parse_port(Port) -> + case catch list_to_integer(Port) of + P when (P >= 0) and (P=<65535) -> P; + _ -> {error, {invalid_args,[{port, Port}]}} + end. + +parse_addr(Addr) -> + case inet:parse_address(Addr) of + {ok, Ip} -> Ip; + {error, einval} -> + {error, {invalid_args,[{address, Addr}]}} + end. + +parse_type(Type) -> + case catch list_to_atom(Type) of + T when (T=:=tcp) orelse + (T=:=ssl) orelse + (T=:=ws) orelse + (T=:=wss) orelse + (T=:=http) orelse + (T=:=https) -> T; + _ -> {error, {invalid_args,[{type, Type}]}} + end. + + %%------------------------------------------------------------- %% usage %%------------------------------------------------------------- @@ -920,7 +962,14 @@ listeners_usage() -> " listeners start Create and start a listener\n", " listeners stop Stop accepting new connections for a running listener\n", " listeners restart Restart accepting new connections for a stopped listener\n", - " listeners delete Delete a stopped listener"]. + " listeners delete Delete a stopped listener\n"]. + +listener_stop_usage() -> + ["\n listeners stop address=IpAddr port=Port\n", + " Stops accepting new connections on a running listener.\n", + "Options\n", + " -k, --kill_sessions\n" + " kills all sessions accepted with this listener.\n"]. mnesia_usage() -> ["\n mnesia info Mnesia system info\n"]. From 3e1c69dff18b5a999995c712ee813c018c2bf557 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 18 May 2017 16:35:49 +0800 Subject: [PATCH 09/16] Add cli listeners start|stop --- priv/{emqttd.schema => emq.schema} | 0 src/emqttd_cli2.erl | 165 ++++++++++++++++++++++++----- src/emqttd_session.erl | 5 + 3 files changed, 141 insertions(+), 29 deletions(-) rename priv/{emqttd.schema => emq.schema} (100%) diff --git a/priv/emqttd.schema b/priv/emq.schema similarity index 100% rename from priv/emqttd.schema rename to priv/emq.schema diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index 397d063ea..00ffefef5 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -12,7 +12,7 @@ -behaviour(clique_handler). --import(proplists, [get_value/2]). +-import(proplists, [get_value/2, get_value/3]). -define(APP, emqttd). @@ -35,7 +35,7 @@ run([]) -> All = clique_usage:find_all(), io:format("--------------------------------------------------------------------------------~n"), lists:foreach(fun({Cmd, Usage}) -> - io:format("~p usage:", [Cmd]), + io:format("~p usage:", [cuttlefish_variable:format(Cmd)]), io:format("~ts", [Usage]), io:format("--------------------------------------------------------------------------------~n") end, lists:sort(All)); @@ -58,6 +58,7 @@ register_usage() -> clique:register_usage(["trace"], trace_usage()), clique:register_usage(["status"], status_usage()), clique:register_usage(["listeners"], listeners_usage()), + clique:register_usage(["listeners", "start"], listener_start_usage()), clique:register_usage(["listeners", "stop"],listener_stop_usage()), clique:register_usage(["mnesia"], mnesia_usage()). @@ -118,6 +119,7 @@ register_cmd() -> trace_off(), listeners(), + listeners_start(), listeners_stop(). node_status() -> @@ -837,26 +839,119 @@ listeners() -> end, clique:register_command(Cmd, [], [], Callback). -listeners_stop() -> - Cmd = ["listeners", "stop"], - KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, - {'port', [{typecast, fun parse_port/1}]}, - {'type', [{typecast, fun parse_type/1}]}], - FlagSpecs = [{kill, [{shortname, "k"}, - {longname, "kill_sessions"}]}], +listeners_start() -> + Cmd = ["listeners", "start"], + KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, + {'port', [{typecast, fun parse_port/1}]}, + {'type', [{typecast, fun parse_type/1}]}], + FlagSpecs = [{acceptors, [{longname, "acceptors"}, + {typecast, fun(Acceptors) -> list_to_integer(Acceptors) end}]}, + {max_clients, [{longname, "max_clients"}, + {typecast, fun(MaxClients) -> list_to_integer(MaxClients) end}]}, + {buffer, [{longname, "buffer"}, + {typecast, fun(Buffer) -> list_to_integer(Buffer) end}]}, + {tls_versions, [{longname, "tls_versions"}, + {typecast, fun(TlsVersions) -> list_to_atom(TlsVersions) end}]}, + {handshake_timeout, [{longname, "handshake_timeout"}, + {typecast, fun(HandshakeTimeout) -> list_to_integer(HandshakeTimeout) end}]}, + {reuse_sessions, [{longname, "reuse_sessions"}, + {typecast, fun(ReuseSessions) -> list_to_atom(ReuseSessions) end}]}, + {keyfile, [{longname, "keyfile"}, + {typecast, fun(Keyfile) -> Keyfile end}]}, + {certfile, [{longname, "certfile"}, + {typecast, fun(Certfile) -> Certfile end}]}, + {cacertfile, [{longname, "cacertfile"}, + {typecast, fun(Cacertfile) -> Cacertfile end}]}, + {dhfile, [{longname, "dhfile"}, + {typecast, fun(Dhfile) -> Dhfile end}]}, + {verify, [{longname, "verify"}, + {typecast, fun(Verify) -> list_to_atom(Verify) end}]}, + {fail_if_no_peer_cert, [{longname, "fail_if_no_peer_cert"}, + {typecast, fun(FailIfNoPeerCert) -> list_to_atom(FailIfNoPeerCert) end}]}], Callback = fun (_, Params, Flag) -> Address = get_value('address', Params), Port = get_value('port', Params), Type = get_value('type', Params), - case Address of - undefined -> emqttd_app:stop_listener({Type, Port, []}); - Address -> emqttd_app:stop_listener({Type, {Address, Port}, []}) + Text = case {Type, Port}of + {undefined, _} -> + io_lib:format("Invalid params type: ~p error~n", [Type]); + {_, undefined} -> + io_lib:format("Invalid params port: ~p error~n", [Type]); + {_, _} -> + ListenOn = case Address of + undefined -> Port; + _ -> {Address, Port} + end, + Opts = parse_opts(Type, Flag), + case emqttd_app:start_listener({Type, ListenOn, Opts}) of + {ok, _} -> + io_lib:format("Start mqtt:~p listen on ~p successfully", [Type, ListenOn]); + Error -> + io_lib:format("Start mqtt:~p listen on ~p failed, error:~p~n", [Type, ListenOn, Error]) + end end, - [clique_status:text("aaa")] + [clique_status:text(Text)] end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). +listeners_stop() -> + Cmd = ["listeners", "stop"], + KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, + {'port', [{typecast, fun parse_port/1}]}, + {'type', [{typecast, fun parse_type/1}]}], + Callback = + fun (_, Params, _) -> + Address = get_value('address', Params), + Port = get_value('port', Params), + Type = get_value('type', Params), + Text = case {Type, Port}of + {undefined, _} -> + io_lib:format("Invalid params type: ~p error~n", [Type]); + {_, undefined} -> + io_lib:format("Invalid params port: ~p error~n", [Type]); + {_, _} -> + case Address of + undefined -> + emqttd_app:stop_listener({Type, Port, []}), + io_lib:format("stopped mqtt:~p on ~p~n", [Type, Port]); + Address -> + emqttd_app:stop_listener({Type, {Address, Port}, []}), + io_lib:format("stopped mqtt:~p on ~p:~p~n", [Type, emqttd_net:ntoa(Address), Port]) + end + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, [], Callback). + +parse_opts(Type, Opts) when Type == ssl + orelse Type == wss + orelse Type == https -> + + OptList = [handshake_timeout, reuse_sessions, keyfile, certfile, + cacertfile, dhfile, verify, fail_if_no_peer_cert], + SslOpts = lists:foldl( + fun(Opt, Acc) -> + case get_value(Opt, Opts) of + undefined -> Acc; + OptVal -> [[{Opt, OptVal}] | Acc] + end + end, [], OptList) ++ + case get_value(tls_versions, Opts) of + undefined -> []; + TlsVersions -> [{versions, [TlsVersions]}] + end, + case SslOpts of + [] -> parse_opts(undefined, Opts); + _ -> [{sslopts, SslOpts}] ++ parse_opts(undefined, Opts) + end; +parse_opts(_Type, Opts) -> + Acceptors = get_value(acceptors, Opts, 4), + MaxClients = get_value(max_clients, Opts, 1024), + Buffer = get_value(buffer, Opts, 4096), + [{acceptors, Acceptors}, {max_clients, MaxClients}, {sockopts, [{buffer, Buffer}]}]. + + parse_port(Port) -> case catch list_to_integer(Port) of P when (P >= 0) and (P=<65535) -> P; @@ -920,11 +1015,11 @@ topics_usage() -> " topics show topic= Show a topic\n"]. subscriptions_usage() -> - ["\n subscriptions list List all subscriptions\n", - " subscriptions show client_id= Show subscriptions of a client\n", - " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", - " subscriptions del client_id= Delete static subscriptions manually\n", - " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. + ["\n subscriptions list List all subscriptions\n", + " subscriptions show client_id= Show subscriptions of a client\n", + " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", + " subscriptions del client_id= Delete static subscriptions manually\n", + " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. plugins_usage() -> ["\n plugins list Show loaded plugins\n", @@ -950,9 +1045,9 @@ vm_usage() -> " vm ports Show Ports of Erlang VM\n"]. trace_usage() -> - ["\n trace list List all traces\n", - " trace type=client|topic client_id= topic= log_file= Start tracing\n", - " trace off type=client|topic client_id= topic= Stop tracing\n"]. + ["\n trace list List all traces\n", + " trace type=client|topic client_id= topic= log_file= Start tracing\n", + " trace off type=client|topic client_id= topic= Stop tracing\n"]. status_usage() -> ["\n status info Show broker status\n"]. @@ -960,16 +1055,28 @@ status_usage() -> listeners_usage() -> ["\n listeners info List listeners\n", " listeners start Create and start a listener\n", - " listeners stop Stop accepting new connections for a running listener\n", - " listeners restart Restart accepting new connections for a stopped listener\n", - " listeners delete Delete a stopped listener\n"]. + " listeners stop Stop accepting new connections for a running listener\n"]. + +listener_start_usage() -> + ["\n listeners start address= port= type=\n", + " Create and start a listener.\n", + "Options:\n", + " --acceptors= Size of acceptor pool\n", + " --max_clients= Maximum number of concurrent clients\n", + " --buffer= TCP Socket Options\n", + " --tls_versions= TLS protocol versions\n", + " --handshake_timeout= TLS handshake timeout\n", + " --reuse_sessions= TLS allows clients to reuse pre-existing sessions\n", + " --keyfile= Path to the file containing the user's private PEM-encoded key\n", + " --certfile= Path to a file containing the user certificate\n", + " --cacertfile= Path to a file containing PEM-encoded CA certificates\n", + " --dhfile= Path to a file containing PEM-encoded Diffie Hellman\n", + " --verify= A server only does x509-path validation in mode\n", + " --fail_if_no_peer_cert= Used together with {verify, verify_peer} by an SSL server\n"]. listener_stop_usage() -> - ["\n listeners stop address=IpAddr port=Port\n", - " Stops accepting new connections on a running listener.\n", - "Options\n", - " -k, --kill_sessions\n" - " kills all sessions accepted with this listener.\n"]. + ["\n listeners stop address= port= type=\n", + " Stops accepting new connections on a running listener.\n"]. mnesia_usage() -> ["\n mnesia info Mnesia system info\n"]. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 02d23567f..f227339dc 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -559,6 +559,11 @@ handle_info({'EXIT', ClientPid, _Reason}, State = #state{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, State}; +%% ClientPid was killed +handle_info({'EXIT', ClientPid, killed}, State) -> + ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, killed], State), + shutdown(killed, State); + handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid, From 61fa9f3f89f7ac80268f00813fad5a7d02dbdba5 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 18 May 2017 16:44:46 +0800 Subject: [PATCH 10/16] Cli listeners start add flag --backlog --- src/emqttd_cli2.erl | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index 00ffefef5..81ef701d3 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -848,6 +848,8 @@ listeners_start() -> {typecast, fun(Acceptors) -> list_to_integer(Acceptors) end}]}, {max_clients, [{longname, "max_clients"}, {typecast, fun(MaxClients) -> list_to_integer(MaxClients) end}]}, + {backlog, [{longname, "backlog"}, + {typecast, fun(Backlog) -> list_to_integer(Backlog) end}]}, {buffer, [{longname, "buffer"}, {typecast, fun(Buffer) -> list_to_integer(Buffer) end}]}, {tls_versions, [{longname, "tls_versions"}, @@ -949,7 +951,10 @@ parse_opts(_Type, Opts) -> Acceptors = get_value(acceptors, Opts, 4), MaxClients = get_value(max_clients, Opts, 1024), Buffer = get_value(buffer, Opts, 4096), - [{acceptors, Acceptors}, {max_clients, MaxClients}, {sockopts, [{buffer, Buffer}]}]. + Backlog = get_value(backlog, Opts, 1024), + [{acceptors, Acceptors}, + {max_clients, MaxClients}, + {sockopts, [{buffer, Buffer}, {backlog, Backlog}]}]. parse_port(Port) -> @@ -1063,9 +1068,10 @@ listener_start_usage() -> "Options:\n", " --acceptors= Size of acceptor pool\n", " --max_clients= Maximum number of concurrent clients\n", + " --backlog= TCP Socket Options\n", " --buffer= TCP Socket Options\n", " --tls_versions= TLS protocol versions\n", - " --handshake_timeout= TLS handshake timeout\n", + " --handshake_timeout= TLS handshake timeout(ms)\n", " --reuse_sessions= TLS allows clients to reuse pre-existing sessions\n", " --keyfile= Path to the file containing the user's private PEM-encoded key\n", " --certfile= Path to a file containing the user certificate\n", From 798c60f269d3eeb2b2583bcd202b0e841e5843b1 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 19 May 2017 09:26:52 +0800 Subject: [PATCH 11/16] Rollback code --- Makefile | 4 +- src/emqttd.app.src | 2 +- src/emqttd_app.erl | 1 - src/emqttd_cli2.erl | 1163 ------------------------------------- src/emqttd_cli_format.erl | 27 - src/emqttd_session.erl | 5 - 6 files changed, 2 insertions(+), 1200 deletions(-) delete mode 100644 src/emqttd_cli2.erl delete mode 100644 src/emqttd_cli_format.erl diff --git a/Makefile b/Makefile index 88bb4c53a..272cc6f2e 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.2 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt clique jsx +DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc @@ -13,8 +13,6 @@ dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master -dep_clique = git https://github.com/turtleDeng/clique -dep_jsx = git https://github.com/talentdeficit/jsx ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 3e8c7e1bd..3a7ed3482 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -4,7 +4,7 @@ {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, - lager_syslog,pbkdf2,bcrypt,jsx]}, + lager_syslog,pbkdf2,bcrypt]}, {env,[]}, {mod,{emqttd_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 3b8a9c85b..992e22da6 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -43,7 +43,6 @@ start(_Type, _Args) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), - emqttd_cli2:register_cli(), register_acl_mod(), emqttd_plugins:init(), emqttd_plugins:load(), diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl deleted file mode 100644 index 81ef701d3..000000000 --- a/src/emqttd_cli2.erl +++ /dev/null @@ -1,1163 +0,0 @@ --module (emqttd_cli2). - --export([register_cli/0]). - --include("emqttd.hrl"). - --include("emqttd_cli.hrl"). - --include("emqttd_protocol.hrl"). - --export([run/1]). - --behaviour(clique_handler). - --import(proplists, [get_value/2, get_value/3]). - --define(APP, emqttd). - --define(PROC_INFOKEYS, [status, - memory, - message_queue_len, - total_heap_size, - heap_size, - stack_size, - reductions]). - -register_cli() -> - F = fun() -> emqttd_mnesia:running_nodes() end, - clique:register_node_finder(F), - clique:register_writer("json", emqttd_cli_format), - register_usage(), - register_cmd(). - -run([]) -> - All = clique_usage:find_all(), - io:format("--------------------------------------------------------------------------------~n"), - lists:foreach(fun({Cmd, Usage}) -> - io:format("~p usage:", [cuttlefish_variable:format(Cmd)]), - io:format("~ts", [Usage]), - io:format("--------------------------------------------------------------------------------~n") - end, lists:sort(All)); - -run(Cmd) -> - clique:run(Cmd). - -register_usage() -> - clique:register_usage(["broker"], broker_usage()), - clique:register_usage(["cluster"], cluster_usage()), - clique:register_usage(["acl"], acl_usage()), - clique:register_usage(["clients"], clients_usage()), - clique:register_usage(["sessions"], sessions_usage()), - clique:register_usage(["routes"], routes_usage()), - clique:register_usage(["topics"], topics_usage()), - clique:register_usage(["subscriptions"], subscriptions_usage()), - clique:register_usage(["plugins"], plugins_usage()), - clique:register_usage(["bridges"], bridges_usage()), - clique:register_usage(["vm"], vm_usage()), - clique:register_usage(["trace"], trace_usage()), - clique:register_usage(["status"], status_usage()), - clique:register_usage(["listeners"], listeners_usage()), - clique:register_usage(["listeners", "start"], listener_start_usage()), - clique:register_usage(["listeners", "stop"],listener_stop_usage()), - clique:register_usage(["mnesia"], mnesia_usage()). - -register_cmd() -> - - node_status(), - - broker_status(), - broker_stats(), - broker_metrics(), - broker_pubsub(), - - cluster_join(), - cluster_leave(), - cluster_remove(), - - acl_reload(), - - clients_list(), - clients_show(), - clients_kick(), - - sessions_list(), - sessions_list_persistent(), - sessions_list_transient(), - sessions_show(), - - routes_list(), - routes_show(), - topics_list(), - topics_show(), - - subscriptions_list(), - subscriptions_show(), - subscriptions_subscribe(), - subscriptions_del(), - subscriptions_unsubscribe(), - - plugins_list(), - plugins_load(), - plugins_unload(), - - bridges_list(), - bridges_start(), - bridges_stop(), - - vm_all(), - vm_load(), - vm_memory(), - vm_process(), - vm_io(), - vm_ports(), - - mnesia_info(), - - trace_list(), - trace_on(), - trace_off(), - - listeners(), - listeners_start(), - listeners_stop(). - -node_status() -> - Cmd = ["status", "info"], - Callback = - fun (_, _, _) -> - {Status, Vsn} = case lists:keysearch(?APP, 1, application:which_applications()) of - false -> - {"not running", undefined}; - {value, {?APP, _Desc, Vsn0}} -> - {"running", Vsn0} - end, - [clique_status:table([[{node, node()}, {status, Status}, {version, Vsn}]])] - end, - clique:register_command(Cmd, [], [], Callback). - -%%-------------------------------------------------------------------- -%% @doc Query broker - -broker_status() -> - Cmd = ["broker", "info"], - Callback = - fun (_, _, _) -> - Funs = [sysdescr, version, uptime, datetime], - Table = lists:map(fun(Fun) -> - {Fun, emqttd_broker:Fun()} - end, Funs), - [clique_status:table([Table])] - end, - clique:register_command(Cmd, [], [], Callback). - -broker_stats() -> - Cmd = ["broker", "stats"], - Callback = - fun (_, _, _) -> - lists:map( - fun({Key, Val}) -> - clique_status:list(Key, io_lib:format("~p", [Val])) - end, emqttd_stats:getstats()) - end, - clique:register_command(Cmd, [], [], Callback). - -broker_metrics() -> - Cmd = ["broker", "metrics"], - Callback = - fun (_, _, _) -> - lists:map( - fun({Key, Val}) -> - clique_status:list(Key, io_lib:format("~p", [Val])) - end, lists:sort(emqttd_metrics:all())) - end, - clique:register_command(Cmd, [], [], Callback). - -broker_pubsub() -> - Cmd = ["broker", "pubsub"], - Callback = - fun (_, _, _) -> - Pubsubs = supervisor:which_children(emqttd_pubsub_sup:pubsub_pool()), - Table = lists:map( - fun({{_, Id}, Pid, _, _}) -> - ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), - [{id, Id}] ++ ProcInfo - end, lists:reverse(Pubsubs)), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - - -%%-------------------------------------------------------------------- -%% @doc Cluster with other nodes - -cluster_join() -> - Cmd = ["cluster", "join"], - KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case get_value('node', Params) of - undefined -> - io_lib:format("Invalid params node is undefined~n", []); - Node -> - case emqttd_cluster:join(Node) of - ok -> - ["Join the cluster successfully.\n", cluster(["status"])]; - {error, Error} -> - io_lib:format("Failed to join the cluster: ~p~n", [Error]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -cluster_leave() -> - Cmd = ["cluster", "leave"], - Callback = - fun(_, _, _) -> - Text = case emqttd_cluster:leave() of - ok -> - ["Leave the cluster successfully.\n", cluster(["status"])]; - {error, Error} -> - io_lib:format("Failed to leave the cluster: ~p~n", [Error]) - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, [], [], Callback). - -cluster_remove() -> - Cmd = ["cluster", "remove"], - KeySpecs = [{'node', [{typecast, fun(Node) -> list_to_atom(Node) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case get_value('node', Params) of - undefined -> - io_lib:format("Invalid params node is undefined~n", []); - Node -> - case emqttd_cluster:remove(Node) of - ok -> - ["Remove the cluster successfully.\n", cluster(["status"])]; - {error, Error} -> - io_lib:format("Failed to remove the cluster: ~p~n", [Error]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -cluster(["status"]) -> - io_lib:format("Cluster status: ~p~n", [emqttd_cluster:status()]). - -%%-------------------------------------------------------------------- -%% @doc acl - -acl_reload() -> - Cmd = ["acl", "reload"], - Callback = - fun (_, _, _) -> - emqttd_access_control:reload_acl(), - [clique_status:text("")] - end, - clique:register_command(Cmd, [], [], Callback). - -%%-------------------------------------------------------------------- -%% @doc Query clients - -clients_list() -> - Cmd = ["clients", "list"], - Callback = - fun (_, _, _) -> - [clique_status:table(dump(mqtt_client))] - end, - clique:register_command(Cmd, [], [], Callback). - -clients_show() -> - Cmd = ["clients", "show"], - KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('client_id', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; - ClientId -> - [clique_status:table(if_client(ClientId, fun print/1))] - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -clients_kick() -> - Cmd = ["clients", "kick"], - KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('client_id', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; - ClientId -> - Result = if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end), - case Result of - [ok] -> [clique_status:text(io_lib:format("Kick client_id: ~p successfully~n", [ClientId]))]; - _ -> [clique_status:text("")] - end - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -if_client(ClientId, Fun) -> - case emqttd_cm:lookup(ClientId) of - undefined -> ?PRINT_MSG("Not Found.~n"), []; - Client -> [Fun(Client)] - end. - -%%-------------------------------------------------------------------- -%% @doc Sessions Command - -sessions_list() -> - Cmd = ["sessions", "list"], - Callback = - fun (_, _, _) -> - [clique_status:table(dump(mqtt_local_session))] - end, - clique:register_command(Cmd, [], [], Callback). - -%% performance issue? - -sessions_list_persistent() -> - Cmd = ["sessions", "list", "persistent"], - Callback = - fun (_, _, _) -> - Table = lists:map(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'})), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -%% performance issue? - -sessions_list_transient() -> - Cmd = ["sessions", "list", "transient"], - Callback = - fun (_, _, _) -> - Table = lists:map(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'})), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -sessions_show() -> - Cmd = ["sessions", "show"], - KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('client_id', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; - ClientId -> - case ets:lookup(mqtt_local_session, ClientId) of - [] -> - ?PRINT_MSG("Not Found.~n"), - [clique_status:table([])]; - [SessInfo] -> - [clique_status:table([print(SessInfo)])] - end - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -%%-------------------------------------------------------------------- -%% @doc Routes Command - -routes_list() -> - Cmd = ["routes", "list"], - Callback = - fun (_, _, _) -> - Table = lists:flatten(lists:map(fun print/1, emqttd_router:dump())), - [clique_status:table([Table])] - end, - clique:register_command(Cmd, [], [], Callback). - -routes_show() -> - Cmd = ["routes", "show"], - KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('topic', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params topic is undefined~n", []))]; - Topic -> - [clique_status:table([print(mnesia:dirty_read(mqtt_route, Topic))])] - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -%%-------------------------------------------------------------------- -%% @doc Topics Command - -topics_list() -> - Cmd = ["topics", "list"], - Callback = - fun (_, _, _) -> - Table = lists:map(fun(Topic) -> [{topic, Topic}] end, emqttd:topics()), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -topics_show() -> - Cmd = ["topics", "show"], - KeySpecs = [{'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('client_id', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params topic is undefined~n", []))]; - Topic -> - Table = print(mnesia:dirty_read(mqtt_route, Topic)), - [clique_status:table([Table])] - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -%%-------------------------------------------------------------------- -%% @doc Subscriptions Command -subscriptions_list() -> - Cmd = ["subscriptions", "list"], - Callback = - fun (_, _, _) -> - Table = lists:map(fun(Subscription) -> - print(subscription, Subscription) - end, ets:tab2list(mqtt_subscription)), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -subscriptions_show() -> - Cmd = ["subscriptions", "show"], - KeySpecs = [{'client_id', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('client_id', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; - ClientId -> - case ets:lookup(mqtt_subscription, ClientId) of - [] -> - ?PRINT_MSG("Not Found.~n"), - [clique_status:table([])]; - Records -> - Table = [print(subscription, Subscription) || Subscription <- Records], - [clique_status:table(Table)] - end - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -subscriptions_subscribe() -> - Cmd = ["subscriptions", "subscribe"], - KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, - {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, - {'qos', [{typecast, fun(QoS) -> list_to_integer(QoS) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Topic = get_value('topic', Params), - ClientId = get_value('client_id', Params), - QoS = get_value('qos', Params), - Text = case {Topic, ClientId, QoS} of - {undefined, _, _} -> - io_lib:format("Invalid params topic is undefined~n", []); - {_, undefined, _} -> - io_lib:format("Invalid params client_id is undefined~n", []); - {_, _, undefined} -> - io_lib:format("Invalid params qos is undefined~n", []); - {_, _, _} -> - case emqttd:subscribe(Topic, ClientId, [{qos, QoS}]) of - ok -> - io_lib:format("Client_id: ~p subscribe topic: ~p qos: ~p successfully~n", [ClientId, Topic, QoS]); - {error, already_existed} -> - io_lib:format("Error: client_id: ~p subscribe topic: ~p already existed~n", [ClientId, Topic]); - {error, Reason} -> - io_lib:format("Error: ~p~n", [Reason]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -subscriptions_del() -> - Cmd = ["subscriptions", "del"], - KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - case get_value('client_id', Params) of - undefined -> - [clique_status:text(io_lib:format("Invalid params client_id is undefined~n", []))]; - ClientId -> - emqttd:subscriber_down(ClientId), - Text = io_lib:format("Client_id del subscriptions:~p successfully~n", [ClientId]), - [clique_status:text(Text)] - end - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -subscriptions_unsubscribe() -> - Cmd = ["subscriptions", "unsubscribe"], - KeySpecs = [{'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, - {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Topic = get_value('topic', Params), - ClientId = get_value('client_id', Params), - QoS = get_value('qos', Params), - Text = case {Topic, ClientId, QoS} of - {undefined, _, _} -> - io_lib:format("Invalid params topic is undefined~n", []); - {_, undefined, _} -> - io_lib:format("Invalid params client_id is undefined~n", []); - {_, _, undefined} -> - io_lib:format("Invalid params qos is undefined~n", []); - - {_, _, _} -> - emqttd:unsubscribe(Topic, ClientId), - io_lib:format("Client_id: ~p unsubscribe topic: ~p successfully~n", [ClientId, Topic]) - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -%%-------------------------------------------------------------------- -%% @doc Plugins Command -plugins_list() -> - Cmd = ["plugins", "list"], - Callback = - fun (_, _, _) -> - Text = lists:map(fun(Plugin) -> print(Plugin) end, emqttd_plugins:list()), - [clique_status:table(Text)] - end, - clique:register_command(Cmd, [], [], Callback). - -plugins_load() -> - Cmd = ["plugins", "load"], - KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case get_value('plugin_name', Params) of - undefined -> - io_lib:format("Invalid params plugin_name is undefined~n", []); - PluginName -> - case emqttd_plugins:load(PluginName) of - {ok, StartedApps} -> - io_lib:format("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, PluginName]); - {error, Reason} -> - io_lib:format("load plugin error: ~p~n", [Reason]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -plugins_unload() -> - Cmd = ["plugins", "unload"], - KeySpecs = [{'plugin_name', [{typecast, fun(PluginName) -> list_to_atom(PluginName) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case get_value('plugin_name', Params) of - undefined -> - io_lib:format("Invalid params plugin_name is undefined~n", []); - PluginName -> - case emqttd_plugins:unload(PluginName) of - ok -> - io_lib:format("Plugin ~s unloaded successfully.~n", [PluginName]); - {error, Reason} -> - io_lib:format("unload plugin error: ~p~n", [Reason]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - - -%%-------------------------------------------------------------------- -%% @doc Bridges command - -bridges_list() -> - Cmd = ["bridges", "list"], - Callback = - fun (_, _, _) -> - Text = lists:map( - fun({Node, Topic, _Pid}) -> - [{bridge, node()}, {topic, Topic}, {node, Node}] - end, emqttd_bridge_sup_sup:bridges()), - [clique_status:table(Text)] - end, - clique:register_command(Cmd, [], [], Callback). - -bridges_start() -> - Cmd = ["bridges", "start"], - KeySpecs = [{'snode', [{typecast, fun(SNode) -> list_to_atom(SNode) end}]}, - {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, - {'qos', [{typecast, fun(Qos) -> list_to_integer(Qos) end}]}, - {'topic_suffix', [{typecast, fun(Prefix) -> list_to_binary(Prefix) end}]}, - {'topic_prefix', [{typecast, fun(Suffix) -> list_to_binary(Suffix) end}]}, - {'max_queue_len', [{typecast, fun(Queue) -> list_to_integer(Queue) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case {get_value('snode', Params), get_value('topic', Params)} of - {undefined, _} -> - io_lib:format("Invalid params snode is undefined~n", []); - {_, undefined} -> - io_lib:format("Invalid params topic is undefined~n", []); - {SNode, Topic} -> - Opts = Params -- [{'snode', SNode}, {'topic', Topic}], - case emqttd_bridge_sup_sup:start_bridge(SNode, Topic, Opts) of - {ok, _} -> - io_lib:format("bridge is started.~n", []); - {error, Error} -> - io_lib:format("error: ~p~n", [Error]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -bridges_stop() -> - Cmd = ["bridges", "stop"], - KeySpecs = [{'snode', [{typecast, fun(SNode) -> list_to_atom(SNode) end}]}, - {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case {get_value('snode', Params), get_value('topic', Params)} of - {undefined, _} -> - io_lib:format("Invalid params snode is undefined~n", []); - {_, undefined} -> - io_lib:format("Invalid params topic is undefined~n", []); - {SNode, Topic} -> - case emqttd_bridge_sup_sup:stop_bridge(SNode, Topic) of - ok -> io_lib:format("bridge is stopped.~n", []); - {error, Error} -> io_lib:format("error: ~p~n", [Error]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -%%-------------------------------------------------------------------- -%% @doc vm command - -vm_all() -> - Cmd = ["vm","info"], - Callback = - fun (_, _, _) -> - Cpu = [vm_info("cpu", K, list_to_float(V)) || {K, V} <- emqttd_vm:loads()], - Memory = [vm_info("memory", K, V) || {K, V} <- erlang:memory()], - Process = [vm_info("process", K, erlang:system_info(V)) || {K, V} <- [{limit, process_limit}, {count, process_count}]], - IoInfo = erlang:system_info(check_io), - Io = [vm_info("io", K, get_value(K, IoInfo)) || K <- [max_fds, active_fds]], - Ports = [vm_info("ports", K, erlang:system_info(V)) || {K, V} <- [{count, port_count}, {limit, port_limit}]], - lists:flatten([Cpu, Memory, Process, Io, Ports]) - end, - clique:register_command(Cmd, [], [], Callback). - -vm_info(Item, K, V) -> - clique_status:list(format_key(Item, K), io_lib:format("~p", [V])). - -format_key(Item, K) -> - list_to_atom(lists:concat([Item, "/", K])). - -vm_load() -> - Cmd = ["vm","load"], - Callback = - fun (_, _, _) -> - Table = lists:map( - fun({Name, Val}) -> - [{name, Name}, {val, Val}] - end, emqttd_vm:loads()), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -vm_memory() -> - Cmd = ["vm","memory"], - Callback = - fun (_, _, _) -> - Table = lists:map( - fun({Name, Val}) -> - [{name, Name}, {val, Val}] - end, erlang:memory()), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -vm_process() -> - Cmd = ["vm","process"], - Callback = - fun (_, _, _) -> - Table = lists:map( - fun({Name, Val}) -> - [{name, Name}, {val, erlang:system_info(Val)}] - end, [{limit, process_limit}, {count, process_count}]), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -vm_io() -> - Cmd = ["vm","io"], - Callback = - fun (_, _, _) -> - IoInfo = erlang:system_info(check_io), - Table = lists:map( - fun(Key) -> - [{name, Key}, {val, get_value(Key, IoInfo)}] - end, [max_fds, active_fds]), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -vm_ports() -> - Cmd = ["vm","ports"], - Callback = - fun (_, _, _) -> - Table = lists:map( - fun({Name, Val}) -> - [{name, Name}, {val, erlang:system_info(Val)}] - end, [{count, port_count}, {limit, port_limit}]), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -% %%-------------------------------------------------------------------- -%% @doc mnesia Command - -mnesia_info() -> - Cmd = ["mnesia", "info"], - Callback = - fun (_, _, _) -> - mnesia:system_info(), - [clique_status:text("")] - end, - clique:register_command(Cmd, [], [], Callback). - -%%-------------------------------------------------------------------- -%% @doc Trace Command - -trace_list() -> - Cmd = ["trace", "list"], - Callback = - fun (_, _, _) -> - Table = lists:map(fun({{Who, Name}, LogFile}) -> - [{trace, Who}, {name, Name}, {log_file, LogFile}] - end, emqttd_trace:all_traces()), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -trace_on() -> - Cmd = ["trace"], - KeySpecs = [{'type', [{typecast, fun(Type) -> list_to_atom(Type) end}]}, - {'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, - {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}, - {'log_file', [{typecast, fun(LogFile) -> list_to_binary(LogFile) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case get_value('type', Params) of - client -> - trace_on(client, get_value('client_id', Params), get_value('log_file', Params)); - topic -> - trace_on(topic, get_value('topic', Params), get_value('log_file', Params)); - Type -> - io_lib:format("Invalid params type: ~p error~n", [Type]) - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -trace_off() -> - Cmd = ["trace", "off"], - KeySpecs = [{'type', [{typecast, fun(Type) -> list_to_atom(Type) end}]}, - {'client_id', [{typecast, fun(ClientId) -> list_to_binary(ClientId) end}]}, - {'topic', [{typecast, fun(Topic) -> list_to_binary(Topic) end}]}], - FlagSpecs = [], - Callback = - fun (_, Params, _) -> - Text = case get_value('type', Params) of - client -> - trace_off(client, get_value('client_id', Params)); - topic -> - trace_off(topic, get_value('topic', Params)); - Type -> - io_lib:format("Invalid params type: ~p error~n", [Type]) - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -trace_on(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, Name}, LogFile) of - ok -> - io_lib:format("trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - io_lib:format("trace ~s ~s error: ~p~n", [Who, Name, Error]) - end. - -trace_off(Who, Name) -> - case emqttd_trace:stop_trace({Who, Name}) of - ok -> - io_lib:format("stop tracing ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - io_lib:format("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error]) - end. - -%%-------------------------------------------------------------------- -%% @doc Listeners Command - -listeners() -> - Cmd = ["listeners", "info"], - Callback = - fun (_, _, _) -> - Table = - lists:map(fun({{Protocol, ListenOn}, Pid}) -> - Info = [{acceptors, esockd:get_acceptors(Pid)}, - {max_clients, esockd:get_max_clients(Pid)}, - {current_clients,esockd:get_current_clients(Pid)}, - {shutdown_count, esockd:get_shutdown_count(Pid)}], - Listener = io_lib:format("~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), - [{listener, Listener}| Info] - end, esockd:listeners()), - [clique_status:table(Table)] - end, - clique:register_command(Cmd, [], [], Callback). - -listeners_start() -> - Cmd = ["listeners", "start"], - KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, - {'port', [{typecast, fun parse_port/1}]}, - {'type', [{typecast, fun parse_type/1}]}], - FlagSpecs = [{acceptors, [{longname, "acceptors"}, - {typecast, fun(Acceptors) -> list_to_integer(Acceptors) end}]}, - {max_clients, [{longname, "max_clients"}, - {typecast, fun(MaxClients) -> list_to_integer(MaxClients) end}]}, - {backlog, [{longname, "backlog"}, - {typecast, fun(Backlog) -> list_to_integer(Backlog) end}]}, - {buffer, [{longname, "buffer"}, - {typecast, fun(Buffer) -> list_to_integer(Buffer) end}]}, - {tls_versions, [{longname, "tls_versions"}, - {typecast, fun(TlsVersions) -> list_to_atom(TlsVersions) end}]}, - {handshake_timeout, [{longname, "handshake_timeout"}, - {typecast, fun(HandshakeTimeout) -> list_to_integer(HandshakeTimeout) end}]}, - {reuse_sessions, [{longname, "reuse_sessions"}, - {typecast, fun(ReuseSessions) -> list_to_atom(ReuseSessions) end}]}, - {keyfile, [{longname, "keyfile"}, - {typecast, fun(Keyfile) -> Keyfile end}]}, - {certfile, [{longname, "certfile"}, - {typecast, fun(Certfile) -> Certfile end}]}, - {cacertfile, [{longname, "cacertfile"}, - {typecast, fun(Cacertfile) -> Cacertfile end}]}, - {dhfile, [{longname, "dhfile"}, - {typecast, fun(Dhfile) -> Dhfile end}]}, - {verify, [{longname, "verify"}, - {typecast, fun(Verify) -> list_to_atom(Verify) end}]}, - {fail_if_no_peer_cert, [{longname, "fail_if_no_peer_cert"}, - {typecast, fun(FailIfNoPeerCert) -> list_to_atom(FailIfNoPeerCert) end}]}], - Callback = - fun (_, Params, Flag) -> - Address = get_value('address', Params), - Port = get_value('port', Params), - Type = get_value('type', Params), - Text = case {Type, Port}of - {undefined, _} -> - io_lib:format("Invalid params type: ~p error~n", [Type]); - {_, undefined} -> - io_lib:format("Invalid params port: ~p error~n", [Type]); - {_, _} -> - ListenOn = case Address of - undefined -> Port; - _ -> {Address, Port} - end, - Opts = parse_opts(Type, Flag), - case emqttd_app:start_listener({Type, ListenOn, Opts}) of - {ok, _} -> - io_lib:format("Start mqtt:~p listen on ~p successfully", [Type, ListenOn]); - Error -> - io_lib:format("Start mqtt:~p listen on ~p failed, error:~p~n", [Type, ListenOn, Error]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). - -listeners_stop() -> - Cmd = ["listeners", "stop"], - KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, - {'port', [{typecast, fun parse_port/1}]}, - {'type', [{typecast, fun parse_type/1}]}], - Callback = - fun (_, Params, _) -> - Address = get_value('address', Params), - Port = get_value('port', Params), - Type = get_value('type', Params), - Text = case {Type, Port}of - {undefined, _} -> - io_lib:format("Invalid params type: ~p error~n", [Type]); - {_, undefined} -> - io_lib:format("Invalid params port: ~p error~n", [Type]); - {_, _} -> - case Address of - undefined -> - emqttd_app:stop_listener({Type, Port, []}), - io_lib:format("stopped mqtt:~p on ~p~n", [Type, Port]); - Address -> - emqttd_app:stop_listener({Type, {Address, Port}, []}), - io_lib:format("stopped mqtt:~p on ~p:~p~n", [Type, emqttd_net:ntoa(Address), Port]) - end - end, - [clique_status:text(Text)] - end, - clique:register_command(Cmd, KeySpecs, [], Callback). - -parse_opts(Type, Opts) when Type == ssl - orelse Type == wss - orelse Type == https -> - - OptList = [handshake_timeout, reuse_sessions, keyfile, certfile, - cacertfile, dhfile, verify, fail_if_no_peer_cert], - SslOpts = lists:foldl( - fun(Opt, Acc) -> - case get_value(Opt, Opts) of - undefined -> Acc; - OptVal -> [[{Opt, OptVal}] | Acc] - end - end, [], OptList) ++ - case get_value(tls_versions, Opts) of - undefined -> []; - TlsVersions -> [{versions, [TlsVersions]}] - end, - case SslOpts of - [] -> parse_opts(undefined, Opts); - _ -> [{sslopts, SslOpts}] ++ parse_opts(undefined, Opts) - end; -parse_opts(_Type, Opts) -> - Acceptors = get_value(acceptors, Opts, 4), - MaxClients = get_value(max_clients, Opts, 1024), - Buffer = get_value(buffer, Opts, 4096), - Backlog = get_value(backlog, Opts, 1024), - [{acceptors, Acceptors}, - {max_clients, MaxClients}, - {sockopts, [{buffer, Buffer}, {backlog, Backlog}]}]. - - -parse_port(Port) -> - case catch list_to_integer(Port) of - P when (P >= 0) and (P=<65535) -> P; - _ -> {error, {invalid_args,[{port, Port}]}} - end. - -parse_addr(Addr) -> - case inet:parse_address(Addr) of - {ok, Ip} -> Ip; - {error, einval} -> - {error, {invalid_args,[{address, Addr}]}} - end. - -parse_type(Type) -> - case catch list_to_atom(Type) of - T when (T=:=tcp) orelse - (T=:=ssl) orelse - (T=:=ws) orelse - (T=:=wss) orelse - (T=:=http) orelse - (T=:=https) -> T; - _ -> {error, {invalid_args,[{type, Type}]}} - end. - - -%%------------------------------------------------------------- -%% usage -%%------------------------------------------------------------- -broker_usage() -> - ["\n broker info Show broker version, uptime and description\n", - " broker pubsub Show process_info of pubsub\n", - " broker stats Show broker statistics of clients, topics, subscribers\n", - " broker metrics Show broker metrics\n"]. - -cluster_usage() -> - ["\n cluster join node= Join the cluster\n", - " cluster leave Leave the cluster\n", - " cluster remove node= Remove the node from cluster\n", - " cluster status Cluster status\n"]. - -acl_usage() -> - ["\n acl reload reload etc/acl.conf\n"]. - -clients_usage() -> - ["\n clients list List all clients\n", - " clients show client_id= Show a client\n", - " clients kick client_id= Kick out a client\n"]. - -sessions_usage() -> - ["\n sessions list List all sessions\n", - " sessions list persistent List all persistent sessions\n", - " sessions list transient List all transient sessions\n", - " sessions show client_id= Show a session\n"]. - -routes_usage() -> - ["\n routes list List all routes\n", - " routes show topic= Show a route\n"]. - -topics_usage() -> - ["\n topics list List all topics\n", - " topics show topic= Show a topic\n"]. - -subscriptions_usage() -> - ["\n subscriptions list List all subscriptions\n", - " subscriptions show client_id= Show subscriptions of a client\n", - " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", - " subscriptions del client_id= Delete static subscriptions manually\n", - " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. - -plugins_usage() -> - ["\n plugins list Show loaded plugins\n", - " plugins load plugin_name= Load plugin\n", - " plugins unload plugin_name= Unload plugin\n"]. - -bridges_usage() -> - ["\n bridges list List bridges\n", - " bridges start snode= topic= Start a bridge - options: - qos= - topic_prefix= - topic_suffix= - queue=\n", - " bridges stop snode= topic= Stop a bridge\n"]. - -vm_usage() -> - ["\n vm info Show info of Erlang VM\n", - " vm load Show load of Erlang VM\n", - " vm memory Show memory of Erlang VM\n", - " vm process Show process of Erlang VM\n", - " vm io Show IO of Erlang VM\n", - " vm ports Show Ports of Erlang VM\n"]. - -trace_usage() -> - ["\n trace list List all traces\n", - " trace type=client|topic client_id= topic= log_file= Start tracing\n", - " trace off type=client|topic client_id= topic= Stop tracing\n"]. - -status_usage() -> - ["\n status info Show broker status\n"]. - -listeners_usage() -> - ["\n listeners info List listeners\n", - " listeners start Create and start a listener\n", - " listeners stop Stop accepting new connections for a running listener\n"]. - -listener_start_usage() -> - ["\n listeners start address= port= type=\n", - " Create and start a listener.\n", - "Options:\n", - " --acceptors= Size of acceptor pool\n", - " --max_clients= Maximum number of concurrent clients\n", - " --backlog= TCP Socket Options\n", - " --buffer= TCP Socket Options\n", - " --tls_versions= TLS protocol versions\n", - " --handshake_timeout= TLS handshake timeout(ms)\n", - " --reuse_sessions= TLS allows clients to reuse pre-existing sessions\n", - " --keyfile= Path to the file containing the user's private PEM-encoded key\n", - " --certfile= Path to a file containing the user certificate\n", - " --cacertfile= Path to a file containing PEM-encoded CA certificates\n", - " --dhfile= Path to a file containing PEM-encoded Diffie Hellman\n", - " --verify= A server only does x509-path validation in mode\n", - " --fail_if_no_peer_cert= Used together with {verify, verify_peer} by an SSL server\n"]. - -listener_stop_usage() -> - ["\n listeners stop address= port= type=\n", - " Stops accepting new connections on a running listener.\n"]. - -mnesia_usage() -> - ["\n mnesia info Mnesia system info\n"]. - -%%-------------------------------------------------------------------- -%% Dump ETS -%%-------------------------------------------------------------------- - -dump(Table) -> - dump(Table, []). - -dump(Table, Acc) -> - dump(Table, ets:first(Table), Acc). - -dump(_Table, '$end_of_table', Acc) -> - lists:reverse(Acc); - -dump(Table, Key, Acc) -> - case ets:lookup(Table, Key) of - [Record] -> dump(Table, ets:next(Table, Key), [print(Record)|Acc]); - [] -> dump(Table, ets:next(Table, Key), Acc) - end. - -print([]) -> - []; - -print(Routes = [#mqtt_route{topic = Topic} | _]) -> - Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], - [{topic, Topic}, {routes, string:join(Nodes, ",")}]; - -print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - [{plugin, Name}, {version, Ver}, {description, Descr}, {active, Active}]; - -print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username, - peername = Peername, connected_at = ConnectedAt}) -> - [{client_id, ClientId}, - {clean_sess, CleanSess}, - {username, Username}, - {ip, emqttd_net:format(Peername)}, - {connected_at, emqttd_time:now_secs(ConnectedAt)}]; - -print({route, Routes}) -> - lists:map(fun print/1, Routes); -print({local_route, Routes}) -> - lists:map(fun print/1, Routes); -print(#mqtt_route{topic = Topic, node = Node}) -> - [{topic, Topic}, {node, Node}]; -print({Topic, Node}) -> - [{topic, Topic}, {node, Node}]; - -print({ClientId, _ClientPid, _Persistent, SessInfo}) -> - Data = lists:append(SessInfo, emqttd_stats:get_session_stats(ClientId)), - InfoKeys = [clean_sess, - subscriptions, - max_inflight, - inflight_len, - mqueue_len, - mqueue_dropped, - awaiting_rel_len, - deliver_msg, - enqueue_msg, - created_at], - [{client_id, ClientId} | [{Key, format(Key, get_value(Key, Data))} || Key <- InfoKeys]]. - -print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) -> - [{subscription, Sub}, {topic, Topic}]; -print(subscription, {Sub, Topic}) when is_pid(Sub) -> - [{subscription, Sub}, {topic, Topic}]; -print(subscription, {Sub, {_Share, Topic}}) -> - [{subscription, Sub}, {topic, Topic}]; -print(subscription, {Sub, Topic}) -> - [{subscription, Sub}, {topic, Topic}]. - -format(created_at, Val) -> - emqttd_time:now_secs(Val); - -format(_, Val) -> - Val. diff --git a/src/emqttd_cli_format.erl b/src/emqttd_cli_format.erl deleted file mode 100644 index cd1adca4b..000000000 --- a/src/emqttd_cli_format.erl +++ /dev/null @@ -1,27 +0,0 @@ --module (emqttd_cli_format). - --behavior(clique_writer). - -%% API --export([write/1]). - -write([{text, Text}]) -> - Json = jsx:encode([{text, lists:flatten(Text)}]), - {io_lib:format("~p~n", [Json]), []}; - -write([{table, Table}]) -> - Json = jsx:encode(Table), - {io_lib:format("~p~n", [Json]), []}; - -write([{list, Key, [Value]}| Tail]) -> - Table = lists:reverse(write(Tail, [{Key, Value}])), - Json = jsx:encode(Table), - {io_lib:format("~p~n", [Json]), []}; - -write(_) -> - {io_lib:format("error~n", []), []}. - -write([], Acc) -> - Acc; -write([{list, Key, [Value]}| Tail], Acc) -> - write(Tail, [{Key, Value}| Acc]). \ No newline at end of file diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f227339dc..02d23567f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -559,11 +559,6 @@ handle_info({'EXIT', ClientPid, _Reason}, State = #state{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, State}; -%% ClientPid was killed -handle_info({'EXIT', ClientPid, killed}, State) -> - ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, killed], State), - shutdown(killed, State); - handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid, From 3074a4b0ab1f025792052b97996c1e5758ff5469 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 19 May 2017 09:27:48 +0800 Subject: [PATCH 12/16] Rollback code --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 272cc6f2e..abd416178 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master + ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish From 8d4ec32d1aed117222be7cee655bf789220c2de6 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 19 May 2017 10:09:34 +0800 Subject: [PATCH 13/16] Fixed http publish API missing params --- src/emqttd_http.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 5da607319..883952a5c 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -88,13 +88,14 @@ handle_request(Method, Path, Req) -> %%-------------------------------------------------------------------- http_publish(Req) -> - Params = Req:recv_body(), + Params0 = mochiweb_request:parse_post(Req), + Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- Params0], lager:info("HTTP Publish: ~p", [Params]), Topics = topics(Params), - ClientId = get_value("client", Params, http), - Qos = int(get_value("qos", Params, "0")), - Retain = bool(get_value("retain", Params, "0")), - Payload = list_to_binary(get_value("message", Params)), + ClientId = get_value(<<"client">>, Params, http), + Qos = int(get_value(<<"qos">>, Params, "0")), + Retain = bool(get_value(<<"retain">>, Params, "0")), + Payload = iolist_to_binary(get_value(<<"message">>, Params)), case {validate(qos, Qos), validate(topics, Topics)} of {true, true} -> lists:foreach(fun(Topic) -> @@ -151,8 +152,11 @@ authorized(Req) -> user_passwd(BasicAuth) -> list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). +int(S) when is_integer(S)-> S; int(S) -> list_to_integer(S). +bool(0) -> false; +bool(1) -> true; bool("0") -> false; bool("1") -> true. From 9c5ce15f46b63e73283b7305d99d8c2c8275a0d6 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 19 May 2017 10:13:01 +0800 Subject: [PATCH 14/16] Fixed http publish API missing params --- src/emqttd_http.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 883952a5c..c9edca39f 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -110,8 +110,8 @@ http_publish(Req) -> end. topics(Params) -> - Tokens = [get_value("topic", Params) | string:tokens(get_value("topics", Params, ""), ",")], - [list_to_binary(Token) || Token <- Tokens, Token =/= undefined]. + Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")], + [iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined]. validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); From 96562dfc592c56d31151036d49c529d795df43fb Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 19 May 2017 10:22:07 +0800 Subject: [PATCH 15/16] Format code --- src/emqttd_http.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index c9edca39f..389751ea2 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -88,8 +88,7 @@ handle_request(Method, Path, Req) -> %%-------------------------------------------------------------------- http_publish(Req) -> - Params0 = mochiweb_request:parse_post(Req), - Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- Params0], + Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)], lager:info("HTTP Publish: ~p", [Params]), Topics = topics(Params), ClientId = get_value(<<"client">>, Params, http), @@ -152,13 +151,15 @@ authorized(Req) -> user_passwd(BasicAuth) -> list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). -int(S) when is_integer(S)-> S; +int(I) when is_integer(I)-> I; int(S) -> list_to_integer(S). bool(0) -> false; bool(1) -> true; bool("0") -> false; -bool("1") -> true. +bool("1") -> true; +bool(<<"0">>) -> false; +bool(<<"1">>) -> true. is_websocket(Upgrade) -> Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". From 94402f42977f1733a5336143f014cfc72585816a Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 19 May 2017 10:24:36 +0800 Subject: [PATCH 16/16] Format code --- src/emqttd_http.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 389751ea2..b2828a853 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -152,6 +152,7 @@ user_passwd(BasicAuth) -> list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). int(I) when is_integer(I)-> I; +int(B) when is_binary(B)-> binary_to_integer(B); int(S) -> list_to_integer(S). bool(0) -> false;