feat: refactor api_listeners api
This commit is contained in:
parent
1b63f8fcef
commit
63d6682a7d
|
@ -24,6 +24,7 @@
|
|||
|
||||
%% APIs
|
||||
-export([
|
||||
list_raw/0,
|
||||
list/0,
|
||||
start/0,
|
||||
restart/0,
|
||||
|
@ -57,50 +58,64 @@
|
|||
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
|
||||
|
||||
-spec id_example() -> atom().
|
||||
id_example() ->
|
||||
id_example(list()).
|
||||
|
||||
id_example([]) ->
|
||||
{ID, _} = hd(list()),
|
||||
ID;
|
||||
id_example([{'tcp:default', _} | _]) ->
|
||||
'tcp:default';
|
||||
id_example([_ | Listeners]) ->
|
||||
id_example(Listeners).
|
||||
id_example() -> 'tcp:default'.
|
||||
|
||||
%% @doc List configured listeners.
|
||||
-spec list() -> [{ListenerId :: atom(), ListenerConf :: map()}].
|
||||
-spec list_raw() -> [{ListenerId :: atom(), Type :: atom(), ListenerConf :: map()}].
|
||||
list_raw() ->
|
||||
[{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()].
|
||||
|
||||
list() ->
|
||||
[{listener_id(Type, LName), LConf} || {Type, LName, LConf} <- do_list()].
|
||||
|
||||
do_list() ->
|
||||
Listeners = maps:to_list(emqx:get_config([listeners], #{})),
|
||||
lists:append([list(Type, maps:to_list(Conf)) || {Type, Conf} <- Listeners]).
|
||||
lists:flatmap(fun format_list/1, Listeners).
|
||||
|
||||
list(Type, Conf) ->
|
||||
format_list(Listener) ->
|
||||
{Type, Conf} = Listener,
|
||||
[
|
||||
begin
|
||||
Running = is_running(Type, listener_id(Type, LName), LConf),
|
||||
{Type, LName, maps:put(running, Running, LConf)}
|
||||
end
|
||||
|| {LName, LConf} <- Conf, is_map(LConf)
|
||||
|| {LName, LConf} <- maps:to_list(Conf), is_map(LConf)
|
||||
].
|
||||
|
||||
-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
|
||||
do_list_raw() ->
|
||||
Key = <<"listeners">>,
|
||||
Raw = emqx_config:get_raw([Key], #{}),
|
||||
SchemaMod = emqx_config:get_schema_mod(Key),
|
||||
#{Key := RawWithDefault} = emqx_config:fill_defaults(SchemaMod, #{Key => Raw}),
|
||||
Listeners = maps:to_list(RawWithDefault),
|
||||
lists:flatmap(fun format_raw_listeners/1, Listeners).
|
||||
|
||||
format_raw_listeners({Type, Conf}) ->
|
||||
lists:map(
|
||||
fun({LName, LConf0}) when is_map(LConf0) ->
|
||||
Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0),
|
||||
LConf1 = maps:remove(<<"authentication">>, LConf0),
|
||||
LConf2 = maps:put(<<"running">>, Running, LConf1),
|
||||
{Type, LName, LConf2}
|
||||
end, maps:to_list(Conf)).
|
||||
|
||||
-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
|
||||
is_running(ListenerId) ->
|
||||
case
|
||||
lists:filtermap(
|
||||
fun({_Type, Id, #{running := IsRunning}}) ->
|
||||
Id =:= ListenerId andalso {true, IsRunning}
|
||||
end,
|
||||
do_list()
|
||||
)
|
||||
{Type, Name} = parse_listener_id(ListenerId),
|
||||
case [ Running || {Type0, Name0, #{running := Running}} <- list(),
|
||||
Type0 =:= Type, Name0 =:= Name]
|
||||
of
|
||||
[IsRunning] -> IsRunning;
|
||||
[] -> {error, not_found}
|
||||
[] -> {error, not_found};
|
||||
[IsRunning] -> IsRunning
|
||||
end.
|
||||
|
||||
is_running(Type, ListenerId, #{bind := ListenOn}) when Type =:= tcp; Type =:= ssl ->
|
||||
is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
|
||||
ListenOn =
|
||||
case Conf of
|
||||
#{bind := Bind} -> Bind;
|
||||
#{<<"bind">> := Bind} ->
|
||||
case emqx_schema:to_ip_port(binary_to_list(Bind)) of
|
||||
{ok, L} -> L;
|
||||
{error, _} -> binary_to_integer(Bind)
|
||||
end
|
||||
end,
|
||||
try esockd:listener({ListenerId, ListenOn}) of
|
||||
Pid when is_pid(Pid) ->
|
||||
true
|
||||
|
@ -118,7 +133,7 @@ is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss ->
|
|||
end;
|
||||
is_running(quic, _ListenerId, _Conf) ->
|
||||
%% TODO: quic support
|
||||
{error, no_found}.
|
||||
false.
|
||||
|
||||
current_conns(ID, ListenOn) ->
|
||||
{Type, Name} = parse_listener_id(ID),
|
||||
|
@ -164,20 +179,24 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
console_print(
|
||||
"Listener ~ts is NOT started due to: ~p~n.",
|
||||
[listener_id(Type, ListenerName), Reason]
|
||||
);
|
||||
),
|
||||
ok;
|
||||
{ok, _} ->
|
||||
console_print(
|
||||
"Listener ~ts on ~ts started.~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
||||
);
|
||||
),
|
||||
ok;
|
||||
{error, {already_started, Pid}} ->
|
||||
{error, {already_started, Pid}};
|
||||
{error, Reason} ->
|
||||
ListenerId = listener_id(Type, ListenerName),
|
||||
BindStr = format_addr(Bind),
|
||||
?ELOG(
|
||||
"Failed to start listener ~ts on ~ts: ~0p~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]
|
||||
[ListenerId, BindStr, Reason]
|
||||
),
|
||||
error(Reason)
|
||||
error({failed_to_start, ListenerId, BindStr, Reason})
|
||||
end.
|
||||
|
||||
%% @doc Restart all listeners
|
||||
|
@ -316,10 +335,16 @@ delete_authentication(Type, ListenerName, _Conf) ->
|
|||
post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
|
||||
#{added := Added, removed := Removed, changed := Updated} =
|
||||
diff_listeners(NewListeners, OldListeners),
|
||||
perform_listener_changes(fun stop_listener/3, Removed),
|
||||
perform_listener_changes(fun delete_authentication/3, Removed),
|
||||
perform_listener_changes(fun start_listener/3, Added),
|
||||
perform_listener_changes(fun restart_listener/3, Updated).
|
||||
try
|
||||
perform_listener_changes(fun stop_listener/3, Removed),
|
||||
perform_listener_changes(fun delete_authentication/3, Removed),
|
||||
perform_listener_changes(fun start_listener/3, Added),
|
||||
perform_listener_changes(fun restart_listener/3, Updated)
|
||||
catch error : {failed_to_start, ListenerId, Bind, Reason} ->
|
||||
Error = lists:flatten(io_lib:format("~ts(~ts) failed with ~ts",
|
||||
[ListenerId, Bind, element(1, Reason)])),
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
perform_listener_changes(Action, MapConfs) ->
|
||||
lists:foreach(
|
||||
|
@ -483,7 +508,7 @@ foreach_listeners(Do) ->
|
|||
fun({Type, LName, LConf}) ->
|
||||
Do(Type, LName, LConf)
|
||||
end,
|
||||
do_list()
|
||||
list()
|
||||
).
|
||||
|
||||
has_enabled_listener_conf_by_type(Type) ->
|
||||
|
@ -491,7 +516,7 @@ has_enabled_listener_conf_by_type(Type) ->
|
|||
fun({Type0, _LName, LConf}) when is_map(LConf) ->
|
||||
Type =:= Type0 andalso maps:get(enabled, LConf, true)
|
||||
end,
|
||||
do_list()
|
||||
list()
|
||||
).
|
||||
|
||||
apply_on_listener(ListenerId, Do) ->
|
||||
|
|
|
@ -74,11 +74,10 @@ global_chain_config() ->
|
|||
|
||||
listener_chain_configs() ->
|
||||
lists:map(
|
||||
fun({ListenerID, _}) ->
|
||||
{ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])}
|
||||
end,
|
||||
emqx_listeners:list()
|
||||
).
|
||||
fun({ListenerID, _, _}) ->
|
||||
{ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])}
|
||||
end,
|
||||
emqx_listeners:list()).
|
||||
|
||||
auth_config_path(ListenerID) ->
|
||||
[<<"listeners">>] ++
|
||||
|
|
|
@ -83,21 +83,6 @@
|
|||
, do_unsubscribe/2
|
||||
]).
|
||||
|
||||
%% Listeners
|
||||
-export([ do_list_listeners/0
|
||||
, list_listeners/0
|
||||
, list_listeners/1
|
||||
, list_listeners_by_id/1
|
||||
, get_listener/2
|
||||
, manage_listener/2
|
||||
, do_update_listener/2
|
||||
, update_listener/2
|
||||
, update_listener/3
|
||||
, do_remove_listener/1
|
||||
, remove_listener/1
|
||||
, remove_listener/2
|
||||
]).
|
||||
|
||||
%% Alarms
|
||||
-export([ get_alarms/1
|
||||
, get_alarms/2
|
||||
|
@ -434,80 +419,6 @@ do_unsubscribe(ClientId, Topic) ->
|
|||
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Listeners
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
do_list_listeners() ->
|
||||
[Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()].
|
||||
|
||||
list_listeners() ->
|
||||
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
|
||||
|
||||
list_listeners(Node) ->
|
||||
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
|
||||
|
||||
list_listeners_by_id(Id) ->
|
||||
listener_id_filter(Id, list_listeners()).
|
||||
|
||||
get_listener(Node, Id) ->
|
||||
case listener_id_filter(Id, list_listeners(Node)) of
|
||||
[] ->
|
||||
{error, not_found};
|
||||
[Listener] ->
|
||||
Listener
|
||||
end.
|
||||
|
||||
listener_id_filter(Id, Listeners) ->
|
||||
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
||||
lists:filter(Filter, Listeners).
|
||||
|
||||
-spec manage_listener( start_listener | stop_listener | restart_listener
|
||||
, #{id := atom(), node := node()}
|
||||
) -> ok | {error, Reason :: term()}.
|
||||
manage_listener(start_listener, #{id := ID, node := Node}) ->
|
||||
wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID));
|
||||
manage_listener(stop_listener, #{id := ID, node := Node}) ->
|
||||
wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID));
|
||||
manage_listener(restart_listener, #{id := ID, node := Node}) ->
|
||||
wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)).
|
||||
|
||||
-spec do_update_listener(string(), emqx_config:update_request()) ->
|
||||
map() | {error, _}.
|
||||
do_update_listener(Id, Config) ->
|
||||
case emqx_listeners:parse_listener_id(Id) of
|
||||
{error, {invalid_listener_id, Id}} ->
|
||||
{error, {invalid_listener_id, Id}};
|
||||
{Type, Name} ->
|
||||
case emqx:update_config([listeners, Type, Name], Config, #{}) of
|
||||
{ok, #{raw_config := RawConf}} ->
|
||||
RawConf#{node => node(), id => Id, running => true};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end
|
||||
end.
|
||||
|
||||
update_listener(Id, Config) ->
|
||||
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
|
||||
|
||||
update_listener(Node, Id, Config) ->
|
||||
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
|
||||
|
||||
remove_listener(Id) ->
|
||||
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
|
||||
|
||||
-spec do_remove_listener(string()) -> ok.
|
||||
do_remove_listener(Id) ->
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx:remove_config([listeners, Type, Name], #{}) of
|
||||
{ok, _} -> ok;
|
||||
{error, Reason} ->
|
||||
error(Reason)
|
||||
end.
|
||||
|
||||
remove_listener(Node, Id) ->
|
||||
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Get Alarms
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -18,372 +18,415 @@
|
|||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-export([api_spec/0]).
|
||||
-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
|
||||
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
|
||||
|
||||
-export([ list_listeners/2
|
||||
, crud_listeners_by_id/2
|
||||
, list_listeners_on_node/2
|
||||
, crud_listener_by_id_on_node/2
|
||||
, manage_listeners/2
|
||||
, jsonable_resp/2
|
||||
, action_listeners/2
|
||||
]).
|
||||
|
||||
-export([format/1]).
|
||||
%% for rpc call
|
||||
-export([ do_list_listeners/0
|
||||
, do_update_listener/2
|
||||
, do_remove_listener/1
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
-define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>).
|
||||
-define(NODE_NOT_FOUND_OR_DOWN, <<"Node not found or Down">>).
|
||||
-define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
|
||||
-define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
|
||||
-define(ADDR_PORT_INUSE, <<"Addr port in use">>).
|
||||
-define(CONFIG_SCHEMA_ERROR, <<"Config schema error">>).
|
||||
-define(INVALID_LISTENER_PROTOCOL, <<"Invalid listener type">>).
|
||||
-define(UPDATE_CONFIG_FAILED, <<"Update configuration failed">>).
|
||||
-define(OPERATION_FAILED, <<"Operation failed">>).
|
||||
|
||||
-define(OPTS(_Type_), #{rawconf_with_defaults => true, override_to => _Type_}).
|
||||
|
||||
namespace() -> "listeners".
|
||||
|
||||
api_spec() ->
|
||||
{
|
||||
[
|
||||
api_list_listeners(),
|
||||
api_list_update_listeners_by_id(),
|
||||
api_manage_listeners(),
|
||||
api_list_listeners_on_node(),
|
||||
api_get_update_listener_by_id_on_node(),
|
||||
api_manage_listeners_on_node()
|
||||
],
|
||||
[]
|
||||
}.
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
-define(TYPES_ATOM, [tcp, ssl, ws, wss, quic]).
|
||||
req_schema() ->
|
||||
Schema = [emqx_mgmt_api_configs:gen_schema(
|
||||
emqx:get_raw_config([listeners, T, default], #{}))
|
||||
|| T <- ?TYPES_ATOM],
|
||||
#{'oneOf' => Schema}.
|
||||
paths() ->
|
||||
[
|
||||
"/listeners",
|
||||
"/listeners/:id",
|
||||
"/listeners/:id/:action",
|
||||
"/nodes/:node/listeners",
|
||||
"/nodes/:node/listeners/:id",
|
||||
"/nodes/:node/listeners/:id/:action"
|
||||
].
|
||||
|
||||
resp_schema() ->
|
||||
#{'oneOf' := Schema} = req_schema(),
|
||||
AddMetadata = fun(Prop) ->
|
||||
Prop#{running => #{type => boolean},
|
||||
id => #{type => string},
|
||||
node => #{type => string}}
|
||||
end,
|
||||
Schema1 = [S#{properties => AddMetadata(Prop)}
|
||||
|| S = #{properties := Prop} <- Schema],
|
||||
#{'oneOf' => Schema1}.
|
||||
|
||||
api_list_listeners() ->
|
||||
Metadata = #{
|
||||
schema("/listeners") ->
|
||||
#{
|
||||
'operationId' => list_listeners,
|
||||
get => #{
|
||||
description => <<"List listeners from all nodes in the cluster">>,
|
||||
responses => #{
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:array_schema(resp_schema(),
|
||||
<<"List listeners successfully">>)}}},
|
||||
{"/listeners", Metadata, list_listeners}.
|
||||
|
||||
api_list_update_listeners_by_id() ->
|
||||
Metadata = #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"List all running node's listeners.">>,
|
||||
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))}
|
||||
}
|
||||
};
|
||||
schema("/listeners/:id") ->
|
||||
#{
|
||||
'operationId' => crud_listeners_by_id,
|
||||
get => #{
|
||||
description => <<"List listeners by a given Id from all nodes in the cluster">>,
|
||||
parameters => [param_path_id()],
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"List all running node's listeners for the specified id.">>,
|
||||
parameters => [?R_REF(listener_id)],
|
||||
responses => #{
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}},
|
||||
200 => ?HOCON(?ARRAY(?R_REF(listeners)))
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
description =>
|
||||
<<"Create or update a listener by a given Id to all nodes in the cluster">>,
|
||||
parameters => [param_path_id()],
|
||||
'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Create or update the specified listener on all nodes.">>,
|
||||
parameters => [?R_REF(listener_id)],
|
||||
'requestBody' => ?HOCON(listener_schema(), #{}),
|
||||
responses => #{
|
||||
<<"400">> =>
|
||||
emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED,
|
||||
['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
||||
<<"500">> =>
|
||||
emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:array_schema(resp_schema(),
|
||||
<<"Create or update listener successfully">>)}},
|
||||
200 => ?HOCON(listener_schema(), #{}),
|
||||
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
|
||||
}
|
||||
},
|
||||
delete => #{
|
||||
description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
|
||||
parameters => [param_path_id()],
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Delete the specified listener on all nodes.">>,
|
||||
parameters => [?R_REF(listener_id)],
|
||||
responses => #{
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
||||
<<"204">> =>
|
||||
emqx_mgmt_util:schema(<<"Delete listener successfully">>)}}
|
||||
},
|
||||
{"/listeners/:id", Metadata, crud_listeners_by_id}.
|
||||
|
||||
api_list_listeners_on_node() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"List listeners in one node">>,
|
||||
parameters => [param_path_node()],
|
||||
responses => #{
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?NODE_NOT_FOUND_OR_DOWN, ['RESOURCE_NOT_FOUND']),
|
||||
<<"500">> =>
|
||||
emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:schema(resp_schema(), <<"List listeners successfully">>)}}},
|
||||
{"/nodes/:node/listeners", Metadata, list_listeners_on_node}.
|
||||
|
||||
api_get_update_listener_by_id_on_node() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"Get a listener by a given Id on a specific node">>,
|
||||
parameters => [param_path_node(), param_path_id()],
|
||||
responses => #{
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
|
||||
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}},
|
||||
put => #{
|
||||
description => <<"Create or update a listener by a given Id on a specific node">>,
|
||||
parameters => [param_path_node(), param_path_id()],
|
||||
'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
|
||||
responses => #{
|
||||
<<"400">> =>
|
||||
emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED,
|
||||
['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
|
||||
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
|
||||
<<"500">> =>
|
||||
emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}},
|
||||
delete => #{
|
||||
description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
|
||||
parameters => [param_path_node(), param_path_id()],
|
||||
responses => #{
|
||||
<<"404">> =>
|
||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
||||
<<"204">> =>
|
||||
emqx_mgmt_util:schema(<<"Delete listener successfully">>)}}
|
||||
},
|
||||
{"/nodes/:node/listeners/:id", Metadata, crud_listener_by_id_on_node}.
|
||||
|
||||
api_manage_listeners() ->
|
||||
Metadata = #{
|
||||
204 => <<"Listener deleted">>,
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/listeners/:id/:action") ->
|
||||
#{
|
||||
'operationId' => action_listeners,
|
||||
post => #{
|
||||
description => <<"Restart listeners on all nodes in the cluster">>,
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Start/stop/restart listeners on all nodes.">>,
|
||||
parameters => [
|
||||
param_path_id(),
|
||||
param_path_operation()],
|
||||
?R_REF(listener_id),
|
||||
?R_REF(action)],
|
||||
responses => #{
|
||||
<<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
||||
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
|
||||
{"/listeners/:id/operation/:operation", Metadata, manage_listeners}.
|
||||
|
||||
api_manage_listeners_on_node() ->
|
||||
Metadata = #{
|
||||
200 => <<"Updated">>,
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/nodes/:node/listeners") ->
|
||||
#{
|
||||
'operationId' => list_listeners_on_node,
|
||||
get => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"List all listeners on the specified node.">>,
|
||||
parameters => [?R_REF(node)],
|
||||
responses => #{
|
||||
200 => ?HOCON(?ARRAY(listener_schema())),
|
||||
400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN)
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/nodes/:node/listeners/:id") ->
|
||||
#{
|
||||
'operationId' => crud_listener_by_id_on_node,
|
||||
get => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Get the specified listener on the specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(listener_id),
|
||||
?R_REF(node)],
|
||||
responses => #{
|
||||
200 => ?HOCON(listener_schema()),
|
||||
400 => error_codes(['BAD_REQUEST']),
|
||||
404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND)
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
description => <<"Restart listeners on all nodes in the cluster">>,
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Create or update the specified listener on the specified node.">>,
|
||||
parameters => [
|
||||
param_path_node(),
|
||||
param_path_id(),
|
||||
param_path_operation()],
|
||||
?R_REF(listener_id),
|
||||
?R_REF(node)],
|
||||
'requestBody' => ?HOCON(listener_schema()),
|
||||
responses => #{
|
||||
<<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
||||
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
|
||||
{"/nodes/:node/listeners/:id/operation/:operation", Metadata, manage_listeners}.
|
||||
|
||||
%%%==============================================================================================
|
||||
%% parameters
|
||||
param_path_node() ->
|
||||
200 => ?HOCON(listener_schema()),
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
}},
|
||||
delete => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Delete the specified listener on the specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(listener_id),
|
||||
?R_REF(node)],
|
||||
responses => #{
|
||||
204 => <<"Listener deleted">>,
|
||||
400 => error_codes(['BAD_REQUEST'])}
|
||||
}
|
||||
};
|
||||
schema("/nodes/:node/listeners/:id/:action") ->
|
||||
#{
|
||||
name => node,
|
||||
in => path,
|
||||
schema => #{type => string},
|
||||
required => true,
|
||||
example => node()
|
||||
'operationId' => action_listeners,
|
||||
post => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Start/stop/restart listeners on a specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(node),
|
||||
?R_REF(listener_id),
|
||||
?R_REF(action)],
|
||||
responses => #{
|
||||
200 => <<"Updated">>,
|
||||
400 => error_codes(['BAD_REQUEST'])}
|
||||
}
|
||||
}.
|
||||
|
||||
param_path_id() ->
|
||||
#{
|
||||
name => id,
|
||||
in => path,
|
||||
schema => #{type => string, example => emqx_listeners:id_example()},
|
||||
required => true
|
||||
}.
|
||||
fields(listeners) ->
|
||||
[
|
||||
{"node", ?HOCON(atom(), #{
|
||||
desc => "Node name",
|
||||
example => "emqx@127.0.0.1",
|
||||
required => true})
|
||||
},
|
||||
{"listeners", ?ARRAY(listener_schema())}
|
||||
];
|
||||
fields(listener_id) ->
|
||||
[
|
||||
{id, ?HOCON(atom(), #{
|
||||
desc => "Listener id",
|
||||
example => 'tcp:default',
|
||||
validator => fun validate_id/1,
|
||||
in => path})
|
||||
}
|
||||
];
|
||||
fields(action) ->
|
||||
[
|
||||
{action, ?HOCON(?ENUM([start, stop, restart]), #{
|
||||
desc => "listener action",
|
||||
example => start,
|
||||
in => path})
|
||||
}
|
||||
];
|
||||
fields(node) ->
|
||||
[
|
||||
{"node", ?HOCON(atom(), #{
|
||||
desc => "Node name",
|
||||
example => "emqx@127.0.0.1",
|
||||
in => path})
|
||||
}
|
||||
];
|
||||
fields(Type) ->
|
||||
Listeners = listeners_info(),
|
||||
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
||||
Schema.
|
||||
|
||||
param_path_operation()->
|
||||
#{
|
||||
name => operation,
|
||||
in => path,
|
||||
required => true,
|
||||
schema => #{
|
||||
type => string,
|
||||
enum => [start, stop, restart]},
|
||||
example => restart
|
||||
}.
|
||||
listener_schema() ->
|
||||
?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())).
|
||||
|
||||
listeners_info() ->
|
||||
Listeners = hocon_schema:fields(emqx_schema, "listeners"),
|
||||
lists:map(fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
|
||||
Fields0 = hocon_schema:fields(Mod, Field),
|
||||
Fields1 = lists:keydelete("authentication", 1, Fields0),
|
||||
TypeAtom = list_to_existing_atom(Type),
|
||||
#{ref => ?R_REF(TypeAtom),
|
||||
schema => [
|
||||
{type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})},
|
||||
{running, ?HOCON(boolean(), #{desc => "Listener status", required => false})},
|
||||
{id, ?HOCON(atom(), #{desc => "Listener id", required => true,
|
||||
validator => fun validate_id/1})}
|
||||
| Fields1]
|
||||
}
|
||||
end, Listeners).
|
||||
|
||||
validate_id(Id) ->
|
||||
case emqx_listeners:parse_listener_id(Id) of
|
||||
{error, Reason} -> {error, Reason};
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
%%%==============================================================================================
|
||||
%% api
|
||||
list_listeners(get, _Request) ->
|
||||
{200, format(emqx_mgmt:list_listeners())}.
|
||||
{200, list_listeners()}.
|
||||
|
||||
crud_listeners_by_id(get, #{bindings := #{id := Id}}) ->
|
||||
case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(),
|
||||
atom_to_binary(Id0, latin1) =:= Id] of
|
||||
[] ->
|
||||
{400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}};
|
||||
Listeners ->
|
||||
{200, format(Listeners)}
|
||||
{200, list_listeners_by_id(Id)};
|
||||
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
|
||||
case parse_listener_conf(Body0) of
|
||||
{Id, Type, Name, Conf} ->
|
||||
case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
|
||||
{ok, #{raw_config := _RawConf}} ->
|
||||
crud_listeners_by_id(get, #{bindings => #{id => Id}});
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
_ ->
|
||||
{400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
|
||||
end;
|
||||
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) ->
|
||||
Results = format(emqx_mgmt:update_listener(Id, Conf)),
|
||||
case lists:filter(fun filter_errors/1, Results) of
|
||||
[{error, {invalid_listener_id, Id}} | _] ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}};
|
||||
[{error, {emqx_conf_schema, _}} | _] ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}};
|
||||
[{error, {eaddrinuse, _}} | _] ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
||||
[{error, Reason} | _] ->
|
||||
{500, #{code => 'UNKNOWN_ERROR', message => err_msg(Reason)}};
|
||||
[] ->
|
||||
{200, Results}
|
||||
end;
|
||||
|
||||
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
|
||||
Results = emqx_mgmt:remove_listener(Id),
|
||||
case lists:filter(fun filter_errors/1, Results) of
|
||||
[] -> {204};
|
||||
Errors -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Errors)}}
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of
|
||||
{ok, _} -> {204};
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end.
|
||||
|
||||
parse_listener_conf(Conf0) ->
|
||||
Conf1 = maps:remove(<<"running">>, Conf0),
|
||||
{IdBin, Conf2} = maps:take(<<"id">>, Conf1),
|
||||
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(IdBin),
|
||||
TypeAtom = binary_to_existing_atom(TypeBin),
|
||||
case Type =:= TypeAtom of
|
||||
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
|
||||
false -> {error, listener_type_inconsistent}
|
||||
end.
|
||||
|
||||
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
|
||||
case emqx_mgmt:list_listeners(atom(Node)) of
|
||||
case list_listeners(Node) of
|
||||
{error, nodedown} ->
|
||||
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||
{400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
|
||||
Listener ->
|
||||
{200, format(Listener)}
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
#{<<"listeners">> := Listener} ->
|
||||
{200, Listener}
|
||||
end.
|
||||
|
||||
crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
|
||||
case emqx_mgmt:get_listener(atom(Node), atom(Id)) of
|
||||
case get_listener(Node, Id) of
|
||||
{error, not_found} ->
|
||||
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}};
|
||||
{404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
Listener ->
|
||||
{200, format(Listener)}
|
||||
{200, Listener}
|
||||
end;
|
||||
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Conf}) ->
|
||||
case emqx_mgmt:update_listener(atom(Node), Id, Conf) of
|
||||
{error, nodedown} ->
|
||||
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||
{error, {invalid_listener_id, _}} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}};
|
||||
{error, {emqx_conf_schema, _}} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}};
|
||||
{error, {eaddrinuse, _}} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
|
||||
Listener ->
|
||||
{200, format(Listener)}
|
||||
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) ->
|
||||
case parse_listener_conf(Body) of
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
{Id, Type, _Name, Conf} ->
|
||||
case update_listener(Node, Id, Conf) of
|
||||
{error, nodedown} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||
{error, {eaddrinuse, _}} -> %% TODO
|
||||
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
{ok, Listener} ->
|
||||
{200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}}
|
||||
end;
|
||||
_ -> {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}}
|
||||
end;
|
||||
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
|
||||
case emqx_mgmt:remove_listener(atom(Node), Id) of
|
||||
case remove_listener(Node, Id) of
|
||||
ok -> {204};
|
||||
{error, Reason} -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end.
|
||||
|
||||
manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) ->
|
||||
{_, Result} = do_manage_listeners(Node, Id, Oper),
|
||||
action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) ->
|
||||
{_, Result} = action_listeners(Node, Id, Action),
|
||||
Result;
|
||||
|
||||
manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) ->
|
||||
Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()],
|
||||
action_listeners(post, #{bindings := #{id := Id, action := Action}}) ->
|
||||
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
|
||||
case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of
|
||||
[] -> {200};
|
||||
Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}
|
||||
Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}}
|
||||
end.
|
||||
|
||||
%%%==============================================================================================
|
||||
%% util functions
|
||||
|
||||
do_manage_listeners(Node, Id, Oper) ->
|
||||
Param = #{node => atom(Node), id => atom(Id)},
|
||||
{Node, do_manage_listeners2(Oper, Param)}.
|
||||
action_listeners(Node, Id, Action) ->
|
||||
{Node, do_action_listeners(Action, Node, Id)}.
|
||||
|
||||
do_manage_listeners2(<<"start">>, Param) ->
|
||||
case emqx_mgmt:manage_listener(start_listener, Param) of
|
||||
do_action_listeners(start, Node, Id) ->
|
||||
case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of
|
||||
ok -> {200};
|
||||
{error, {already_started, _}} -> {200};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end;
|
||||
do_manage_listeners2(<<"stop">>, Param) ->
|
||||
case emqx_mgmt:manage_listener(stop_listener, Param) of
|
||||
do_action_listeners(stop, Node, Id) ->
|
||||
case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of
|
||||
ok -> {200};
|
||||
{error, not_found} -> {200};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end;
|
||||
do_manage_listeners2(<<"restart">>, Param) ->
|
||||
case emqx_mgmt:manage_listener(restart_listener, Param) of
|
||||
do_action_listeners(restart, Node, Id) ->
|
||||
case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of
|
||||
ok -> {200};
|
||||
{error, not_found} -> do_manage_listeners2(<<"start">>, Param);
|
||||
{error, not_found} -> do_action_listeners(start, Node, Id);
|
||||
{error, Reason} ->
|
||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end.
|
||||
|
||||
manage_listeners_err(Errors) ->
|
||||
action_listeners_err(Errors) ->
|
||||
list_to_binary(lists:foldl(fun({Node, Err}, Str) ->
|
||||
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
|
||||
end, "", Errors)).
|
||||
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
|
||||
end, "", Errors)).
|
||||
|
||||
format(Listeners) when is_list(Listeners) ->
|
||||
[format(Listener) || Listener <- Listeners];
|
||||
|
||||
format({error, Reason}) ->
|
||||
{error, Reason};
|
||||
|
||||
format(#{node := _Node, id := _Id} = Conf) when is_map(Conf) ->
|
||||
emqx_map_lib:jsonable_map(Conf#{
|
||||
running => trans_running(Conf)
|
||||
}, fun ?MODULE:jsonable_resp/2).
|
||||
|
||||
trans_running(Conf) ->
|
||||
case maps:get(running, Conf) of
|
||||
{error, _} ->
|
||||
false;
|
||||
Running ->
|
||||
Running
|
||||
end.
|
||||
|
||||
filter_errors({error, _}) ->
|
||||
true;
|
||||
filter_errors(_) ->
|
||||
false.
|
||||
|
||||
jsonable_resp(bind, Port) when is_integer(Port) ->
|
||||
{bind, Port};
|
||||
jsonable_resp(bind, {Addr, Port}) when is_tuple(Addr); is_integer(Port)->
|
||||
{bind, inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port)};
|
||||
jsonable_resp(user_lookup_fun, _) ->
|
||||
drop;
|
||||
jsonable_resp(K, V) ->
|
||||
{K, V}.
|
||||
|
||||
atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
|
||||
atom(S) when is_list(S) -> list_to_atom(S);
|
||||
atom(A) when is_atom(A) -> A.
|
||||
|
||||
err_msg(Reason) ->
|
||||
list_to_binary(err_msg_str(Reason)).
|
||||
err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom);
|
||||
err_msg(Reason) -> list_to_binary(err_msg_str(Reason)).
|
||||
|
||||
err_msg_str(Reason) ->
|
||||
io_lib:format("~p", [Reason]).
|
||||
|
||||
list_listeners() ->
|
||||
[list_listeners(Node) || Node <- mria_mnesia:running_nodes()].
|
||||
|
||||
list_listeners(Node) ->
|
||||
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
|
||||
|
||||
list_listeners_by_id(Id) ->
|
||||
listener_id_filter(Id, list_listeners()).
|
||||
|
||||
get_listener(Node, Id) ->
|
||||
case listener_id_filter(Id, [list_listeners(Node)]) of
|
||||
[#{<<"listeners">> := []}] -> {error, not_found};
|
||||
[#{<<"listeners">> := [Listener]}] -> Listener
|
||||
end.
|
||||
|
||||
listener_id_filter(Id, Listeners) ->
|
||||
lists:map(fun(Conf = #{<<"listeners">> := Listeners0}) ->
|
||||
Conf#{<<"listeners">> =>
|
||||
[C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]}
|
||||
end, Listeners).
|
||||
|
||||
update_listener(Node, Id, Config) ->
|
||||
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
|
||||
|
||||
remove_listener(Node, Id) ->
|
||||
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
|
||||
|
||||
-spec do_list_listeners() -> map().
|
||||
do_list_listeners() ->
|
||||
Listeners = [Conf#{<<"id">> => Id, <<"type">> => Type}
|
||||
|| {Id, Type, Conf} <- emqx_listeners:list_raw()],
|
||||
#{
|
||||
<<"node">> => node(),
|
||||
<<"listeners">> => Listeners
|
||||
}.
|
||||
|
||||
-spec do_update_listener(string(), emqx_config:update_request()) ->
|
||||
{ok, map()} | {error, _}.
|
||||
do_update_listener(Id, Config) ->
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of
|
||||
{ok, #{raw_config := RawConf}} -> {ok, RawConf};
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
-spec do_remove_listener(string()) -> ok.
|
||||
do_remove_listener(Id) ->
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of
|
||||
{ok, _} -> ok;
|
||||
{error, Reason} -> error(Reason)
|
||||
end.
|
||||
|
||||
wrap_rpc({badrpc, Reason}) ->
|
||||
{error, Reason};
|
||||
wrap_rpc(Res) ->
|
||||
Res.
|
||||
|
|
|
@ -518,7 +518,7 @@ trace_type(_, _) -> error.
|
|||
%% @doc Listeners Command
|
||||
|
||||
listeners([]) ->
|
||||
lists:foreach(fun({ID, Conf}) ->
|
||||
lists:foreach(fun({ID, _Name, Conf}) ->
|
||||
{Host, Port} = maps:get(bind, Conf),
|
||||
Acceptors = maps:get(acceptors, Conf),
|
||||
ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
|
||||
|
|
|
@ -55,16 +55,16 @@ list_subscriptions(Node) ->
|
|||
|
||||
-spec list_listeners(node()) -> [map()] | {badrpc, _}.
|
||||
list_listeners(Node) ->
|
||||
rpc:call(Node, emqx_mgmt, do_list_listeners, []).
|
||||
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
|
||||
|
||||
-spec remove_listener(node(), string()) -> ok | {badrpc, _}.
|
||||
remove_listener(Node, Id) ->
|
||||
rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]).
|
||||
rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]).
|
||||
|
||||
-spec update_listener(node(), string(), emqx_config:update_request()) ->
|
||||
map() | {error, _} | {badrpc, _}.
|
||||
update_listener(Node, Id, Config) ->
|
||||
rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]).
|
||||
rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]).
|
||||
|
||||
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
||||
|
|
|
@ -24,53 +24,131 @@ all() ->
|
|||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_) ->
|
||||
emqx_mgmt_api_test_util:end_suite().
|
||||
emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}),
|
||||
emqx_conf:remove([listeners, tcp, new1], #{override_to => local}),
|
||||
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
|
||||
|
||||
t_list_listeners(_) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
|
||||
get_api(Path).
|
||||
Res = request(get, Path, [], []),
|
||||
Expect = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||
?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)),
|
||||
ok.
|
||||
|
||||
t_list_node_listeners(_) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]),
|
||||
get_api(Path).
|
||||
t_crud_listeners_by_id(_) ->
|
||||
TcpListenerId = <<"tcp:default">>,
|
||||
NewListenerId = <<"tcp:new">>,
|
||||
TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]),
|
||||
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
|
||||
[#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []),
|
||||
?assertEqual(atom_to_binary(node()), Node),
|
||||
|
||||
t_get_listeners(_) ->
|
||||
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
|
||||
ID = maps:get(id, LocalListener),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(ID)]),
|
||||
get_api(Path).
|
||||
%% create
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||
[#{<<"listeners">> := [Create]}] = request(put, NewPath, [], TcpListener#{
|
||||
<<"id">> => NewListenerId,
|
||||
<<"bind">> => <<"0.0.0.0:2883">>
|
||||
}),
|
||||
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||
[#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []),
|
||||
?assertMatch(Create, Get1),
|
||||
?assert(is_running(NewListenerId)),
|
||||
|
||||
t_get_node_listeners(_) ->
|
||||
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
|
||||
ID = maps:get(id, LocalListener),
|
||||
Path = emqx_mgmt_api_test_util:api_path(
|
||||
["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(ID)]),
|
||||
get_api(Path).
|
||||
%% update
|
||||
#{<<"acceptors">> := Acceptors} = Create,
|
||||
Acceptors1 = Acceptors + 10,
|
||||
[#{<<"listeners">> := [Update]}] =
|
||||
request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
|
||||
[#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
|
||||
|
||||
t_manage_listener(_) ->
|
||||
%% delete
|
||||
?assertEqual([], delete(NewPath)),
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||
?assertEqual([], delete(NewPath)),
|
||||
ok.
|
||||
|
||||
t_list_listeners_on_node(_) ->
|
||||
Node = atom_to_list(node()),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]),
|
||||
Listeners = request(get, Path, [], []),
|
||||
#{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||
?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)),
|
||||
ok.
|
||||
|
||||
t_crud_listener_by_id_on_node(_) ->
|
||||
TcpListenerId = <<"tcp:default">>,
|
||||
NewListenerId = <<"tcp:new1">>,
|
||||
Node = atom_to_list(node()),
|
||||
TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]),
|
||||
NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]),
|
||||
TcpListener = request(get, TcpPath, [], []),
|
||||
|
||||
%% create
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
|
||||
Create = request(put, NewPath, [], TcpListener#{
|
||||
<<"id">> => NewListenerId,
|
||||
<<"bind">> => <<"0.0.0.0:3883">>
|
||||
}),
|
||||
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||
Get1 = request(get, NewPath, [], []),
|
||||
?assertMatch(Create, Get1),
|
||||
?assert(is_running(NewListenerId)),
|
||||
|
||||
%% update
|
||||
#{<<"acceptors">> := Acceptors} = Create,
|
||||
Acceptors1 = Acceptors + 10,
|
||||
Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
|
||||
Get2 = request(get, NewPath, [], []),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
|
||||
|
||||
%% delete
|
||||
?assertEqual([], delete(NewPath)),
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
|
||||
?assertEqual([], delete(NewPath)),
|
||||
ok.
|
||||
|
||||
t_action_listeners(_) ->
|
||||
ID = "tcp:default",
|
||||
manage_listener(ID, "stop", false),
|
||||
manage_listener(ID, "start", true),
|
||||
manage_listener(ID, "restart", true).
|
||||
action_listener(ID, "stop", false),
|
||||
action_listener(ID, "start", true),
|
||||
action_listener(ID, "restart", true).
|
||||
|
||||
manage_listener(ID, Operation, Running) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, "operation", Operation]),
|
||||
action_listener(ID, Action, Running) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, Action]),
|
||||
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path),
|
||||
timer:sleep(500),
|
||||
GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]),
|
||||
{ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath),
|
||||
Listeners = emqx_json:decode(ListenersResponse, [return_maps]),
|
||||
[#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []),
|
||||
[listener_stats(Listener, Running) || Listener <- Listeners].
|
||||
|
||||
request(Method, Url, QueryParams, Body) ->
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
case emqx_mgmt_api_test_util:request_api(Method, Url, QueryParams, AuthHeader, Body) of
|
||||
{ok, Res} -> emqx_json:decode(Res, [return_maps]);
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
delete(Url) ->
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
{ok, Res} = emqx_mgmt_api_test_util:request_api(delete, Url, AuthHeader),
|
||||
Res.
|
||||
|
||||
get_api(Path) ->
|
||||
{ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path),
|
||||
LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()),
|
||||
case emqx_json:decode(ListenersData, [return_maps]) of
|
||||
[Listener] ->
|
||||
[#{<<"node">> := _, <<"listeners">> := [Listener]}] ->
|
||||
ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8),
|
||||
Filter =
|
||||
fun(Local) ->
|
||||
|
@ -78,7 +156,7 @@ get_api(Path) ->
|
|||
end,
|
||||
LocalListener = hd(lists:filter(Filter, LocalListeners)),
|
||||
comparison_listener(LocalListener, Listener);
|
||||
Listeners when is_list(Listeners) ->
|
||||
[#{<<"node">> := _, <<"listeners">> := Listeners}] when is_list(Listeners) ->
|
||||
?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)),
|
||||
Fun =
|
||||
fun(LocalListener) ->
|
||||
|
@ -111,3 +189,6 @@ comparison_listener(Local, Response) ->
|
|||
|
||||
listener_stats(Listener, ExpectedStats) ->
|
||||
?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)).
|
||||
|
||||
is_running(Id) ->
|
||||
emqx_listeners:is_running(Id).
|
||||
|
|
Loading…
Reference in New Issue