diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index 4174f3224..63a2b22a3 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -535,6 +535,7 @@ list_listeners(Node) when Node =:= node() -> Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> #{protocol => Protocol, listen_on => ListenOn, + identifier => emqx_listeners:find_id_by_listen_on(ListenOn), acceptors => esockd:get_acceptors({Protocol, ListenOn}), max_conns => esockd:get_max_connections({Protocol, ListenOn}), current_conns => esockd:get_current_connections({Protocol, ListenOn}), diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl b/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl index 8a9f57f67..6d2315acc 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl @@ -510,14 +510,14 @@ trace_off(Who, Name) -> listeners([]) -> foreach(fun({{Protocol, ListenOn}, _Pid}) -> - Info = [{acceptors, esockd:get_acceptors({Protocol, ListenOn})}, + Info = [{identifier, {string, emqx_listeners:find_id_by_listen_on(ListenOn)}}, + {acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {current_conn, esockd:get_current_connections({Protocol, ListenOn})}, - {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}], + {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} + ], emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), - foreach(fun({Key, Val}) -> - emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) - end, Info) + foreach(fun indent_print/1, Info) end, esockd:listeners()), foreach(fun({Protocol, Opts}) -> Info = [{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, @@ -525,9 +525,7 @@ listeners([]) -> {current_conn, proplists:get_value(all_connections, Opts)}, {shutdown_count, []}], emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]), - foreach(fun({Key, Val}) -> - emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) - end, Info) + foreach(fun indent_print/1, Info) end, ranch:info()); listeners(["stop", Name = "http" ++ _N, ListenOn]) -> @@ -538,22 +536,32 @@ listeners(["stop", Name = "http" ++ _N, ListenOn]) -> emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error]) end; -listeners(["stop", Proto, ListenOn]) -> +listeners(["stop", "mqtt:" ++ _ = Identifier]) -> + stop_listener(emqx_listeners:find_by_id(Identifier), Identifier); + +listeners(["stop", _Proto, ListenOn]) -> + %% this clause is kept to be backward compatible ListenOn1 = case string:tokens(ListenOn, ":") of [Port] -> list_to_integer(Port); [IP, Port] -> {IP, list_to_integer(Port)} end, - case emqx_listeners:stop_listener({list_to_atom(Proto), ListenOn1, []}) of - ok -> - emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]); - {error, Error} -> - emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) - end; + stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); listeners(_) -> emqx_ctl:usage([{"listeners", "List listeners"}, {"listeners stop ", "Stop a listener"}]). +stop_listener(false, Input) -> + emqx_ctl:print("No such listener ~p~n", [Input]); +stop_listener(#{listen_on := ListenOn} = Listener, _Input) -> + ID = emqx_listeners:identifier(Listener), + case emqx_listeners:stop_listener(Listener) of + ok -> + emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOn]); + {error, Error} -> + emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [ID, ListenOn, Error]) + end. + %%-------------------------------------------------------------------- %% @doc data Command @@ -707,3 +715,8 @@ format(_, Val) -> Val. bin(S) -> iolist_to_binary(S). + +indent_print({Key, {string, Val}}) -> + emqx_ctl:print(" ~-16s: ~s~n", [Key, Val]); +indent_print({Key, Val}) -> + emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). diff --git a/priv/emqx.schema b/priv/emqx.schema index bdf8a053f..26713ce2a 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1996,8 +1996,15 @@ end}. Other -> Other end end, - [{Atom(Type), ListenOnN, [{deflate_options, DeflateOpts(Prefix)}, - {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOnN + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] end, SslListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), @@ -2005,9 +2012,16 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, - {tcp_options, TcpOpts(Prefix)}, - {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOn + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + , {ssl_options, SslOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] end end, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 3b642d55f..f7912a7fd 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -33,11 +33,43 @@ , restart_listener/3 ]). --type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). +-export([ find_id_by_listen_on/1 + , find_by_listen_on/1 + , find_by_id/1 + , identifier/1 + ]). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- +-type(listener() :: #{ name := binary() + , proto := esockd:proto() + , listen_on := esockd:listen_on() + , opts := [esockd:option()] + }). + +%% @doc Find listener identifier by listen-on. +%% Return empty string (binary) if listener is not found in config. +-spec(find_id_by_listen_on(esockd:listen_on()) -> binary()). +find_id_by_listen_on(ListenOn) -> + case find_by_listen_on(ListenOn) of + false -> <<>>; + L -> identifier(L) + end. + +%% @doc Find listener by listen-on. +%% Return 'false' if not found. +-spec(find_by_listen_on(esockd:listen_on()) -> listener() | false). +find_by_listen_on(ListenOn) -> + find_by_listen_on(ListenOn, emqx:get_env(listeners, [])). + +%% @doc Find listener by identifier. +%% Return 'false' if not found. +-spec(find_by_id(string() | binary()) -> listener() | false). +find_by_id(Id) -> + find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])). + +%% @doc Return the ID of the given listener. +-spec identifier(listener()) -> binary(). +identifier(#{proto := Proto, name := Name}) -> + identifier(Proto, Name). %% @doc Start all listeners. -spec(start() -> ok). @@ -45,13 +77,14 @@ start() -> lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])). -spec(start_listener(listener()) -> ok). -start_listener({Proto, ListenOn, Options}) -> +start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) -> + ID = identifier(Proto, Name), case start_listener(Proto, ListenOn, Options) of - {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n", - [Proto, format(ListenOn)]); + {ok, _} -> io:format("Start ~s listener on ~s successfully.~n", + [ID, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!", - [Proto, format(ListenOn), Reason]), + io:format(standard_error, "Failed to start mqtt listener ~s on ~s - ~0p~n!", + [ID, format(ListenOn), Reason]), error(Reason) end. @@ -115,7 +148,7 @@ restart() -> lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). -spec(restart_listener(listener()) -> any()). -restart_listener({Proto, ListenOn, Options}) -> +restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) -> restart_listener(Proto, ListenOn, Options). -spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()). @@ -138,14 +171,14 @@ stop() -> lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])). -spec(stop_listener(listener()) -> ok | {error, term()}). -stop_listener({Proto, ListenOn, Opts}) -> +stop_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Opts}) -> + ID = identifier(Proto, Name), StopRet = stop_listener(Proto, ListenOn, Opts), case StopRet of - ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n", - [Proto, format(ListenOn)]); + ok -> io:format("Stop ~s listener on ~s successfully.~n", [ID, format(ListenOn)]); {error, Reason} -> io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.", - [Proto, format(ListenOn), Reason]) + [ID, format(ListenOn), Reason]) end, StopRet. @@ -181,3 +214,19 @@ ws_name(Name, {_Addr, Port}) -> ws_name(Name, Port); ws_name(Name, Port) -> list_to_atom(lists:concat([Name, ":", Port])). + +identifier(Proto, Name) when is_atom(Proto) -> + identifier(atom_to_list(Proto), Name); +identifier(Proto, Name) -> + iolist_to_binary(["mqtt", ":", Proto, ":", Name]). + +find_by_listen_on(ListenOn, []) -> error({unknown_listener, ListenOn}); +find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L; +find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest). + +find_by_id(_Id, []) -> false; +find_by_id(Id, [L | Rest]) -> + case identifier(L) =:= Id of + true -> L; + false -> find_by_id(Id, Rest) + end.