From 4dd72e59fa11be25d75d4a2cb4dd6d0c23454296 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 7 Jul 2021 16:22:38 +0800 Subject: [PATCH] feat(listeners): make the APIs and CLIs work with the new listener --- apps/emqx/src/emqx_listeners.erl | 29 +++--- apps/emqx_management/src/emqx_mgmt.erl | 3 +- .../src/emqx_mgmt_api_listeners.erl | 9 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 98 +++++-------------- .../src/emqx_mod_acl_internal.erl | 7 +- 5 files changed, 50 insertions(+), 96 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 6e3bc69be..1d2592fff 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -39,10 +39,8 @@ start() -> foreach_listeners(fun start_listener/3). -spec(start_listener(atom()) -> ok). -start_listener(Id) -> - {ZoneName, ListenerName} = decode_listener_id(Id), - start_listener(ZoneName, ListenerName, - emqx_config:get([zones, ZoneName, listeners, ListenerName])). +start_listener(ListenerId) -> + apply_on_listener(ListenerId, fun start_listener/3). -spec(start_listener(atom(), atom(), map()) -> ok). start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) -> @@ -133,13 +131,11 @@ esockd_access_rules(StrRules) -> restart() -> foreach_listeners(fun restart_listener/3). --spec(restart_listener(atom()) -> ok | {error, any()}). -restart_listener(ListenerID) -> - {ZoneName, ListenerName} = decode_listener_id(ListenerID), - restart_listener(ZoneName, ListenerName, - emqx_config:get([zones, ZoneName, listeners, ListenerName])). +-spec(restart_listener(atom()) -> ok | {error, term()}). +restart_listener(ListenerId) -> + apply_on_listener(ListenerId, fun restart_listener/3). --spec(restart_listener(atom(), atom(), map()) -> ok | {error, any()}). +-spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}). restart_listener(ZoneName, ListenerName, Conf) -> case stop_listener(ZoneName, ListenerName, Conf) of ok -> start_listener(ZoneName, ListenerName, Conf); @@ -152,10 +148,8 @@ stop() -> foreach_listeners(fun stop_listener/3). -spec(stop_listener(atom()) -> ok | {error, term()}). -stop_listener(ListenerID) -> - {ZoneName, ListenerName} = decode_listener_id(ListenerID), - stop_listener(ZoneName, ListenerName, - emqx_config:get([zones, ZoneName, listeners, ListenerName])). +stop_listener(ListenerId) -> + apply_on_listener(ListenerId, fun stop_listener/3). -spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}). stop_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn}) -> @@ -214,3 +208,10 @@ merge_zone_and_listener_confs(ZoneConf, ListenerConf) -> ConfsInZonesOnly = [listeners, overall_max_connections], BaseConf = maps:without(ConfsInZonesOnly, ZoneConf), emqx_map_lib:deep_merge(BaseConf, ListenerConf). + +apply_on_listener(ListenerId, Do) -> + {ZoneName, ListenerName} = decode_listener_id(ListenerId), + case emqx_config:find([zones, ZoneName, listeners, ListenerName]) of + {not_found, _, _} -> error({not_found, ListenerId}); + {ok, Conf} -> Do(ZoneName, ListenerName, Conf) + end. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index ce83b9b71..e04a7d9bc 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -417,7 +417,7 @@ list_listeners(Node) when Node =:= node() -> Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> #{protocol => Protocol, listen_on => ListenOn, - identifier => emqx_listeners:find_id_by_listen_on(ListenOn), + identifier => Protocol, acceptors => esockd:get_acceptors({Protocol, ListenOn}), max_conns => esockd:get_max_connections({Protocol, ListenOn}), current_conns => esockd:get_current_connections({Protocol, ListenOn}), @@ -436,6 +436,7 @@ list_listeners(Node) when Node =:= node() -> list_listeners(Node) -> rpc_call(Node, list_listeners, [Node]). +-spec restart_listener(node(), atom()) -> ok | {error, term()}. restart_listener(Node, Identifier) when Node =:= node() -> emqx_listeners:restart_listener(Identifier); diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 7cccbd2ac..1b0c90033 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -30,13 +30,13 @@ -rest_api(#{name => restart_listener, method => 'PUT', - path => "/listeners/:bin:identifier/restart", + path => "/listeners/:atom:identifier/restart", func => restart, descr => "Restart a listener in the cluster"}). -rest_api(#{name => restart_node_listener, method => 'PUT', - path => "/nodes/:atom:node/listeners/:bin:identifier/restart", + path => "/nodes/:atom:node/listeners/:atom:identifier/restart", func => restart, descr => "Restart a listener on a node"}). @@ -57,10 +57,7 @@ restart(#{node := Node, identifier := Identifier}, _Params) -> ok -> minirest:return({ok, "Listener restarted."}); {error, Error} -> minirest:return({error, Error}) end; - -%% Restart listeners in the cluster. -restart(#{identifier := <<"http", _/binary>>}, _Params) -> - {403, <<"http_listener_restart_unsupported">>}; +%% Restart listeners on all nodes in the cluster. restart(#{identifier := Identifier}, _Params) -> Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()], case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 77fe96182..fb0f64ec9 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -464,86 +464,57 @@ trace_off(Who, Name) -> listeners([]) -> lists:foreach(fun({{Protocol, ListenOn}, _Pid}) -> - Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}}, + Info = [{listen_on, {string, format_listen_on(ListenOn)}}, {acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {current_conn, esockd:get_current_connections({Protocol, ListenOn})}, {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} ], - emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]), + emqx_ctl:print("~s~n", [Protocol]), lists:foreach(fun indent_print/1, Info) end, esockd:listeners()), lists:foreach(fun({Protocol, Opts}) -> Port = proplists:get_value(port, Opts), - Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, + Info = [{listen_on, {string, format_listen_on(Port)}}, {acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, {max_conns, proplists:get_value(max_connections, Opts)}, {current_conn, proplists:get_value(all_connections, Opts)}, {shutdown_count, []}], - emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), + emqx_ctl:print("~s~n", [Protocol]), lists:foreach(fun indent_print/1, Info) end, ranch:info()); -listeners(["stop", Name = "http" ++ _N | _MaybePort]) -> - %% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number - case minirest:stop_http(list_to_atom(Name)) of +listeners(["stop", ListenerId]) -> + case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of ok -> - emqx_ctl:print("Stop ~s listener successfully.~n", [Name]); + emqx_ctl:print("Stop ~s listener successfully.~n", [ListenerId]); {error, Error} -> - emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [Name, Error]) + emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [ListenerId, Error]) end; -listeners(["stop", "mqtt:" ++ _ = Identifier]) -> - stop_listener(emqx_listeners:find_by_id(Identifier), Identifier); - -listeners(["stop", _Proto, ListenOn]) -> - %% this clause is kept to be backward compatible - ListenOn1 = case string:tokens(ListenOn, ":") of - [Port] -> list_to_integer(Port); - [IP, Port] -> {IP, list_to_integer(Port)} - end, - stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); - -listeners(["restart", "http:management"]) -> - restart_http_listener(http, emqx_management); - -listeners(["restart", "https:management"]) -> - restart_http_listener(https, emqx_management); - -listeners(["restart", "http:dashboard"]) -> - restart_http_listener(http, emqx_dashboard); - -listeners(["restart", "https:dashboard"]) -> - restart_http_listener(https, emqx_dashboard); - -listeners(["restart", Identifier]) -> - case emqx_listeners:restart_listener(Identifier) of +listeners(["start", ListenerId]) -> + case emqx_listeners:start_listener(list_to_atom(ListenerId)) of ok -> - emqx_ctl:print("Restarted ~s listener successfully.~n", [Identifier]); + emqx_ctl:print("Started ~s listener successfully.~n", [ListenerId]); {error, Error} -> - emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [Identifier, Error]) + emqx_ctl:print("Failed to start ~s listener: ~0p~n", [ListenerId, Error]) + end; + +listeners(["restart", ListenerId]) -> + case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of + ok -> + emqx_ctl:print("Restarted ~s listener successfully.~n", [ListenerId]); + {error, Error} -> + emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [ListenerId, Error]) end; listeners(_) -> emqx_ctl:usage([{"listeners", "List listeners"}, {"listeners stop ", "Stop a listener"}, - {"listeners stop ", "Stop a listener"}, + {"listeners start ", "Start a listener"}, {"listeners restart ", "Restart a listener"} ]). -stop_listener(false, Input) -> - emqx_ctl:print("No such listener ~p~n", [Input]); -stop_listener(#{listen_on := ListenOn} = Listener, _Input) -> - ID = emqx_listeners:identifier(Listener), - ListenOnStr = emqx_listeners:format_listen_on(ListenOn), - case emqx_listeners:stop_listener(Listener) of - ok -> - emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]); - {error, Reason} -> - emqx_ctl:print("Failed to stop ~s listener on ~s: ~0p~n", - [ID, ListenOnStr, Reason]) - end. - %%-------------------------------------------------------------------- %% @doc data Command @@ -692,24 +663,9 @@ indent_print({Key, {string, Val}}) -> indent_print({Key, Val}) -> emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). -listener_identifier(Protocol, ListenOn) -> - case emqx_listeners:find_id_by_listen_on(ListenOn) of - false -> - atom_to_list(Protocol); - ID -> - ID - end. - -restart_http_listener(Scheme, AppName) -> - Listeners = application:get_env(AppName, listeners, []), - case lists:keyfind(Scheme, 1, Listeners) of - false -> - emqx_ctl:print("Listener ~s not exists!~n", [AppName]); - {Scheme, Port, Options} -> - ModName = http_mod_name(AppName), - ModName:stop_listener({Scheme, Port, Options}), - ModName:start_listener({Scheme, Port, Options}) - end. - -http_mod_name(emqx_management) -> emqx_mgmt_http; -http_mod_name(Name) -> Name. +format_listen_on(Port) when is_integer(Port) -> + io_lib:format("0.0.0.0:~w", [Port]); +format_listen_on({Addr, Port}) when is_list(Addr) -> + io_lib:format("~s:~w", [Addr, Port]); +format_listen_on({Addr, Port}) when is_tuple(Addr) -> + io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). \ No newline at end of file diff --git a/apps/emqx_modules/src/emqx_mod_acl_internal.erl b/apps/emqx_modules/src/emqx_mod_acl_internal.erl index 8956229ea..5fa459c5c 100644 --- a/apps/emqx_modules/src/emqx_mod_acl_internal.erl +++ b/apps/emqx_modules/src/emqx_mod_acl_internal.erl @@ -50,10 +50,9 @@ unload(_Env) -> emqx_hooks:del('client.check_acl', {?MODULE, check_acl}). reload(Env) -> - emqx_acl_cache:is_enabled() andalso ( - lists:foreach( - fun(Pid) -> erlang:send(Pid, clean_acl_cache) end, - emqx_cm:all_channels())), + lists:foreach( + fun(Pid) -> erlang:send(Pid, clean_acl_cache) end, + emqx_cm:all_channels()), unload(Env), load(Env). description() ->