diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 29f8de0d3..02bb8662c 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -87,7 +87,6 @@ %% Listeners -export([ list_listeners/0 , list_listeners/1 - , list_listeners/2 , list_listeners_by_id/1 , get_listener/2 , manage_listener/2 @@ -473,9 +472,6 @@ reload_plugin(Node, Plugin) -> list_listeners() -> lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). -list_listeners(Node, Identifier) -> - listener_id_filter(Identifier, list_listeners(Node)). - list_listeners(Node) when Node =:= node() -> [{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()]; diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index fb098634f..e13549a86 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -20,14 +20,15 @@ -export([api_spec/0]). --export([ listeners/2 - , listener/2 +-export([ list_listeners/2 + , list_listeners_by_id/2 + , list_listeners_on_node/2 + , get_listener_by_id_on_node/2 , manage_listeners/2]). -import(emqx_mgmt_util, [ schema/1 , object_schema/2 , object_array_schema/2 - , error_schema/1 , error_schema/2 , properties/1 ]). @@ -36,15 +37,18 @@ -include_lib("emqx/include/emqx.hrl"). +-define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>). +-define(LISTENER_NOT_FOUND, <<"Listener id not found">>). + api_spec() -> { [ - listeners_api(), - listener_api(), - nodes_listeners_api(), - nodes_listener_api(), - manage_listeners_api(), - manage_nodes_listeners_api() + api_list_listeners(), + api_list_listeners_by_id(), + api_manage_listeners(), + api_list_listeners_on_node(), + api_get_listener_by_id_on_node(), + api_manage_listeners_on_node() ], [] }. @@ -61,86 +65,74 @@ properties() -> {auth, boolean, <<"Has auth">>} ]). -listeners_api() -> +api_list_listeners() -> Metadata = #{ get => #{ - description => <<"List listeners in cluster">>, + description => <<"List listeners from all nodes in the cluster">>, responses => #{ <<"200">> => - object_array_schema(properties(), <<"List all listeners">>)}}}, - {"/listeners", Metadata, listeners}. + object_array_schema(properties(), <<"List listeners successfully">>)}}}, + {"/listeners", Metadata, list_listeners}. -listener_api() -> +api_list_listeners_by_id() -> Metadata = #{ get => #{ - description => <<"List listeners by listener ID">>, + description => <<"List listeners by a given Id from all nodes in the cluster">>, parameters => [param_path_id()], responses => #{ <<"404">> => - error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']), + error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), <<"200">> => - object_array_schema(properties(), <<"List listener info ok">>)}}}, - {"/listeners/:id", Metadata, listener}. + object_array_schema(properties(), <<"List listeners successfully">>)}}}, + {"/listeners/:id", Metadata, list_listeners_by_id}. -manage_listeners_api() -> - Metadata = #{ - get => #{ - description => <<"Restart listeners in cluster">>, - parameters => [ - param_path_id(), - param_path_operation()], - responses => #{ - <<"500">> => - error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), - <<"404">> => - error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']), - <<"400">> => - error_schema(<<"Listener id not found">>, ['BAD_REQUEST']), - <<"200">> => schema(<<"Operation success">>)}}}, - {"/listeners/:id/:operation", Metadata, manage_listeners}. - -manage_nodes_listeners_api() -> - Metadata = #{ - put => #{ - description => <<"Restart listeners in cluster">>, - parameters => [ - param_path_node(), - param_path_id(), - param_path_operation()], - responses => #{ - <<"500">> => - error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), - <<"404">> => - error_schema(<<"Bad node or Listener id not found">>, - ['BAD_NODE_NAME','BAD_LISTENER_ID']), - <<"400">> => - error_schema(<<"Listener id not found">>, ['BAD_REQUEST']), - <<"200">> => - schema(<<"Operation success">>)}}}, - {"/node/:node/listeners/:id/:operation", Metadata, manage_listeners}. - -nodes_listeners_api() -> - Metadata = #{ - get => #{ - description => <<"Get listener info in one node">>, - parameters => [param_path_node(), param_path_id()], - responses => #{ - <<"404">> => - error_schema(<<"Node name or listener id not found">>, - ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), - <<"200">> => - object_schema(properties(), <<"Get listener info ok">>)}}}, - {"/nodes/:node/listeners/:id", Metadata, listener}. - -nodes_listener_api() -> +api_list_listeners_on_node() -> Metadata = #{ get => #{ description => <<"List listeners in one node">>, parameters => [param_path_node()], responses => #{ - <<"404">> => error_schema(<<"Listener id not found">>), - <<"200">> => object_schema(properties(), <<"Get listener info ok">>)}}}, - {"/nodes/:node/listeners", Metadata, listeners}. + <<"200">> => object_schema(properties(), <<"List listeners successfully">>)}}}, + {"/nodes/:node/listeners", Metadata, list_listeners_on_node}. + +api_get_listener_by_id_on_node() -> + Metadata = #{ + get => #{ + description => <<"Get a listener by a given Id on a specific node">>, + parameters => [param_path_node(), param_path_id()], + responses => #{ + <<"404">> => + error_schema(?NODE_LISTENER_NOT_FOUND, + ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), + <<"200">> => + object_schema(properties(), <<"Get listener successfully">>)}}}, + {"/nodes/:node/listeners/:id", Metadata, get_listener_by_id_on_node}. + +api_manage_listeners() -> + Metadata = #{ + get => #{ + description => <<"Restart listeners on all nodes in the cluster">>, + parameters => [ + param_path_id(), + param_path_operation()], + responses => #{ + <<"500">> => error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"200">> => schema(<<"Operation success">>)}}}, + {"/listeners/:id/:operation", Metadata, manage_listeners}. + +api_manage_listeners_on_node() -> + Metadata = #{ + put => #{ + description => <<"Restart listeners on all nodes in the cluster">>, + parameters => [ + param_path_node(), + param_path_id(), + param_path_operation()], + responses => #{ + <<"500">> => error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"200">> => schema(<<"Operation success">>)}}}, + {"/nodes/:node/listeners/:id/:operation", Metadata, manage_listeners}. + %%%============================================================================================== %% parameters param_path_node() -> @@ -173,102 +165,80 @@ param_path_operation()-> %%%============================================================================================== %% api -listeners(get, _Request) -> - list(). - -listener(get, #{bindings := Bindings}) -> - get_listeners(Bindings). - -manage_listeners(_, #{bindings := Bindings}) -> - manage(Bindings). - -%%%============================================================================================== - -%% List listeners in the cluster. -list() -> +list_listeners(get, _Request) -> {200, format(emqx_mgmt:list_listeners())}. -get_listeners(Param) -> - case list_listener(Param) of - {error, not_found} -> - ID = b2a(maps:get(id, Param)), - Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])), - {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; - {error, nodedown} -> - Node = b2a(maps:get(node, Param)), - Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])), - Response = #{code => 'BAD_NODE_NAME', message => Reason}, - {404, Response}; +list_listeners_by_id(get, #{bindings := #{id := Id}}) -> + case [L || L = {Id0, _Conf} <- emqx_mgmt:list_listeners(), + atom_to_binary(Id0, latin1) =:= Id] of [] -> - ID = b2a(maps:get(id, Param)), - Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])), - {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; - Data -> - {200, Data} + {400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}}; + Listeners -> + {200, format(Listeners)} end. -manage(Param) -> - OperationMap = #{start => start_listener, - stop => stop_listener, - restart => restart_listener}, - Operation = maps:get(b2a(maps:get(operation, Param)), OperationMap), - case list_listener(Param) of - {error, not_found} -> - ID = b2a(maps:get(id, Param)), - Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])), - {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; - {error, nodedown} -> - Node = b2a(maps:get(node, Param)), - Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])), - Response = #{code => 'BAD_NODE_NAME', message => Reason}, - {404, Response}; - [] -> - ID = b2a(maps:get(id, Param)), - Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])), - {404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}}; - ListenersOrSingleListener -> - manage_(Operation, ListenersOrSingleListener) +list_listeners_on_node(get, #{bindings := #{node := Node}}) -> + case emqx_mgmt:list_listeners(atom(Node)) of + {error, Reason} -> + {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; + Listener -> + {200, format(Listener)} end. -manage_(Operation, Listener) when is_map(Listener) -> - manage_(Operation, [Listener]); -manage_(Operation, Listeners) when is_list(Listeners) -> - Results = [emqx_mgmt:manage_listener(Operation, Listener) || Listener <- Listeners], - case lists:filter(fun(Result) -> Result =/= ok end, Results) of - [] -> - {200}; - Errors -> - case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of - [] -> - ID = maps:get(id, hd(Listeners)), - Message = iolist_to_binary(io_lib:format("Already Started: ~s", [ID])), - {400, #{code => 'BAD_REQUEST', message => Message}}; - _ -> - case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of - [] -> - ID = maps:get(id, hd(Listeners)), - Message = iolist_to_binary(io_lib:format("Already Stopped: ~s", [ID])), - {400, #{code => 'BAD_REQUEST', message => Message}}; - _ -> - Reason = iolist_to_binary(io_lib:format("~p", [Errors])), - {500, #{code => 'UNKNOW_ERROR', message => Reason}} - end - end +get_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> + case emqx_mgmt:get_listener(atom(Node), atom(Id)) of + {error, not_found} -> + {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}}; + {error, Reason} -> + {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; + Listener -> + {200, format(Listener)} + end. + +manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) -> + {_, Result} = do_manage_listeners(Node, Id, Oper), + Result; + +manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> + Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()], + case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of + [] -> {200}; + Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} end. %%%============================================================================================== -%% util function -list_listener(Params) -> - format(list_listener_(Params)). +%% util functions -list_listener_(#{node := Node, id := Identifier}) -> - emqx_mgmt:get_listener(b2a(Node), b2a(Identifier)); -list_listener_(#{id := Identifier}) -> - emqx_mgmt:list_listeners_by_id(b2a(Identifier)); -list_listener_(#{node := Node}) -> - emqx_mgmt:list_listeners(b2a(Node)); -list_listener_(#{}) -> - emqx_mgmt:list_listeners(). +do_manage_listeners(Node, Id, Oper) -> + Param = #{node => atom(Node), id => atom(Id)}, + {Node, do_manage_listeners2(Oper, Param)}. + +do_manage_listeners2(<<"start">>, Param) -> + case emqx_mgmt:manage_listener(start_listener, Param) of + ok -> {200}; + {error, {already_started, _}} -> {200}; + {error, Reason} -> + {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + end; +do_manage_listeners2(<<"stop">>, Param) -> + case emqx_mgmt:manage_listener(stop_listener, Param) of + ok -> {200}; + {error, not_found} -> {200}; + {error, Reason} -> + {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + end; +do_manage_listeners2(<<"restart">>, Param) -> + case emqx_mgmt:manage_listener(restart_listener, Param) of + ok -> {200}; + {error, not_found} -> do_manage_listeners2(<<"start">>, Param); + {error, Reason} -> + {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + end. + +manage_listeners_err(Errors) -> + list_to_binary(lists:foldl(fun({Node, Err}, Str) -> + err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str + end, "", Errors)). format(Listeners) when is_list(Listeners) -> [format(Listener) || Listener <- Listeners]; @@ -295,6 +265,12 @@ trans_running(Conf) -> Running end. +atom(B) when is_binary(B) -> binary_to_atom(B, utf8); +atom(S) when is_list(S) -> list_to_atom(S); +atom(A) when is_atom(A) -> A. -b2a(B) when is_binary(B) -> binary_to_atom(B, utf8); -b2a(Any) -> Any. +err_msg(Reason) -> + list_to_binary(err_msg_str(Reason)). + +err_msg_str(Reason) -> + io_lib:format("~p", [Reason]).