feat(listeners): APIs for updating the listener

This commit is contained in:
Shawn 2021-08-29 16:03:17 +08:00
parent 05fc6d9e45
commit 8c36b7879f
3 changed files with 75 additions and 26 deletions

View File

@ -265,12 +265,12 @@ format_addr({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
listener_id(Type, ListenerName) ->
list_to_atom(lists:append([atom_to_list(Type), ":", atom_to_list(ListenerName)])).
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
parse_listener_id(Id) ->
try
[Zone, Listen] = string:split(atom_to_list(Id), ":", leading),
{list_to_existing_atom(Zone), list_to_existing_atom(Listen)}
[Type, Name] = string:split(str(Id), ":", leading),
{list_to_existing_atom(Type), list_to_atom(Name)}
catch
_ : _ -> error({invalid_listener_id, Id})
end.
@ -291,8 +291,8 @@ tcp_opts(Opts) ->
foreach_listeners(Do) ->
lists:foreach(
fun({ZoneName, LName, LConf}) ->
Do(ZoneName, LName, LConf)
fun({Type, LName, LConf}) ->
Do(Type, LName, LConf)
end, do_list()).
has_enabled_listener_conf_by_type(Type) ->
@ -307,3 +307,10 @@ apply_on_listener(ListenerId, Do) ->
{not_found, _, _} -> error({listener_config_not_found, Type, ListenerName});
{ok, Conf} -> Do(Type, ListenerName, Conf)
end.
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.

View File

@ -90,6 +90,8 @@
, list_listeners_by_id/1
, get_listener/2
, manage_listener/2
, update_listener/2
, update_listener/3
]).
%% Alarms
@ -473,7 +475,7 @@ list_listeners() ->
lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]).
list_listeners(Node) when Node =:= node() ->
[{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()];
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]).
@ -501,6 +503,16 @@ manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
manage_listener(Operation, Param = #{node := Node}) ->
rpc_call(Node, manage_listener, [Operation, Param]).
update_listener(Id, Config) ->
[update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()].
update_listener(Node, Id, Config) when Node =:= node() ->
{Type, Name} = emqx_listeners:parse_listener_id(Id),
{ok, #{raw_config := RawConf}} = emqx:update_config([listeners, Type, Name], Config, #{}),
RawConf#{node => Node, id => Id, running => true};
update_listener(Node, Id, Config) ->
rpc_call(Node, update_listener, [Node, Id, Config]).
%%--------------------------------------------------------------------
%% Get Alarms
%%--------------------------------------------------------------------

View File

@ -21,9 +21,9 @@
-export([api_spec/0]).
-export([ list_listeners/2
, list_listeners_by_id/2
, list_update_listeners_by_id/2
, list_listeners_on_node/2
, get_listener_by_id_on_node/2
, get_update_listener_by_id_on_node/2
, manage_listeners/2
, jsonable_resp/2
]).
@ -39,10 +39,10 @@ api_spec() ->
{
[
api_list_listeners(),
api_list_listeners_by_id(),
api_list_update_listeners_by_id(),
api_manage_listeners(),
api_list_listeners_on_node(),
api_get_listener_by_id_on_node(),
api_get_update_listener_by_id_on_node(),
api_manage_listeners_on_node()
],
[]
@ -75,7 +75,7 @@ api_list_listeners() ->
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}},
{"/listeners", Metadata, list_listeners}.
api_list_listeners_by_id() ->
api_list_update_listeners_by_id() ->
Metadata = #{
get => #{
description => <<"List listeners by a given Id from all nodes in the cluster">>,
@ -84,8 +84,18 @@ api_list_listeners_by_id() ->
<<"404">> =>
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}},
{"/listeners/:id", Metadata, list_listeners_by_id}.
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}},
put => #{
description => <<"Create or update listeners by a given Id to all nodes in the cluster">>,
parameters => [param_path_id()],
requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
responses => #{
<<"404">> =>
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}
},
{"/listeners/:id", Metadata, list_update_listeners_by_id}.
api_list_listeners_on_node() ->
Metadata = #{
@ -96,7 +106,7 @@ api_list_listeners_on_node() ->
<<"200">> => emqx_mgmt_util:object_schema(resp_schema(), <<"List listeners successfully">>)}}},
{"/nodes/:node/listeners", Metadata, list_listeners_on_node}.
api_get_listener_by_id_on_node() ->
api_get_update_listener_by_id_on_node() ->
Metadata = #{
get => #{
description => <<"Get a listener by a given Id on a specific node">>,
@ -106,8 +116,19 @@ api_get_listener_by_id_on_node() ->
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}}},
{"/nodes/:node/listeners/:id", Metadata, get_listener_by_id_on_node}.
emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}},
put => #{
description => <<"Create or update a listener by a given Id on a specific node">>,
parameters => [param_path_node(), param_path_id()],
requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
responses => #{
<<"404">> =>
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}}
},
{"/nodes/:node/listeners/:id", Metadata, get_update_listener_by_id_on_node}.
api_manage_listeners() ->
Metadata = #{
@ -169,14 +190,16 @@ param_path_operation()->
list_listeners(get, _Request) ->
{200, format(emqx_mgmt:list_listeners())}.
list_listeners_by_id(get, #{bindings := #{id := Id}}) ->
case [L || L = {Id0, _Conf} <- emqx_mgmt:list_listeners(),
list_update_listeners_by_id(get, #{bindings := #{id := Id}}) ->
case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(),
atom_to_binary(Id0, latin1) =:= Id] of
[] ->
{400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}};
Listeners ->
{200, format(Listeners)}
end.
end;
list_update_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) ->
return_listeners(emqx_mgmt:update_listener(Id, Conf)).
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
case emqx_mgmt:list_listeners(atom(Node)) of
@ -186,7 +209,7 @@ list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
{200, format(Listener)}
end.
get_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
get_update_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
case emqx_mgmt:get_listener(atom(Node), atom(Id)) of
{error, not_found} ->
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}};
@ -194,7 +217,9 @@ get_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
Listener ->
{200, format(Listener)}
end.
end;
get_update_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node, body := Conf}}) ->
return_listeners(emqx_mgmt:update_listener(atom(Node), Id, Conf)).
manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) ->
{_, Result} = do_manage_listeners(Node, Id, Oper),
@ -236,6 +261,13 @@ do_manage_listeners2(<<"restart">>, Param) ->
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
end.
return_listeners(Listeners) ->
Results = format(Listeners),
case lists:filter(fun({error, _}) -> true; (_) -> false end, Results) of
[] -> {200, Results};
Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}
end.
manage_listeners_err(Errors) ->
list_to_binary(lists:foldl(fun({Node, Err}, Str) ->
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
@ -247,12 +279,10 @@ format(Listeners) when is_list(Listeners) ->
format({error, Reason}) ->
{error, Reason};
format({ID, Conf}) ->
format(#{node := _Node, id := _Id} = Conf) when is_map(Conf) ->
emqx_map_lib:jsonable_map(Conf#{
id => ID,
node => maps:get(node, Conf),
running => trans_running(Conf)
}, fun ?MODULE:jsonable_resp/2).
running => trans_running(Conf)
}, fun ?MODULE:jsonable_resp/2).
trans_running(Conf) ->
case maps:get(running, Conf) of