diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index b86dd0b48..2123b579e 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -61,7 +61,7 @@ id_example() -> 'tcp:default'. %% @doc List configured listeners. --spec list_raw() -> [{ListenerId :: atom(), Type :: atom(), ListenerConf :: map()}]. +-spec list_raw() -> [{ListenerId :: atom(), Type :: binary(), ListenerConf :: map()}]. list_raw() -> [{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()]. @@ -76,7 +76,7 @@ format_list(Listener) -> Running = is_running(Type, listener_id(Type, LName), LConf), {Type, LName, maps:put(running, Running, LConf)} end - || {LName, LConf} <- maps:to_list(Conf), is_map(LConf) + || {LName, LConf} <- maps:to_list(Conf), is_map(LConf) ]. do_list_raw() -> @@ -94,13 +94,20 @@ format_raw_listeners({Type, Conf}) -> LConf1 = maps:remove(<<"authentication">>, LConf0), LConf2 = maps:put(<<"running">>, Running, LConf1), {Type, LName, LConf2} - end, maps:to_list(Conf)). + end, + maps:to_list(Conf) + ). -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> {Type, Name} = parse_listener_id(ListenerId), - case [ Running || {Type0, Name0, #{running := Running}} <- list(), - Type0 =:= Type, Name0 =:= Name] + case + [ + Running + || {Type0, Name0, #{running := Running}} <- list(), + Type0 =:= Type, + Name0 =:= Name + ] of [] -> {error, not_found}; [IsRunning] -> IsRunning @@ -109,7 +116,8 @@ is_running(ListenerId) -> is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> ListenOn = case Conf of - #{bind := Bind} -> Bind; + #{bind := Bind} -> + Bind; #{<<"bind">> := Bind} -> case emqx_schema:to_ip_port(binary_to_list(Bind)) of {ok, L} -> L; @@ -340,10 +348,15 @@ post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) -> perform_listener_changes(fun delete_authentication/3, Removed), perform_listener_changes(fun start_listener/3, Added), perform_listener_changes(fun restart_listener/3, Updated) - catch error : {failed_to_start, ListenerId, Bind, Reason} -> - Error = lists:flatten(io_lib:format("~ts(~ts) failed with ~ts", - [ListenerId, Bind, element(1, Reason)])), - {error, Error} + catch + error:{failed_to_start, ListenerId, Bind, Reason} -> + Error = lists:flatten( + io_lib:format( + "~ts(~ts) failed with ~ts", + [ListenerId, Bind, element(1, Reason)] + ) + ), + {error, Error} end. perform_listener_changes(Action, MapConfs) -> diff --git a/apps/emqx_authn/src/emqx_authn_app.erl b/apps/emqx_authn/src/emqx_authn_app.erl index 89b634d02..c3295d18b 100644 --- a/apps/emqx_authn/src/emqx_authn_app.erl +++ b/apps/emqx_authn/src/emqx_authn_app.erl @@ -74,10 +74,11 @@ global_chain_config() -> listener_chain_configs() -> lists:map( - fun({ListenerID, _, _}) -> - {ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])} - end, - emqx_listeners:list()). + fun({ListenerID, _, _}) -> + {ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])} + end, + emqx_listeners:list() + ). auth_config_path(ListenerID) -> [<<"listeners">>] ++ diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 6b3620517..fadec7ee7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -21,18 +21,20 @@ -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). --export([ list_listeners/2 - , crud_listeners_by_id/2 - , list_listeners_on_node/2 - , crud_listener_by_id_on_node/2 - , action_listeners/2 - ]). +-export([ + list_listeners/2, + crud_listeners_by_id/2, + list_listeners_on_node/2, + crud_listener_by_id_on_node/2, + action_listeners/2 +]). %% for rpc call --export([ do_list_listeners/0 - , do_update_listener/2 - , do_remove_listener/1 - ]). +-export([ + do_list_listeners/0, + do_update_listener/2, + do_remove_listener/1 +]). -include_lib("emqx/include/emqx.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -108,7 +110,8 @@ schema("/listeners/:id/:action") -> desc => <<"Start/stop/restart listeners on all nodes.">>, parameters => [ ?R_REF(listener_id), - ?R_REF(action)], + ?R_REF(action) + ], responses => #{ 200 => <<"Updated">>, 400 => error_codes(['BAD_REQUEST']) @@ -136,7 +139,8 @@ schema("/nodes/:node/listeners/:id") -> desc => <<"Get the specified listener on the specified node.">>, parameters => [ ?R_REF(listener_id), - ?R_REF(node)], + ?R_REF(node) + ], responses => #{ 200 => ?HOCON(listener_schema()), 400 => error_codes(['BAD_REQUEST']), @@ -148,21 +152,25 @@ schema("/nodes/:node/listeners/:id") -> desc => <<"Create or update the specified listener on the specified node.">>, parameters => [ ?R_REF(listener_id), - ?R_REF(node)], + ?R_REF(node) + ], 'requestBody' => ?HOCON(listener_schema()), responses => #{ 200 => ?HOCON(listener_schema()), 400 => error_codes(['BAD_REQUEST']) - }}, + } + }, delete => #{ tags => [<<"listeners">>], desc => <<"Delete the specified listener on the specified node.">>, parameters => [ ?R_REF(listener_id), - ?R_REF(node)], + ?R_REF(node) + ], responses => #{ 204 => <<"Listener deleted">>, - 400 => error_codes(['BAD_REQUEST'])} + 400 => error_codes(['BAD_REQUEST']) + } } }; schema("/nodes/:node/listeners/:id/:action") -> @@ -174,46 +182,52 @@ schema("/nodes/:node/listeners/:id/:action") -> parameters => [ ?R_REF(node), ?R_REF(listener_id), - ?R_REF(action)], + ?R_REF(action) + ], responses => #{ 200 => <<"Updated">>, - 400 => error_codes(['BAD_REQUEST'])} + 400 => error_codes(['BAD_REQUEST']) + } } }. fields(listeners) -> [ - {"node", ?HOCON(atom(), #{ - desc => "Node name", - example => "emqx@127.0.0.1", - required => true}) - }, + {"node", + ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1", + required => true + })}, {"listeners", ?ARRAY(listener_schema())} ]; fields(listener_id) -> [ - {id, ?HOCON(atom(), #{ - desc => "Listener id", - example => 'tcp:default', - validator => fun validate_id/1, - in => path}) - } + {id, + ?HOCON(atom(), #{ + desc => "Listener id", + example => 'tcp:default', + validator => fun validate_id/1, + in => path + })} ]; fields(action) -> [ - {action, ?HOCON(?ENUM([start, stop, restart]), #{ - desc => "listener action", - example => start, - in => path}) - } + {action, + ?HOCON(?ENUM([start, stop, restart]), #{ + desc => "listener action", + example => start, + in => path + })} ]; fields(node) -> [ - {"node", ?HOCON(atom(), #{ - desc => "Node name", - example => "emqx@127.0.0.1", - in => path}) - } + {"node", + ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1", + in => path + })} ]; fields(Type) -> Listeners = listeners_info(), @@ -225,19 +239,28 @@ listener_schema() -> listeners_info() -> Listeners = hocon_schema:fields(emqx_schema, "listeners"), - lists:map(fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> - Fields0 = hocon_schema:fields(Mod, Field), - Fields1 = lists:keydelete("authentication", 1, Fields0), - TypeAtom = list_to_existing_atom(Type), - #{ref => ?R_REF(TypeAtom), - schema => [ - {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, - {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, - {id, ?HOCON(atom(), #{desc => "Listener id", required => true, - validator => fun validate_id/1})} - | Fields1] - } - end, Listeners). + lists:map( + fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> + Fields0 = hocon_schema:fields(Mod, Field), + Fields1 = lists:keydelete("authentication", 1, Fields0), + TypeAtom = list_to_existing_atom(Type), + #{ + ref => ?R_REF(TypeAtom), + schema => [ + {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, + {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, + {id, + ?HOCON(atom(), #{ + desc => "Listener id", + required => true, + validator => fun validate_id/1 + })} + | Fields1 + ] + } + end, + Listeners + ). validate_id(Id) -> case emqx_listeners:parse_listener_id(Id) of @@ -304,19 +327,22 @@ crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> end; crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) -> case parse_listener_conf(Body) of - {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {Id, Type, _Name, Conf} -> case update_listener(Node, Id, Conf) of {error, nodedown} -> {400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}}; - {error, {eaddrinuse, _}} -> %% TODO + %% TODO + {error, {eaddrinuse, _}} -> {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {ok, Listener} -> {200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}} end; - _ -> {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}} + _ -> + {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}} end; crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> case remove_listener(Node, Id) of @@ -327,10 +353,17 @@ crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) -> {_, Result} = action_listeners(Node, Id, Action), Result; - action_listeners(post, #{bindings := #{id := Id, action := Action}}) -> Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], - case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of + case + lists:filter( + fun + ({_, {200}}) -> false; + (_) -> true + end, + Results + ) + of [] -> {200}; Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}} end. @@ -344,28 +377,31 @@ do_action_listeners(start, Node, Id) -> case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of ok -> {200}; {error, {already_started, _}} -> {200}; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; do_action_listeners(stop, Node, Id) -> case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of ok -> {200}; {error, not_found} -> {200}; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; do_action_listeners(restart, Node, Id) -> case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of ok -> {200}; {error, not_found} -> do_action_listeners(start, Node, Id); - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. action_listeners_err(Errors) -> - list_to_binary(lists:foldl(fun({Node, Err}, Str) -> - err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str - end, "", Errors)). + list_to_binary( + lists:foldl( + fun({Node, Err}, Str) -> + err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str + end, + "", + Errors + ) + ). err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom); err_msg(Reason) -> list_to_binary(err_msg_str(Reason)). @@ -389,10 +425,15 @@ get_listener(Node, Id) -> end. listener_id_filter(Id, Listeners) -> - lists:map(fun(Conf = #{<<"listeners">> := Listeners0}) -> - Conf#{<<"listeners">> => - [C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]} - end, Listeners). + lists:map( + fun(Conf = #{<<"listeners">> := Listeners0}) -> + Conf#{ + <<"listeners">> => + [C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0] + } + end, + Listeners + ). update_listener(Node, Id, Config) -> wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). @@ -402,8 +443,10 @@ remove_listener(Node, Id) -> -spec do_list_listeners() -> map(). do_list_listeners() -> - Listeners = [Conf#{<<"id">> => Id, <<"type">> => Type} - || {Id, Type, Conf} <- emqx_listeners:list_raw()], + Listeners = [ + Conf#{<<"id">> => Id, <<"type">> => Type} + || {Id, Type, Conf} <- emqx_listeners:list_raw() + ], #{ <<"node">> => node(), <<"listeners">> => Listeners diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl index 8efaa4084..d3099ca39 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -18,23 +18,24 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 +-export([ + introduced_in/0, - , node_info/1 - , broker_info/1 - , list_subscriptions/1 + node_info/1, + broker_info/1, + list_subscriptions/1, - , list_listeners/1 - , remove_listener/2 + list_listeners/1, + remove_listener/2, - , update_listener/3 - , subscribe/3 - , unsubscribe/3 + update_listener/3, + subscribe/3, + unsubscribe/3, - , call_client/3 + call_client/3, - , get_full_config/1 - ]). + get_full_config/1 +]). -include_lib("emqx/include/bpapi.hrl"). @@ -53,7 +54,7 @@ broker_info(Node) -> list_subscriptions(Node) -> rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). --spec list_listeners(node()) -> [map()] | {badrpc, _}. +-spec list_listeners(node()) -> map() | {badrpc, _}. list_listeners(Node) -> rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). @@ -61,18 +62,18 @@ list_listeners(Node) -> remove_listener(Node, Id) -> rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]). --spec update_listener(node(), string(), emqx_config:update_request()) -> - map() | {error, _} | {badrpc, _}. +-spec update_listener(node(), atom(), emqx_config:update_request()) -> + {ok, map()} | {error, _} | {badrpc, _}. update_listener(Node, Id, Config) -> rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]). -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> - {subscribe, _} | {error, atom()} | {badrpc, _}. + {subscribe, _} | {error, atom()} | {badrpc, _}. subscribe(Node, ClientId, TopicTables) -> rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). -spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> - {unsubscribe, _} | {error, _} | {badrpc, _}. + {unsubscribe, _} | {error, _} | {badrpc, _}. unsubscribe(Node, ClientId, Topic) -> rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).