feat(listeners): add DELETE APIs for removing the listeners

This commit is contained in:
Shawn 2021-08-31 14:08:49 +08:00
parent 0d1bc6d689
commit 0af39e88a4
4 changed files with 65 additions and 18 deletions

View File

@ -94,8 +94,8 @@
-type update_stage() :: pre_config_update | post_config_update. -type update_stage() :: pre_config_update | post_config_update.
-type update_error() :: {update_stage(), module(), term()} | {save_configs, term()} | term(). -type update_error() :: {update_stage(), module(), term()} | {save_configs, term()} | term().
-type update_result() :: #{ -type update_result() :: #{
config := emqx_config:config(), config => emqx_config:config(),
raw_config := emqx_config:raw_config(), raw_config => emqx_config:raw_config(),
post_config_update => #{module() => any()} post_config_update => #{module() => any()}
}. }.

View File

@ -217,10 +217,9 @@ call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result)
false -> {ok, Result} false -> {ok, Result}
end. end.
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, {_Cmd, Opts}) -> save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs) ->
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf) of case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf) of
ok -> {ok, #{config => emqx_config:get(ConfKeyPath), ok -> {ok, return_change_result(ConfKeyPath, UpdateArgs)};
raw_config => return_rawconf(ConfKeyPath, Opts)}};
{error, Reason} -> {error, {save_configs, Reason}} {error, Reason} -> {error, {save_configs, Reason}}
end. end.
@ -241,6 +240,12 @@ update_override_config(RawConf) ->
up_req({remove, _Opts}) -> '$remove'; up_req({remove, _Opts}) -> '$remove';
up_req({{update, Req}, _Opts}) -> Req. up_req({{update, Req}, _Opts}) -> Req.
return_change_result(ConfKeyPath, {{update, _Req}, Opts}) ->
#{config => emqx_config:get(ConfKeyPath),
raw_config => return_rawconf(ConfKeyPath, Opts)};
return_change_result(_ConfKeyPath, {remove, _Opts}) ->
#{}.
return_rawconf(ConfKeyPath, #{rawconf_with_defaults := true}) -> return_rawconf(ConfKeyPath, #{rawconf_with_defaults := true}) ->
FullRawConf = emqx_config:fill_defaults(emqx_config:get_raw([])), FullRawConf = emqx_config:fill_defaults(emqx_config:get_raw([])),
emqx_map_lib:deep_get(bin_path(ConfKeyPath), FullRawConf); emqx_map_lib:deep_get(bin_path(ConfKeyPath), FullRawConf);

View File

@ -92,6 +92,8 @@
, manage_listener/2 , manage_listener/2
, update_listener/2 , update_listener/2
, update_listener/3 , update_listener/3
, remove_listener/1
, remove_listener/2
]). ]).
%% Alarms %% Alarms
@ -516,6 +518,19 @@ update_listener(Node, Id, Config) when Node =:= node() ->
update_listener(Node, Id, Config) -> update_listener(Node, Id, Config) ->
rpc_call(Node, update_listener, [Node, Id, Config]). rpc_call(Node, update_listener, [Node, Id, Config]).
remove_listener(Id) ->
[remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()].
remove_listener(Node, Id) when Node =:= node() ->
{Type, Name} = emqx_listeners:parse_listener_id(Id),
case emqx:remove_config([listeners, Type, Name], #{}) of
{ok, _} -> ok;
{error, Reason} ->
error(Reason)
end;
remove_listener(Node, Id) ->
rpc_call(Node, remove_listener, [Node, Id]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Get Alarms %% Get Alarms
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -21,9 +21,9 @@
-export([api_spec/0]). -export([api_spec/0]).
-export([ list_listeners/2 -export([ list_listeners/2
, list_update_listeners_by_id/2 , crud_listeners_by_id/2
, list_listeners_on_node/2 , list_listeners_on_node/2
, get_update_listener_by_id_on_node/2 , crud_listener_by_id_on_node/2
, manage_listeners/2 , manage_listeners/2
, jsonable_resp/2 , jsonable_resp/2
]). ]).
@ -86,16 +86,24 @@ api_list_update_listeners_by_id() ->
<<"200">> => <<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}, emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}},
put => #{ put => #{
description => <<"Create or update listeners by a given Id to all nodes in the cluster">>, description => <<"Create or update a listener by a given Id to all nodes in the cluster">>,
parameters => [param_path_id()], parameters => [param_path_id()],
requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
responses => #{ responses => #{
<<"404">> => <<"404">> =>
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
<<"200">> => <<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}} emqx_mgmt_util:array_schema(resp_schema(), <<"Create or update listener successfully">>)}},
delete => #{
description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
parameters => [param_path_id()],
responses => #{
<<"404">> =>
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:schema(<<"Delete listener successfully">>)}}
}, },
{"/listeners/:id", Metadata, list_update_listeners_by_id}. {"/listeners/:id", Metadata, crud_listeners_by_id}.
api_list_listeners_on_node() -> api_list_listeners_on_node() ->
Metadata = #{ Metadata = #{
@ -126,9 +134,17 @@ api_get_update_listener_by_id_on_node() ->
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']), ['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> => <<"200">> =>
emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}} emqx_mgmt_util:object_schema(resp_schema(), <<"Get listener successfully">>)}},
delete => #{
description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
parameters => [param_path_node(), param_path_id()],
responses => #{
<<"404">> =>
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:schema(<<"Delete listener successfully">>)}}
}, },
{"/nodes/:node/listeners/:id", Metadata, get_update_listener_by_id_on_node}. {"/nodes/:node/listeners/:id", Metadata, crud_listener_by_id_on_node}.
api_manage_listeners() -> api_manage_listeners() ->
Metadata = #{ Metadata = #{
@ -190,7 +206,7 @@ param_path_operation()->
list_listeners(get, _Request) -> list_listeners(get, _Request) ->
{200, format(emqx_mgmt:list_listeners())}. {200, format(emqx_mgmt:list_listeners())}.
list_update_listeners_by_id(get, #{bindings := #{id := Id}}) -> crud_listeners_by_id(get, #{bindings := #{id := Id}}) ->
case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(), case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(),
atom_to_binary(Id0, latin1) =:= Id] of atom_to_binary(Id0, latin1) =:= Id] of
[] -> [] ->
@ -198,8 +214,14 @@ list_update_listeners_by_id(get, #{bindings := #{id := Id}}) ->
Listeners -> Listeners ->
{200, format(Listeners)} {200, format(Listeners)}
end; end;
list_update_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) -> crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) ->
return_listeners(emqx_mgmt:update_listener(Id, Conf)). return_listeners(emqx_mgmt:update_listener(Id, Conf));
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
Results = emqx_mgmt:remove_listener(Id),
case lists:filter(fun({error, _}) -> true; (_) -> false end, Results) of
[] -> {200};
Errors -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Errors)}}
end.
list_listeners_on_node(get, #{bindings := #{node := Node}}) -> list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
case emqx_mgmt:list_listeners(atom(Node)) of case emqx_mgmt:list_listeners(atom(Node)) of
@ -209,7 +231,7 @@ list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
{200, format(Listener)} {200, format(Listener)}
end. end.
get_update_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
case emqx_mgmt:get_listener(atom(Node), atom(Id)) of case emqx_mgmt:get_listener(atom(Node), atom(Id)) of
{error, not_found} -> {error, not_found} ->
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}}; {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}};
@ -218,8 +240,13 @@ get_update_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}})
Listener -> Listener ->
{200, format(Listener)} {200, format(Listener)}
end; end;
get_update_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node, body := Conf}}) -> crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node, body := Conf}}) ->
return_listeners(emqx_mgmt:update_listener(atom(Node), Id, Conf)). return_listeners(emqx_mgmt:update_listener(atom(Node), Id, Conf));
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
case emqx_mgmt:remove_listener(atom(Node), Id) of
ok -> {200};
{error, Reason} -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
end.
manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) -> manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) ->
{_, Result} = do_manage_listeners(Node, Id, Oper), {_, Result} = do_manage_listeners(Node, Id, Oper),