Merge pull request #4188 from bgrosse-midokura/mgmt_restart_listeners

feature(mgmt) restart a listener
This commit is contained in:
Zaiming Shi 2021-02-19 10:01:06 +01:00 committed by GitHub
commit 0b513c6e3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 7 deletions

View File

@ -102,6 +102,7 @@
%% Listeners %% Listeners
-export([ list_listeners/0 -export([ list_listeners/0
, list_listeners/1 , list_listeners/1
, restart_listener/2
]). ]).
%% Alarms %% Alarms
@ -560,6 +561,12 @@ list_listeners(Node) when Node =:= node() ->
list_listeners(Node) -> list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]). rpc_call(Node, list_listeners, [Node]).
restart_listener(Node, Identifier) when Node =:= node() ->
emqx_listeners:restart_listener(Identifier);
restart_listener(Node, Identifier) ->
rpc_call(Node, restart_listener, [Node, Identifier]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Get Alarms %% Get Alarms
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -978,3 +985,4 @@ action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args}
{name, Name}, {name, Name},
{fallbacks, actions_to_prop_list(FallbackActions)}, {fallbacks, actions_to_prop_list(FallbackActions)},
{args, Args}]. {args, Args}].

View File

@ -30,7 +30,19 @@
func => list, func => list,
descr => "A list of listeners on the node"}). descr => "A list of listeners on the node"}).
-export([list/2]). -rest_api(#{name => restart_listener,
method => 'PUT',
path => "/listeners/:bin:identifier/restart",
func => restart,
descr => "Restart a listener in the cluster"}).
-rest_api(#{name => restart_node_listener,
method => 'PUT',
path => "/nodes/:atom:node/listeners/:bin:identifier/restart",
func => restart,
descr => "Restart a listener on a node"}).
-export([list/2, restart/2]).
%% List listeners on a node. %% List listeners on a node.
list(#{node := Node}, _Params) -> list(#{node := Node}, _Params) ->
@ -41,6 +53,21 @@ list(_Binding, _Params) ->
return({ok, [#{node => Node, listeners => format(Listeners)} return({ok, [#{node => Node, listeners => format(Listeners)}
|| {Node, Listeners} <- emqx_mgmt:list_listeners()]}). || {Node, Listeners} <- emqx_mgmt:list_listeners()]}).
%% Restart listeners on a node.
restart(#{node := Node, identifier := Identifier}, _Params) ->
case emqx_mgmt:restart_listener(Node, Identifier) of
ok -> return({ok, "Listener restarted."});
{error, Error} -> return({error, Error})
end;
%% Restart listeners in the cluster.
restart(#{identifier := Identifier}, _Params) ->
Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of
[] -> return(ok);
Errors -> return({error, Errors})
end.
format(Listeners) when is_list(Listeners) -> format(Listeners) when is_list(Listeners) ->
[ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))} [ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))}
|| Info = #{listen_on := ListenOn} <- Listeners ]; || Info = #{listen_on := ListenOn} <- Listeners ];

View File

@ -550,10 +550,19 @@ listeners(["stop", _Proto, ListenOn]) ->
end, end,
stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1);
listeners(["restart", Identifier]) ->
case emqx_listeners:restart_listener(Identifier) of
ok ->
emqx_ctl:print("Restarted ~s listener successfully.~n", [Identifier]);
{error, Error} ->
emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [Identifier, Error])
end;
listeners(_) -> listeners(_) ->
emqx_ctl:usage([{"listeners", "List listeners"}, emqx_ctl:usage([{"listeners", "List listeners"},
{"listeners stop <Identifier>", "Stop a listener"}, {"listeners stop <Identifier>", "Stop a listener"},
{"listeners stop <Proto> <Port>", "Stop a listener"} {"listeners stop <Proto> <Port>", "Stop a listener"},
{"listeners restart <Identifier>", "Restart a listener"}
]). ]).
stop_listener(false, Input) -> stop_listener(false, Input) ->

View File

@ -294,6 +294,18 @@ t_listeners_cmd_new(_) ->
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n", "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"]) emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"])
), ),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "mqtt:tcp:external"]),
"Restarted mqtt:tcp:external listener successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "mqtt:ssl:external"]),
"Restarted mqtt:ssl:external listener successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "bad:listener:identifier"]),
"Failed to restart bad:listener:identifier listener: {no_such_listener,\"bad:listener:identifier\"}\n"
),
unmock_print(). unmock_print().
t_plugins_cmd(_) -> t_plugins_cmd(_) ->

View File

@ -173,24 +173,34 @@ with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) ->
restart() -> restart() ->
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
-spec(restart_listener(listener()) -> any()). -spec(restart_listener(listener() | string() | binary()) -> ok | {error, any()}).
restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) -> restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) ->
restart_listener(Proto, ListenOn, Options). restart_listener(Proto, ListenOn, Options);
restart_listener(Identifier) ->
case emqx_listeners:find_by_id(Identifier) of
false -> {error, {no_such_listener, Identifier}};
Listener -> restart_listener(Listener)
end.
-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()). -spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) ->
ok | {error, any()}).
restart_listener(tcp, ListenOn, _Options) -> restart_listener(tcp, ListenOn, _Options) ->
esockd:reopen('mqtt:tcp', ListenOn); esockd:reopen('mqtt:tcp', ListenOn);
restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
esockd:reopen('mqtt:ssl', ListenOn); esockd:reopen('mqtt:ssl', ListenOn);
restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
_ = cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)), _ = cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)),
start_listener(Proto, ListenOn, Options); ok(start_listener(Proto, ListenOn, Options));
restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
_ = cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)), _ = cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)),
start_listener(Proto, ListenOn, Options); ok(start_listener(Proto, ListenOn, Options));
restart_listener(Proto, ListenOn, _Opts) -> restart_listener(Proto, ListenOn, _Opts) ->
esockd:reopen(Proto, ListenOn). esockd:reopen(Proto, ListenOn).
ok(ok) -> ok;
ok({ok, _}) -> ok;
ok(Error) -> Error.
%% @doc Stop all listeners. %% @doc Stop all listeners.
-spec(stop() -> ok). -spec(stop() -> ok).
stop() -> stop() ->