diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 2123b579e..02cb91900 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -74,7 +74,7 @@ format_list(Listener) -> [ begin Running = is_running(Type, listener_id(Type, LName), LConf), - {Type, LName, maps:put(running, Running, LConf)} + {listener_id(Type, LName), maps:put(running, Running, LConf)} end || {LName, LConf} <- maps:to_list(Conf), is_map(LConf) ]. @@ -100,13 +100,11 @@ format_raw_listeners({Type, Conf}) -> -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> - {Type, Name} = parse_listener_id(ListenerId), case [ Running - || {Type0, Name0, #{running := Running}} <- list(), - Type0 =:= Type, - Name0 =:= Name + || {Id, #{running := Running}} <- list(), + Id =:= ListenerId ] of [] -> {error, not_found}; @@ -144,7 +142,7 @@ is_running(quic, _ListenerId, _Conf) -> false. current_conns(ID, ListenOn) -> - {Type, Name} = parse_listener_id(ID), + {ok, #{type := Type, name := Name}} = parse_listener_id(ID), current_conns(Type, Name, ListenOn). current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl -> @@ -155,7 +153,7 @@ current_conns(_, _, _) -> {error, not_support}. max_conns(ID, ListenOn) -> - {Type, Name} = parse_listener_id(ID), + {ok, #{type := Type, name := Name}} = parse_listener_id(ID), max_conns(Type, Name, ListenOn). max_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl -> @@ -362,7 +360,7 @@ post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) -> perform_listener_changes(Action, MapConfs) -> lists:foreach( fun({Id, Conf}) -> - {Type, Name} = parse_listener_id(Id), + {ok, #{type := Type, name := Name}} = parse_listener_id(Id), Action(Type, Name, Conf) end, maps:to_list(MapConfs) @@ -485,7 +483,7 @@ parse_listener_id(Id) -> case string:split(str(Id), ":", leading) of [Type, Name] -> case lists:member(Type, ?TYPES_STRING) of - true -> {list_to_existing_atom(Type), list_to_atom(Name)}; + true -> {ok, #{type => list_to_existing_atom(Type), name => list_to_atom(Name)}}; false -> {error, {invalid_listener_id, Id}} end; _ -> @@ -518,25 +516,27 @@ tcp_opts(Opts) -> foreach_listeners(Do) -> lists:foreach( - fun({Type, LName, LConf}) -> - Do(Type, LName, LConf) + fun({Id, LConf}) -> + {ok, #{type := Type, name := Name}} = parse_listener_id(Id), + Do(Type, Name, LConf) end, list() ). has_enabled_listener_conf_by_type(Type) -> lists:any( - fun({Type0, _LName, LConf}) when is_map(LConf) -> + fun({Id, LConf}) when is_map(LConf) -> + {ok, #{type := Type0}} = parse_listener_id(Id), Type =:= Type0 andalso maps:get(enabled, LConf, true) end, list() ). apply_on_listener(ListenerId, Do) -> - {Type, ListenerName} = parse_listener_id(ListenerId), - case emqx_config:find_listener_conf(Type, ListenerName, []) of - {not_found, _, _} -> error({listener_config_not_found, Type, ListenerName}); - {ok, Conf} -> Do(Type, ListenerName, Conf) + {ok, #{type := Type, name := Name}} = parse_listener_id(ListenerId), + case emqx_config:find_listener_conf(Type, Name, []) of + {not_found, _, _} -> error({listener_config_not_found, Type, Name}); + {ok, Conf} -> Do(Type, Name, Conf) end. str(A) when is_atom(A) -> diff --git a/apps/emqx_authn/src/emqx_authn_app.erl b/apps/emqx_authn/src/emqx_authn_app.erl index c3295d18b..f761bfe33 100644 --- a/apps/emqx_authn/src/emqx_authn_app.erl +++ b/apps/emqx_authn/src/emqx_authn_app.erl @@ -74,7 +74,7 @@ global_chain_config() -> listener_chain_configs() -> lists:map( - fun({ListenerID, _, _}) -> + fun({ListenerID, _}) -> {ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])} end, emqx_listeners:list() diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index fadec7ee7..a2dbce1aa 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -26,7 +26,8 @@ crud_listeners_by_id/2, list_listeners_on_node/2, crud_listener_by_id_on_node/2, - action_listeners/2 + action_listeners_by_id/2, + action_listeners_by_id_on_node/2 ]). %% for rpc call @@ -45,7 +46,7 @@ -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>). -define(ADDR_PORT_INUSE, <<"Addr port in use">>). --define(OPTS(_Type_), #{rawconf_with_defaults => true, override_to => _Type_}). +-define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}). namespace() -> "listeners". @@ -104,7 +105,7 @@ schema("/listeners/:id") -> }; schema("/listeners/:id/:action") -> #{ - 'operationId' => action_listeners, + 'operationId' => action_listeners_by_id, post => #{ tags => [<<"listeners">>], desc => <<"Start/stop/restart listeners on all nodes.">>, @@ -175,7 +176,7 @@ schema("/nodes/:node/listeners/:id") -> }; schema("/nodes/:node/listeners/:id/:action") -> #{ - 'operationId' => action_listeners, + 'operationId' => action_listeners_by_id_on_node, post => #{ tags => [<<"listeners">>], desc => <<"Start/stop/restart listeners on a specified node.">>, @@ -265,7 +266,7 @@ listeners_info() -> validate_id(Id) -> case emqx_listeners:parse_listener_id(Id) of {error, Reason} -> {error, Reason}; - _ -> ok + {ok, _} -> ok end. %% api @@ -289,7 +290,7 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} end; crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> - {Type, Name} = emqx_listeners:parse_listener_id(Id), + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of {ok, _} -> {204}; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} @@ -299,7 +300,7 @@ parse_listener_conf(Conf0) -> Conf1 = maps:remove(<<"running">>, Conf0), {IdBin, Conf2} = maps:take(<<"id">>, Conf1), {TypeBin, Conf3} = maps:take(<<"type">>, Conf2), - {Type, Name} = emqx_listeners:parse_listener_id(IdBin), + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), TypeAtom = binary_to_existing_atom(TypeBin), case Type =:= TypeAtom of true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; @@ -350,10 +351,12 @@ crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. -action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) -> +action_listeners_by_id_on_node(post, + #{bindings := #{id := Id, action := Action, node := Node}}) -> {_, Result} = action_listeners(Node, Id, Action), - Result; -action_listeners(post, #{bindings := #{id := Id, action := Action}}) -> + Result. + +action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) -> Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], case lists:filter( @@ -455,7 +458,7 @@ do_list_listeners() -> -spec do_update_listener(string(), emqx_config:update_request()) -> {ok, map()} | {error, _}. do_update_listener(Id, Config) -> - {Type, Name} = emqx_listeners:parse_listener_id(Id), + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of {ok, #{raw_config := RawConf}} -> {ok, RawConf}; {error, Reason} -> {error, Reason} @@ -463,7 +466,7 @@ do_update_listener(Id, Config) -> -spec do_remove_listener(string()) -> ok. do_remove_listener(Id) -> - {Type, Name} = emqx_listeners:parse_listener_id(Id), + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of {ok, _} -> ok; {error, Reason} -> error(Reason) diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index b535f1e62..232011319 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -26,36 +26,39 @@ -export([load/0]). --export([ status/1 - , broker/1 - , cluster/1 - , clients/1 - , routes/1 - , subscriptions/1 - , plugins/1 - , listeners/1 - , vm/1 - , mnesia/1 - , trace/1 - , traces/1 - , log/1 - , authz/1 - , olp/1 - ]). +-export([ + status/1, + broker/1, + cluster/1, + clients/1, + routes/1, + subscriptions/1, + plugins/1, + listeners/1, + vm/1, + mnesia/1, + trace/1, + traces/1, + log/1, + authz/1, + olp/1 +]). --define(PROC_INFOKEYS, [status, - memory, - message_queue_len, - total_heap_size, - heap_size, - stack_size, - reductions]). +-define(PROC_INFOKEYS, [ + status, + memory, + message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions +]). -define(MAX_LIMIT, 10000). -define(APP, emqx). --spec(load() -> ok). +-spec load() -> ok. load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). @@ -70,7 +73,7 @@ status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), emqx_ctl:print("Node ~p ~ts is ~p~n", [node(), emqx_app:get_release(), InternalStatus]); status(_) -> - emqx_ctl:usage("status", "Show broker status"). + emqx_ctl:usage("status", "Show broker status"). %%-------------------------------------------------------------------- %% @doc Query broker @@ -79,19 +82,22 @@ broker([]) -> Funs = [sysdescr, version, datetime], [emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs], emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]); - broker(["stats"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) - || {Stat, Val} <- lists:sort(emqx_stats:getstats())]; - + [ + emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) + || {Stat, Val} <- lists:sort(emqx_stats:getstats()) + ]; broker(["metrics"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) - || {Metric, Val} <- lists:sort(emqx_metrics:all())]; - + [ + emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) + || {Metric, Val} <- lists:sort(emqx_metrics:all()) + ]; broker(_) -> - emqx_ctl:usage([{"broker", "Show broker version, uptime and description"}, - {"broker stats", "Show broker statistics of clients, topics, subscribers"}, - {"broker metrics", "Show broker metrics"}]). + emqx_ctl:usage([ + {"broker", "Show broker version, uptime and description"}, + {"broker stats", "Show broker statistics of clients, topics, subscribers"}, + {"broker metrics", "Show broker metrics"} + ]). %%----------------------------------------------------------------------------- %% @doc Cluster with other nodes @@ -106,7 +112,6 @@ cluster(["join", SNode]) -> {error, Error} -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; - cluster(["leave"]) -> case ekka:leave() of ok -> @@ -115,7 +120,6 @@ cluster(["leave"]) -> {error, Error} -> emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error]) end; - cluster(["force-leave", SNode]) -> case ekka:force_leave(ekka_node:parse_name(SNode)) of ok -> @@ -126,38 +130,37 @@ cluster(["force-leave", SNode]) -> {error, Error} -> emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error]) end; - cluster(["status"]) -> emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]); - cluster(_) -> - emqx_ctl:usage([{"cluster join ", "Join the cluster"}, - {"cluster leave", "Leave the cluster"}, - {"cluster force-leave ","Force the node leave from cluster"}, - {"cluster status", "Cluster status"}]). + emqx_ctl:usage([ + {"cluster join ", "Join the cluster"}, + {"cluster leave", "Leave the cluster"}, + {"cluster force-leave ", "Force the node leave from cluster"}, + {"cluster status", "Cluster status"} + ]). %%-------------------------------------------------------------------- %% @doc Query clients clients(["list"]) -> dump(emqx_channel, client); - clients(["show", ClientId]) -> if_client(ClientId, fun print/1); - clients(["kick", ClientId]) -> ok = emqx_cm:kick_session(bin(ClientId)), emqx_ctl:print("ok~n"); - clients(_) -> - emqx_ctl:usage([{"clients list", "List all clients"}, - {"clients show ", "Show a client"}, - {"clients kick ", "Kick out a client"}]). + emqx_ctl:usage([ + {"clients list", "List all clients"}, + {"clients show ", "Show a client"}, + {"clients kick ", "Kick out a client"} + ]). if_client(ClientId, Fun) -> case ets:lookup(emqx_channel, (bin(ClientId))) of [] -> emqx_ctl:print("Not Found.~n"); - [Channel] -> Fun({client, Channel}) + [Channel] -> Fun({client, Channel}) end. %%-------------------------------------------------------------------- @@ -165,20 +168,22 @@ if_client(ClientId, Fun) -> routes(["list"]) -> dump(emqx_route); - routes(["show", Topic]) -> Routes = ets:lookup(emqx_route, bin(Topic)), [print({emqx_route, Route}) || Route <- Routes]; - routes(_) -> - emqx_ctl:usage([{"routes list", "List all routes"}, - {"routes show ", "Show a route"}]). + emqx_ctl:usage([ + {"routes list", "List all routes"}, + {"routes show ", "Show a route"} + ]). subscriptions(["list"]) -> - lists:foreach(fun(Suboption) -> - print({emqx_suboption, Suboption}) - end, ets:tab2list(emqx_suboption)); - + lists:foreach( + fun(Suboption) -> + print({emqx_suboption, Suboption}) + end, + ets:tab2list(emqx_suboption) + ); subscriptions(["show", ClientId]) -> case ets:lookup(emqx_subid, bin(ClientId)) of [] -> @@ -186,43 +191,45 @@ subscriptions(["show", ClientId]) -> [{_, Pid}] -> case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); - Suboption -> - [print({emqx_suboption, Sub}) || Sub <- Suboption] + Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] end end; - subscriptions(["add", ClientId, Topic, QoS]) -> - if_valid_qos(QoS, fun(IntQos) -> - case ets:lookup(emqx_channel, bin(ClientId)) of - [] -> emqx_ctl:print("Error: Channel not found!"); - [{_, Pid}] -> - {Topic1, Options} = emqx_topic:parse(bin(Topic)), - Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]}, - emqx_ctl:print("ok~n") - end - end); - + if_valid_qos(QoS, fun(IntQos) -> + case ets:lookup(emqx_channel, bin(ClientId)) of + [] -> + emqx_ctl:print("Error: Channel not found!"); + [{_, Pid}] -> + {Topic1, Options} = emqx_topic:parse(bin(Topic)), + Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]}, + emqx_ctl:print("ok~n") + end + end); subscriptions(["del", ClientId, Topic]) -> case ets:lookup(emqx_channel, bin(ClientId)) of - [] -> emqx_ctl:print("Error: Channel not found!"); + [] -> + emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]}, emqx_ctl:print("ok~n") end; - subscriptions(_) -> emqx_ctl:usage( - [{"subscriptions list", "List all subscriptions"}, - {"subscriptions show ", "Show subscriptions of a client"}, - {"subscriptions add ", "Add a static subscription manually"}, - {"subscriptions del ", "Delete a static subscription manually"}]). + [ + {"subscriptions list", "List all subscriptions"}, + {"subscriptions show ", "Show subscriptions of a client"}, + {"subscriptions add ", "Add a static subscription manually"}, + {"subscriptions del ", "Delete a static subscription manually"} + ] + ). if_valid_qos(QoS, Fun) -> try list_to_integer(QoS) of Int when ?IS_QOS(Int) -> Fun(Int); _ -> emqx_ctl:print("QoS should be 0, 1, 2~n") - catch _:_ -> - emqx_ctl:print("QoS should be 0, 1, 2~n") + catch + _:_ -> + emqx_ctl:print("QoS should be 0, 1, 2~n") end. plugins(["list"]) -> @@ -231,7 +238,7 @@ plugins(["describe", NameVsn]) -> emqx_plugins_cli:describe(NameVsn, fun emqx_ctl:print/2); plugins(["install", NameVsn]) -> emqx_plugins_cli:ensure_installed(NameVsn, fun emqx_ctl:print/2); -plugins(["uninstall", NameVsn])-> +plugins(["uninstall", NameVsn]) -> emqx_plugins_cli:ensure_uninstalled(NameVsn, fun emqx_ctl:print/2); plugins(["start", NameVsn]) -> emqx_plugins_cli:ensure_started(NameVsn, fun emqx_ctl:print/2); @@ -251,69 +258,73 @@ plugins(["enable", NameVsn, "before", Other]) -> emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2); plugins(_) -> emqx_ctl:usage( - [{"plugins [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"}, - {"plugins list", "List all installed plugins"}, - {"plugins describe Name-Vsn", "Describe an installed plugins"}, - {"plugins install Name-Vsn", "Install a plugin package placed\n" - "in plugin'sinstall_dir"}, - {"plugins uninstall Name-Vsn", "Uninstall a plugin. NOTE: it deletes\n" - "all files in install_dir/Name-Vsn"}, - {"plugins start Name-Vsn", "Start a plugin"}, - {"plugins stop Name-Vsn", "Stop a plugin"}, - {"plugins restart Name-Vsn", "Stop then start a plugin"}, - {"plugins disable Name-Vsn", "Disable auto-boot"}, - {"plugins enable Name-Vsn [Position]", - "Enable auto-boot at Position in the boot list, where Position could be\n" - "'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n" - "The Position parameter can be used to adjust the boot order.\n" - "If no Position is given, an already configured plugin\n" - "will stay at is old position; a newly plugin is appended to the rear\n" - "e.g. plugins disable foo-0.1.0 front\n" - " plugins enable bar-0.2.0 before foo-0.1.0"} - ]). + [ + {"plugins [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"}, + {"plugins list", "List all installed plugins"}, + {"plugins describe Name-Vsn", "Describe an installed plugins"}, + {"plugins install Name-Vsn", + "Install a plugin package placed\n" + "in plugin'sinstall_dir"}, + {"plugins uninstall Name-Vsn", + "Uninstall a plugin. NOTE: it deletes\n" + "all files in install_dir/Name-Vsn"}, + {"plugins start Name-Vsn", "Start a plugin"}, + {"plugins stop Name-Vsn", "Stop a plugin"}, + {"plugins restart Name-Vsn", "Stop then start a plugin"}, + {"plugins disable Name-Vsn", "Disable auto-boot"}, + {"plugins enable Name-Vsn [Position]", + "Enable auto-boot at Position in the boot list, where Position could be\n" + "'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n" + "The Position parameter can be used to adjust the boot order.\n" + "If no Position is given, an already configured plugin\n" + "will stay at is old position; a newly plugin is appended to the rear\n" + "e.g. plugins disable foo-0.1.0 front\n" + " plugins enable bar-0.2.0 before foo-0.1.0"} + ] + ). %%-------------------------------------------------------------------- %% @doc vm command vm([]) -> vm(["all"]); - vm(["all"]) -> [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]]; - vm(["load"]) -> [emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()]; - vm(["memory"]) -> [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; - vm(["process"]) -> - [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) - || {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; - + [ + emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + || {Name, Key} <- [{limit, process_limit}, {count, process_count}] + ]; vm(["io"]) -> IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))), - [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) - || Key <- [max_fds, active_fds]]; - + [ + emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) + || Key <- [max_fds, active_fds] + ]; vm(["ports"]) -> - [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) - || {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; - + [ + emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) + || {Name, Key} <- [{count, port_count}, {limit, port_limit}] + ]; vm(_) -> - emqx_ctl:usage([{"vm all", "Show info of Erlang VM"}, - {"vm load", "Show load of Erlang VM"}, - {"vm memory", "Show memory of Erlang VM"}, - {"vm process", "Show process of Erlang VM"}, - {"vm io", "Show IO of Erlang VM"}, - {"vm ports", "Show Ports of Erlang VM"}]). + emqx_ctl:usage([ + {"vm all", "Show info of Erlang VM"}, + {"vm load", "Show load of Erlang VM"}, + {"vm memory", "Show memory of Erlang VM"}, + {"vm process", "Show process of Erlang VM"}, + {"vm io", "Show IO of Erlang VM"}, + {"vm ports", "Show Ports of Erlang VM"} + ]). %%-------------------------------------------------------------------- %% @doc mnesia Command mnesia([]) -> mnesia:system_info(); - mnesia(_) -> emqx_ctl:usage([{"mnesia", "Mnesia system info"}]). @@ -325,40 +336,40 @@ log(["set-level", Level]) -> ok -> emqx_ctl:print("~ts~n", [Level]); Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error]) end; - log(["primary-level"]) -> Level = emqx_logger:get_primary_log_level(), emqx_ctl:print("~ts~n", [Level]); - log(["primary-level", Level]) -> _ = emqx_logger:set_primary_log_level(list_to_atom(Level)), emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]); - log(["handlers", "list"]) -> - _ = [emqx_ctl:print( + _ = [ + emqx_ctl:print( "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n", [Id, Level, Dst, Status] - ) - || #{id := Id, - level := Level, - dst := Dst, - status := Status} <- emqx_logger:get_log_handlers()], + ) + || #{ + id := Id, + level := Level, + dst := Dst, + status := Status + } <- emqx_logger:get_log_handlers() + ], ok; - log(["handlers", "start", HandlerId]) -> case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of - ok -> emqx_ctl:print("log handler ~ts started~n", [HandlerId]); + ok -> + emqx_ctl:print("log handler ~ts started~n", [HandlerId]); {error, Reason} -> emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [HandlerId, Reason]) end; - log(["handlers", "stop", HandlerId]) -> case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of - ok -> emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]); + ok -> + emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]); {error, Reason} -> emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [HandlerId, Reason]) end; - log(["handlers", "set-level", HandlerId, Level]) -> case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of ok -> @@ -367,57 +378,60 @@ log(["handlers", "set-level", HandlerId, Level]) -> {error, Error} -> emqx_ctl:print("[error] ~p~n", [Error]) end; - log(_) -> emqx_ctl:usage( - [{"log set-level ", "Set the overall log level"}, - {"log primary-level", "Show the primary log level now"}, - {"log primary-level ","Set the primary log level"}, - {"log handlers list", "Show log handlers"}, - {"log handlers start ", "Start a log handler"}, - {"log handlers stop ", "Stop a log handler"}, - {"log handlers set-level ", "Set log level of a log handler"}]). + [ + {"log set-level ", "Set the overall log level"}, + {"log primary-level", "Show the primary log level now"}, + {"log primary-level ", "Set the primary log level"}, + {"log handlers list", "Show log handlers"}, + {"log handlers start ", "Start a log handler"}, + {"log handlers stop ", "Stop a log handler"}, + {"log handlers set-level ", "Set log level of a log handler"} + ] + ). %%-------------------------------------------------------------------- %% @doc Trace Command trace(["list"]) -> - lists:foreach(fun(Trace) -> - #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, - emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) - end, emqx_trace_handler:running()); - + lists:foreach( + fun(Trace) -> + #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, + emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) + end, + emqx_trace_handler:running() + ); trace(["stop", Operation, Filter0]) -> case trace_type(Operation, Filter0) of {ok, Type, Filter} -> trace_off(Type, Filter); error -> trace([]) end; - trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); - trace(["start", Operation, Filter0, LogFile, Level]) -> case trace_type(Operation, Filter0) of {ok, Type, Filter} -> - trace_on(name(Filter0), Type, Filter, - list_to_existing_atom(Level), LogFile); - error -> trace([]) + trace_on( + name(Filter0), + Type, + Filter, + list_to_existing_atom(Level), + LogFile + ); + error -> + trace([]) end; - trace(_) -> - emqx_ctl:usage([{"trace list", "List all traces started on local node"}, - {"trace start client []", - "Traces for a client on local node"}, - {"trace stop client ", - "Stop tracing for a client on local node"}, - {"trace start topic [] ", - "Traces for a topic on local node"}, - {"trace stop topic ", - "Stop tracing for a topic on local node"}, + emqx_ctl:usage([ + {"trace list", "List all traces started on local node"}, + {"trace start client []", "Traces for a client on local node"}, + {"trace stop client ", "Stop tracing for a client on local node"}, + {"trace start topic [] ", "Traces for a topic on local node"}, + {"trace stop topic ", "Stop tracing for a topic on local node"}, {"trace start ip_address [] ", "Traces for a client ip on local node"}, - {"trace stop ip_addresss ", - "Stop tracing for a client ip on local node"} + {"trace stop ip_addresss ", "Stop tracing for a client ip on local node"} ]). trace_on(Name, Type, Filter, Level, LogFile) -> @@ -447,32 +461,37 @@ traces(["list"]) -> [] -> emqx_ctl:print("Cluster Trace is empty~n", []); _ -> - lists:foreach(fun(Trace) -> - #{type := Type, name := Name, status := Status, - log_size := LogSize} = Trace, - emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n", - [Name, Type, maps:get(Type, Trace), Status, LogSize]) - end, List) + lists:foreach( + fun(Trace) -> + #{ + type := Type, + name := Name, + status := Status, + log_size := LogSize + } = Trace, + emqx_ctl:print( + "Trace(~s: ~s=~s, ~s, LogSize:~p)~n", + [Name, Type, maps:get(Type, Trace), Status, LogSize] + ) + end, + List + ) end, length(List); - traces(["stop", Name]) -> trace_cluster_off(Name); - traces(["delete", Name]) -> trace_cluster_del(Name); - traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, "900"]); - traces(["start", Name, Operation, Filter0, DurationS]) -> case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); + {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); error -> traces([]) end; - traces(_) -> - emqx_ctl:usage([{"traces list", "List all cluster traces started"}, + emqx_ctl:usage([ + {"traces list", "List all cluster traces started"}, {"traces start client ", "Traces for a client in cluster"}, {"traces start topic ", "Traces for a topic in cluster"}, {"traces start ip_address ", "Traces for a IP in cluster"}, @@ -483,18 +502,21 @@ traces(_) -> trace_cluster_on(Name, Type, Filter, DurationS0) -> DurationS = list_to_integer(DurationS0), Now = erlang:system_time(second), - Trace = #{ name => list_to_binary(Name) - , type => atom_to_binary(Type) - , Type => list_to_binary(Filter) - , start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)) - , end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) - }, + Trace = #{ + name => list_to_binary(Name), + type => atom_to_binary(Type), + Type => list_to_binary(Filter), + start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)), + end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) + }, case emqx_trace:create(Trace) of {ok, _} -> emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]); {error, Error} -> - emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n", - [Name, Type, Filter, Error]) + emqx_ctl:print( + "[error] cluster_trace ~s ~s=~s ~p~n", + [Name, Type, Filter, Error] + ) end. trace_cluster_del(Name) -> @@ -518,29 +540,34 @@ trace_type(_, _) -> error. %% @doc Listeners Command listeners([]) -> - lists:foreach(fun({ID, _Name, Conf}) -> - {Host, Port} = maps:get(bind, Conf), - Acceptors = maps:get(acceptors, Conf), - ProxyProtocol = maps:get(proxy_protocol, Conf, undefined), - Running = maps:get(running, Conf), - CurrentConns = case emqx_listeners:current_conns(ID, {Host, Port}) of - {error, _} -> []; - CC -> [{current_conn, CC}] - end, - MaxConn = case emqx_listeners:max_conns(ID, {Host, Port}) of - {error, _} -> []; - MC -> [{max_conns, MC}] - end, - Info = [ - {listen_on, {string, format_listen_on(Port)}}, - {acceptors, Acceptors}, - {proxy_protocol, ProxyProtocol}, - {running, Running} + lists:foreach( + fun({ID, Conf}) -> + {Host, Port} = maps:get(bind, Conf), + Acceptors = maps:get(acceptors, Conf), + ProxyProtocol = maps:get(proxy_protocol, Conf, undefined), + Running = maps:get(running, Conf), + CurrentConns = + case emqx_listeners:current_conns(ID, {Host, Port}) of + {error, _} -> []; + CC -> [{current_conn, CC}] + end, + MaxConn = + case emqx_listeners:max_conns(ID, {Host, Port}) of + {error, _} -> []; + MC -> [{max_conns, MC}] + end, + Info = + [ + {listen_on, {string, format_listen_on(Port)}}, + {acceptors, Acceptors}, + {proxy_protocol, ProxyProtocol}, + {running, Running} ] ++ CurrentConns ++ MaxConn, - emqx_ctl:print("~ts~n", [ID]), - lists:foreach(fun indent_print/1, Info) - end, emqx_listeners:list()); - + emqx_ctl:print("~ts~n", [ID]), + lists:foreach(fun indent_print/1, Info) + end, + emqx_listeners:list() + ); listeners(["stop", ListenerId]) -> case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of ok -> @@ -548,7 +575,6 @@ listeners(["stop", ListenerId]) -> {error, Error} -> emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error]) end; - listeners(["start", ListenerId]) -> case emqx_listeners:start_listener(list_to_atom(ListenerId)) of ok -> @@ -556,7 +582,6 @@ listeners(["start", ListenerId]) -> {error, Error} -> emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error]) end; - listeners(["restart", ListenerId]) -> case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of ok -> @@ -564,13 +589,13 @@ listeners(["restart", ListenerId]) -> {error, Error} -> emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error]) end; - listeners(_) -> - emqx_ctl:usage([{"listeners", "List listeners"}, - {"listeners stop ", "Stop a listener"}, - {"listeners start ", "Start a listener"}, - {"listeners restart ", "Restart a listener"} - ]). + emqx_ctl:usage([ + {"listeners", "List listeners"}, + {"listeners stop ", "Stop a listener"}, + {"listeners start ", "Start a listener"}, + {"listeners restart ", "Restart a listener"} + ]). %%-------------------------------------------------------------------- %% @doc authz Command @@ -582,7 +607,6 @@ authz(["cache-clean", "node", Node]) -> {error, Reason} -> emqx_ctl:print("Authorization drain failed on node ~ts: ~0p.~n", [Node, Reason]) end; - authz(["cache-clean", "all"]) -> case emqx_mgmt:clean_authz_cache_all() of ok -> @@ -590,22 +614,22 @@ authz(["cache-clean", "all"]) -> {error, Reason} -> emqx_ctl:print("Authorization cache-clean failed: ~p.~n", [Reason]) end; - authz(["cache-clean", ClientId]) -> emqx_mgmt:clean_authz_cache(ClientId); - authz(_) -> emqx_ctl:usage( - [{"authz cache-clean all", "Clears authorization cache on all nodes"}, - {"authz cache-clean node ", "Clears authorization cache on given node"}, - {"authz cache-clean ", "Clears authorization cache for given client"} - ]). - + [ + {"authz cache-clean all", "Clears authorization cache on all nodes"}, + {"authz cache-clean node ", "Clears authorization cache on given node"}, + {"authz cache-clean ", "Clears authorization cache for given client"} + ] + ). %%-------------------------------------------------------------------- %% @doc OLP (Overload Protection related) olp(["status"]) -> - S = case emqx_olp:is_overloaded() of + S = + case emqx_olp:is_overloaded() of true -> "overloaded"; false -> "not overloaded" end, @@ -617,10 +641,11 @@ olp(["enable"]) -> Res = emqx_olp:enable(), emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]); olp(_) -> - emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"}, - {"olp enable", "Enable overload protection"}, - {"olp disable", "Disable overload protection"} - ]). + emqx_ctl:usage([ + {"olp status", "Return OLP status if system is overloaded"}, + {"olp enable", "Enable overload protection"}, + {"olp disable", "Disable overload protection"} + ]). %%-------------------------------------------------------------------- %% Dump ETS @@ -634,78 +659,114 @@ dump(Table, Tag) -> dump(_Table, _, '$end_of_table', Result) -> lists:reverse(Result); - dump(Table, Tag, Key, Result) -> PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)], dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]). print({_, []}) -> ok; - print({client, {ClientId, ChanPid}}) -> - Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of - undefined -> #{}; - Attrs0 -> Attrs0 - end, - Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of - undefined -> #{}; - Stats0 -> maps:from_list(Stats0) - end, + Attrs = + case emqx_cm:get_chan_info(ClientId, ChanPid) of + undefined -> #{}; + Attrs0 -> Attrs0 + end, + Stats = + case emqx_cm:get_chan_stats(ClientId, ChanPid) of + undefined -> #{}; + Stats0 -> maps:from_list(Stats0) + end, ClientInfo = maps:get(clientinfo, Attrs, #{}), ConnInfo = maps:get(conninfo, Attrs, #{}), Session = maps:get(session, Attrs, #{}), - Connected = case maps:get(conn_state, Attrs) of - connected -> true; - _ -> false - end, - Info = lists:foldl(fun(Items, Acc) -> - maps:merge(Items, Acc) - end, #{connected => Connected}, - [maps:with([subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, - mqueue_len, mqueue_dropped, send_msg], Stats), - maps:with([clientid, username], ClientInfo), - maps:with([peername, clean_start, keepalive, expiry_interval, - connected_at, disconnected_at], ConnInfo), - maps:with([created_at], Session)]), - InfoKeys = [clientid, username, peername, clean_start, keepalive, - expiry_interval, subscriptions_cnt, inflight_cnt, - awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, - connected, created_at, connected_at] ++ - case maps:is_key(disconnected_at, Info) of - true -> [disconnected_at]; - false -> [] - end, + Connected = + case maps:get(conn_state, Attrs) of + connected -> true; + _ -> false + end, + Info = lists:foldl( + fun(Items, Acc) -> + maps:merge(Items, Acc) + end, + #{connected => Connected}, + [ + maps:with( + [ + subscriptions_cnt, + inflight_cnt, + awaiting_rel_cnt, + mqueue_len, + mqueue_dropped, + send_msg + ], + Stats + ), + maps:with([clientid, username], ClientInfo), + maps:with( + [ + peername, + clean_start, + keepalive, + expiry_interval, + connected_at, + disconnected_at + ], + ConnInfo + ), + maps:with([created_at], Session) + ] + ), + InfoKeys = + [ + clientid, + username, + peername, + clean_start, + keepalive, + expiry_interval, + subscriptions_cnt, + inflight_cnt, + awaiting_rel_cnt, + send_msg, + mqueue_len, + mqueue_dropped, + connected, + created_at, + connected_at + ] ++ + case maps:is_key(disconnected_at, Info) of + true -> [disconnected_at]; + false -> [] + end, Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000}, emqx_ctl:print( "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, " "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, " "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, " - "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" - ++ case maps:is_key(disconnected_at, Info1) of - true -> ", disconnected_at=~w)~n"; - false -> ")~n" - end, - [format(K, maps:get(K, Info1)) || K <- InfoKeys]); - + "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" ++ + case maps:is_key(disconnected_at, Info1) of + true -> ", disconnected_at=~w)~n"; + false -> ")~n" + end, + [format(K, maps:get(K, Info1)) || K <- InfoKeys] + ); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); print({emqx_route, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); - print(#plugin{name = Name, descr = Descr, active = Active}) -> - emqx_ctl:print("Plugin(~ts, description=~ts, active=~ts)~n", - [Name, Descr, Active]); - + emqx_ctl:print( + "Plugin(~ts, description=~ts, active=~ts)~n", + [Name, Descr, Active] + ); print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) -> emqx_ctl:print("~ts -> ~ts~n", [maps:get(subid, Options), Topic]). format(_, undefined) -> undefined; - format(peername, {IPAddr, Port}) -> IPStr = emqx_mgmt_util:ntoa(IPAddr), io_lib:format("~ts:~p", [IPStr, Port]); - format(_, Val) -> Val. diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 65568135b..a146f48ef 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -50,15 +50,25 @@ t_crud_listeners_by_id(_) -> %% create ?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), - [#{<<"listeners">> := [Create]}] = request(put, NewPath, [], TcpListener#{ + NewConf = TcpListener#{ <<"id">> => NewListenerId, <<"bind">> => <<"0.0.0.0:2883">> - }), + }, + [#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf), ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), [#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []), ?assertMatch(Create, Get1), ?assert(is_running(NewListenerId)), + %% bad create(same port) + BadId = <<"tcp:bad">>, + BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]), + BadConf = TcpListener#{ + <<"id">> => BadId, + <<"bind">> => <<"0.0.0.0:2883">> + }, + ?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)), + %% update #{<<"acceptors">> := Acceptors} = Create, Acceptors1 = Acceptors + 10, @@ -144,51 +154,8 @@ delete(Url) -> {ok, Res} = emqx_mgmt_api_test_util:request_api(delete, Url, AuthHeader), Res. -get_api(Path) -> - {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path), - LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()), - case emqx_json:decode(ListenersData, [return_maps]) of - [#{<<"node">> := _, <<"listeners">> := [Listener]}] -> - ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), - Filter = - fun(Local) -> - maps:get(id, Local) =:= ID - end, - LocalListener = hd(lists:filter(Filter, LocalListeners)), - comparison_listener(LocalListener, Listener); - [#{<<"node">> := _, <<"listeners">> := Listeners}] when is_list(Listeners) -> - ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)), - Fun = - fun(LocalListener) -> - ID = maps:get(id, LocalListener), - IDBinary = atom_to_binary(ID, utf8), - Filter = - fun(Listener) -> - maps:get(<<"id">>, Listener) =:= IDBinary - end, - Listener = hd(lists:filter(Filter, Listeners)), - comparison_listener(LocalListener, Listener) - end, - lists:foreach(Fun, LocalListeners); - Listener when is_map(Listener) -> - ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), - Filter = - fun(Local) -> - maps:get(id, Local) =:= ID - end, - LocalListener = hd(lists:filter(Filter, LocalListeners)), - comparison_listener(LocalListener, Listener) - end. - -comparison_listener(Local, Response) -> - ?assertEqual(maps:get(id, Local), binary_to_atom(maps:get(<<"id">>, Response))), - ?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))), - ?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)), - ?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)). - - listener_stats(Listener, ExpectedStats) -> ?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)). is_running(Id) -> - emqx_listeners:is_running(Id). + emqx_listeners:is_running(binary_to_atom(Id)). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 76fb10f65..9952686c5 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -92,7 +92,8 @@ do_request_api(Method, Request)-> {ok, {{"HTTP/1.1", Code, _}, _, Return} } when Code >= 200 andalso Code =< 299 -> {ok, Return}; - {ok, {Reason, _, _}} -> + {ok, {Reason, _, _} = Error} -> + ct:pal("error: ~p~n", [Error]), {error, Reason} end.