diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 6e1c1e151..02cb91900 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -24,6 +24,7 @@ %% APIs -export([ + list_raw/0, list/0, start/0, restart/0, @@ -57,50 +58,70 @@ -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). -spec id_example() -> atom(). -id_example() -> - id_example(list()). - -id_example([]) -> - {ID, _} = hd(list()), - ID; -id_example([{'tcp:default', _} | _]) -> - 'tcp:default'; -id_example([_ | Listeners]) -> - id_example(Listeners). +id_example() -> 'tcp:default'. %% @doc List configured listeners. --spec list() -> [{ListenerId :: atom(), ListenerConf :: map()}]. +-spec list_raw() -> [{ListenerId :: atom(), Type :: binary(), ListenerConf :: map()}]. +list_raw() -> + [{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()]. + list() -> - [{listener_id(Type, LName), LConf} || {Type, LName, LConf} <- do_list()]. - -do_list() -> Listeners = maps:to_list(emqx:get_config([listeners], #{})), - lists:append([list(Type, maps:to_list(Conf)) || {Type, Conf} <- Listeners]). + lists:flatmap(fun format_list/1, Listeners). -list(Type, Conf) -> +format_list(Listener) -> + {Type, Conf} = Listener, [ begin Running = is_running(Type, listener_id(Type, LName), LConf), - {Type, LName, maps:put(running, Running, LConf)} + {listener_id(Type, LName), maps:put(running, Running, LConf)} end - || {LName, LConf} <- Conf, is_map(LConf) + || {LName, LConf} <- maps:to_list(Conf), is_map(LConf) ]. --spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}. +do_list_raw() -> + Key = <<"listeners">>, + Raw = emqx_config:get_raw([Key], #{}), + SchemaMod = emqx_config:get_schema_mod(Key), + #{Key := RawWithDefault} = emqx_config:fill_defaults(SchemaMod, #{Key => Raw}), + Listeners = maps:to_list(RawWithDefault), + lists:flatmap(fun format_raw_listeners/1, Listeners). + +format_raw_listeners({Type, Conf}) -> + lists:map( + fun({LName, LConf0}) when is_map(LConf0) -> + Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0), + LConf1 = maps:remove(<<"authentication">>, LConf0), + LConf2 = maps:put(<<"running">>, Running, LConf1), + {Type, LName, LConf2} + end, + maps:to_list(Conf) + ). + +-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> case - lists:filtermap( - fun({_Type, Id, #{running := IsRunning}}) -> - Id =:= ListenerId andalso {true, IsRunning} - end, - do_list() - ) + [ + Running + || {Id, #{running := Running}} <- list(), + Id =:= ListenerId + ] of - [IsRunning] -> IsRunning; - [] -> {error, not_found} + [] -> {error, not_found}; + [IsRunning] -> IsRunning end. -is_running(Type, ListenerId, #{bind := ListenOn}) when Type =:= tcp; Type =:= ssl -> +is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> + ListenOn = + case Conf of + #{bind := Bind} -> + Bind; + #{<<"bind">> := Bind} -> + case emqx_schema:to_ip_port(binary_to_list(Bind)) of + {ok, L} -> L; + {error, _} -> binary_to_integer(Bind) + end + end, try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> true @@ -118,10 +139,10 @@ is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss -> end; is_running(quic, _ListenerId, _Conf) -> %% TODO: quic support - {error, no_found}. + false. current_conns(ID, ListenOn) -> - {Type, Name} = parse_listener_id(ID), + {ok, #{type := Type, name := Name}} = parse_listener_id(ID), current_conns(Type, Name, ListenOn). current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl -> @@ -132,7 +153,7 @@ current_conns(_, _, _) -> {error, not_support}. max_conns(ID, ListenOn) -> - {Type, Name} = parse_listener_id(ID), + {ok, #{type := Type, name := Name}} = parse_listener_id(ID), max_conns(Type, Name, ListenOn). max_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl -> @@ -164,20 +185,24 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> console_print( "Listener ~ts is NOT started due to: ~p~n.", [listener_id(Type, ListenerName), Reason] - ); + ), + ok; {ok, _} -> console_print( "Listener ~ts on ~ts started.~n", [listener_id(Type, ListenerName), format_addr(Bind)] - ); + ), + ok; {error, {already_started, Pid}} -> {error, {already_started, Pid}}; {error, Reason} -> + ListenerId = listener_id(Type, ListenerName), + BindStr = format_addr(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p~n", - [listener_id(Type, ListenerName), format_addr(Bind), Reason] + [ListenerId, BindStr, Reason] ), - error(Reason) + error({failed_to_start, ListenerId, BindStr, Reason}) end. %% @doc Restart all listeners @@ -316,15 +341,26 @@ delete_authentication(Type, ListenerName, _Conf) -> post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = diff_listeners(NewListeners, OldListeners), - perform_listener_changes(fun stop_listener/3, Removed), - perform_listener_changes(fun delete_authentication/3, Removed), - perform_listener_changes(fun start_listener/3, Added), - perform_listener_changes(fun restart_listener/3, Updated). + try + perform_listener_changes(fun stop_listener/3, Removed), + perform_listener_changes(fun delete_authentication/3, Removed), + perform_listener_changes(fun start_listener/3, Added), + perform_listener_changes(fun restart_listener/3, Updated) + catch + error:{failed_to_start, ListenerId, Bind, Reason} -> + Error = lists:flatten( + io_lib:format( + "~ts(~ts) failed with ~ts", + [ListenerId, Bind, element(1, Reason)] + ) + ), + {error, Error} + end. perform_listener_changes(Action, MapConfs) -> lists:foreach( fun({Id, Conf}) -> - {Type, Name} = parse_listener_id(Id), + {ok, #{type := Type, name := Name}} = parse_listener_id(Id), Action(Type, Name, Conf) end, maps:to_list(MapConfs) @@ -447,7 +483,7 @@ parse_listener_id(Id) -> case string:split(str(Id), ":", leading) of [Type, Name] -> case lists:member(Type, ?TYPES_STRING) of - true -> {list_to_existing_atom(Type), list_to_atom(Name)}; + true -> {ok, #{type => list_to_existing_atom(Type), name => list_to_atom(Name)}}; false -> {error, {invalid_listener_id, Id}} end; _ -> @@ -480,25 +516,27 @@ tcp_opts(Opts) -> foreach_listeners(Do) -> lists:foreach( - fun({Type, LName, LConf}) -> - Do(Type, LName, LConf) + fun({Id, LConf}) -> + {ok, #{type := Type, name := Name}} = parse_listener_id(Id), + Do(Type, Name, LConf) end, - do_list() + list() ). has_enabled_listener_conf_by_type(Type) -> lists:any( - fun({Type0, _LName, LConf}) when is_map(LConf) -> + fun({Id, LConf}) when is_map(LConf) -> + {ok, #{type := Type0}} = parse_listener_id(Id), Type =:= Type0 andalso maps:get(enabled, LConf, true) end, - do_list() + list() ). apply_on_listener(ListenerId, Do) -> - {Type, ListenerName} = parse_listener_id(ListenerId), - case emqx_config:find_listener_conf(Type, ListenerName, []) of - {not_found, _, _} -> error({listener_config_not_found, Type, ListenerName}); - {ok, Conf} -> Do(Type, ListenerName, Conf) + {ok, #{type := Type, name := Name}} = parse_listener_id(ListenerId), + case emqx_config:find_listener_conf(Type, Name, []) of + {not_found, _, _} -> error({listener_config_not_found, Type, Name}); + {ok, Conf} -> Do(Type, Name, Conf) end. str(A) when is_atom(A) -> diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index e9a154423..8261808da 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -83,21 +83,6 @@ , do_unsubscribe/2 ]). -%% Listeners --export([ do_list_listeners/0 - , list_listeners/0 - , list_listeners/1 - , list_listeners_by_id/1 - , get_listener/2 - , manage_listener/2 - , do_update_listener/2 - , update_listener/2 - , update_listener/3 - , do_remove_listener/1 - , remove_listener/1 - , remove_listener/2 - ]). - %% Alarms -export([ get_alarms/1 , get_alarms/2 @@ -434,80 +419,6 @@ do_unsubscribe(ClientId, Topic) -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} end. -%%-------------------------------------------------------------------- -%% Listeners -%%-------------------------------------------------------------------- - -do_list_listeners() -> - [Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()]. - -list_listeners() -> - lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). - -list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). - -list_listeners_by_id(Id) -> - listener_id_filter(Id, list_listeners()). - -get_listener(Node, Id) -> - case listener_id_filter(Id, list_listeners(Node)) of - [] -> - {error, not_found}; - [Listener] -> - Listener - end. - -listener_id_filter(Id, Listeners) -> - Filter = fun(#{id := Id0}) -> Id0 =:= Id end, - lists:filter(Filter, Listeners). - --spec manage_listener( start_listener | stop_listener | restart_listener - , #{id := atom(), node := node()} - ) -> ok | {error, Reason :: term()}. -manage_listener(start_listener, #{id := ID, node := Node}) -> - wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID)); -manage_listener(stop_listener, #{id := ID, node := Node}) -> - wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID)); -manage_listener(restart_listener, #{id := ID, node := Node}) -> - wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)). - --spec do_update_listener(string(), emqx_config:update_request()) -> - map() | {error, _}. -do_update_listener(Id, Config) -> - case emqx_listeners:parse_listener_id(Id) of - {error, {invalid_listener_id, Id}} -> - {error, {invalid_listener_id, Id}}; - {Type, Name} -> - case emqx:update_config([listeners, Type, Name], Config, #{}) of - {ok, #{raw_config := RawConf}} -> - RawConf#{node => node(), id => Id, running => true}; - {error, Reason} -> - {error, Reason} - end - end. - -update_listener(Id, Config) -> - [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. - -update_listener(Node, Id, Config) -> - wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). - -remove_listener(Id) -> - [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()]. - --spec do_remove_listener(string()) -> ok. -do_remove_listener(Id) -> - {Type, Name} = emqx_listeners:parse_listener_id(Id), - case emqx:remove_config([listeners, Type, Name], #{}) of - {ok, _} -> ok; - {error, Reason} -> - error(Reason) - end. - -remove_listener(Node, Id) -> - wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). - %%-------------------------------------------------------------------- %% Get Alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 606705187..a2dbce1aa 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -18,372 +18,461 @@ -behaviour(minirest_api). --export([api_spec/0]). +-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). +-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). --export([ list_listeners/2 - , crud_listeners_by_id/2 - , list_listeners_on_node/2 - , crud_listener_by_id_on_node/2 - , manage_listeners/2 - , jsonable_resp/2 - ]). +-export([ + list_listeners/2, + crud_listeners_by_id/2, + list_listeners_on_node/2, + crud_listener_by_id_on_node/2, + action_listeners_by_id/2, + action_listeners_by_id_on_node/2 +]). --export([format/1]). +%% for rpc call +-export([ + do_list_listeners/0, + do_update_listener/2, + do_remove_listener/1 +]). -include_lib("emqx/include/emqx.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). -define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>). -define(NODE_NOT_FOUND_OR_DOWN, <<"Node not found or Down">>). -define(LISTENER_NOT_FOUND, <<"Listener id not found">>). +-define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>). -define(ADDR_PORT_INUSE, <<"Addr port in use">>). --define(CONFIG_SCHEMA_ERROR, <<"Config schema error">>). --define(INVALID_LISTENER_PROTOCOL, <<"Invalid listener type">>). --define(UPDATE_CONFIG_FAILED, <<"Update configuration failed">>). --define(OPERATION_FAILED, <<"Operation failed">>). + +-define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}). + +namespace() -> "listeners". api_spec() -> - { - [ - api_list_listeners(), - api_list_update_listeners_by_id(), - api_manage_listeners(), - api_list_listeners_on_node(), - api_get_update_listener_by_id_on_node(), - api_manage_listeners_on_node() - ], - [] - }. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). --define(TYPES_ATOM, [tcp, ssl, ws, wss, quic]). -req_schema() -> - Schema = [emqx_mgmt_api_configs:gen_schema( - emqx:get_raw_config([listeners, T, default], #{})) - || T <- ?TYPES_ATOM], - #{'oneOf' => Schema}. +paths() -> + [ + "/listeners", + "/listeners/:id", + "/listeners/:id/:action", + "/nodes/:node/listeners", + "/nodes/:node/listeners/:id", + "/nodes/:node/listeners/:id/:action" + ]. -resp_schema() -> - #{'oneOf' := Schema} = req_schema(), - AddMetadata = fun(Prop) -> - Prop#{running => #{type => boolean}, - id => #{type => string}, - node => #{type => string}} - end, - Schema1 = [S#{properties => AddMetadata(Prop)} - || S = #{properties := Prop} <- Schema], - #{'oneOf' => Schema1}. - -api_list_listeners() -> - Metadata = #{ +schema("/listeners") -> + #{ + 'operationId' => list_listeners, get => #{ - description => <<"List listeners from all nodes in the cluster">>, - responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), - <<"List listeners successfully">>)}}}, - {"/listeners", Metadata, list_listeners}. - -api_list_update_listeners_by_id() -> - Metadata = #{ + tags => [<<"listeners">>], + desc => <<"List all running node's listeners.">>, + responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + } + }; +schema("/listeners/:id") -> + #{ + 'operationId' => crud_listeners_by_id, get => #{ - description => <<"List listeners by a given Id from all nodes in the cluster">>, - parameters => [param_path_id()], + tags => [<<"listeners">>], + desc => <<"List all running node's listeners for the specified id.">>, + parameters => [?R_REF(listener_id)], responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}, + 200 => ?HOCON(?ARRAY(?R_REF(listeners))) + } + }, put => #{ - description => - <<"Create or update a listener by a given Id to all nodes in the cluster">>, - parameters => [param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), + tags => [<<"listeners">>], + desc => <<"Create or update the specified listener on all nodes.">>, + parameters => [?R_REF(listener_id)], + 'requestBody' => ?HOCON(listener_schema(), #{}), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED, - ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']), - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"500">> => - emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), - <<"Create or update listener successfully">>)}}, + 200 => ?HOCON(listener_schema(), #{}), + 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) + } + }, delete => #{ - description => <<"Delete a listener by a given Id to all nodes in the cluster">>, - parameters => [param_path_id()], + tags => [<<"listeners">>], + desc => <<"Delete the specified listener on all nodes.">>, + parameters => [?R_REF(listener_id)], responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"204">> => - emqx_mgmt_util:schema(<<"Delete listener successfully">>)}} - }, - {"/listeners/:id", Metadata, crud_listeners_by_id}. - -api_list_listeners_on_node() -> - Metadata = #{ - get => #{ - description => <<"List listeners in one node">>, - parameters => [param_path_node()], - responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?NODE_NOT_FOUND_OR_DOWN, ['RESOURCE_NOT_FOUND']), - <<"500">> => - emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"List listeners successfully">>)}}}, - {"/nodes/:node/listeners", Metadata, list_listeners_on_node}. - -api_get_update_listener_by_id_on_node() -> - Metadata = #{ - get => #{ - description => <<"Get a listener by a given Id on a specific node">>, - parameters => [param_path_node(), param_path_id()], - responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, - ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}}, - put => #{ - description => <<"Create or update a listener by a given Id on a specific node">>, - parameters => [param_path_node(), param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>), - responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED, - ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']), - <<"404">> => - emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND, - ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), - <<"500">> => - emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}}, - delete => #{ - description => <<"Delete a listener by a given Id to all nodes in the cluster">>, - parameters => [param_path_node(), param_path_id()], - responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']), - <<"204">> => - emqx_mgmt_util:schema(<<"Delete listener successfully">>)}} - }, - {"/nodes/:node/listeners/:id", Metadata, crud_listener_by_id_on_node}. - -api_manage_listeners() -> - Metadata = #{ + 204 => <<"Listener deleted">>, + 400 => error_codes(['BAD_REQUEST']) + } + } + }; +schema("/listeners/:id/:action") -> + #{ + 'operationId' => action_listeners_by_id, post => #{ - description => <<"Restart listeners on all nodes in the cluster">>, + tags => [<<"listeners">>], + desc => <<"Start/stop/restart listeners on all nodes.">>, parameters => [ - param_path_id(), - param_path_operation()], + ?R_REF(listener_id), + ?R_REF(action) + ], responses => #{ - <<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/listeners/:id/operation/:operation", Metadata, manage_listeners}. - -api_manage_listeners_on_node() -> - Metadata = #{ + 200 => <<"Updated">>, + 400 => error_codes(['BAD_REQUEST']) + } + } + }; +schema("/nodes/:node/listeners") -> + #{ + 'operationId' => list_listeners_on_node, + get => #{ + tags => [<<"listeners">>], + desc => <<"List all listeners on the specified node.">>, + parameters => [?R_REF(node)], + responses => #{ + 200 => ?HOCON(?ARRAY(listener_schema())), + 400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN) + } + } + }; +schema("/nodes/:node/listeners/:id") -> + #{ + 'operationId' => crud_listener_by_id_on_node, + get => #{ + tags => [<<"listeners">>], + desc => <<"Get the specified listener on the specified node.">>, + parameters => [ + ?R_REF(listener_id), + ?R_REF(node) + ], + responses => #{ + 200 => ?HOCON(listener_schema()), + 400 => error_codes(['BAD_REQUEST']), + 404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND) + } + }, put => #{ - description => <<"Restart listeners on all nodes in the cluster">>, + tags => [<<"listeners">>], + desc => <<"Create or update the specified listener on the specified node.">>, parameters => [ - param_path_node(), - param_path_id(), - param_path_operation()], + ?R_REF(listener_id), + ?R_REF(node) + ], + 'requestBody' => ?HOCON(listener_schema()), responses => #{ - <<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/nodes/:node/listeners/:id/operation/:operation", Metadata, manage_listeners}. - -%%%============================================================================================== -%% parameters -param_path_node() -> + 200 => ?HOCON(listener_schema()), + 400 => error_codes(['BAD_REQUEST']) + } + }, + delete => #{ + tags => [<<"listeners">>], + desc => <<"Delete the specified listener on the specified node.">>, + parameters => [ + ?R_REF(listener_id), + ?R_REF(node) + ], + responses => #{ + 204 => <<"Listener deleted">>, + 400 => error_codes(['BAD_REQUEST']) + } + } + }; +schema("/nodes/:node/listeners/:id/:action") -> #{ - name => node, - in => path, - schema => #{type => string}, - required => true, - example => node() + 'operationId' => action_listeners_by_id_on_node, + post => #{ + tags => [<<"listeners">>], + desc => <<"Start/stop/restart listeners on a specified node.">>, + parameters => [ + ?R_REF(node), + ?R_REF(listener_id), + ?R_REF(action) + ], + responses => #{ + 200 => <<"Updated">>, + 400 => error_codes(['BAD_REQUEST']) + } + } }. -param_path_id() -> - #{ - name => id, - in => path, - schema => #{type => string, example => emqx_listeners:id_example()}, - required => true - }. +fields(listeners) -> + [ + {"node", + ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1", + required => true + })}, + {"listeners", ?ARRAY(listener_schema())} + ]; +fields(listener_id) -> + [ + {id, + ?HOCON(atom(), #{ + desc => "Listener id", + example => 'tcp:default', + validator => fun validate_id/1, + in => path + })} + ]; +fields(action) -> + [ + {action, + ?HOCON(?ENUM([start, stop, restart]), #{ + desc => "listener action", + example => start, + in => path + })} + ]; +fields(node) -> + [ + {"node", + ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1", + in => path + })} + ]; +fields(Type) -> + Listeners = listeners_info(), + [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], + Schema. -param_path_operation()-> - #{ - name => operation, - in => path, - required => true, - schema => #{ - type => string, - enum => [start, stop, restart]}, - example => restart - }. +listener_schema() -> + ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())). + +listeners_info() -> + Listeners = hocon_schema:fields(emqx_schema, "listeners"), + lists:map( + fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> + Fields0 = hocon_schema:fields(Mod, Field), + Fields1 = lists:keydelete("authentication", 1, Fields0), + TypeAtom = list_to_existing_atom(Type), + #{ + ref => ?R_REF(TypeAtom), + schema => [ + {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, + {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, + {id, + ?HOCON(atom(), #{ + desc => "Listener id", + required => true, + validator => fun validate_id/1 + })} + | Fields1 + ] + } + end, + Listeners + ). + +validate_id(Id) -> + case emqx_listeners:parse_listener_id(Id) of + {error, Reason} -> {error, Reason}; + {ok, _} -> ok + end. -%%%============================================================================================== %% api list_listeners(get, _Request) -> - {200, format(emqx_mgmt:list_listeners())}. + {200, list_listeners()}. crud_listeners_by_id(get, #{bindings := #{id := Id}}) -> - case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(), - atom_to_binary(Id0, latin1) =:= Id] of - [] -> - {400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}}; - Listeners -> - {200, format(Listeners)} + {200, list_listeners_by_id(Id)}; +crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> + case parse_listener_conf(Body0) of + {Id, Type, Name, Conf} -> + case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + _ -> + {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} end; -crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) -> - Results = format(emqx_mgmt:update_listener(Id, Conf)), - case lists:filter(fun filter_errors/1, Results) of - [{error, {invalid_listener_id, Id}} | _] -> - {400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}}; - [{error, {emqx_conf_schema, _}} | _] -> - {400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}}; - [{error, {eaddrinuse, _}} | _] -> - {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; - [{error, Reason} | _] -> - {500, #{code => 'UNKNOWN_ERROR', message => err_msg(Reason)}}; - [] -> - {200, Results} - end; - crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> - Results = emqx_mgmt:remove_listener(Id), - case lists:filter(fun filter_errors/1, Results) of - [] -> {204}; - Errors -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Errors)}} + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), + case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of + {ok, _} -> {204}; + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end. + +parse_listener_conf(Conf0) -> + Conf1 = maps:remove(<<"running">>, Conf0), + {IdBin, Conf2} = maps:take(<<"id">>, Conf1), + {TypeBin, Conf3} = maps:take(<<"type">>, Conf2), + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), + TypeAtom = binary_to_existing_atom(TypeBin), + case Type =:= TypeAtom of + true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; + false -> {error, listener_type_inconsistent} end. list_listeners_on_node(get, #{bindings := #{node := Node}}) -> - case emqx_mgmt:list_listeners(atom(Node)) of + case list_listeners(Node) of {error, nodedown} -> - {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}}; + {400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}}; {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; - Listener -> - {200, format(Listener)} + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + #{<<"listeners">> := Listener} -> + {200, Listener} end. crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> - case emqx_mgmt:get_listener(atom(Node), atom(Id)) of + case get_listener(Node, Id) of {error, not_found} -> - {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}}; + {404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}}; {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; Listener -> - {200, format(Listener)} + {200, Listener} end; -crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Conf}) -> - case emqx_mgmt:update_listener(atom(Node), Id, Conf) of - {error, nodedown} -> - {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}}; - {error, {invalid_listener_id, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}}; - {error, {emqx_conf_schema, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}}; - {error, {eaddrinuse, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; +crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) -> + case parse_listener_conf(Body) of {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}; - Listener -> - {200, format(Listener)} + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {Id, Type, _Name, Conf} -> + case update_listener(Node, Id, Conf) of + {error, nodedown} -> + {400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}}; + %% TODO + {error, {eaddrinuse, _}} -> + {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {ok, Listener} -> + {200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}} + end; + _ -> + {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}} end; crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> - case emqx_mgmt:remove_listener(atom(Node), Id) of + case remove_listener(Node, Id) of ok -> {204}; - {error, Reason} -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. -manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) -> - {_, Result} = do_manage_listeners(Node, Id, Oper), - Result; +action_listeners_by_id_on_node(post, + #{bindings := #{id := Id, action := Action, node := Node}}) -> + {_, Result} = action_listeners(Node, Id, Action), + Result. -manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> - Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()], - case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of +action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) -> + Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], + case + lists:filter( + fun + ({_, {200}}) -> false; + (_) -> true + end, + Results + ) + of [] -> {200}; - Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} + Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}} end. %%%============================================================================================== -%% util functions -do_manage_listeners(Node, Id, Oper) -> - Param = #{node => atom(Node), id => atom(Id)}, - {Node, do_manage_listeners2(Oper, Param)}. +action_listeners(Node, Id, Action) -> + {Node, do_action_listeners(Action, Node, Id)}. -do_manage_listeners2(<<"start">>, Param) -> - case emqx_mgmt:manage_listener(start_listener, Param) of +do_action_listeners(start, Node, Id) -> + case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of ok -> {200}; {error, {already_started, _}} -> {200}; - {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; -do_manage_listeners2(<<"stop">>, Param) -> - case emqx_mgmt:manage_listener(stop_listener, Param) of +do_action_listeners(stop, Node, Id) -> + case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of ok -> {200}; {error, not_found} -> {200}; - {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; -do_manage_listeners2(<<"restart">>, Param) -> - case emqx_mgmt:manage_listener(restart_listener, Param) of +do_action_listeners(restart, Node, Id) -> + case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of ok -> {200}; - {error, not_found} -> do_manage_listeners2(<<"start">>, Param); - {error, Reason} -> - {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + {error, not_found} -> do_action_listeners(start, Node, Id); + {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. -manage_listeners_err(Errors) -> - list_to_binary(lists:foldl(fun({Node, Err}, Str) -> - err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str - end, "", Errors)). +action_listeners_err(Errors) -> + list_to_binary( + lists:foldl( + fun({Node, Err}, Str) -> + err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str + end, + "", + Errors + ) + ). -format(Listeners) when is_list(Listeners) -> - [format(Listener) || Listener <- Listeners]; - -format({error, Reason}) -> - {error, Reason}; - -format(#{node := _Node, id := _Id} = Conf) when is_map(Conf) -> - emqx_map_lib:jsonable_map(Conf#{ - running => trans_running(Conf) - }, fun ?MODULE:jsonable_resp/2). - -trans_running(Conf) -> - case maps:get(running, Conf) of - {error, _} -> - false; - Running -> - Running - end. - -filter_errors({error, _}) -> - true; -filter_errors(_) -> - false. - -jsonable_resp(bind, Port) when is_integer(Port) -> - {bind, Port}; -jsonable_resp(bind, {Addr, Port}) when is_tuple(Addr); is_integer(Port)-> - {bind, inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port)}; -jsonable_resp(user_lookup_fun, _) -> - drop; -jsonable_resp(K, V) -> - {K, V}. - -atom(B) when is_binary(B) -> binary_to_atom(B, utf8); -atom(S) when is_list(S) -> list_to_atom(S); -atom(A) when is_atom(A) -> A. - -err_msg(Reason) -> - list_to_binary(err_msg_str(Reason)). +err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom); +err_msg(Reason) -> list_to_binary(err_msg_str(Reason)). err_msg_str(Reason) -> io_lib:format("~p", [Reason]). + +list_listeners() -> + [list_listeners(Node) || Node <- mria_mnesia:running_nodes()]. + +list_listeners(Node) -> + wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). + +list_listeners_by_id(Id) -> + listener_id_filter(Id, list_listeners()). + +get_listener(Node, Id) -> + case listener_id_filter(Id, [list_listeners(Node)]) of + [#{<<"listeners">> := []}] -> {error, not_found}; + [#{<<"listeners">> := [Listener]}] -> Listener + end. + +listener_id_filter(Id, Listeners) -> + lists:map( + fun(Conf = #{<<"listeners">> := Listeners0}) -> + Conf#{ + <<"listeners">> => + [C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0] + } + end, + Listeners + ). + +update_listener(Node, Id, Config) -> + wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). + +remove_listener(Node, Id) -> + wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). + +-spec do_list_listeners() -> map(). +do_list_listeners() -> + Listeners = [ + Conf#{<<"id">> => Id, <<"type">> => Type} + || {Id, Type, Conf} <- emqx_listeners:list_raw() + ], + #{ + <<"node">> => node(), + <<"listeners">> => Listeners + }. + +-spec do_update_listener(string(), emqx_config:update_request()) -> + {ok, map()} | {error, _}. +do_update_listener(Id, Config) -> + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), + case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of + {ok, #{raw_config := RawConf}} -> {ok, RawConf}; + {error, Reason} -> {error, Reason} + end. + +-spec do_remove_listener(string()) -> ok. +do_remove_listener(Id) -> + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), + case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of + {ok, _} -> ok; + {error, Reason} -> error(Reason) + end. + +wrap_rpc({badrpc, Reason}) -> + {error, Reason}; +wrap_rpc(Res) -> + Res. diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 34aff22fb..232011319 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -26,36 +26,39 @@ -export([load/0]). --export([ status/1 - , broker/1 - , cluster/1 - , clients/1 - , routes/1 - , subscriptions/1 - , plugins/1 - , listeners/1 - , vm/1 - , mnesia/1 - , trace/1 - , traces/1 - , log/1 - , authz/1 - , olp/1 - ]). +-export([ + status/1, + broker/1, + cluster/1, + clients/1, + routes/1, + subscriptions/1, + plugins/1, + listeners/1, + vm/1, + mnesia/1, + trace/1, + traces/1, + log/1, + authz/1, + olp/1 +]). --define(PROC_INFOKEYS, [status, - memory, - message_queue_len, - total_heap_size, - heap_size, - stack_size, - reductions]). +-define(PROC_INFOKEYS, [ + status, + memory, + message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions +]). -define(MAX_LIMIT, 10000). -define(APP, emqx). --spec(load() -> ok). +-spec load() -> ok. load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). @@ -70,7 +73,7 @@ status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), emqx_ctl:print("Node ~p ~ts is ~p~n", [node(), emqx_app:get_release(), InternalStatus]); status(_) -> - emqx_ctl:usage("status", "Show broker status"). + emqx_ctl:usage("status", "Show broker status"). %%-------------------------------------------------------------------- %% @doc Query broker @@ -79,19 +82,22 @@ broker([]) -> Funs = [sysdescr, version, datetime], [emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs], emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]); - broker(["stats"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) - || {Stat, Val} <- lists:sort(emqx_stats:getstats())]; - + [ + emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) + || {Stat, Val} <- lists:sort(emqx_stats:getstats()) + ]; broker(["metrics"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) - || {Metric, Val} <- lists:sort(emqx_metrics:all())]; - + [ + emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) + || {Metric, Val} <- lists:sort(emqx_metrics:all()) + ]; broker(_) -> - emqx_ctl:usage([{"broker", "Show broker version, uptime and description"}, - {"broker stats", "Show broker statistics of clients, topics, subscribers"}, - {"broker metrics", "Show broker metrics"}]). + emqx_ctl:usage([ + {"broker", "Show broker version, uptime and description"}, + {"broker stats", "Show broker statistics of clients, topics, subscribers"}, + {"broker metrics", "Show broker metrics"} + ]). %%----------------------------------------------------------------------------- %% @doc Cluster with other nodes @@ -106,7 +112,6 @@ cluster(["join", SNode]) -> {error, Error} -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; - cluster(["leave"]) -> case ekka:leave() of ok -> @@ -115,7 +120,6 @@ cluster(["leave"]) -> {error, Error} -> emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error]) end; - cluster(["force-leave", SNode]) -> case ekka:force_leave(ekka_node:parse_name(SNode)) of ok -> @@ -126,38 +130,37 @@ cluster(["force-leave", SNode]) -> {error, Error} -> emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error]) end; - cluster(["status"]) -> emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]); - cluster(_) -> - emqx_ctl:usage([{"cluster join ", "Join the cluster"}, - {"cluster leave", "Leave the cluster"}, - {"cluster force-leave ","Force the node leave from cluster"}, - {"cluster status", "Cluster status"}]). + emqx_ctl:usage([ + {"cluster join ", "Join the cluster"}, + {"cluster leave", "Leave the cluster"}, + {"cluster force-leave ", "Force the node leave from cluster"}, + {"cluster status", "Cluster status"} + ]). %%-------------------------------------------------------------------- %% @doc Query clients clients(["list"]) -> dump(emqx_channel, client); - clients(["show", ClientId]) -> if_client(ClientId, fun print/1); - clients(["kick", ClientId]) -> ok = emqx_cm:kick_session(bin(ClientId)), emqx_ctl:print("ok~n"); - clients(_) -> - emqx_ctl:usage([{"clients list", "List all clients"}, - {"clients show ", "Show a client"}, - {"clients kick ", "Kick out a client"}]). + emqx_ctl:usage([ + {"clients list", "List all clients"}, + {"clients show ", "Show a client"}, + {"clients kick ", "Kick out a client"} + ]). if_client(ClientId, Fun) -> case ets:lookup(emqx_channel, (bin(ClientId))) of [] -> emqx_ctl:print("Not Found.~n"); - [Channel] -> Fun({client, Channel}) + [Channel] -> Fun({client, Channel}) end. %%-------------------------------------------------------------------- @@ -165,20 +168,22 @@ if_client(ClientId, Fun) -> routes(["list"]) -> dump(emqx_route); - routes(["show", Topic]) -> Routes = ets:lookup(emqx_route, bin(Topic)), [print({emqx_route, Route}) || Route <- Routes]; - routes(_) -> - emqx_ctl:usage([{"routes list", "List all routes"}, - {"routes show ", "Show a route"}]). + emqx_ctl:usage([ + {"routes list", "List all routes"}, + {"routes show ", "Show a route"} + ]). subscriptions(["list"]) -> - lists:foreach(fun(Suboption) -> - print({emqx_suboption, Suboption}) - end, ets:tab2list(emqx_suboption)); - + lists:foreach( + fun(Suboption) -> + print({emqx_suboption, Suboption}) + end, + ets:tab2list(emqx_suboption) + ); subscriptions(["show", ClientId]) -> case ets:lookup(emqx_subid, bin(ClientId)) of [] -> @@ -186,43 +191,45 @@ subscriptions(["show", ClientId]) -> [{_, Pid}] -> case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); - Suboption -> - [print({emqx_suboption, Sub}) || Sub <- Suboption] + Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] end end; - subscriptions(["add", ClientId, Topic, QoS]) -> - if_valid_qos(QoS, fun(IntQos) -> - case ets:lookup(emqx_channel, bin(ClientId)) of - [] -> emqx_ctl:print("Error: Channel not found!"); - [{_, Pid}] -> - {Topic1, Options} = emqx_topic:parse(bin(Topic)), - Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]}, - emqx_ctl:print("ok~n") - end - end); - + if_valid_qos(QoS, fun(IntQos) -> + case ets:lookup(emqx_channel, bin(ClientId)) of + [] -> + emqx_ctl:print("Error: Channel not found!"); + [{_, Pid}] -> + {Topic1, Options} = emqx_topic:parse(bin(Topic)), + Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]}, + emqx_ctl:print("ok~n") + end + end); subscriptions(["del", ClientId, Topic]) -> case ets:lookup(emqx_channel, bin(ClientId)) of - [] -> emqx_ctl:print("Error: Channel not found!"); + [] -> + emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]}, emqx_ctl:print("ok~n") end; - subscriptions(_) -> emqx_ctl:usage( - [{"subscriptions list", "List all subscriptions"}, - {"subscriptions show ", "Show subscriptions of a client"}, - {"subscriptions add ", "Add a static subscription manually"}, - {"subscriptions del ", "Delete a static subscription manually"}]). + [ + {"subscriptions list", "List all subscriptions"}, + {"subscriptions show ", "Show subscriptions of a client"}, + {"subscriptions add ", "Add a static subscription manually"}, + {"subscriptions del ", "Delete a static subscription manually"} + ] + ). if_valid_qos(QoS, Fun) -> try list_to_integer(QoS) of Int when ?IS_QOS(Int) -> Fun(Int); _ -> emqx_ctl:print("QoS should be 0, 1, 2~n") - catch _:_ -> - emqx_ctl:print("QoS should be 0, 1, 2~n") + catch + _:_ -> + emqx_ctl:print("QoS should be 0, 1, 2~n") end. plugins(["list"]) -> @@ -231,7 +238,7 @@ plugins(["describe", NameVsn]) -> emqx_plugins_cli:describe(NameVsn, fun emqx_ctl:print/2); plugins(["install", NameVsn]) -> emqx_plugins_cli:ensure_installed(NameVsn, fun emqx_ctl:print/2); -plugins(["uninstall", NameVsn])-> +plugins(["uninstall", NameVsn]) -> emqx_plugins_cli:ensure_uninstalled(NameVsn, fun emqx_ctl:print/2); plugins(["start", NameVsn]) -> emqx_plugins_cli:ensure_started(NameVsn, fun emqx_ctl:print/2); @@ -251,69 +258,73 @@ plugins(["enable", NameVsn, "before", Other]) -> emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2); plugins(_) -> emqx_ctl:usage( - [{"plugins [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"}, - {"plugins list", "List all installed plugins"}, - {"plugins describe Name-Vsn", "Describe an installed plugins"}, - {"plugins install Name-Vsn", "Install a plugin package placed\n" - "in plugin'sinstall_dir"}, - {"plugins uninstall Name-Vsn", "Uninstall a plugin. NOTE: it deletes\n" - "all files in install_dir/Name-Vsn"}, - {"plugins start Name-Vsn", "Start a plugin"}, - {"plugins stop Name-Vsn", "Stop a plugin"}, - {"plugins restart Name-Vsn", "Stop then start a plugin"}, - {"plugins disable Name-Vsn", "Disable auto-boot"}, - {"plugins enable Name-Vsn [Position]", - "Enable auto-boot at Position in the boot list, where Position could be\n" - "'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n" - "The Position parameter can be used to adjust the boot order.\n" - "If no Position is given, an already configured plugin\n" - "will stay at is old position; a newly plugin is appended to the rear\n" - "e.g. plugins disable foo-0.1.0 front\n" - " plugins enable bar-0.2.0 before foo-0.1.0"} - ]). + [ + {"plugins [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"}, + {"plugins list", "List all installed plugins"}, + {"plugins describe Name-Vsn", "Describe an installed plugins"}, + {"plugins install Name-Vsn", + "Install a plugin package placed\n" + "in plugin'sinstall_dir"}, + {"plugins uninstall Name-Vsn", + "Uninstall a plugin. NOTE: it deletes\n" + "all files in install_dir/Name-Vsn"}, + {"plugins start Name-Vsn", "Start a plugin"}, + {"plugins stop Name-Vsn", "Stop a plugin"}, + {"plugins restart Name-Vsn", "Stop then start a plugin"}, + {"plugins disable Name-Vsn", "Disable auto-boot"}, + {"plugins enable Name-Vsn [Position]", + "Enable auto-boot at Position in the boot list, where Position could be\n" + "'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n" + "The Position parameter can be used to adjust the boot order.\n" + "If no Position is given, an already configured plugin\n" + "will stay at is old position; a newly plugin is appended to the rear\n" + "e.g. plugins disable foo-0.1.0 front\n" + " plugins enable bar-0.2.0 before foo-0.1.0"} + ] + ). %%-------------------------------------------------------------------- %% @doc vm command vm([]) -> vm(["all"]); - vm(["all"]) -> [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]]; - vm(["load"]) -> [emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()]; - vm(["memory"]) -> [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; - vm(["process"]) -> - [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) - || {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; - + [ + emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + || {Name, Key} <- [{limit, process_limit}, {count, process_count}] + ]; vm(["io"]) -> IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))), - [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) - || Key <- [max_fds, active_fds]]; - + [ + emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) + || Key <- [max_fds, active_fds] + ]; vm(["ports"]) -> - [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) - || {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; - + [ + emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) + || {Name, Key} <- [{count, port_count}, {limit, port_limit}] + ]; vm(_) -> - emqx_ctl:usage([{"vm all", "Show info of Erlang VM"}, - {"vm load", "Show load of Erlang VM"}, - {"vm memory", "Show memory of Erlang VM"}, - {"vm process", "Show process of Erlang VM"}, - {"vm io", "Show IO of Erlang VM"}, - {"vm ports", "Show Ports of Erlang VM"}]). + emqx_ctl:usage([ + {"vm all", "Show info of Erlang VM"}, + {"vm load", "Show load of Erlang VM"}, + {"vm memory", "Show memory of Erlang VM"}, + {"vm process", "Show process of Erlang VM"}, + {"vm io", "Show IO of Erlang VM"}, + {"vm ports", "Show Ports of Erlang VM"} + ]). %%-------------------------------------------------------------------- %% @doc mnesia Command mnesia([]) -> mnesia:system_info(); - mnesia(_) -> emqx_ctl:usage([{"mnesia", "Mnesia system info"}]). @@ -325,40 +336,40 @@ log(["set-level", Level]) -> ok -> emqx_ctl:print("~ts~n", [Level]); Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error]) end; - log(["primary-level"]) -> Level = emqx_logger:get_primary_log_level(), emqx_ctl:print("~ts~n", [Level]); - log(["primary-level", Level]) -> _ = emqx_logger:set_primary_log_level(list_to_atom(Level)), emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]); - log(["handlers", "list"]) -> - _ = [emqx_ctl:print( + _ = [ + emqx_ctl:print( "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n", [Id, Level, Dst, Status] - ) - || #{id := Id, - level := Level, - dst := Dst, - status := Status} <- emqx_logger:get_log_handlers()], + ) + || #{ + id := Id, + level := Level, + dst := Dst, + status := Status + } <- emqx_logger:get_log_handlers() + ], ok; - log(["handlers", "start", HandlerId]) -> case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of - ok -> emqx_ctl:print("log handler ~ts started~n", [HandlerId]); + ok -> + emqx_ctl:print("log handler ~ts started~n", [HandlerId]); {error, Reason} -> emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [HandlerId, Reason]) end; - log(["handlers", "stop", HandlerId]) -> case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of - ok -> emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]); + ok -> + emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]); {error, Reason} -> emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [HandlerId, Reason]) end; - log(["handlers", "set-level", HandlerId, Level]) -> case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of ok -> @@ -367,57 +378,60 @@ log(["handlers", "set-level", HandlerId, Level]) -> {error, Error} -> emqx_ctl:print("[error] ~p~n", [Error]) end; - log(_) -> emqx_ctl:usage( - [{"log set-level ", "Set the overall log level"}, - {"log primary-level", "Show the primary log level now"}, - {"log primary-level ","Set the primary log level"}, - {"log handlers list", "Show log handlers"}, - {"log handlers start ", "Start a log handler"}, - {"log handlers stop ", "Stop a log handler"}, - {"log handlers set-level ", "Set log level of a log handler"}]). + [ + {"log set-level ", "Set the overall log level"}, + {"log primary-level", "Show the primary log level now"}, + {"log primary-level ", "Set the primary log level"}, + {"log handlers list", "Show log handlers"}, + {"log handlers start ", "Start a log handler"}, + {"log handlers stop ", "Stop a log handler"}, + {"log handlers set-level ", "Set log level of a log handler"} + ] + ). %%-------------------------------------------------------------------- %% @doc Trace Command trace(["list"]) -> - lists:foreach(fun(Trace) -> - #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, - emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) - end, emqx_trace_handler:running()); - + lists:foreach( + fun(Trace) -> + #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, + emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) + end, + emqx_trace_handler:running() + ); trace(["stop", Operation, Filter0]) -> case trace_type(Operation, Filter0) of {ok, Type, Filter} -> trace_off(Type, Filter); error -> trace([]) end; - trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); - trace(["start", Operation, Filter0, LogFile, Level]) -> case trace_type(Operation, Filter0) of {ok, Type, Filter} -> - trace_on(name(Filter0), Type, Filter, - list_to_existing_atom(Level), LogFile); - error -> trace([]) + trace_on( + name(Filter0), + Type, + Filter, + list_to_existing_atom(Level), + LogFile + ); + error -> + trace([]) end; - trace(_) -> - emqx_ctl:usage([{"trace list", "List all traces started on local node"}, - {"trace start client []", - "Traces for a client on local node"}, - {"trace stop client ", - "Stop tracing for a client on local node"}, - {"trace start topic [] ", - "Traces for a topic on local node"}, - {"trace stop topic ", - "Stop tracing for a topic on local node"}, + emqx_ctl:usage([ + {"trace list", "List all traces started on local node"}, + {"trace start client []", "Traces for a client on local node"}, + {"trace stop client ", "Stop tracing for a client on local node"}, + {"trace start topic [] ", "Traces for a topic on local node"}, + {"trace stop topic ", "Stop tracing for a topic on local node"}, {"trace start ip_address [] ", "Traces for a client ip on local node"}, - {"trace stop ip_addresss ", - "Stop tracing for a client ip on local node"} + {"trace stop ip_addresss ", "Stop tracing for a client ip on local node"} ]). trace_on(Name, Type, Filter, Level, LogFile) -> @@ -447,32 +461,37 @@ traces(["list"]) -> [] -> emqx_ctl:print("Cluster Trace is empty~n", []); _ -> - lists:foreach(fun(Trace) -> - #{type := Type, name := Name, status := Status, - log_size := LogSize} = Trace, - emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n", - [Name, Type, maps:get(Type, Trace), Status, LogSize]) - end, List) + lists:foreach( + fun(Trace) -> + #{ + type := Type, + name := Name, + status := Status, + log_size := LogSize + } = Trace, + emqx_ctl:print( + "Trace(~s: ~s=~s, ~s, LogSize:~p)~n", + [Name, Type, maps:get(Type, Trace), Status, LogSize] + ) + end, + List + ) end, length(List); - traces(["stop", Name]) -> trace_cluster_off(Name); - traces(["delete", Name]) -> trace_cluster_del(Name); - traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, "900"]); - traces(["start", Name, Operation, Filter0, DurationS]) -> case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); + {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); error -> traces([]) end; - traces(_) -> - emqx_ctl:usage([{"traces list", "List all cluster traces started"}, + emqx_ctl:usage([ + {"traces list", "List all cluster traces started"}, {"traces start client ", "Traces for a client in cluster"}, {"traces start topic ", "Traces for a topic in cluster"}, {"traces start ip_address ", "Traces for a IP in cluster"}, @@ -483,18 +502,21 @@ traces(_) -> trace_cluster_on(Name, Type, Filter, DurationS0) -> DurationS = list_to_integer(DurationS0), Now = erlang:system_time(second), - Trace = #{ name => list_to_binary(Name) - , type => atom_to_binary(Type) - , Type => list_to_binary(Filter) - , start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)) - , end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) - }, + Trace = #{ + name => list_to_binary(Name), + type => atom_to_binary(Type), + Type => list_to_binary(Filter), + start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)), + end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) + }, case emqx_trace:create(Trace) of {ok, _} -> emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]); {error, Error} -> - emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n", - [Name, Type, Filter, Error]) + emqx_ctl:print( + "[error] cluster_trace ~s ~s=~s ~p~n", + [Name, Type, Filter, Error] + ) end. trace_cluster_del(Name) -> @@ -518,29 +540,34 @@ trace_type(_, _) -> error. %% @doc Listeners Command listeners([]) -> - lists:foreach(fun({ID, Conf}) -> - {Host, Port} = maps:get(bind, Conf), - Acceptors = maps:get(acceptors, Conf), - ProxyProtocol = maps:get(proxy_protocol, Conf, undefined), - Running = maps:get(running, Conf), - CurrentConns = case emqx_listeners:current_conns(ID, {Host, Port}) of - {error, _} -> []; - CC -> [{current_conn, CC}] - end, - MaxConn = case emqx_listeners:max_conns(ID, {Host, Port}) of - {error, _} -> []; - MC -> [{max_conns, MC}] - end, - Info = [ - {listen_on, {string, format_listen_on(Port)}}, - {acceptors, Acceptors}, - {proxy_protocol, ProxyProtocol}, - {running, Running} + lists:foreach( + fun({ID, Conf}) -> + {Host, Port} = maps:get(bind, Conf), + Acceptors = maps:get(acceptors, Conf), + ProxyProtocol = maps:get(proxy_protocol, Conf, undefined), + Running = maps:get(running, Conf), + CurrentConns = + case emqx_listeners:current_conns(ID, {Host, Port}) of + {error, _} -> []; + CC -> [{current_conn, CC}] + end, + MaxConn = + case emqx_listeners:max_conns(ID, {Host, Port}) of + {error, _} -> []; + MC -> [{max_conns, MC}] + end, + Info = + [ + {listen_on, {string, format_listen_on(Port)}}, + {acceptors, Acceptors}, + {proxy_protocol, ProxyProtocol}, + {running, Running} ] ++ CurrentConns ++ MaxConn, - emqx_ctl:print("~ts~n", [ID]), - lists:foreach(fun indent_print/1, Info) - end, emqx_listeners:list()); - + emqx_ctl:print("~ts~n", [ID]), + lists:foreach(fun indent_print/1, Info) + end, + emqx_listeners:list() + ); listeners(["stop", ListenerId]) -> case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of ok -> @@ -548,7 +575,6 @@ listeners(["stop", ListenerId]) -> {error, Error} -> emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error]) end; - listeners(["start", ListenerId]) -> case emqx_listeners:start_listener(list_to_atom(ListenerId)) of ok -> @@ -556,7 +582,6 @@ listeners(["start", ListenerId]) -> {error, Error} -> emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error]) end; - listeners(["restart", ListenerId]) -> case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of ok -> @@ -564,13 +589,13 @@ listeners(["restart", ListenerId]) -> {error, Error} -> emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error]) end; - listeners(_) -> - emqx_ctl:usage([{"listeners", "List listeners"}, - {"listeners stop ", "Stop a listener"}, - {"listeners start ", "Start a listener"}, - {"listeners restart ", "Restart a listener"} - ]). + emqx_ctl:usage([ + {"listeners", "List listeners"}, + {"listeners stop ", "Stop a listener"}, + {"listeners start ", "Start a listener"}, + {"listeners restart ", "Restart a listener"} + ]). %%-------------------------------------------------------------------- %% @doc authz Command @@ -582,7 +607,6 @@ authz(["cache-clean", "node", Node]) -> {error, Reason} -> emqx_ctl:print("Authorization drain failed on node ~ts: ~0p.~n", [Node, Reason]) end; - authz(["cache-clean", "all"]) -> case emqx_mgmt:clean_authz_cache_all() of ok -> @@ -590,22 +614,22 @@ authz(["cache-clean", "all"]) -> {error, Reason} -> emqx_ctl:print("Authorization cache-clean failed: ~p.~n", [Reason]) end; - authz(["cache-clean", ClientId]) -> emqx_mgmt:clean_authz_cache(ClientId); - authz(_) -> emqx_ctl:usage( - [{"authz cache-clean all", "Clears authorization cache on all nodes"}, - {"authz cache-clean node ", "Clears authorization cache on given node"}, - {"authz cache-clean ", "Clears authorization cache for given client"} - ]). - + [ + {"authz cache-clean all", "Clears authorization cache on all nodes"}, + {"authz cache-clean node ", "Clears authorization cache on given node"}, + {"authz cache-clean ", "Clears authorization cache for given client"} + ] + ). %%-------------------------------------------------------------------- %% @doc OLP (Overload Protection related) olp(["status"]) -> - S = case emqx_olp:is_overloaded() of + S = + case emqx_olp:is_overloaded() of true -> "overloaded"; false -> "not overloaded" end, @@ -617,10 +641,11 @@ olp(["enable"]) -> Res = emqx_olp:enable(), emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]); olp(_) -> - emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"}, - {"olp enable", "Enable overload protection"}, - {"olp disable", "Disable overload protection"} - ]). + emqx_ctl:usage([ + {"olp status", "Return OLP status if system is overloaded"}, + {"olp enable", "Enable overload protection"}, + {"olp disable", "Disable overload protection"} + ]). %%-------------------------------------------------------------------- %% Dump ETS @@ -634,78 +659,114 @@ dump(Table, Tag) -> dump(_Table, _, '$end_of_table', Result) -> lists:reverse(Result); - dump(Table, Tag, Key, Result) -> PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)], dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]). print({_, []}) -> ok; - print({client, {ClientId, ChanPid}}) -> - Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of - undefined -> #{}; - Attrs0 -> Attrs0 - end, - Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of - undefined -> #{}; - Stats0 -> maps:from_list(Stats0) - end, + Attrs = + case emqx_cm:get_chan_info(ClientId, ChanPid) of + undefined -> #{}; + Attrs0 -> Attrs0 + end, + Stats = + case emqx_cm:get_chan_stats(ClientId, ChanPid) of + undefined -> #{}; + Stats0 -> maps:from_list(Stats0) + end, ClientInfo = maps:get(clientinfo, Attrs, #{}), ConnInfo = maps:get(conninfo, Attrs, #{}), Session = maps:get(session, Attrs, #{}), - Connected = case maps:get(conn_state, Attrs) of - connected -> true; - _ -> false - end, - Info = lists:foldl(fun(Items, Acc) -> - maps:merge(Items, Acc) - end, #{connected => Connected}, - [maps:with([subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, - mqueue_len, mqueue_dropped, send_msg], Stats), - maps:with([clientid, username], ClientInfo), - maps:with([peername, clean_start, keepalive, expiry_interval, - connected_at, disconnected_at], ConnInfo), - maps:with([created_at], Session)]), - InfoKeys = [clientid, username, peername, clean_start, keepalive, - expiry_interval, subscriptions_cnt, inflight_cnt, - awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, - connected, created_at, connected_at] ++ - case maps:is_key(disconnected_at, Info) of - true -> [disconnected_at]; - false -> [] - end, + Connected = + case maps:get(conn_state, Attrs) of + connected -> true; + _ -> false + end, + Info = lists:foldl( + fun(Items, Acc) -> + maps:merge(Items, Acc) + end, + #{connected => Connected}, + [ + maps:with( + [ + subscriptions_cnt, + inflight_cnt, + awaiting_rel_cnt, + mqueue_len, + mqueue_dropped, + send_msg + ], + Stats + ), + maps:with([clientid, username], ClientInfo), + maps:with( + [ + peername, + clean_start, + keepalive, + expiry_interval, + connected_at, + disconnected_at + ], + ConnInfo + ), + maps:with([created_at], Session) + ] + ), + InfoKeys = + [ + clientid, + username, + peername, + clean_start, + keepalive, + expiry_interval, + subscriptions_cnt, + inflight_cnt, + awaiting_rel_cnt, + send_msg, + mqueue_len, + mqueue_dropped, + connected, + created_at, + connected_at + ] ++ + case maps:is_key(disconnected_at, Info) of + true -> [disconnected_at]; + false -> [] + end, Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000}, emqx_ctl:print( "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, " "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, " "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, " - "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" - ++ case maps:is_key(disconnected_at, Info1) of - true -> ", disconnected_at=~w)~n"; - false -> ")~n" - end, - [format(K, maps:get(K, Info1)) || K <- InfoKeys]); - + "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" ++ + case maps:is_key(disconnected_at, Info1) of + true -> ", disconnected_at=~w)~n"; + false -> ")~n" + end, + [format(K, maps:get(K, Info1)) || K <- InfoKeys] + ); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); print({emqx_route, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); - print(#plugin{name = Name, descr = Descr, active = Active}) -> - emqx_ctl:print("Plugin(~ts, description=~ts, active=~ts)~n", - [Name, Descr, Active]); - + emqx_ctl:print( + "Plugin(~ts, description=~ts, active=~ts)~n", + [Name, Descr, Active] + ); print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) -> emqx_ctl:print("~ts -> ~ts~n", [maps:get(subid, Options), Topic]). format(_, undefined) -> undefined; - format(peername, {IPAddr, Port}) -> IPStr = emqx_mgmt_util:ntoa(IPAddr), io_lib:format("~ts:~p", [IPStr, Port]); - format(_, Val) -> Val. diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl index 306145ade..d3099ca39 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -18,23 +18,24 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 +-export([ + introduced_in/0, - , node_info/1 - , broker_info/1 - , list_subscriptions/1 + node_info/1, + broker_info/1, + list_subscriptions/1, - , list_listeners/1 - , remove_listener/2 + list_listeners/1, + remove_listener/2, - , update_listener/3 - , subscribe/3 - , unsubscribe/3 + update_listener/3, + subscribe/3, + unsubscribe/3, - , call_client/3 + call_client/3, - , get_full_config/1 - ]). + get_full_config/1 +]). -include_lib("emqx/include/bpapi.hrl"). @@ -53,26 +54,26 @@ broker_info(Node) -> list_subscriptions(Node) -> rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). --spec list_listeners(node()) -> [map()] | {badrpc, _}. +-spec list_listeners(node()) -> map() | {badrpc, _}. list_listeners(Node) -> - rpc:call(Node, emqx_mgmt, do_list_listeners, []). + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). -spec remove_listener(node(), string()) -> ok | {badrpc, _}. remove_listener(Node, Id) -> - rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]). + rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]). --spec update_listener(node(), string(), emqx_config:update_request()) -> - map() | {error, _} | {badrpc, _}. +-spec update_listener(node(), atom(), emqx_config:update_request()) -> + {ok, map()} | {error, _} | {badrpc, _}. update_listener(Node, Id, Config) -> - rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]). + rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]). -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> - {subscribe, _} | {error, atom()} | {badrpc, _}. + {subscribe, _} | {error, atom()} | {badrpc, _}. subscribe(Node, ClientId, TopicTables) -> rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). -spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> - {unsubscribe, _} | {error, _} | {badrpc, _}. + {unsubscribe, _} | {error, _} | {badrpc, _}. unsubscribe(Node, ClientId, Topic) -> rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 68fe3acc5..a146f48ef 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -24,90 +24,138 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), + emqx_mgmt_api_test_util:init_suite([emqx_conf]), Config. end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). + emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}), + emqx_conf:remove([listeners, tcp, new1], #{override_to => local}), + emqx_mgmt_api_test_util:end_suite([emqx_conf]). t_list_listeners(_) -> Path = emqx_mgmt_api_test_util:api_path(["listeners"]), - get_api(Path). + Res = request(get, Path, [], []), + Expect = emqx_mgmt_api_listeners:do_list_listeners(), + ?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)), + ok. -t_list_node_listeners(_) -> - Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]), - get_api(Path). +t_crud_listeners_by_id(_) -> + TcpListenerId = <<"tcp:default">>, + NewListenerId = <<"tcp:new">>, + TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]), + NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), + [#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []), + ?assertEqual(atom_to_binary(node()), Node), -t_get_listeners(_) -> - LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - ID = maps:get(id, LocalListener), - Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(ID)]), - get_api(Path). + %% create + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), + NewConf = TcpListener#{ + <<"id">> => NewListenerId, + <<"bind">> => <<"0.0.0.0:2883">> + }, + [#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf), + ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), + [#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []), + ?assertMatch(Create, Get1), + ?assert(is_running(NewListenerId)), -t_get_node_listeners(_) -> - LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - ID = maps:get(id, LocalListener), - Path = emqx_mgmt_api_test_util:api_path( - ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(ID)]), - get_api(Path). + %% bad create(same port) + BadId = <<"tcp:bad">>, + BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]), + BadConf = TcpListener#{ + <<"id">> => BadId, + <<"bind">> => <<"0.0.0.0:2883">> + }, + ?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)), -t_manage_listener(_) -> + %% update + #{<<"acceptors">> := Acceptors} = Create, + Acceptors1 = Acceptors + 10, + [#{<<"listeners">> := [Update]}] = + request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), + [#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), + + %% delete + ?assertEqual([], delete(NewPath)), + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), + ?assertEqual([], delete(NewPath)), + ok. + +t_list_listeners_on_node(_) -> + Node = atom_to_list(node()), + Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]), + Listeners = request(get, Path, [], []), + #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(), + ?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)), + ok. + +t_crud_listener_by_id_on_node(_) -> + TcpListenerId = <<"tcp:default">>, + NewListenerId = <<"tcp:new1">>, + Node = atom_to_list(node()), + TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]), + NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]), + TcpListener = request(get, TcpPath, [], []), + + %% create + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), + Create = request(put, NewPath, [], TcpListener#{ + <<"id">> => NewListenerId, + <<"bind">> => <<"0.0.0.0:3883">> + }), + ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), + Get1 = request(get, NewPath, [], []), + ?assertMatch(Create, Get1), + ?assert(is_running(NewListenerId)), + + %% update + #{<<"acceptors">> := Acceptors} = Create, + Acceptors1 = Acceptors + 10, + Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), + Get2 = request(get, NewPath, [], []), + ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), + + %% delete + ?assertEqual([], delete(NewPath)), + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), + ?assertEqual([], delete(NewPath)), + ok. + +t_action_listeners(_) -> ID = "tcp:default", - manage_listener(ID, "stop", false), - manage_listener(ID, "start", true), - manage_listener(ID, "restart", true). + action_listener(ID, "stop", false), + action_listener(ID, "start", true), + action_listener(ID, "restart", true). -manage_listener(ID, Operation, Running) -> - Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, "operation", Operation]), +action_listener(ID, Action, Running) -> + Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, Action]), {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path), timer:sleep(500), GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]), - {ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath), - Listeners = emqx_json:decode(ListenersResponse, [return_maps]), + [#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []), [listener_stats(Listener, Running) || Listener <- Listeners]. -get_api(Path) -> - {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path), - LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()), - case emqx_json:decode(ListenersData, [return_maps]) of - [Listener] -> - ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), - Filter = - fun(Local) -> - maps:get(id, Local) =:= ID - end, - LocalListener = hd(lists:filter(Filter, LocalListeners)), - comparison_listener(LocalListener, Listener); - Listeners when is_list(Listeners) -> - ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)), - Fun = - fun(LocalListener) -> - ID = maps:get(id, LocalListener), - IDBinary = atom_to_binary(ID, utf8), - Filter = - fun(Listener) -> - maps:get(<<"id">>, Listener) =:= IDBinary - end, - Listener = hd(lists:filter(Filter, Listeners)), - comparison_listener(LocalListener, Listener) - end, - lists:foreach(Fun, LocalListeners); - Listener when is_map(Listener) -> - ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), - Filter = - fun(Local) -> - maps:get(id, Local) =:= ID - end, - LocalListener = hd(lists:filter(Filter, LocalListeners)), - comparison_listener(LocalListener, Listener) +request(Method, Url, QueryParams, Body) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(Method, Url, QueryParams, AuthHeader, Body) of + {ok, Res} -> emqx_json:decode(Res, [return_maps]); + Error -> Error end. -comparison_listener(Local, Response) -> - ?assertEqual(maps:get(id, Local), binary_to_atom(maps:get(<<"id">>, Response))), - ?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))), - ?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)), - ?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)). - +delete(Url) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + {ok, Res} = emqx_mgmt_api_test_util:request_api(delete, Url, AuthHeader), + Res. listener_stats(Listener, ExpectedStats) -> ?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)). + +is_running(Id) -> + emqx_listeners:is_running(binary_to_atom(Id)). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 76fb10f65..9952686c5 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -92,7 +92,8 @@ do_request_api(Method, Request)-> {ok, {{"HTTP/1.1", Code, _}, _, Return} } when Code >= 200 andalso Code =< 299 -> {ok, Return}; - {ok, {Reason, _, _}} -> + {ok, {Reason, _, _} = Error} -> + ct:pal("error: ~p~n", [Error]), {error, Reason} end.