refactor(listener): rewrite the code for managing listeners

This commit is contained in:
Shawn 2021-08-27 16:30:43 +08:00 committed by turtleDeng
parent 5ba396afb3
commit e6ee8ec140
2 changed files with 135 additions and 163 deletions

View File

@ -87,7 +87,6 @@
%% Listeners
-export([ list_listeners/0
, list_listeners/1
, list_listeners/2
, list_listeners_by_id/1
, get_listener/2
, manage_listener/2
@ -473,9 +472,6 @@ reload_plugin(Node, Plugin) ->
list_listeners() ->
lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]).
list_listeners(Node, Identifier) ->
listener_id_filter(Identifier, list_listeners(Node)).
list_listeners(Node) when Node =:= node() ->
[{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()];

View File

@ -20,14 +20,15 @@
-export([api_spec/0]).
-export([ listeners/2
, listener/2
-export([ list_listeners/2
, list_listeners_by_id/2
, list_listeners_on_node/2
, get_listener_by_id_on_node/2
, manage_listeners/2]).
-import(emqx_mgmt_util, [ schema/1
, object_schema/2
, object_array_schema/2
, error_schema/1
, error_schema/2
, properties/1
]).
@ -36,15 +37,18 @@
-include_lib("emqx/include/emqx.hrl").
-define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>).
-define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
api_spec() ->
{
[
listeners_api(),
listener_api(),
nodes_listeners_api(),
nodes_listener_api(),
manage_listeners_api(),
manage_nodes_listeners_api()
api_list_listeners(),
api_list_listeners_by_id(),
api_manage_listeners(),
api_list_listeners_on_node(),
api_get_listener_by_id_on_node(),
api_manage_listeners_on_node()
],
[]
}.
@ -61,86 +65,74 @@ properties() ->
{auth, boolean, <<"Has auth">>}
]).
listeners_api() ->
api_list_listeners() ->
Metadata = #{
get => #{
description => <<"List listeners in cluster">>,
description => <<"List listeners from all nodes in the cluster">>,
responses => #{
<<"200">> =>
object_array_schema(properties(), <<"List all listeners">>)}}},
{"/listeners", Metadata, listeners}.
object_array_schema(properties(), <<"List listeners successfully">>)}}},
{"/listeners", Metadata, list_listeners}.
listener_api() ->
api_list_listeners_by_id() ->
Metadata = #{
get => #{
description => <<"List listeners by listener ID">>,
description => <<"List listeners by a given Id from all nodes in the cluster">>,
parameters => [param_path_id()],
responses => #{
<<"404">> =>
error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
<<"200">> =>
object_array_schema(properties(), <<"List listener info ok">>)}}},
{"/listeners/:id", Metadata, listener}.
object_array_schema(properties(), <<"List listeners successfully">>)}}},
{"/listeners/:id", Metadata, list_listeners_by_id}.
manage_listeners_api() ->
Metadata = #{
get => #{
description => <<"Restart listeners in cluster">>,
parameters => [
param_path_id(),
param_path_operation()],
responses => #{
<<"500">> =>
error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"404">> =>
error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
<<"400">> =>
error_schema(<<"Listener id not found">>, ['BAD_REQUEST']),
<<"200">> => schema(<<"Operation success">>)}}},
{"/listeners/:id/:operation", Metadata, manage_listeners}.
manage_nodes_listeners_api() ->
Metadata = #{
put => #{
description => <<"Restart listeners in cluster">>,
parameters => [
param_path_node(),
param_path_id(),
param_path_operation()],
responses => #{
<<"500">> =>
error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"404">> =>
error_schema(<<"Bad node or Listener id not found">>,
['BAD_NODE_NAME','BAD_LISTENER_ID']),
<<"400">> =>
error_schema(<<"Listener id not found">>, ['BAD_REQUEST']),
<<"200">> =>
schema(<<"Operation success">>)}}},
{"/node/:node/listeners/:id/:operation", Metadata, manage_listeners}.
nodes_listeners_api() ->
Metadata = #{
get => #{
description => <<"Get listener info in one node">>,
parameters => [param_path_node(), param_path_id()],
responses => #{
<<"404">> =>
error_schema(<<"Node name or listener id not found">>,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
object_schema(properties(), <<"Get listener info ok">>)}}},
{"/nodes/:node/listeners/:id", Metadata, listener}.
nodes_listener_api() ->
api_list_listeners_on_node() ->
Metadata = #{
get => #{
description => <<"List listeners in one node">>,
parameters => [param_path_node()],
responses => #{
<<"404">> => error_schema(<<"Listener id not found">>),
<<"200">> => object_schema(properties(), <<"Get listener info ok">>)}}},
{"/nodes/:node/listeners", Metadata, listeners}.
<<"200">> => object_schema(properties(), <<"List listeners successfully">>)}}},
{"/nodes/:node/listeners", Metadata, list_listeners_on_node}.
api_get_listener_by_id_on_node() ->
Metadata = #{
get => #{
description => <<"Get a listener by a given Id on a specific node">>,
parameters => [param_path_node(), param_path_id()],
responses => #{
<<"404">> =>
error_schema(?NODE_LISTENER_NOT_FOUND,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
object_schema(properties(), <<"Get listener successfully">>)}}},
{"/nodes/:node/listeners/:id", Metadata, get_listener_by_id_on_node}.
api_manage_listeners() ->
Metadata = #{
get => #{
description => <<"Restart listeners on all nodes in the cluster">>,
parameters => [
param_path_id(),
param_path_operation()],
responses => #{
<<"500">> => error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"200">> => schema(<<"Operation success">>)}}},
{"/listeners/:id/:operation", Metadata, manage_listeners}.
api_manage_listeners_on_node() ->
Metadata = #{
put => #{
description => <<"Restart listeners on all nodes in the cluster">>,
parameters => [
param_path_node(),
param_path_id(),
param_path_operation()],
responses => #{
<<"500">> => error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"200">> => schema(<<"Operation success">>)}}},
{"/nodes/:node/listeners/:id/:operation", Metadata, manage_listeners}.
%%%==============================================================================================
%% parameters
param_path_node() ->
@ -173,102 +165,80 @@ param_path_operation()->
%%%==============================================================================================
%% api
listeners(get, _Request) ->
list().
listener(get, #{bindings := Bindings}) ->
get_listeners(Bindings).
manage_listeners(_, #{bindings := Bindings}) ->
manage(Bindings).
%%%==============================================================================================
%% List listeners in the cluster.
list() ->
list_listeners(get, _Request) ->
{200, format(emqx_mgmt:list_listeners())}.
get_listeners(Param) ->
case list_listener(Param) of
{error, not_found} ->
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = b2a(maps:get(node, Param)),
Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
list_listeners_by_id(get, #{bindings := #{id := Id}}) ->
case [L || L = {Id0, _Conf} <- emqx_mgmt:list_listeners(),
atom_to_binary(Id0, latin1) =:= Id] of
[] ->
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
Data ->
{200, Data}
{400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}};
Listeners ->
{200, format(Listeners)}
end.
manage(Param) ->
OperationMap = #{start => start_listener,
stop => stop_listener,
restart => restart_listener},
Operation = maps:get(b2a(maps:get(operation, Param)), OperationMap),
case list_listener(Param) of
{error, not_found} ->
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = b2a(maps:get(node, Param)),
Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}};
ListenersOrSingleListener ->
manage_(Operation, ListenersOrSingleListener)
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
case emqx_mgmt:list_listeners(atom(Node)) of
{error, Reason} ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
Listener ->
{200, format(Listener)}
end.
manage_(Operation, Listener) when is_map(Listener) ->
manage_(Operation, [Listener]);
manage_(Operation, Listeners) when is_list(Listeners) ->
Results = [emqx_mgmt:manage_listener(Operation, Listener) || Listener <- Listeners],
case lists:filter(fun(Result) -> Result =/= ok end, Results) of
[] ->
{200};
Errors ->
case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of
[] ->
ID = maps:get(id, hd(Listeners)),
Message = iolist_to_binary(io_lib:format("Already Started: ~s", [ID])),
{400, #{code => 'BAD_REQUEST', message => Message}};
_ ->
case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of
[] ->
ID = maps:get(id, hd(Listeners)),
Message = iolist_to_binary(io_lib:format("Already Stopped: ~s", [ID])),
{400, #{code => 'BAD_REQUEST', message => Message}};
_ ->
Reason = iolist_to_binary(io_lib:format("~p", [Errors])),
{500, #{code => 'UNKNOW_ERROR', message => Reason}}
end
end
get_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
case emqx_mgmt:get_listener(atom(Node), atom(Id)) of
{error, not_found} ->
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}};
{error, Reason} ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
Listener ->
{200, format(Listener)}
end.
manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) ->
{_, Result} = do_manage_listeners(Node, Id, Oper),
Result;
manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) ->
Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of
[] -> {200};
Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}
end.
%%%==============================================================================================
%% util function
list_listener(Params) ->
format(list_listener_(Params)).
%% util functions
list_listener_(#{node := Node, id := Identifier}) ->
emqx_mgmt:get_listener(b2a(Node), b2a(Identifier));
list_listener_(#{id := Identifier}) ->
emqx_mgmt:list_listeners_by_id(b2a(Identifier));
list_listener_(#{node := Node}) ->
emqx_mgmt:list_listeners(b2a(Node));
list_listener_(#{}) ->
emqx_mgmt:list_listeners().
do_manage_listeners(Node, Id, Oper) ->
Param = #{node => atom(Node), id => atom(Id)},
{Node, do_manage_listeners2(Oper, Param)}.
do_manage_listeners2(<<"start">>, Param) ->
case emqx_mgmt:manage_listener(start_listener, Param) of
ok -> {200};
{error, {already_started, _}} -> {200};
{error, Reason} ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
end;
do_manage_listeners2(<<"stop">>, Param) ->
case emqx_mgmt:manage_listener(stop_listener, Param) of
ok -> {200};
{error, not_found} -> {200};
{error, Reason} ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
end;
do_manage_listeners2(<<"restart">>, Param) ->
case emqx_mgmt:manage_listener(restart_listener, Param) of
ok -> {200};
{error, not_found} -> do_manage_listeners2(<<"start">>, Param);
{error, Reason} ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
end.
manage_listeners_err(Errors) ->
list_to_binary(lists:foldl(fun({Node, Err}, Str) ->
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
end, "", Errors)).
format(Listeners) when is_list(Listeners) ->
[format(Listener) || Listener <- Listeners];
@ -295,6 +265,12 @@ trans_running(Conf) ->
Running
end.
atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
atom(S) when is_list(S) -> list_to_atom(S);
atom(A) when is_atom(A) -> A.
b2a(B) when is_binary(B) -> binary_to_atom(B, utf8);
b2a(Any) -> Any.
err_msg(Reason) ->
list_to_binary(err_msg_str(Reason)).
err_msg_str(Reason) ->
io_lib:format("~p", [Reason]).