From 3e1c69dff18b5a999995c712ee813c018c2bf557 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 18 May 2017 16:35:49 +0800 Subject: [PATCH] Add cli listeners start|stop --- priv/{emqttd.schema => emq.schema} | 0 src/emqttd_cli2.erl | 165 ++++++++++++++++++++++++----- src/emqttd_session.erl | 5 + 3 files changed, 141 insertions(+), 29 deletions(-) rename priv/{emqttd.schema => emq.schema} (100%) diff --git a/priv/emqttd.schema b/priv/emq.schema similarity index 100% rename from priv/emqttd.schema rename to priv/emq.schema diff --git a/src/emqttd_cli2.erl b/src/emqttd_cli2.erl index 397d063ea..00ffefef5 100644 --- a/src/emqttd_cli2.erl +++ b/src/emqttd_cli2.erl @@ -12,7 +12,7 @@ -behaviour(clique_handler). --import(proplists, [get_value/2]). +-import(proplists, [get_value/2, get_value/3]). -define(APP, emqttd). @@ -35,7 +35,7 @@ run([]) -> All = clique_usage:find_all(), io:format("--------------------------------------------------------------------------------~n"), lists:foreach(fun({Cmd, Usage}) -> - io:format("~p usage:", [Cmd]), + io:format("~p usage:", [cuttlefish_variable:format(Cmd)]), io:format("~ts", [Usage]), io:format("--------------------------------------------------------------------------------~n") end, lists:sort(All)); @@ -58,6 +58,7 @@ register_usage() -> clique:register_usage(["trace"], trace_usage()), clique:register_usage(["status"], status_usage()), clique:register_usage(["listeners"], listeners_usage()), + clique:register_usage(["listeners", "start"], listener_start_usage()), clique:register_usage(["listeners", "stop"],listener_stop_usage()), clique:register_usage(["mnesia"], mnesia_usage()). @@ -118,6 +119,7 @@ register_cmd() -> trace_off(), listeners(), + listeners_start(), listeners_stop(). node_status() -> @@ -837,26 +839,119 @@ listeners() -> end, clique:register_command(Cmd, [], [], Callback). -listeners_stop() -> - Cmd = ["listeners", "stop"], - KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, - {'port', [{typecast, fun parse_port/1}]}, - {'type', [{typecast, fun parse_type/1}]}], - FlagSpecs = [{kill, [{shortname, "k"}, - {longname, "kill_sessions"}]}], +listeners_start() -> + Cmd = ["listeners", "start"], + KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, + {'port', [{typecast, fun parse_port/1}]}, + {'type', [{typecast, fun parse_type/1}]}], + FlagSpecs = [{acceptors, [{longname, "acceptors"}, + {typecast, fun(Acceptors) -> list_to_integer(Acceptors) end}]}, + {max_clients, [{longname, "max_clients"}, + {typecast, fun(MaxClients) -> list_to_integer(MaxClients) end}]}, + {buffer, [{longname, "buffer"}, + {typecast, fun(Buffer) -> list_to_integer(Buffer) end}]}, + {tls_versions, [{longname, "tls_versions"}, + {typecast, fun(TlsVersions) -> list_to_atom(TlsVersions) end}]}, + {handshake_timeout, [{longname, "handshake_timeout"}, + {typecast, fun(HandshakeTimeout) -> list_to_integer(HandshakeTimeout) end}]}, + {reuse_sessions, [{longname, "reuse_sessions"}, + {typecast, fun(ReuseSessions) -> list_to_atom(ReuseSessions) end}]}, + {keyfile, [{longname, "keyfile"}, + {typecast, fun(Keyfile) -> Keyfile end}]}, + {certfile, [{longname, "certfile"}, + {typecast, fun(Certfile) -> Certfile end}]}, + {cacertfile, [{longname, "cacertfile"}, + {typecast, fun(Cacertfile) -> Cacertfile end}]}, + {dhfile, [{longname, "dhfile"}, + {typecast, fun(Dhfile) -> Dhfile end}]}, + {verify, [{longname, "verify"}, + {typecast, fun(Verify) -> list_to_atom(Verify) end}]}, + {fail_if_no_peer_cert, [{longname, "fail_if_no_peer_cert"}, + {typecast, fun(FailIfNoPeerCert) -> list_to_atom(FailIfNoPeerCert) end}]}], Callback = fun (_, Params, Flag) -> Address = get_value('address', Params), Port = get_value('port', Params), Type = get_value('type', Params), - case Address of - undefined -> emqttd_app:stop_listener({Type, Port, []}); - Address -> emqttd_app:stop_listener({Type, {Address, Port}, []}) + Text = case {Type, Port}of + {undefined, _} -> + io_lib:format("Invalid params type: ~p error~n", [Type]); + {_, undefined} -> + io_lib:format("Invalid params port: ~p error~n", [Type]); + {_, _} -> + ListenOn = case Address of + undefined -> Port; + _ -> {Address, Port} + end, + Opts = parse_opts(Type, Flag), + case emqttd_app:start_listener({Type, ListenOn, Opts}) of + {ok, _} -> + io_lib:format("Start mqtt:~p listen on ~p successfully", [Type, ListenOn]); + Error -> + io_lib:format("Start mqtt:~p listen on ~p failed, error:~p~n", [Type, ListenOn, Error]) + end end, - [clique_status:text("aaa")] + [clique_status:text(Text)] end, clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback). +listeners_stop() -> + Cmd = ["listeners", "stop"], + KeySpecs = [{'address', [{typecast, fun parse_addr/1}]}, + {'port', [{typecast, fun parse_port/1}]}, + {'type', [{typecast, fun parse_type/1}]}], + Callback = + fun (_, Params, _) -> + Address = get_value('address', Params), + Port = get_value('port', Params), + Type = get_value('type', Params), + Text = case {Type, Port}of + {undefined, _} -> + io_lib:format("Invalid params type: ~p error~n", [Type]); + {_, undefined} -> + io_lib:format("Invalid params port: ~p error~n", [Type]); + {_, _} -> + case Address of + undefined -> + emqttd_app:stop_listener({Type, Port, []}), + io_lib:format("stopped mqtt:~p on ~p~n", [Type, Port]); + Address -> + emqttd_app:stop_listener({Type, {Address, Port}, []}), + io_lib:format("stopped mqtt:~p on ~p:~p~n", [Type, emqttd_net:ntoa(Address), Port]) + end + end, + [clique_status:text(Text)] + end, + clique:register_command(Cmd, KeySpecs, [], Callback). + +parse_opts(Type, Opts) when Type == ssl + orelse Type == wss + orelse Type == https -> + + OptList = [handshake_timeout, reuse_sessions, keyfile, certfile, + cacertfile, dhfile, verify, fail_if_no_peer_cert], + SslOpts = lists:foldl( + fun(Opt, Acc) -> + case get_value(Opt, Opts) of + undefined -> Acc; + OptVal -> [[{Opt, OptVal}] | Acc] + end + end, [], OptList) ++ + case get_value(tls_versions, Opts) of + undefined -> []; + TlsVersions -> [{versions, [TlsVersions]}] + end, + case SslOpts of + [] -> parse_opts(undefined, Opts); + _ -> [{sslopts, SslOpts}] ++ parse_opts(undefined, Opts) + end; +parse_opts(_Type, Opts) -> + Acceptors = get_value(acceptors, Opts, 4), + MaxClients = get_value(max_clients, Opts, 1024), + Buffer = get_value(buffer, Opts, 4096), + [{acceptors, Acceptors}, {max_clients, MaxClients}, {sockopts, [{buffer, Buffer}]}]. + + parse_port(Port) -> case catch list_to_integer(Port) of P when (P >= 0) and (P=<65535) -> P; @@ -920,11 +1015,11 @@ topics_usage() -> " topics show topic= Show a topic\n"]. subscriptions_usage() -> - ["\n subscriptions list List all subscriptions\n", - " subscriptions show client_id= Show subscriptions of a client\n", - " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", - " subscriptions del client_id= Delete static subscriptions manually\n", - " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. + ["\n subscriptions list List all subscriptions\n", + " subscriptions show client_id= Show subscriptions of a client\n", + " subscriptions subscribe client_id= topic= qos= Add a static subscription manually\n", + " subscriptions del client_id= Delete static subscriptions manually\n", + " subscriptions unsubscribe client_id= topic= Delete a static subscription manually\n"]. plugins_usage() -> ["\n plugins list Show loaded plugins\n", @@ -950,9 +1045,9 @@ vm_usage() -> " vm ports Show Ports of Erlang VM\n"]. trace_usage() -> - ["\n trace list List all traces\n", - " trace type=client|topic client_id= topic= log_file= Start tracing\n", - " trace off type=client|topic client_id= topic= Stop tracing\n"]. + ["\n trace list List all traces\n", + " trace type=client|topic client_id= topic= log_file= Start tracing\n", + " trace off type=client|topic client_id= topic= Stop tracing\n"]. status_usage() -> ["\n status info Show broker status\n"]. @@ -960,16 +1055,28 @@ status_usage() -> listeners_usage() -> ["\n listeners info List listeners\n", " listeners start Create and start a listener\n", - " listeners stop Stop accepting new connections for a running listener\n", - " listeners restart Restart accepting new connections for a stopped listener\n", - " listeners delete Delete a stopped listener\n"]. + " listeners stop Stop accepting new connections for a running listener\n"]. + +listener_start_usage() -> + ["\n listeners start address= port= type=\n", + " Create and start a listener.\n", + "Options:\n", + " --acceptors= Size of acceptor pool\n", + " --max_clients= Maximum number of concurrent clients\n", + " --buffer= TCP Socket Options\n", + " --tls_versions= TLS protocol versions\n", + " --handshake_timeout= TLS handshake timeout\n", + " --reuse_sessions= TLS allows clients to reuse pre-existing sessions\n", + " --keyfile= Path to the file containing the user's private PEM-encoded key\n", + " --certfile= Path to a file containing the user certificate\n", + " --cacertfile= Path to a file containing PEM-encoded CA certificates\n", + " --dhfile= Path to a file containing PEM-encoded Diffie Hellman\n", + " --verify= A server only does x509-path validation in mode\n", + " --fail_if_no_peer_cert= Used together with {verify, verify_peer} by an SSL server\n"]. listener_stop_usage() -> - ["\n listeners stop address=IpAddr port=Port\n", - " Stops accepting new connections on a running listener.\n", - "Options\n", - " -k, --kill_sessions\n" - " kills all sessions accepted with this listener.\n"]. + ["\n listeners stop address= port= type=\n", + " Stops accepting new connections on a running listener.\n"]. mnesia_usage() -> ["\n mnesia info Mnesia system info\n"]. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 02d23567f..f227339dc 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -559,6 +559,11 @@ handle_info({'EXIT', ClientPid, _Reason}, State = #state{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, State}; +%% ClientPid was killed +handle_info({'EXIT', ClientPid, killed}, State) -> + ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, killed], State), + shutdown(killed, State); + handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid,