diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 6e1c1e151..b86dd0b48 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -24,6 +24,7 @@ %% APIs -export([ + list_raw/0, list/0, start/0, restart/0, @@ -57,50 +58,64 @@ -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). -spec id_example() -> atom(). -id_example() -> - id_example(list()). - -id_example([]) -> - {ID, _} = hd(list()), - ID; -id_example([{'tcp:default', _} | _]) -> - 'tcp:default'; -id_example([_ | Listeners]) -> - id_example(Listeners). +id_example() -> 'tcp:default'. %% @doc List configured listeners. --spec list() -> [{ListenerId :: atom(), ListenerConf :: map()}]. +-spec list_raw() -> [{ListenerId :: atom(), Type :: atom(), ListenerConf :: map()}]. +list_raw() -> + [{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()]. + list() -> - [{listener_id(Type, LName), LConf} || {Type, LName, LConf} <- do_list()]. - -do_list() -> Listeners = maps:to_list(emqx:get_config([listeners], #{})), - lists:append([list(Type, maps:to_list(Conf)) || {Type, Conf} <- Listeners]). + lists:flatmap(fun format_list/1, Listeners). -list(Type, Conf) -> +format_list(Listener) -> + {Type, Conf} = Listener, [ begin Running = is_running(Type, listener_id(Type, LName), LConf), {Type, LName, maps:put(running, Running, LConf)} end - || {LName, LConf} <- Conf, is_map(LConf) + || {LName, LConf} <- maps:to_list(Conf), is_map(LConf) ]. --spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}. +do_list_raw() -> + Key = <<"listeners">>, + Raw = emqx_config:get_raw([Key], #{}), + SchemaMod = emqx_config:get_schema_mod(Key), + #{Key := RawWithDefault} = emqx_config:fill_defaults(SchemaMod, #{Key => Raw}), + Listeners = maps:to_list(RawWithDefault), + lists:flatmap(fun format_raw_listeners/1, Listeners). + +format_raw_listeners({Type, Conf}) -> + lists:map( + fun({LName, LConf0}) when is_map(LConf0) -> + Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0), + LConf1 = maps:remove(<<"authentication">>, LConf0), + LConf2 = maps:put(<<"running">>, Running, LConf1), + {Type, LName, LConf2} + end, maps:to_list(Conf)). + +-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> - case - lists:filtermap( - fun({_Type, Id, #{running := IsRunning}}) -> - Id =:= ListenerId andalso {true, IsRunning} - end, - do_list() - ) + {Type, Name} = parse_listener_id(ListenerId), + case [ Running || {Type0, Name0, #{running := Running}} <- list(), + Type0 =:= Type, Name0 =:= Name] of - [IsRunning] -> IsRunning; - [] -> {error, not_found} + [] -> {error, not_found}; + [IsRunning] -> IsRunning end. -is_running(Type, ListenerId, #{bind := ListenOn}) when Type =:= tcp; Type =:= ssl -> +is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> + ListenOn = + case Conf of + #{bind := Bind} -> Bind; + #{<<"bind">> := Bind} -> + case emqx_schema:to_ip_port(binary_to_list(Bind)) of + {ok, L} -> L; + {error, _} -> binary_to_integer(Bind) + end + end, try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> true @@ -118,7 +133,7 @@ is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss -> end; is_running(quic, _ListenerId, _Conf) -> %% TODO: quic support - {error, no_found}. + false. current_conns(ID, ListenOn) -> {Type, Name} = parse_listener_id(ID), @@ -164,20 +179,24 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> console_print( "Listener ~ts is NOT started due to: ~p~n.", [listener_id(Type, ListenerName), Reason] - ); + ), + ok; {ok, _} -> console_print( "Listener ~ts on ~ts started.~n", [listener_id(Type, ListenerName), format_addr(Bind)] - ); + ), + ok; {error, {already_started, Pid}} -> {error, {already_started, Pid}}; {error, Reason} -> + ListenerId = listener_id(Type, ListenerName), + BindStr = format_addr(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p~n", - [listener_id(Type, ListenerName), format_addr(Bind), Reason] + [ListenerId, BindStr, Reason] ), - error(Reason) + error({failed_to_start, ListenerId, BindStr, Reason}) end. %% @doc Restart all listeners @@ -316,10 +335,16 @@ delete_authentication(Type, ListenerName, _Conf) -> post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = diff_listeners(NewListeners, OldListeners), - perform_listener_changes(fun stop_listener/3, Removed), - perform_listener_changes(fun delete_authentication/3, Removed), - perform_listener_changes(fun start_listener/3, Added), - perform_listener_changes(fun restart_listener/3, Updated). + try + perform_listener_changes(fun stop_listener/3, Removed), + perform_listener_changes(fun delete_authentication/3, Removed), + perform_listener_changes(fun start_listener/3, Added), + perform_listener_changes(fun restart_listener/3, Updated) + catch error : {failed_to_start, ListenerId, Bind, Reason} -> + Error = lists:flatten(io_lib:format("~ts(~ts) failed with ~ts", + [ListenerId, Bind, element(1, Reason)])), + {error, Error} + end. perform_listener_changes(Action, MapConfs) -> lists:foreach( @@ -483,7 +508,7 @@ foreach_listeners(Do) -> fun({Type, LName, LConf}) -> Do(Type, LName, LConf) end, - do_list() + list() ). has_enabled_listener_conf_by_type(Type) -> @@ -491,7 +516,7 @@ has_enabled_listener_conf_by_type(Type) -> fun({Type0, _LName, LConf}) when is_map(LConf) -> Type =:= Type0 andalso maps:get(enabled, LConf, true) end, - do_list() + list() ). apply_on_listener(ListenerId, Do) -> diff --git a/apps/emqx_authn/src/emqx_authn_app.erl b/apps/emqx_authn/src/emqx_authn_app.erl index f761bfe33..89b634d02 100644 --- a/apps/emqx_authn/src/emqx_authn_app.erl +++ b/apps/emqx_authn/src/emqx_authn_app.erl @@ -74,11 +74,10 @@ global_chain_config() -> listener_chain_configs() -> lists:map( - fun({ListenerID, _}) -> - {ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])} - end, - emqx_listeners:list() - ). + fun({ListenerID, _, _}) -> + {ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])} + end, + emqx_listeners:list()). auth_config_path(ListenerID) -> [<<"listeners">>] ++ diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index e9a154423..8261808da 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -83,21 +83,6 @@ , do_unsubscribe/2 ]). -%% Listeners --export([ do_list_listeners/0 - , list_listeners/0 - , list_listeners/1 - , list_listeners_by_id/1 - , get_listener/2 - , manage_listener/2 - , do_update_listener/2 - , update_listener/2 - , update_listener/3 - , do_remove_listener/1 - , remove_listener/1 - , remove_listener/2 - ]). - %% Alarms -export([ get_alarms/1 , get_alarms/2 @@ -434,80 +419,6 @@ do_unsubscribe(ClientId, Topic) -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} end. -%%-------------------------------------------------------------------- -%% Listeners -%%-------------------------------------------------------------------- - -do_list_listeners() -> - [Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()]. - -list_listeners() -> - lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). - -list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). - -list_listeners_by_id(Id) -> - listener_id_filter(Id, list_listeners()). - -get_listener(Node, Id) -> - case listener_id_filter(Id, list_listeners(Node)) of - [] -> - {error, not_found}; - [Listener] -> - Listener - end. - -listener_id_filter(Id, Listeners) -> - Filter = fun(#{id := Id0}) -> Id0 =:= Id end, - lists:filter(Filter, Listeners). - --spec manage_listener( start_listener | stop_listener | restart_listener - , #{id := atom(), node := node()} - ) -> ok | {error, Reason :: term()}. -manage_listener(start_listener, #{id := ID, node := Node}) -> - wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID)); -manage_listener(stop_listener, #{id := ID, node := Node}) -> - wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID)); -manage_listener(restart_listener, #{id := ID, node := Node}) -> - wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)). - --spec do_update_listener(string(), emqx_config:update_request()) -> - map() | {error, _}. -do_update_listener(Id, Config) -> - case emqx_listeners:parse_listener_id(Id) of - {error, {invalid_listener_id, Id}} -> - {error, {invalid_listener_id, Id}}; - {Type, Name} -> - case emqx:update_config([listeners, Type, Name], Config, #{}) of - {ok, #{raw_config := RawConf}} -> - RawConf#{node => node(), id => Id, running => true}; - {error, Reason} -> - {error, Reason} - end - end. - -update_listener(Id, Config) -> - [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. - -update_listener(Node, Id, Config) -> - wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). - -remove_listener(Id) -> - [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()]. - --spec do_remove_listener(string()) -> ok. -do_remove_listener(Id) -> - {Type, Name} = emqx_listeners:parse_listener_id(Id), - case emqx:remove_config([listeners, Type, Name], #{}) of - {ok, _} -> ok; - {error, Reason} -> - error(Reason) - end. - -remove_listener(Node, Id) -> - wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). - %%-------------------------------------------------------------------- %% Get Alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 606705187..6b3620517 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -18,372 +18,415 @@ -behaviour(minirest_api). --export([api_spec/0]). +-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). +-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). -export([ list_listeners/2 , crud_listeners_by_id/2 , list_listeners_on_node/2 , crud_listener_by_id_on_node/2 - , manage_listeners/2 - , jsonable_resp/2 + , action_listeners/2 ]). --export([format/1]). +%% for rpc call +-export([ do_list_listeners/0 + , do_update_listener/2 + , do_remove_listener/1 + ]). -include_lib("emqx/include/emqx.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). -define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>). -define(NODE_NOT_FOUND_OR_DOWN, <<"Node not found or Down">>). -define(LISTENER_NOT_FOUND, <<"Listener id not found">>). +-define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>). -define(ADDR_PORT_INUSE, <<"Addr port in use">>). --define(CONFIG_SCHEMA_ERROR, <<"Config schema error">>). --define(INVALID_LISTENER_PROTOCOL, <<"Invalid listener type">>). --define(UPDATE_CONFIG_FAILED, <<"Update configuration failed">>). --define(OPERATION_FAILED, <<"Operation failed">>). + +-define(OPTS(_Type_), #{rawconf_with_defaults => true, override_to => _Type_}). + +namespace() -> "listeners". api_spec() -> - { - [ - api_list_listeners(), - api_list_update_listeners_by_id(), - api_manage_listeners(), - api_list_listeners_on_node(), - api_get_update_listener_by_id_on_node(), - api_manage_listeners_on_node() - ], - [] - }. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). --define(TYPES_ATOM, [tcp, ssl, ws, wss, quic]). -req_schema() -> - Schema = [emqx_mgmt_api_configs:gen_schema( - emqx:get_raw_config([listeners, T, default], #{})) - || T <- ?TYPES_ATOM], - #{'oneOf' => Schema}. +paths() -> + [ + "/listeners", + "/listeners/:id", + "/listeners/:id/:action", + "/nodes/:node/listeners", + "/nodes/:node/listeners/:id", + "/nodes/:node/listeners/:id/:action" + ]. -resp_schema() -> - #{'oneOf' := Schema} = req_schema(), - AddMetadata = fun(Prop) -> - Prop#{running => #{type => boolean}, - id => #{type => string}, - node => #{type => string}} - end, - Schema1 = [S#{properties => AddMetadata(Prop)} - || S = #{properties := Prop} <- Schema], - #{'oneOf' => Schema1}. - -api_list_listeners() -> - Metadata = #{ +schema("/listeners") -> + #{ + 'operationId' => list_listeners, get => #{ - description => <<"List listeners from all nodes in the cluster">>, - responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), - <<"List listeners successfully">>)}}}, - {"/listeners", Metadata, list_listeners}. - -api_list_update_listeners_by_id() -> - Metadata = #{ + tags => [<<"listeners">>], + desc => <<"List all running node's listeners.">>, + responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + } + }; +schema("/listeners/:id") -> + #{ + 'operationId' => crud_listeners_by_id, get => #{ - description => <<"List listeners by a given Id from all nodes in the cluster">>, - parameters => [param_path_id()], + tags => [<<"listeners">>], + desc => <<"List all running node's listeners for the specified id.">>, + parameters => [?R_REF(listener_id)], responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}, + 200 => ?HOCON(?ARRAY(?R_REF(listeners))) + } + }, put => #{ - description => - <<"Create or update a listener by a given Id to all nodes in the cluster">>, - parameters => [param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), + tags => [<<"listeners">>], + desc => <<"Create or update the specified listener on all nodes.">>, + parameters => [?R_REF(listener_id)], + 'requestBody' => ?HOCON(listener_schema(), #{}), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED, - ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']), - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"500">> => - emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), - <<"Create or update listener successfully">>)}}, + 200 => ?HOCON(listener_schema(), #{}), + 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) + } + }, delete => #{ - description => <<"Delete a listener by a given Id to all nodes in the cluster">>, - parameters => [param_path_id()], + tags => [<<"listeners">>], + desc => <<"Delete the specified listener on all nodes.">>, + parameters => [?R_REF(listener_id)], responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"204">> => - emqx_mgmt_util:schema(<<"Delete listener successfully">>)}} - }, - {"/listeners/:id", Metadata, crud_listeners_by_id}. - -api_list_listeners_on_node() -> - Metadata = #{ - get => #{ - description => <<"List listeners in one node">>, - parameters => [param_path_node()], - responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?NODE_NOT_FOUND_OR_DOWN, ['RESOURCE_NOT_FOUND']), - <<"500">> => - emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"List listeners successfully">>)}}}, - {"/nodes/:node/listeners", Metadata, list_listeners_on_node}. - -api_get_update_listener_by_id_on_node() -> - Metadata = #{ - get => #{ - description => <<"Get a listener by a given Id on a specific node">>, - parameters => [param_path_node(), param_path_id()], - responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, - ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}}, - put => #{ - description => <<"Create or update a listener by a given Id on a specific node">>, - parameters => [param_path_node(), param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), - responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED, - ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']), - <<"404">> => - emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, - ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), - <<"500">> => - emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}}, - delete => #{ - description => <<"Delete a listener by a given Id to all nodes in the cluster">>, - parameters => [param_path_node(), param_path_id()], - responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"204">> => - emqx_mgmt_util:schema(<<"Delete listener successfully">>)}} - }, - {"/nodes/:node/listeners/:id", Metadata, crud_listener_by_id_on_node}. - -api_manage_listeners() -> - Metadata = #{ + 204 => <<"Listener deleted">>, + 400 => error_codes(['BAD_REQUEST']) + } + } + }; +schema("/listeners/:id/:action") -> + #{ + 'operationId' => action_listeners, post => #{ - description => <<"Restart listeners on all nodes in the cluster">>, + tags => [<<"listeners">>], + desc => <<"Start/stop/restart listeners on all nodes.">>, parameters => [ - param_path_id(), - param_path_operation()], + ?R_REF(listener_id), + ?R_REF(action)], responses => #{ - <<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/listeners/:id/operation/:operation", Metadata, manage_listeners}. - -api_manage_listeners_on_node() -> - Metadata = #{ + 200 => <<"Updated">>, + 400 => error_codes(['BAD_REQUEST']) + } + } + }; +schema("/nodes/:node/listeners") -> + #{ + 'operationId' => list_listeners_on_node, + get => #{ + tags => [<<"listeners">>], + desc => <<"List all listeners on the specified node.">>, + parameters => [?R_REF(node)], + responses => #{ + 200 => ?HOCON(?ARRAY(listener_schema())), + 400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN) + } + } + }; +schema("/nodes/:node/listeners/:id") -> + #{ + 'operationId' => crud_listener_by_id_on_node, + get => #{ + tags => [<<"listeners">>], + desc => <<"Get the specified listener on the specified node.">>, + parameters => [ + ?R_REF(listener_id), + ?R_REF(node)], + responses => #{ + 200 => ?HOCON(listener_schema()), + 400 => error_codes(['BAD_REQUEST']), + 404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND) + } + }, put => #{ - description => <<"Restart listeners on all nodes in the cluster">>, + tags => [<<"listeners">>], + desc => <<"Create or update the specified listener on the specified node.">>, parameters => [ - param_path_node(), - param_path_id(), - param_path_operation()], + ?R_REF(listener_id), + ?R_REF(node)], + 'requestBody' => ?HOCON(listener_schema()), responses => #{ - <<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/nodes/:node/listeners/:id/operation/:operation", Metadata, manage_listeners}. - -%%%============================================================================================== -%% parameters -param_path_node() -> + 200 => ?HOCON(listener_schema()), + 400 => error_codes(['BAD_REQUEST']) + }}, + delete => #{ + tags => [<<"listeners">>], + desc => <<"Delete the specified listener on the specified node.">>, + parameters => [ + ?R_REF(listener_id), + ?R_REF(node)], + responses => #{ + 204 => <<"Listener deleted">>, + 400 => error_codes(['BAD_REQUEST'])} + } + }; +schema("/nodes/:node/listeners/:id/:action") -> #{ - name => node, - in => path, - schema => #{type => string}, - required => true, - example => node() + 'operationId' => action_listeners, + post => #{ + tags => [<<"listeners">>], + desc => <<"Start/stop/restart listeners on a specified node.">>, + parameters => [ + ?R_REF(node), + ?R_REF(listener_id), + ?R_REF(action)], + responses => #{ + 200 => <<"Updated">>, + 400 => error_codes(['BAD_REQUEST'])} + } }. -param_path_id() -> - #{ - name => id, - in => path, - schema => #{type => string, example => emqx_listeners:id_example()}, - required => true - }. +fields(listeners) -> + [ + {"node", ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1", + required => true}) + }, + {"listeners", ?ARRAY(listener_schema())} + ]; +fields(listener_id) -> + [ + {id, ?HOCON(atom(), #{ + desc => "Listener id", + example => 'tcp:default', + validator => fun validate_id/1, + in => path}) + } + ]; +fields(action) -> + [ + {action, ?HOCON(?ENUM([start, stop, restart]), #{ + desc => "listener action", + example => start, + in => path}) + } + ]; +fields(node) -> + [ + {"node", ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1", + in => path}) + } + ]; +fields(Type) -> + Listeners = listeners_info(), + [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], + Schema. -param_path_operation()-> - #{ - name => operation, - in => path, - required => true, - schema => #{ - type => string, - enum => [start, stop, restart]}, - example => restart - }. +listener_schema() -> + ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())). + +listeners_info() -> + Listeners = hocon_schema:fields(emqx_schema, "listeners"), + lists:map(fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> + Fields0 = hocon_schema:fields(Mod, Field), + Fields1 = lists:keydelete("authentication", 1, Fields0), + TypeAtom = list_to_existing_atom(Type), + #{ref => ?R_REF(TypeAtom), + schema => [ + {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, + {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, + {id, ?HOCON(atom(), #{desc => "Listener id", required => true, + validator => fun validate_id/1})} + | Fields1] + } + end, Listeners). + +validate_id(Id) -> + case emqx_listeners:parse_listener_id(Id) of + {error, Reason} -> {error, Reason}; + _ -> ok + end. -%%%============================================================================================== %% api list_listeners(get, _Request) -> - {200, format(emqx_mgmt:list_listeners())}. + {200, list_listeners()}. crud_listeners_by_id(get, #{bindings := #{id := Id}}) -> - case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(), - atom_to_binary(Id0, latin1) =:= Id] of - [] -> - {400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}}; - Listeners -> - {200, format(Listeners)} + {200, list_listeners_by_id(Id)}; +crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> + case parse_listener_conf(Body0) of + {Id, Type, Name, Conf} -> + case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + _ -> + {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} end; -crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) -> - Results = format(emqx_mgmt:update_listener(Id, Conf)), - case lists:filter(fun filter_errors/1, Results) of - [{error, {invalid_listener_id, Id}} | _] -> - {400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}}; - [{error, {emqx_conf_schema, _}} | _] -> - {400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}}; - [{error, {eaddrinuse, _}} | _] -> - {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; - [{error, Reason} | _] -> - {500, #{code => 'UNKNOWN_ERROR', message => err_msg(Reason)}}; - [] -> - {200, Results} - end; - crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> - Results = emqx_mgmt:remove_listener(Id), - case lists:filter(fun filter_errors/1, Results) of - [] -> {204}; - Errors -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Errors)}} + {Type, Name} = emqx_listeners:parse_listener_id(Id), + case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of + {ok, _} -> {204}; + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end. + +parse_listener_conf(Conf0) -> + Conf1 = maps:remove(<<"running">>, Conf0), + {IdBin, Conf2} = maps:take(<<"id">>, Conf1), + {TypeBin, Conf3} = maps:take(<<"type">>, Conf2), + {Type, Name} = emqx_listeners:parse_listener_id(IdBin), + TypeAtom = binary_to_existing_atom(TypeBin), + case Type =:= TypeAtom of + true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; + false -> {error, listener_type_inconsistent} end. list_listeners_on_node(get, #{bindings := #{node := Node}}) -> - case emqx_mgmt:list_listeners(atom(Node)) of + case list_listeners(Node) of {error, nodedown} -> - {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}}; + {400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}}; {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; - Listener -> - {200, format(Listener)} + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + #{<<"listeners">> := Listener} -> + {200, Listener} end. crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> - case emqx_mgmt:get_listener(atom(Node), atom(Id)) of + case get_listener(Node, Id) of {error, not_found} -> - {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}}; + {404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}}; {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; Listener -> - {200, format(Listener)} + {200, Listener} end; -crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Conf}) -> - case emqx_mgmt:update_listener(atom(Node), Id, Conf) of - {error, nodedown} -> - {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}}; - {error, {invalid_listener_id, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}}; - {error, {emqx_conf_schema, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}}; - {error, {eaddrinuse, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; - {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; - Listener -> - {200, format(Listener)} +crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) -> + case parse_listener_conf(Body) of + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {Id, Type, _Name, Conf} -> + case update_listener(Node, Id, Conf) of + {error, nodedown} -> + {400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}}; + {error, {eaddrinuse, _}} -> %% TODO + {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {ok, Listener} -> + {200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}} + end; + _ -> {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}} end; crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> - case emqx_mgmt:remove_listener(atom(Node), Id) of + case remove_listener(Node, Id) of ok -> {204}; - {error, Reason} -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. -manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) -> - {_, Result} = do_manage_listeners(Node, Id, Oper), +action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) -> + {_, Result} = action_listeners(Node, Id, Action), Result; -manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> - Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()], +action_listeners(post, #{bindings := #{id := Id, action := Action}}) -> + Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of [] -> {200}; - Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} + Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}} end. %%%============================================================================================== -%% util functions -do_manage_listeners(Node, Id, Oper) -> - Param = #{node => atom(Node), id => atom(Id)}, - {Node, do_manage_listeners2(Oper, Param)}. +action_listeners(Node, Id, Action) -> + {Node, do_action_listeners(Action, Node, Id)}. -do_manage_listeners2(<<"start">>, Param) -> - case emqx_mgmt:manage_listener(start_listener, Param) of +do_action_listeners(start, Node, Id) -> + case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of ok -> {200}; {error, {already_started, _}} -> {200}; {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; -do_manage_listeners2(<<"stop">>, Param) -> - case emqx_mgmt:manage_listener(stop_listener, Param) of +do_action_listeners(stop, Node, Id) -> + case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of ok -> {200}; {error, not_found} -> {200}; {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; -do_manage_listeners2(<<"restart">>, Param) -> - case emqx_mgmt:manage_listener(restart_listener, Param) of +do_action_listeners(restart, Node, Id) -> + case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of ok -> {200}; - {error, not_found} -> do_manage_listeners2(<<"start">>, Param); + {error, not_found} -> do_action_listeners(start, Node, Id); {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. -manage_listeners_err(Errors) -> +action_listeners_err(Errors) -> list_to_binary(lists:foldl(fun({Node, Err}, Str) -> - err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str - end, "", Errors)). + err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str + end, "", Errors)). -format(Listeners) when is_list(Listeners) -> - [format(Listener) || Listener <- Listeners]; - -format({error, Reason}) -> - {error, Reason}; - -format(#{node := _Node, id := _Id} = Conf) when is_map(Conf) -> - emqx_map_lib:jsonable_map(Conf#{ - running => trans_running(Conf) - }, fun ?MODULE:jsonable_resp/2). - -trans_running(Conf) -> - case maps:get(running, Conf) of - {error, _} -> - false; - Running -> - Running - end. - -filter_errors({error, _}) -> - true; -filter_errors(_) -> - false. - -jsonable_resp(bind, Port) when is_integer(Port) -> - {bind, Port}; -jsonable_resp(bind, {Addr, Port}) when is_tuple(Addr); is_integer(Port)-> - {bind, inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port)}; -jsonable_resp(user_lookup_fun, _) -> - drop; -jsonable_resp(K, V) -> - {K, V}. - -atom(B) when is_binary(B) -> binary_to_atom(B, utf8); -atom(S) when is_list(S) -> list_to_atom(S); -atom(A) when is_atom(A) -> A. - -err_msg(Reason) -> - list_to_binary(err_msg_str(Reason)). +err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom); +err_msg(Reason) -> list_to_binary(err_msg_str(Reason)). err_msg_str(Reason) -> io_lib:format("~p", [Reason]). + +list_listeners() -> + [list_listeners(Node) || Node <- mria_mnesia:running_nodes()]. + +list_listeners(Node) -> + wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). + +list_listeners_by_id(Id) -> + listener_id_filter(Id, list_listeners()). + +get_listener(Node, Id) -> + case listener_id_filter(Id, [list_listeners(Node)]) of + [#{<<"listeners">> := []}] -> {error, not_found}; + [#{<<"listeners">> := [Listener]}] -> Listener + end. + +listener_id_filter(Id, Listeners) -> + lists:map(fun(Conf = #{<<"listeners">> := Listeners0}) -> + Conf#{<<"listeners">> => + [C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]} + end, Listeners). + +update_listener(Node, Id, Config) -> + wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). + +remove_listener(Node, Id) -> + wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). + +-spec do_list_listeners() -> map(). +do_list_listeners() -> + Listeners = [Conf#{<<"id">> => Id, <<"type">> => Type} + || {Id, Type, Conf} <- emqx_listeners:list_raw()], + #{ + <<"node">> => node(), + <<"listeners">> => Listeners + }. + +-spec do_update_listener(string(), emqx_config:update_request()) -> + {ok, map()} | {error, _}. +do_update_listener(Id, Config) -> + {Type, Name} = emqx_listeners:parse_listener_id(Id), + case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of + {ok, #{raw_config := RawConf}} -> {ok, RawConf}; + {error, Reason} -> {error, Reason} + end. + +-spec do_remove_listener(string()) -> ok. +do_remove_listener(Id) -> + {Type, Name} = emqx_listeners:parse_listener_id(Id), + case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of + {ok, _} -> ok; + {error, Reason} -> error(Reason) + end. + +wrap_rpc({badrpc, Reason}) -> + {error, Reason}; +wrap_rpc(Res) -> + Res. diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 34aff22fb..b535f1e62 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -518,7 +518,7 @@ trace_type(_, _) -> error. %% @doc Listeners Command listeners([]) -> - lists:foreach(fun({ID, Conf}) -> + lists:foreach(fun({ID, _Name, Conf}) -> {Host, Port} = maps:get(bind, Conf), Acceptors = maps:get(acceptors, Conf), ProxyProtocol = maps:get(proxy_protocol, Conf, undefined), diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl index 306145ade..8efaa4084 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -55,16 +55,16 @@ list_subscriptions(Node) -> -spec list_listeners(node()) -> [map()] | {badrpc, _}. list_listeners(Node) -> - rpc:call(Node, emqx_mgmt, do_list_listeners, []). + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). -spec remove_listener(node(), string()) -> ok | {badrpc, _}. remove_listener(Node, Id) -> - rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]). + rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]). -spec update_listener(node(), string(), emqx_config:update_request()) -> map() | {error, _} | {badrpc, _}. update_listener(Node, Id, Config) -> - rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]). + rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]). -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> {subscribe, _} | {error, atom()} | {badrpc, _}. diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 68fe3acc5..65568135b 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -24,53 +24,131 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), + emqx_mgmt_api_test_util:init_suite([emqx_conf]), Config. end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). + emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}), + emqx_conf:remove([listeners, tcp, new1], #{override_to => local}), + emqx_mgmt_api_test_util:end_suite([emqx_conf]). t_list_listeners(_) -> Path = emqx_mgmt_api_test_util:api_path(["listeners"]), - get_api(Path). + Res = request(get, Path, [], []), + Expect = emqx_mgmt_api_listeners:do_list_listeners(), + ?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)), + ok. -t_list_node_listeners(_) -> - Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]), - get_api(Path). +t_crud_listeners_by_id(_) -> + TcpListenerId = <<"tcp:default">>, + NewListenerId = <<"tcp:new">>, + TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]), + NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), + [#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []), + ?assertEqual(atom_to_binary(node()), Node), -t_get_listeners(_) -> - LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - ID = maps:get(id, LocalListener), - Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(ID)]), - get_api(Path). + %% create + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), + [#{<<"listeners">> := [Create]}] = request(put, NewPath, [], TcpListener#{ + <<"id">> => NewListenerId, + <<"bind">> => <<"0.0.0.0:2883">> + }), + ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), + [#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []), + ?assertMatch(Create, Get1), + ?assert(is_running(NewListenerId)), -t_get_node_listeners(_) -> - LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - ID = maps:get(id, LocalListener), - Path = emqx_mgmt_api_test_util:api_path( - ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(ID)]), - get_api(Path). + %% update + #{<<"acceptors">> := Acceptors} = Create, + Acceptors1 = Acceptors + 10, + [#{<<"listeners">> := [Update]}] = + request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), + [#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), -t_manage_listener(_) -> + %% delete + ?assertEqual([], delete(NewPath)), + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), + ?assertEqual([], delete(NewPath)), + ok. + +t_list_listeners_on_node(_) -> + Node = atom_to_list(node()), + Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]), + Listeners = request(get, Path, [], []), + #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(), + ?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)), + ok. + +t_crud_listener_by_id_on_node(_) -> + TcpListenerId = <<"tcp:default">>, + NewListenerId = <<"tcp:new1">>, + Node = atom_to_list(node()), + TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]), + NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]), + TcpListener = request(get, TcpPath, [], []), + + %% create + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), + Create = request(put, NewPath, [], TcpListener#{ + <<"id">> => NewListenerId, + <<"bind">> => <<"0.0.0.0:3883">> + }), + ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), + Get1 = request(get, NewPath, [], []), + ?assertMatch(Create, Get1), + ?assert(is_running(NewListenerId)), + + %% update + #{<<"acceptors">> := Acceptors} = Create, + Acceptors1 = Acceptors + 10, + Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), + Get2 = request(get, NewPath, [], []), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), + + %% delete + ?assertEqual([], delete(NewPath)), + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), + ?assertEqual([], delete(NewPath)), + ok. + +t_action_listeners(_) -> ID = "tcp:default", - manage_listener(ID, "stop", false), - manage_listener(ID, "start", true), - manage_listener(ID, "restart", true). + action_listener(ID, "stop", false), + action_listener(ID, "start", true), + action_listener(ID, "restart", true). -manage_listener(ID, Operation, Running) -> - Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, "operation", Operation]), +action_listener(ID, Action, Running) -> + Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, Action]), {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path), timer:sleep(500), GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]), - {ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath), - Listeners = emqx_json:decode(ListenersResponse, [return_maps]), + [#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []), [listener_stats(Listener, Running) || Listener <- Listeners]. +request(Method, Url, QueryParams, Body) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(Method, Url, QueryParams, AuthHeader, Body) of + {ok, Res} -> emqx_json:decode(Res, [return_maps]); + Error -> Error + end. + +delete(Url) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + {ok, Res} = emqx_mgmt_api_test_util:request_api(delete, Url, AuthHeader), + Res. + get_api(Path) -> {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path), LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()), case emqx_json:decode(ListenersData, [return_maps]) of - [Listener] -> + [#{<<"node">> := _, <<"listeners">> := [Listener]}] -> ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), Filter = fun(Local) -> @@ -78,7 +156,7 @@ get_api(Path) -> end, LocalListener = hd(lists:filter(Filter, LocalListeners)), comparison_listener(LocalListener, Listener); - Listeners when is_list(Listeners) -> + [#{<<"node">> := _, <<"listeners">> := Listeners}] when is_list(Listeners) -> ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)), Fun = fun(LocalListener) -> @@ -111,3 +189,6 @@ comparison_listener(Local, Response) -> listener_stats(Listener, ExpectedStats) -> ?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)). + +is_running(Id) -> + emqx_listeners:is_running(Id).