feat(listeners): make the APIs and CLIs work with the new listener

This commit is contained in:
Shawn 2021-07-07 16:22:38 +08:00
parent 630b54f6ee
commit 4dd72e59fa
5 changed files with 50 additions and 96 deletions

View File

@ -39,10 +39,8 @@ start() ->
foreach_listeners(fun start_listener/3).
-spec(start_listener(atom()) -> ok).
start_listener(Id) ->
{ZoneName, ListenerName} = decode_listener_id(Id),
start_listener(ZoneName, ListenerName,
emqx_config:get([zones, ZoneName, listeners, ListenerName])).
start_listener(ListenerId) ->
apply_on_listener(ListenerId, fun start_listener/3).
-spec(start_listener(atom(), atom(), map()) -> ok).
start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) ->
@ -133,13 +131,11 @@ esockd_access_rules(StrRules) ->
restart() ->
foreach_listeners(fun restart_listener/3).
-spec(restart_listener(atom()) -> ok | {error, any()}).
restart_listener(ListenerID) ->
{ZoneName, ListenerName} = decode_listener_id(ListenerID),
restart_listener(ZoneName, ListenerName,
emqx_config:get([zones, ZoneName, listeners, ListenerName])).
-spec(restart_listener(atom()) -> ok | {error, term()}).
restart_listener(ListenerId) ->
apply_on_listener(ListenerId, fun restart_listener/3).
-spec(restart_listener(atom(), atom(), map()) -> ok | {error, any()}).
-spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}).
restart_listener(ZoneName, ListenerName, Conf) ->
case stop_listener(ZoneName, ListenerName, Conf) of
ok -> start_listener(ZoneName, ListenerName, Conf);
@ -152,10 +148,8 @@ stop() ->
foreach_listeners(fun stop_listener/3).
-spec(stop_listener(atom()) -> ok | {error, term()}).
stop_listener(ListenerID) ->
{ZoneName, ListenerName} = decode_listener_id(ListenerID),
stop_listener(ZoneName, ListenerName,
emqx_config:get([zones, ZoneName, listeners, ListenerName])).
stop_listener(ListenerId) ->
apply_on_listener(ListenerId, fun stop_listener/3).
-spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}).
stop_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn}) ->
@ -214,3 +208,10 @@ merge_zone_and_listener_confs(ZoneConf, ListenerConf) ->
ConfsInZonesOnly = [listeners, overall_max_connections],
BaseConf = maps:without(ConfsInZonesOnly, ZoneConf),
emqx_map_lib:deep_merge(BaseConf, ListenerConf).
apply_on_listener(ListenerId, Do) ->
{ZoneName, ListenerName} = decode_listener_id(ListenerId),
case emqx_config:find([zones, ZoneName, listeners, ListenerName]) of
{not_found, _, _} -> error({not_found, ListenerId});
{ok, Conf} -> Do(ZoneName, ListenerName, Conf)
end.

View File

@ -417,7 +417,7 @@ list_listeners(Node) when Node =:= node() ->
Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
#{protocol => Protocol,
listen_on => ListenOn,
identifier => emqx_listeners:find_id_by_listen_on(ListenOn),
identifier => Protocol,
acceptors => esockd:get_acceptors({Protocol, ListenOn}),
max_conns => esockd:get_max_connections({Protocol, ListenOn}),
current_conns => esockd:get_current_connections({Protocol, ListenOn}),
@ -436,6 +436,7 @@ list_listeners(Node) when Node =:= node() ->
list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]).
-spec restart_listener(node(), atom()) -> ok | {error, term()}.
restart_listener(Node, Identifier) when Node =:= node() ->
emqx_listeners:restart_listener(Identifier);

View File

@ -30,13 +30,13 @@
-rest_api(#{name => restart_listener,
method => 'PUT',
path => "/listeners/:bin:identifier/restart",
path => "/listeners/:atom:identifier/restart",
func => restart,
descr => "Restart a listener in the cluster"}).
-rest_api(#{name => restart_node_listener,
method => 'PUT',
path => "/nodes/:atom:node/listeners/:bin:identifier/restart",
path => "/nodes/:atom:node/listeners/:atom:identifier/restart",
func => restart,
descr => "Restart a listener on a node"}).
@ -57,10 +57,7 @@ restart(#{node := Node, identifier := Identifier}, _Params) ->
ok -> minirest:return({ok, "Listener restarted."});
{error, Error} -> minirest:return({error, Error})
end;
%% Restart listeners in the cluster.
restart(#{identifier := <<"http", _/binary>>}, _Params) ->
{403, <<"http_listener_restart_unsupported">>};
%% Restart listeners on all nodes in the cluster.
restart(#{identifier := Identifier}, _Params) ->
Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of

View File

@ -464,86 +464,57 @@ trace_off(Who, Name) ->
listeners([]) ->
lists:foreach(fun({{Protocol, ListenOn}, _Pid}) ->
Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}},
Info = [{listen_on, {string, format_listen_on(ListenOn)}},
{acceptors, esockd:get_acceptors({Protocol, ListenOn})},
{max_conns, esockd:get_max_connections({Protocol, ListenOn})},
{current_conn, esockd:get_current_connections({Protocol, ListenOn})},
{shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}
],
emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]),
emqx_ctl:print("~s~n", [Protocol]),
lists:foreach(fun indent_print/1, Info)
end, esockd:listeners()),
lists:foreach(fun({Protocol, Opts}) ->
Port = proplists:get_value(port, Opts),
Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}},
Info = [{listen_on, {string, format_listen_on(Port)}},
{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
{max_conns, proplists:get_value(max_connections, Opts)},
{current_conn, proplists:get_value(all_connections, Opts)},
{shutdown_count, []}],
emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]),
emqx_ctl:print("~s~n", [Protocol]),
lists:foreach(fun indent_print/1, Info)
end, ranch:info());
listeners(["stop", Name = "http" ++ _N | _MaybePort]) ->
%% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number
case minirest:stop_http(list_to_atom(Name)) of
listeners(["stop", ListenerId]) ->
case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of
ok ->
emqx_ctl:print("Stop ~s listener successfully.~n", [Name]);
emqx_ctl:print("Stop ~s listener successfully.~n", [ListenerId]);
{error, Error} ->
emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [Name, Error])
emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [ListenerId, Error])
end;
listeners(["stop", "mqtt:" ++ _ = Identifier]) ->
stop_listener(emqx_listeners:find_by_id(Identifier), Identifier);
listeners(["stop", _Proto, ListenOn]) ->
%% this clause is kept to be backward compatible
ListenOn1 = case string:tokens(ListenOn, ":") of
[Port] -> list_to_integer(Port);
[IP, Port] -> {IP, list_to_integer(Port)}
end,
stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1);
listeners(["restart", "http:management"]) ->
restart_http_listener(http, emqx_management);
listeners(["restart", "https:management"]) ->
restart_http_listener(https, emqx_management);
listeners(["restart", "http:dashboard"]) ->
restart_http_listener(http, emqx_dashboard);
listeners(["restart", "https:dashboard"]) ->
restart_http_listener(https, emqx_dashboard);
listeners(["restart", Identifier]) ->
case emqx_listeners:restart_listener(Identifier) of
listeners(["start", ListenerId]) ->
case emqx_listeners:start_listener(list_to_atom(ListenerId)) of
ok ->
emqx_ctl:print("Restarted ~s listener successfully.~n", [Identifier]);
emqx_ctl:print("Started ~s listener successfully.~n", [ListenerId]);
{error, Error} ->
emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [Identifier, Error])
emqx_ctl:print("Failed to start ~s listener: ~0p~n", [ListenerId, Error])
end;
listeners(["restart", ListenerId]) ->
case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of
ok ->
emqx_ctl:print("Restarted ~s listener successfully.~n", [ListenerId]);
{error, Error} ->
emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [ListenerId, Error])
end;
listeners(_) ->
emqx_ctl:usage([{"listeners", "List listeners"},
{"listeners stop <Identifier>", "Stop a listener"},
{"listeners stop <Proto> <Port>", "Stop a listener"},
{"listeners start <Identifier>", "Start a listener"},
{"listeners restart <Identifier>", "Restart a listener"}
]).
stop_listener(false, Input) ->
emqx_ctl:print("No such listener ~p~n", [Input]);
stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
ID = emqx_listeners:identifier(Listener),
ListenOnStr = emqx_listeners:format_listen_on(ListenOn),
case emqx_listeners:stop_listener(Listener) of
ok ->
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]);
{error, Reason} ->
emqx_ctl:print("Failed to stop ~s listener on ~s: ~0p~n",
[ID, ListenOnStr, Reason])
end.
%%--------------------------------------------------------------------
%% @doc data Command
@ -692,24 +663,9 @@ indent_print({Key, {string, Val}}) ->
indent_print({Key, Val}) ->
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
listener_identifier(Protocol, ListenOn) ->
case emqx_listeners:find_id_by_listen_on(ListenOn) of
false ->
atom_to_list(Protocol);
ID ->
ID
end.
restart_http_listener(Scheme, AppName) ->
Listeners = application:get_env(AppName, listeners, []),
case lists:keyfind(Scheme, 1, Listeners) of
false ->
emqx_ctl:print("Listener ~s not exists!~n", [AppName]);
{Scheme, Port, Options} ->
ModName = http_mod_name(AppName),
ModName:stop_listener({Scheme, Port, Options}),
ModName:start_listener({Scheme, Port, Options})
end.
http_mod_name(emqx_management) -> emqx_mgmt_http;
http_mod_name(Name) -> Name.
format_listen_on(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listen_on({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format_listen_on({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).

View File

@ -50,10 +50,9 @@ unload(_Env) ->
emqx_hooks:del('client.check_acl', {?MODULE, check_acl}).
reload(Env) ->
emqx_acl_cache:is_enabled() andalso (
lists:foreach(
fun(Pid) -> erlang:send(Pid, clean_acl_cache) end,
emqx_cm:all_channels())),
lists:foreach(
fun(Pid) -> erlang:send(Pid, clean_acl_cache) end,
emqx_cm:all_channels()),
unload(Env), load(Env).
description() ->