From 153977609e6c73b5600a24e0f5c75a44164e96b7 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sat, 13 Feb 2021 21:07:31 +0100 Subject: [PATCH] feat(listeners): Add identifier to listeners Listeners are internally identifiered by the listen-on tuple which is not UI friendly when we have to find a listener by this 'signature'. The listeners are actually named in configs, but the names are discarded in the parsing functions. This commit is to keep the name and provide an API to find listener by name (identifier). --- .../emqx_management/src/emqx_mgmt.erl | 1 + .../emqx_management/src/emqx_mgmt_cli.erl | 43 +++++++---- priv/emqx.schema | 24 ++++-- src/emqx_listeners.erl | 77 +++++++++++++++---- 4 files changed, 111 insertions(+), 34 deletions(-) diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index 4174f3224..63a2b22a3 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -535,6 +535,7 @@ list_listeners(Node) when Node =:= node() -> Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> #{protocol => Protocol, listen_on => ListenOn, + identifier => emqx_listeners:find_id_by_listen_on(ListenOn), acceptors => esockd:get_acceptors({Protocol, ListenOn}), max_conns => esockd:get_max_connections({Protocol, ListenOn}), current_conns => esockd:get_current_connections({Protocol, ListenOn}), diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl b/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl index 8a9f57f67..6d2315acc 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl @@ -510,14 +510,14 @@ trace_off(Who, Name) -> listeners([]) -> foreach(fun({{Protocol, ListenOn}, _Pid}) -> - Info = [{acceptors, esockd:get_acceptors({Protocol, ListenOn})}, + Info = [{identifier, {string, emqx_listeners:find_id_by_listen_on(ListenOn)}}, + {acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {current_conn, esockd:get_current_connections({Protocol, ListenOn})}, - {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}], + {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} + ], emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), - foreach(fun({Key, Val}) -> - emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) - end, Info) + foreach(fun indent_print/1, Info) end, esockd:listeners()), foreach(fun({Protocol, Opts}) -> Info = [{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, @@ -525,9 +525,7 @@ listeners([]) -> {current_conn, proplists:get_value(all_connections, Opts)}, {shutdown_count, []}], emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]), - foreach(fun({Key, Val}) -> - emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) - end, Info) + foreach(fun indent_print/1, Info) end, ranch:info()); listeners(["stop", Name = "http" ++ _N, ListenOn]) -> @@ -538,22 +536,32 @@ listeners(["stop", Name = "http" ++ _N, ListenOn]) -> emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error]) end; -listeners(["stop", Proto, ListenOn]) -> +listeners(["stop", "mqtt:" ++ _ = Identifier]) -> + stop_listener(emqx_listeners:find_by_id(Identifier), Identifier); + +listeners(["stop", _Proto, ListenOn]) -> + %% this clause is kept to be backward compatible ListenOn1 = case string:tokens(ListenOn, ":") of [Port] -> list_to_integer(Port); [IP, Port] -> {IP, list_to_integer(Port)} end, - case emqx_listeners:stop_listener({list_to_atom(Proto), ListenOn1, []}) of - ok -> - emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]); - {error, Error} -> - emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) - end; + stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); listeners(_) -> emqx_ctl:usage([{"listeners", "List listeners"}, {"listeners stop ", "Stop a listener"}]). +stop_listener(false, Input) -> + emqx_ctl:print("No such listener ~p~n", [Input]); +stop_listener(#{listen_on := ListenOn} = Listener, _Input) -> + ID = emqx_listeners:identifier(Listener), + case emqx_listeners:stop_listener(Listener) of + ok -> + emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOn]); + {error, Error} -> + emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [ID, ListenOn, Error]) + end. + %%-------------------------------------------------------------------- %% @doc data Command @@ -707,3 +715,8 @@ format(_, Val) -> Val. bin(S) -> iolist_to_binary(S). + +indent_print({Key, {string, Val}}) -> + emqx_ctl:print(" ~-16s: ~s~n", [Key, Val]); +indent_print({Key, Val}) -> + emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). diff --git a/priv/emqx.schema b/priv/emqx.schema index bdf8a053f..26713ce2a 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1996,8 +1996,15 @@ end}. Other -> Other end end, - [{Atom(Type), ListenOnN, [{deflate_options, DeflateOpts(Prefix)}, - {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOnN + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] end, SslListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), @@ -2005,9 +2012,16 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, - {tcp_options, TcpOpts(Prefix)}, - {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOn + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + , {ssl_options, SslOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] end end, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 3b642d55f..f7912a7fd 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -33,11 +33,43 @@ , restart_listener/3 ]). --type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). +-export([ find_id_by_listen_on/1 + , find_by_listen_on/1 + , find_by_id/1 + , identifier/1 + ]). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- +-type(listener() :: #{ name := binary() + , proto := esockd:proto() + , listen_on := esockd:listen_on() + , opts := [esockd:option()] + }). + +%% @doc Find listener identifier by listen-on. +%% Return empty string (binary) if listener is not found in config. +-spec(find_id_by_listen_on(esockd:listen_on()) -> binary()). +find_id_by_listen_on(ListenOn) -> + case find_by_listen_on(ListenOn) of + false -> <<>>; + L -> identifier(L) + end. + +%% @doc Find listener by listen-on. +%% Return 'false' if not found. +-spec(find_by_listen_on(esockd:listen_on()) -> listener() | false). +find_by_listen_on(ListenOn) -> + find_by_listen_on(ListenOn, emqx:get_env(listeners, [])). + +%% @doc Find listener by identifier. +%% Return 'false' if not found. +-spec(find_by_id(string() | binary()) -> listener() | false). +find_by_id(Id) -> + find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])). + +%% @doc Return the ID of the given listener. +-spec identifier(listener()) -> binary(). +identifier(#{proto := Proto, name := Name}) -> + identifier(Proto, Name). %% @doc Start all listeners. -spec(start() -> ok). @@ -45,13 +77,14 @@ start() -> lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])). -spec(start_listener(listener()) -> ok). -start_listener({Proto, ListenOn, Options}) -> +start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) -> + ID = identifier(Proto, Name), case start_listener(Proto, ListenOn, Options) of - {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n", - [Proto, format(ListenOn)]); + {ok, _} -> io:format("Start ~s listener on ~s successfully.~n", + [ID, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!", - [Proto, format(ListenOn), Reason]), + io:format(standard_error, "Failed to start mqtt listener ~s on ~s - ~0p~n!", + [ID, format(ListenOn), Reason]), error(Reason) end. @@ -115,7 +148,7 @@ restart() -> lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). -spec(restart_listener(listener()) -> any()). -restart_listener({Proto, ListenOn, Options}) -> +restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) -> restart_listener(Proto, ListenOn, Options). -spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()). @@ -138,14 +171,14 @@ stop() -> lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])). -spec(stop_listener(listener()) -> ok | {error, term()}). -stop_listener({Proto, ListenOn, Opts}) -> +stop_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Opts}) -> + ID = identifier(Proto, Name), StopRet = stop_listener(Proto, ListenOn, Opts), case StopRet of - ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n", - [Proto, format(ListenOn)]); + ok -> io:format("Stop ~s listener on ~s successfully.~n", [ID, format(ListenOn)]); {error, Reason} -> io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.", - [Proto, format(ListenOn), Reason]) + [ID, format(ListenOn), Reason]) end, StopRet. @@ -181,3 +214,19 @@ ws_name(Name, {_Addr, Port}) -> ws_name(Name, Port); ws_name(Name, Port) -> list_to_atom(lists:concat([Name, ":", Port])). + +identifier(Proto, Name) when is_atom(Proto) -> + identifier(atom_to_list(Proto), Name); +identifier(Proto, Name) -> + iolist_to_binary(["mqtt", ":", Proto, ":", Name]). + +find_by_listen_on(ListenOn, []) -> error({unknown_listener, ListenOn}); +find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L; +find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest). + +find_by_id(_Id, []) -> false; +find_by_id(Id, [L | Rest]) -> + case identifier(L) =:= Id of + true -> L; + false -> find_by_id(Id, Rest) + end.