diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 2d3357f37..d9670b858 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -265,12 +265,12 @@ format_addr({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). listener_id(Type, ListenerName) -> - list_to_atom(lists:append([atom_to_list(Type), ":", atom_to_list(ListenerName)])). + list_to_atom(lists:append([str(Type), ":", str(ListenerName)])). parse_listener_id(Id) -> try - [Zone, Listen] = string:split(atom_to_list(Id), ":", leading), - {list_to_existing_atom(Zone), list_to_existing_atom(Listen)} + [Type, Name] = string:split(str(Id), ":", leading), + {list_to_existing_atom(Type), list_to_atom(Name)} catch _ : _ -> error({invalid_listener_id, Id}) end. @@ -291,8 +291,8 @@ tcp_opts(Opts) -> foreach_listeners(Do) -> lists:foreach( - fun({ZoneName, LName, LConf}) -> - Do(ZoneName, LName, LConf) + fun({Type, LName, LConf}) -> + Do(Type, LName, LConf) end, do_list()). has_enabled_listener_conf_by_type(Type) -> @@ -307,3 +307,10 @@ apply_on_listener(ListenerId, Do) -> {not_found, _, _} -> error({listener_config_not_found, Type, ListenerName}); {ok, Conf} -> Do(Type, ListenerName, Conf) end. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 02bb8662c..4a628d994 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -90,6 +90,8 @@ , list_listeners_by_id/1 , get_listener/2 , manage_listener/2 + , update_listener/2 + , update_listener/3 ]). %% Alarms @@ -473,7 +475,7 @@ list_listeners() -> lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). list_listeners(Node) when Node =:= node() -> - [{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()]; + [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()]; list_listeners(Node) -> rpc_call(Node, list_listeners, [Node]). @@ -501,6 +503,16 @@ manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()-> manage_listener(Operation, Param = #{node := Node}) -> rpc_call(Node, manage_listener, [Operation, Param]). +update_listener(Id, Config) -> + [update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()]. + +update_listener(Node, Id, Config) when Node =:= node() -> + {Type, Name} = emqx_listeners:parse_listener_id(Id), + {ok, #{raw_config := RawConf}} = emqx:update_config([listeners, Type, Name], Config, #{}), + RawConf#{node => Node, id => Id, running => true}; +update_listener(Node, Id, Config) -> + rpc_call(Node, update_listener, [Node, Id, Config]). + %%-------------------------------------------------------------------- %% Get Alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 4bad6a26b..f1749b1d9 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -21,9 +21,9 @@ -export([api_spec/0]). -export([ list_listeners/2 - , list_listeners_by_id/2 + , list_update_listeners_by_id/2 , list_listeners_on_node/2 - , get_listener_by_id_on_node/2 + , get_update_listener_by_id_on_node/2 , manage_listeners/2 , jsonable_resp/2 ]). @@ -39,10 +39,10 @@ api_spec() -> { [ api_list_listeners(), - api_list_listeners_by_id(), + api_list_update_listeners_by_id(), api_manage_listeners(), api_list_listeners_on_node(), - api_get_listener_by_id_on_node(), + api_get_update_listener_by_id_on_node(), api_manage_listeners_on_node() ], [] @@ -75,7 +75,7 @@ api_list_listeners() -> emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}}, {"/listeners", Metadata, list_listeners}. -api_list_listeners_by_id() -> +api_list_update_listeners_by_id() -> Metadata = #{ get => #{ description => <<"List listeners by a given Id from all nodes in the cluster">>, @@ -84,8 +84,18 @@ api_list_listeners_by_id() -> <<"404">> => emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}}, - {"/listeners/:id", Metadata, list_listeners_by_id}. + emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}, + put => #{ + description => <<"Create or update listeners by a given Id to all nodes in the cluster">>, + parameters => [param_path_id()], + requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), + responses => #{ + <<"404">> => + emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), + <<"200">> => + emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}} + }, + {"/listeners/:id", Metadata, list_update_listeners_by_id}. api_list_listeners_on_node() -> Metadata = #{ @@ -96,7 +106,7 @@ api_list_listeners_on_node() -> <<"200">> => emqx_mgmt_util:object_schema(resp_schema(), <<"List listeners successfully">>)}}}, {"/nodes/:node/listeners", Metadata, list_listeners_on_node}. -api_get_listener_by_id_on_node() -> +api_get_update_listener_by_id_on_node() -> Metadata = #{ get => #{ description => <<"Get a listener by a given Id on a specific node">>, @@ -106,8 +116,19 @@ api_get_listener_by_id_on_node() -> emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), <<"200">> => - emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}}}, - {"/nodes/:node/listeners/:id", Metadata, get_listener_by_id_on_node}. + emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}}, + put => #{ + description => <<"Create or update a listener by a given Id on a specific node">>, + parameters => [param_path_node(), param_path_id()], + requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), + responses => #{ + <<"404">> => + emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, + ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), + <<"200">> => + emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}} + }, + {"/nodes/:node/listeners/:id", Metadata, get_update_listener_by_id_on_node}. api_manage_listeners() -> Metadata = #{ @@ -169,14 +190,16 @@ param_path_operation()-> list_listeners(get, _Request) -> {200, format(emqx_mgmt:list_listeners())}. -list_listeners_by_id(get, #{bindings := #{id := Id}}) -> - case [L || L = {Id0, _Conf} <- emqx_mgmt:list_listeners(), +list_update_listeners_by_id(get, #{bindings := #{id := Id}}) -> + case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(), atom_to_binary(Id0, latin1) =:= Id] of [] -> {400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}}; Listeners -> {200, format(Listeners)} - end. + end; +list_update_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) -> + return_listeners(emqx_mgmt:update_listener(Id, Conf)). list_listeners_on_node(get, #{bindings := #{node := Node}}) -> case emqx_mgmt:list_listeners(atom(Node)) of @@ -186,7 +209,7 @@ list_listeners_on_node(get, #{bindings := #{node := Node}}) -> {200, format(Listener)} end. -get_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> +get_update_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> case emqx_mgmt:get_listener(atom(Node), atom(Id)) of {error, not_found} -> {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}}; @@ -194,7 +217,9 @@ get_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; Listener -> {200, format(Listener)} - end. + end; +get_update_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node, body := Conf}}) -> + return_listeners(emqx_mgmt:update_listener(atom(Node), Id, Conf)). manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) -> {_, Result} = do_manage_listeners(Node, Id, Oper), @@ -236,6 +261,13 @@ do_manage_listeners2(<<"restart">>, Param) -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} end. +return_listeners(Listeners) -> + Results = format(Listeners), + case lists:filter(fun({error, _}) -> true; (_) -> false end, Results) of + [] -> {200, Results}; + Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} + end. + manage_listeners_err(Errors) -> list_to_binary(lists:foldl(fun({Node, Err}, Str) -> err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str @@ -247,12 +279,10 @@ format(Listeners) when is_list(Listeners) -> format({error, Reason}) -> {error, Reason}; -format({ID, Conf}) -> +format(#{node := _Node, id := _Id} = Conf) when is_map(Conf) -> emqx_map_lib:jsonable_map(Conf#{ - id => ID, - node => maps:get(node, Conf), - running => trans_running(Conf) - }, fun ?MODULE:jsonable_resp/2). + running => trans_running(Conf) + }, fun ?MODULE:jsonable_resp/2). trans_running(Conf) -> case maps:get(running, Conf) of