Merge pull request #7542 from zhongwencool/api-listeners
feat: refactor api_listeners api
This commit is contained in:
commit
2375eee006
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([
|
-export([
|
||||||
|
list_raw/0,
|
||||||
list/0,
|
list/0,
|
||||||
start/0,
|
start/0,
|
||||||
restart/0,
|
restart/0,
|
||||||
|
@ -57,50 +58,70 @@
|
||||||
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
|
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
|
||||||
|
|
||||||
-spec id_example() -> atom().
|
-spec id_example() -> atom().
|
||||||
id_example() ->
|
id_example() -> 'tcp:default'.
|
||||||
id_example(list()).
|
|
||||||
|
|
||||||
id_example([]) ->
|
|
||||||
{ID, _} = hd(list()),
|
|
||||||
ID;
|
|
||||||
id_example([{'tcp:default', _} | _]) ->
|
|
||||||
'tcp:default';
|
|
||||||
id_example([_ | Listeners]) ->
|
|
||||||
id_example(Listeners).
|
|
||||||
|
|
||||||
%% @doc List configured listeners.
|
%% @doc List configured listeners.
|
||||||
-spec list() -> [{ListenerId :: atom(), ListenerConf :: map()}].
|
-spec list_raw() -> [{ListenerId :: atom(), Type :: binary(), ListenerConf :: map()}].
|
||||||
|
list_raw() ->
|
||||||
|
[{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()].
|
||||||
|
|
||||||
list() ->
|
list() ->
|
||||||
[{listener_id(Type, LName), LConf} || {Type, LName, LConf} <- do_list()].
|
|
||||||
|
|
||||||
do_list() ->
|
|
||||||
Listeners = maps:to_list(emqx:get_config([listeners], #{})),
|
Listeners = maps:to_list(emqx:get_config([listeners], #{})),
|
||||||
lists:append([list(Type, maps:to_list(Conf)) || {Type, Conf} <- Listeners]).
|
lists:flatmap(fun format_list/1, Listeners).
|
||||||
|
|
||||||
list(Type, Conf) ->
|
format_list(Listener) ->
|
||||||
|
{Type, Conf} = Listener,
|
||||||
[
|
[
|
||||||
begin
|
begin
|
||||||
Running = is_running(Type, listener_id(Type, LName), LConf),
|
Running = is_running(Type, listener_id(Type, LName), LConf),
|
||||||
{Type, LName, maps:put(running, Running, LConf)}
|
{listener_id(Type, LName), maps:put(running, Running, LConf)}
|
||||||
end
|
end
|
||||||
|| {LName, LConf} <- Conf, is_map(LConf)
|
|| {LName, LConf} <- maps:to_list(Conf), is_map(LConf)
|
||||||
].
|
].
|
||||||
|
|
||||||
-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
|
do_list_raw() ->
|
||||||
|
Key = <<"listeners">>,
|
||||||
|
Raw = emqx_config:get_raw([Key], #{}),
|
||||||
|
SchemaMod = emqx_config:get_schema_mod(Key),
|
||||||
|
#{Key := RawWithDefault} = emqx_config:fill_defaults(SchemaMod, #{Key => Raw}),
|
||||||
|
Listeners = maps:to_list(RawWithDefault),
|
||||||
|
lists:flatmap(fun format_raw_listeners/1, Listeners).
|
||||||
|
|
||||||
|
format_raw_listeners({Type, Conf}) ->
|
||||||
|
lists:map(
|
||||||
|
fun({LName, LConf0}) when is_map(LConf0) ->
|
||||||
|
Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0),
|
||||||
|
LConf1 = maps:remove(<<"authentication">>, LConf0),
|
||||||
|
LConf2 = maps:put(<<"running">>, Running, LConf1),
|
||||||
|
{Type, LName, LConf2}
|
||||||
|
end,
|
||||||
|
maps:to_list(Conf)
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
|
||||||
is_running(ListenerId) ->
|
is_running(ListenerId) ->
|
||||||
case
|
case
|
||||||
lists:filtermap(
|
[
|
||||||
fun({_Type, Id, #{running := IsRunning}}) ->
|
Running
|
||||||
Id =:= ListenerId andalso {true, IsRunning}
|
|| {Id, #{running := Running}} <- list(),
|
||||||
end,
|
Id =:= ListenerId
|
||||||
do_list()
|
]
|
||||||
)
|
|
||||||
of
|
of
|
||||||
[IsRunning] -> IsRunning;
|
[] -> {error, not_found};
|
||||||
[] -> {error, not_found}
|
[IsRunning] -> IsRunning
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_running(Type, ListenerId, #{bind := ListenOn}) when Type =:= tcp; Type =:= ssl ->
|
is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
|
||||||
|
ListenOn =
|
||||||
|
case Conf of
|
||||||
|
#{bind := Bind} ->
|
||||||
|
Bind;
|
||||||
|
#{<<"bind">> := Bind} ->
|
||||||
|
case emqx_schema:to_ip_port(binary_to_list(Bind)) of
|
||||||
|
{ok, L} -> L;
|
||||||
|
{error, _} -> binary_to_integer(Bind)
|
||||||
|
end
|
||||||
|
end,
|
||||||
try esockd:listener({ListenerId, ListenOn}) of
|
try esockd:listener({ListenerId, ListenOn}) of
|
||||||
Pid when is_pid(Pid) ->
|
Pid when is_pid(Pid) ->
|
||||||
true
|
true
|
||||||
|
@ -118,10 +139,10 @@ is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss ->
|
||||||
end;
|
end;
|
||||||
is_running(quic, _ListenerId, _Conf) ->
|
is_running(quic, _ListenerId, _Conf) ->
|
||||||
%% TODO: quic support
|
%% TODO: quic support
|
||||||
{error, no_found}.
|
false.
|
||||||
|
|
||||||
current_conns(ID, ListenOn) ->
|
current_conns(ID, ListenOn) ->
|
||||||
{Type, Name} = parse_listener_id(ID),
|
{ok, #{type := Type, name := Name}} = parse_listener_id(ID),
|
||||||
current_conns(Type, Name, ListenOn).
|
current_conns(Type, Name, ListenOn).
|
||||||
|
|
||||||
current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
|
current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
|
||||||
|
@ -132,7 +153,7 @@ current_conns(_, _, _) ->
|
||||||
{error, not_support}.
|
{error, not_support}.
|
||||||
|
|
||||||
max_conns(ID, ListenOn) ->
|
max_conns(ID, ListenOn) ->
|
||||||
{Type, Name} = parse_listener_id(ID),
|
{ok, #{type := Type, name := Name}} = parse_listener_id(ID),
|
||||||
max_conns(Type, Name, ListenOn).
|
max_conns(Type, Name, ListenOn).
|
||||||
|
|
||||||
max_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
|
max_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
|
||||||
|
@ -164,20 +185,24 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
||||||
console_print(
|
console_print(
|
||||||
"Listener ~ts is NOT started due to: ~p~n.",
|
"Listener ~ts is NOT started due to: ~p~n.",
|
||||||
[listener_id(Type, ListenerName), Reason]
|
[listener_id(Type, ListenerName), Reason]
|
||||||
);
|
),
|
||||||
|
ok;
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
console_print(
|
console_print(
|
||||||
"Listener ~ts on ~ts started.~n",
|
"Listener ~ts on ~ts started.~n",
|
||||||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
[listener_id(Type, ListenerName), format_addr(Bind)]
|
||||||
);
|
),
|
||||||
|
ok;
|
||||||
{error, {already_started, Pid}} ->
|
{error, {already_started, Pid}} ->
|
||||||
{error, {already_started, Pid}};
|
{error, {already_started, Pid}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
ListenerId = listener_id(Type, ListenerName),
|
||||||
|
BindStr = format_addr(Bind),
|
||||||
?ELOG(
|
?ELOG(
|
||||||
"Failed to start listener ~ts on ~ts: ~0p~n",
|
"Failed to start listener ~ts on ~ts: ~0p~n",
|
||||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]
|
[ListenerId, BindStr, Reason]
|
||||||
),
|
),
|
||||||
error(Reason)
|
error({failed_to_start, ListenerId, BindStr, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Restart all listeners
|
%% @doc Restart all listeners
|
||||||
|
@ -316,15 +341,26 @@ delete_authentication(Type, ListenerName, _Conf) ->
|
||||||
post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
|
post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
|
||||||
#{added := Added, removed := Removed, changed := Updated} =
|
#{added := Added, removed := Removed, changed := Updated} =
|
||||||
diff_listeners(NewListeners, OldListeners),
|
diff_listeners(NewListeners, OldListeners),
|
||||||
|
try
|
||||||
perform_listener_changes(fun stop_listener/3, Removed),
|
perform_listener_changes(fun stop_listener/3, Removed),
|
||||||
perform_listener_changes(fun delete_authentication/3, Removed),
|
perform_listener_changes(fun delete_authentication/3, Removed),
|
||||||
perform_listener_changes(fun start_listener/3, Added),
|
perform_listener_changes(fun start_listener/3, Added),
|
||||||
perform_listener_changes(fun restart_listener/3, Updated).
|
perform_listener_changes(fun restart_listener/3, Updated)
|
||||||
|
catch
|
||||||
|
error:{failed_to_start, ListenerId, Bind, Reason} ->
|
||||||
|
Error = lists:flatten(
|
||||||
|
io_lib:format(
|
||||||
|
"~ts(~ts) failed with ~ts",
|
||||||
|
[ListenerId, Bind, element(1, Reason)]
|
||||||
|
)
|
||||||
|
),
|
||||||
|
{error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
perform_listener_changes(Action, MapConfs) ->
|
perform_listener_changes(Action, MapConfs) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({Id, Conf}) ->
|
fun({Id, Conf}) ->
|
||||||
{Type, Name} = parse_listener_id(Id),
|
{ok, #{type := Type, name := Name}} = parse_listener_id(Id),
|
||||||
Action(Type, Name, Conf)
|
Action(Type, Name, Conf)
|
||||||
end,
|
end,
|
||||||
maps:to_list(MapConfs)
|
maps:to_list(MapConfs)
|
||||||
|
@ -447,7 +483,7 @@ parse_listener_id(Id) ->
|
||||||
case string:split(str(Id), ":", leading) of
|
case string:split(str(Id), ":", leading) of
|
||||||
[Type, Name] ->
|
[Type, Name] ->
|
||||||
case lists:member(Type, ?TYPES_STRING) of
|
case lists:member(Type, ?TYPES_STRING) of
|
||||||
true -> {list_to_existing_atom(Type), list_to_atom(Name)};
|
true -> {ok, #{type => list_to_existing_atom(Type), name => list_to_atom(Name)}};
|
||||||
false -> {error, {invalid_listener_id, Id}}
|
false -> {error, {invalid_listener_id, Id}}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -480,25 +516,27 @@ tcp_opts(Opts) ->
|
||||||
|
|
||||||
foreach_listeners(Do) ->
|
foreach_listeners(Do) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({Type, LName, LConf}) ->
|
fun({Id, LConf}) ->
|
||||||
Do(Type, LName, LConf)
|
{ok, #{type := Type, name := Name}} = parse_listener_id(Id),
|
||||||
|
Do(Type, Name, LConf)
|
||||||
end,
|
end,
|
||||||
do_list()
|
list()
|
||||||
).
|
).
|
||||||
|
|
||||||
has_enabled_listener_conf_by_type(Type) ->
|
has_enabled_listener_conf_by_type(Type) ->
|
||||||
lists:any(
|
lists:any(
|
||||||
fun({Type0, _LName, LConf}) when is_map(LConf) ->
|
fun({Id, LConf}) when is_map(LConf) ->
|
||||||
|
{ok, #{type := Type0}} = parse_listener_id(Id),
|
||||||
Type =:= Type0 andalso maps:get(enabled, LConf, true)
|
Type =:= Type0 andalso maps:get(enabled, LConf, true)
|
||||||
end,
|
end,
|
||||||
do_list()
|
list()
|
||||||
).
|
).
|
||||||
|
|
||||||
apply_on_listener(ListenerId, Do) ->
|
apply_on_listener(ListenerId, Do) ->
|
||||||
{Type, ListenerName} = parse_listener_id(ListenerId),
|
{ok, #{type := Type, name := Name}} = parse_listener_id(ListenerId),
|
||||||
case emqx_config:find_listener_conf(Type, ListenerName, []) of
|
case emqx_config:find_listener_conf(Type, Name, []) of
|
||||||
{not_found, _, _} -> error({listener_config_not_found, Type, ListenerName});
|
{not_found, _, _} -> error({listener_config_not_found, Type, Name});
|
||||||
{ok, Conf} -> Do(Type, ListenerName, Conf)
|
{ok, Conf} -> Do(Type, Name, Conf)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
str(A) when is_atom(A) ->
|
str(A) when is_atom(A) ->
|
||||||
|
|
|
@ -83,21 +83,6 @@
|
||||||
, do_unsubscribe/2
|
, do_unsubscribe/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Listeners
|
|
||||||
-export([ do_list_listeners/0
|
|
||||||
, list_listeners/0
|
|
||||||
, list_listeners/1
|
|
||||||
, list_listeners_by_id/1
|
|
||||||
, get_listener/2
|
|
||||||
, manage_listener/2
|
|
||||||
, do_update_listener/2
|
|
||||||
, update_listener/2
|
|
||||||
, update_listener/3
|
|
||||||
, do_remove_listener/1
|
|
||||||
, remove_listener/1
|
|
||||||
, remove_listener/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Alarms
|
%% Alarms
|
||||||
-export([ get_alarms/1
|
-export([ get_alarms/1
|
||||||
, get_alarms/2
|
, get_alarms/2
|
||||||
|
@ -434,80 +419,6 @@ do_unsubscribe(ClientId, Topic) ->
|
||||||
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
|
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Listeners
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
do_list_listeners() ->
|
|
||||||
[Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()].
|
|
||||||
|
|
||||||
list_listeners() ->
|
|
||||||
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
|
|
||||||
|
|
||||||
list_listeners(Node) ->
|
|
||||||
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
|
|
||||||
|
|
||||||
list_listeners_by_id(Id) ->
|
|
||||||
listener_id_filter(Id, list_listeners()).
|
|
||||||
|
|
||||||
get_listener(Node, Id) ->
|
|
||||||
case listener_id_filter(Id, list_listeners(Node)) of
|
|
||||||
[] ->
|
|
||||||
{error, not_found};
|
|
||||||
[Listener] ->
|
|
||||||
Listener
|
|
||||||
end.
|
|
||||||
|
|
||||||
listener_id_filter(Id, Listeners) ->
|
|
||||||
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
|
||||||
lists:filter(Filter, Listeners).
|
|
||||||
|
|
||||||
-spec manage_listener( start_listener | stop_listener | restart_listener
|
|
||||||
, #{id := atom(), node := node()}
|
|
||||||
) -> ok | {error, Reason :: term()}.
|
|
||||||
manage_listener(start_listener, #{id := ID, node := Node}) ->
|
|
||||||
wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID));
|
|
||||||
manage_listener(stop_listener, #{id := ID, node := Node}) ->
|
|
||||||
wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID));
|
|
||||||
manage_listener(restart_listener, #{id := ID, node := Node}) ->
|
|
||||||
wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)).
|
|
||||||
|
|
||||||
-spec do_update_listener(string(), emqx_config:update_request()) ->
|
|
||||||
map() | {error, _}.
|
|
||||||
do_update_listener(Id, Config) ->
|
|
||||||
case emqx_listeners:parse_listener_id(Id) of
|
|
||||||
{error, {invalid_listener_id, Id}} ->
|
|
||||||
{error, {invalid_listener_id, Id}};
|
|
||||||
{Type, Name} ->
|
|
||||||
case emqx:update_config([listeners, Type, Name], Config, #{}) of
|
|
||||||
{ok, #{raw_config := RawConf}} ->
|
|
||||||
RawConf#{node => node(), id => Id, running => true};
|
|
||||||
{error, Reason} ->
|
|
||||||
{error, Reason}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
update_listener(Id, Config) ->
|
|
||||||
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
|
|
||||||
|
|
||||||
update_listener(Node, Id, Config) ->
|
|
||||||
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
|
|
||||||
|
|
||||||
remove_listener(Id) ->
|
|
||||||
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
|
|
||||||
|
|
||||||
-spec do_remove_listener(string()) -> ok.
|
|
||||||
do_remove_listener(Id) ->
|
|
||||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
|
||||||
case emqx:remove_config([listeners, Type, Name], #{}) of
|
|
||||||
{ok, _} -> ok;
|
|
||||||
{error, Reason} ->
|
|
||||||
error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
remove_listener(Node, Id) ->
|
|
||||||
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Get Alarms
|
%% Get Alarms
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -18,372 +18,461 @@
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
-export([api_spec/0]).
|
-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
|
||||||
|
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
|
||||||
|
|
||||||
-export([ list_listeners/2
|
-export([
|
||||||
, crud_listeners_by_id/2
|
list_listeners/2,
|
||||||
, list_listeners_on_node/2
|
crud_listeners_by_id/2,
|
||||||
, crud_listener_by_id_on_node/2
|
list_listeners_on_node/2,
|
||||||
, manage_listeners/2
|
crud_listener_by_id_on_node/2,
|
||||||
, jsonable_resp/2
|
action_listeners_by_id/2,
|
||||||
|
action_listeners_by_id_on_node/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([format/1]).
|
%% for rpc call
|
||||||
|
-export([
|
||||||
|
do_list_listeners/0,
|
||||||
|
do_update_listener/2,
|
||||||
|
do_remove_listener/1
|
||||||
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
-define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>).
|
-define(NODE_LISTENER_NOT_FOUND, <<"Node name or listener id not found">>).
|
||||||
-define(NODE_NOT_FOUND_OR_DOWN, <<"Node not found or Down">>).
|
-define(NODE_NOT_FOUND_OR_DOWN, <<"Node not found or Down">>).
|
||||||
-define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
|
-define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
|
||||||
|
-define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
|
||||||
-define(ADDR_PORT_INUSE, <<"Addr port in use">>).
|
-define(ADDR_PORT_INUSE, <<"Addr port in use">>).
|
||||||
-define(CONFIG_SCHEMA_ERROR, <<"Config schema error">>).
|
|
||||||
-define(INVALID_LISTENER_PROTOCOL, <<"Invalid listener type">>).
|
-define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
|
||||||
-define(UPDATE_CONFIG_FAILED, <<"Update configuration failed">>).
|
|
||||||
-define(OPERATION_FAILED, <<"Operation failed">>).
|
namespace() -> "listeners".
|
||||||
|
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
{
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||||
|
|
||||||
|
paths() ->
|
||||||
[
|
[
|
||||||
api_list_listeners(),
|
"/listeners",
|
||||||
api_list_update_listeners_by_id(),
|
"/listeners/:id",
|
||||||
api_manage_listeners(),
|
"/listeners/:id/:action",
|
||||||
api_list_listeners_on_node(),
|
"/nodes/:node/listeners",
|
||||||
api_get_update_listener_by_id_on_node(),
|
"/nodes/:node/listeners/:id",
|
||||||
api_manage_listeners_on_node()
|
"/nodes/:node/listeners/:id/:action"
|
||||||
],
|
].
|
||||||
[]
|
|
||||||
}.
|
|
||||||
|
|
||||||
-define(TYPES_ATOM, [tcp, ssl, ws, wss, quic]).
|
schema("/listeners") ->
|
||||||
req_schema() ->
|
#{
|
||||||
Schema = [emqx_mgmt_api_configs:gen_schema(
|
'operationId' => list_listeners,
|
||||||
emqx:get_raw_config([listeners, T, default], #{}))
|
|
||||||
|| T <- ?TYPES_ATOM],
|
|
||||||
#{'oneOf' => Schema}.
|
|
||||||
|
|
||||||
resp_schema() ->
|
|
||||||
#{'oneOf' := Schema} = req_schema(),
|
|
||||||
AddMetadata = fun(Prop) ->
|
|
||||||
Prop#{running => #{type => boolean},
|
|
||||||
id => #{type => string},
|
|
||||||
node => #{type => string}}
|
|
||||||
end,
|
|
||||||
Schema1 = [S#{properties => AddMetadata(Prop)}
|
|
||||||
|| S = #{properties := Prop} <- Schema],
|
|
||||||
#{'oneOf' => Schema1}.
|
|
||||||
|
|
||||||
api_list_listeners() ->
|
|
||||||
Metadata = #{
|
|
||||||
get => #{
|
get => #{
|
||||||
description => <<"List listeners from all nodes in the cluster">>,
|
tags => [<<"listeners">>],
|
||||||
responses => #{
|
desc => <<"List all running node's listeners.">>,
|
||||||
<<"200">> =>
|
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))}
|
||||||
emqx_mgmt_util:array_schema(resp_schema(),
|
}
|
||||||
<<"List listeners successfully">>)}}},
|
};
|
||||||
{"/listeners", Metadata, list_listeners}.
|
schema("/listeners/:id") ->
|
||||||
|
#{
|
||||||
api_list_update_listeners_by_id() ->
|
'operationId' => crud_listeners_by_id,
|
||||||
Metadata = #{
|
|
||||||
get => #{
|
get => #{
|
||||||
description => <<"List listeners by a given Id from all nodes in the cluster">>,
|
tags => [<<"listeners">>],
|
||||||
parameters => [param_path_id()],
|
desc => <<"List all running node's listeners for the specified id.">>,
|
||||||
|
parameters => [?R_REF(listener_id)],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"404">> =>
|
200 => ?HOCON(?ARRAY(?R_REF(listeners)))
|
||||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
}
|
||||||
<<"200">> =>
|
|
||||||
emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}},
|
|
||||||
put => #{
|
|
||||||
description =>
|
|
||||||
<<"Create or update a listener by a given Id to all nodes in the cluster">>,
|
|
||||||
parameters => [param_path_id()],
|
|
||||||
'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
|
|
||||||
responses => #{
|
|
||||||
<<"400">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED,
|
|
||||||
['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
|
|
||||||
<<"404">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
|
||||||
<<"500">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
|
||||||
<<"200">> =>
|
|
||||||
emqx_mgmt_util:array_schema(resp_schema(),
|
|
||||||
<<"Create or update listener successfully">>)}},
|
|
||||||
delete => #{
|
|
||||||
description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
|
|
||||||
parameters => [param_path_id()],
|
|
||||||
responses => #{
|
|
||||||
<<"404">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
|
||||||
<<"204">> =>
|
|
||||||
emqx_mgmt_util:schema(<<"Delete listener successfully">>)}}
|
|
||||||
},
|
},
|
||||||
{"/listeners/:id", Metadata, crud_listeners_by_id}.
|
|
||||||
|
|
||||||
api_list_listeners_on_node() ->
|
|
||||||
Metadata = #{
|
|
||||||
get => #{
|
|
||||||
description => <<"List listeners in one node">>,
|
|
||||||
parameters => [param_path_node()],
|
|
||||||
responses => #{
|
|
||||||
<<"404">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?NODE_NOT_FOUND_OR_DOWN, ['RESOURCE_NOT_FOUND']),
|
|
||||||
<<"500">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
|
||||||
<<"200">> =>
|
|
||||||
emqx_mgmt_util:schema(resp_schema(), <<"List listeners successfully">>)}}},
|
|
||||||
{"/nodes/:node/listeners", Metadata, list_listeners_on_node}.
|
|
||||||
|
|
||||||
api_get_update_listener_by_id_on_node() ->
|
|
||||||
Metadata = #{
|
|
||||||
get => #{
|
|
||||||
description => <<"Get a listener by a given Id on a specific node">>,
|
|
||||||
parameters => [param_path_node(), param_path_id()],
|
|
||||||
responses => #{
|
|
||||||
<<"404">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
|
|
||||||
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
|
|
||||||
<<"200">> =>
|
|
||||||
emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}},
|
|
||||||
put => #{
|
put => #{
|
||||||
description => <<"Create or update a listener by a given Id on a specific node">>,
|
tags => [<<"listeners">>],
|
||||||
parameters => [param_path_node(), param_path_id()],
|
desc => <<"Create or update the specified listener on all nodes.">>,
|
||||||
'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
|
parameters => [?R_REF(listener_id)],
|
||||||
|
'requestBody' => ?HOCON(listener_schema(), #{}),
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"400">> =>
|
200 => ?HOCON(listener_schema(), #{}),
|
||||||
emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED,
|
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
|
||||||
['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
|
}
|
||||||
<<"404">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
|
|
||||||
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
|
|
||||||
<<"500">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
|
||||||
<<"200">> =>
|
|
||||||
emqx_mgmt_util:schema(resp_schema(), <<"Get listener successfully">>)}},
|
|
||||||
delete => #{
|
|
||||||
description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
|
|
||||||
parameters => [param_path_node(), param_path_id()],
|
|
||||||
responses => #{
|
|
||||||
<<"404">> =>
|
|
||||||
emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
|
|
||||||
<<"204">> =>
|
|
||||||
emqx_mgmt_util:schema(<<"Delete listener successfully">>)}}
|
|
||||||
},
|
},
|
||||||
{"/nodes/:node/listeners/:id", Metadata, crud_listener_by_id_on_node}.
|
delete => #{
|
||||||
|
tags => [<<"listeners">>],
|
||||||
api_manage_listeners() ->
|
desc => <<"Delete the specified listener on all nodes.">>,
|
||||||
Metadata = #{
|
parameters => [?R_REF(listener_id)],
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Listener deleted">>,
|
||||||
|
400 => error_codes(['BAD_REQUEST'])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/listeners/:id/:action") ->
|
||||||
|
#{
|
||||||
|
'operationId' => action_listeners_by_id,
|
||||||
post => #{
|
post => #{
|
||||||
description => <<"Restart listeners on all nodes in the cluster">>,
|
tags => [<<"listeners">>],
|
||||||
|
desc => <<"Start/stop/restart listeners on all nodes.">>,
|
||||||
parameters => [
|
parameters => [
|
||||||
param_path_id(),
|
?R_REF(listener_id),
|
||||||
param_path_operation()],
|
?R_REF(action)
|
||||||
|
],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
200 => <<"Updated">>,
|
||||||
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
|
400 => error_codes(['BAD_REQUEST'])
|
||||||
{"/listeners/:id/operation/:operation", Metadata, manage_listeners}.
|
}
|
||||||
|
}
|
||||||
api_manage_listeners_on_node() ->
|
};
|
||||||
Metadata = #{
|
schema("/nodes/:node/listeners") ->
|
||||||
|
#{
|
||||||
|
'operationId' => list_listeners_on_node,
|
||||||
|
get => #{
|
||||||
|
tags => [<<"listeners">>],
|
||||||
|
desc => <<"List all listeners on the specified node.">>,
|
||||||
|
parameters => [?R_REF(node)],
|
||||||
|
responses => #{
|
||||||
|
200 => ?HOCON(?ARRAY(listener_schema())),
|
||||||
|
400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/nodes/:node/listeners/:id") ->
|
||||||
|
#{
|
||||||
|
'operationId' => crud_listener_by_id_on_node,
|
||||||
|
get => #{
|
||||||
|
tags => [<<"listeners">>],
|
||||||
|
desc => <<"Get the specified listener on the specified node.">>,
|
||||||
|
parameters => [
|
||||||
|
?R_REF(listener_id),
|
||||||
|
?R_REF(node)
|
||||||
|
],
|
||||||
|
responses => #{
|
||||||
|
200 => ?HOCON(listener_schema()),
|
||||||
|
400 => error_codes(['BAD_REQUEST']),
|
||||||
|
404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND)
|
||||||
|
}
|
||||||
|
},
|
||||||
put => #{
|
put => #{
|
||||||
description => <<"Restart listeners on all nodes in the cluster">>,
|
tags => [<<"listeners">>],
|
||||||
|
desc => <<"Create or update the specified listener on the specified node.">>,
|
||||||
parameters => [
|
parameters => [
|
||||||
param_path_node(),
|
?R_REF(listener_id),
|
||||||
param_path_id(),
|
?R_REF(node)
|
||||||
param_path_operation()],
|
],
|
||||||
|
'requestBody' => ?HOCON(listener_schema()),
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"500">> => emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
|
200 => ?HOCON(listener_schema()),
|
||||||
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
|
400 => error_codes(['BAD_REQUEST'])
|
||||||
{"/nodes/:node/listeners/:id/operation/:operation", Metadata, manage_listeners}.
|
}
|
||||||
|
},
|
||||||
%%%==============================================================================================
|
delete => #{
|
||||||
%% parameters
|
tags => [<<"listeners">>],
|
||||||
param_path_node() ->
|
desc => <<"Delete the specified listener on the specified node.">>,
|
||||||
|
parameters => [
|
||||||
|
?R_REF(listener_id),
|
||||||
|
?R_REF(node)
|
||||||
|
],
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Listener deleted">>,
|
||||||
|
400 => error_codes(['BAD_REQUEST'])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/nodes/:node/listeners/:id/:action") ->
|
||||||
#{
|
#{
|
||||||
name => node,
|
'operationId' => action_listeners_by_id_on_node,
|
||||||
in => path,
|
post => #{
|
||||||
schema => #{type => string},
|
tags => [<<"listeners">>],
|
||||||
required => true,
|
desc => <<"Start/stop/restart listeners on a specified node.">>,
|
||||||
example => node()
|
parameters => [
|
||||||
|
?R_REF(node),
|
||||||
|
?R_REF(listener_id),
|
||||||
|
?R_REF(action)
|
||||||
|
],
|
||||||
|
responses => #{
|
||||||
|
200 => <<"Updated">>,
|
||||||
|
400 => error_codes(['BAD_REQUEST'])
|
||||||
|
}
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
param_path_id() ->
|
fields(listeners) ->
|
||||||
#{
|
[
|
||||||
name => id,
|
{"node",
|
||||||
in => path,
|
?HOCON(atom(), #{
|
||||||
schema => #{type => string, example => emqx_listeners:id_example()},
|
desc => "Node name",
|
||||||
|
example => "emqx@127.0.0.1",
|
||||||
required => true
|
required => true
|
||||||
}.
|
})},
|
||||||
|
{"listeners", ?ARRAY(listener_schema())}
|
||||||
|
];
|
||||||
|
fields(listener_id) ->
|
||||||
|
[
|
||||||
|
{id,
|
||||||
|
?HOCON(atom(), #{
|
||||||
|
desc => "Listener id",
|
||||||
|
example => 'tcp:default',
|
||||||
|
validator => fun validate_id/1,
|
||||||
|
in => path
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(action) ->
|
||||||
|
[
|
||||||
|
{action,
|
||||||
|
?HOCON(?ENUM([start, stop, restart]), #{
|
||||||
|
desc => "listener action",
|
||||||
|
example => start,
|
||||||
|
in => path
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(node) ->
|
||||||
|
[
|
||||||
|
{"node",
|
||||||
|
?HOCON(atom(), #{
|
||||||
|
desc => "Node name",
|
||||||
|
example => "emqx@127.0.0.1",
|
||||||
|
in => path
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(Type) ->
|
||||||
|
Listeners = listeners_info(),
|
||||||
|
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
||||||
|
Schema.
|
||||||
|
|
||||||
param_path_operation()->
|
listener_schema() ->
|
||||||
|
?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())).
|
||||||
|
|
||||||
|
listeners_info() ->
|
||||||
|
Listeners = hocon_schema:fields(emqx_schema, "listeners"),
|
||||||
|
lists:map(
|
||||||
|
fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
|
||||||
|
Fields0 = hocon_schema:fields(Mod, Field),
|
||||||
|
Fields1 = lists:keydelete("authentication", 1, Fields0),
|
||||||
|
TypeAtom = list_to_existing_atom(Type),
|
||||||
#{
|
#{
|
||||||
name => operation,
|
ref => ?R_REF(TypeAtom),
|
||||||
in => path,
|
schema => [
|
||||||
|
{type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})},
|
||||||
|
{running, ?HOCON(boolean(), #{desc => "Listener status", required => false})},
|
||||||
|
{id,
|
||||||
|
?HOCON(atom(), #{
|
||||||
|
desc => "Listener id",
|
||||||
required => true,
|
required => true,
|
||||||
schema => #{
|
validator => fun validate_id/1
|
||||||
type => string,
|
})}
|
||||||
enum => [start, stop, restart]},
|
| Fields1
|
||||||
example => restart
|
]
|
||||||
}.
|
}
|
||||||
|
end,
|
||||||
|
Listeners
|
||||||
|
).
|
||||||
|
|
||||||
|
validate_id(Id) ->
|
||||||
|
case emqx_listeners:parse_listener_id(Id) of
|
||||||
|
{error, Reason} -> {error, Reason};
|
||||||
|
{ok, _} -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%%==============================================================================================
|
|
||||||
%% api
|
%% api
|
||||||
list_listeners(get, _Request) ->
|
list_listeners(get, _Request) ->
|
||||||
{200, format(emqx_mgmt:list_listeners())}.
|
{200, list_listeners()}.
|
||||||
|
|
||||||
crud_listeners_by_id(get, #{bindings := #{id := Id}}) ->
|
crud_listeners_by_id(get, #{bindings := #{id := Id}}) ->
|
||||||
case [L || L = #{id := Id0} <- emqx_mgmt:list_listeners(),
|
{200, list_listeners_by_id(Id)};
|
||||||
atom_to_binary(Id0, latin1) =:= Id] of
|
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
|
||||||
[] ->
|
case parse_listener_conf(Body0) of
|
||||||
{400, #{code => 'RESOURCE_NOT_FOUND', message => ?LISTENER_NOT_FOUND}};
|
{Id, Type, Name, Conf} ->
|
||||||
Listeners ->
|
case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
|
||||||
{200, format(Listeners)}
|
{ok, #{raw_config := _RawConf}} ->
|
||||||
|
crud_listeners_by_id(get, #{bindings => #{id => Id}});
|
||||||
|
{error, Reason} ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||||
end;
|
end;
|
||||||
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) ->
|
{error, Reason} ->
|
||||||
Results = format(emqx_mgmt:update_listener(Id, Conf)),
|
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||||
case lists:filter(fun filter_errors/1, Results) of
|
_ ->
|
||||||
[{error, {invalid_listener_id, Id}} | _] ->
|
{400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}};
|
|
||||||
[{error, {emqx_conf_schema, _}} | _] ->
|
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}};
|
|
||||||
[{error, {eaddrinuse, _}} | _] ->
|
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
|
||||||
[{error, Reason} | _] ->
|
|
||||||
{500, #{code => 'UNKNOWN_ERROR', message => err_msg(Reason)}};
|
|
||||||
[] ->
|
|
||||||
{200, Results}
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
|
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
|
||||||
Results = emqx_mgmt:remove_listener(Id),
|
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||||
case lists:filter(fun filter_errors/1, Results) of
|
case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of
|
||||||
[] -> {204};
|
{ok, _} -> {204};
|
||||||
Errors -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Errors)}}
|
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
parse_listener_conf(Conf0) ->
|
||||||
|
Conf1 = maps:remove(<<"running">>, Conf0),
|
||||||
|
{IdBin, Conf2} = maps:take(<<"id">>, Conf1),
|
||||||
|
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
|
||||||
|
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
|
||||||
|
TypeAtom = binary_to_existing_atom(TypeBin),
|
||||||
|
case Type =:= TypeAtom of
|
||||||
|
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
|
||||||
|
false -> {error, listener_type_inconsistent}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
|
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
|
||||||
case emqx_mgmt:list_listeners(atom(Node)) of
|
case list_listeners(Node) of
|
||||||
{error, nodedown} ->
|
{error, nodedown} ->
|
||||||
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
{400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
|
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||||
Listener ->
|
#{<<"listeners">> := Listener} ->
|
||||||
{200, format(Listener)}
|
{200, Listener}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
|
crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
|
||||||
case emqx_mgmt:get_listener(atom(Node), atom(Id)) of
|
case get_listener(Node, Id) of
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_LISTENER_NOT_FOUND}};
|
{404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
|
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||||
Listener ->
|
Listener ->
|
||||||
{200, format(Listener)}
|
{200, Listener}
|
||||||
end;
|
end;
|
||||||
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Conf}) ->
|
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) ->
|
||||||
case emqx_mgmt:update_listener(atom(Node), Id, Conf) of
|
case parse_listener_conf(Body) of
|
||||||
|
{error, Reason} ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||||
|
{Id, Type, _Name, Conf} ->
|
||||||
|
case update_listener(Node, Id, Conf) of
|
||||||
{error, nodedown} ->
|
{error, nodedown} ->
|
||||||
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
{400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||||
{error, {invalid_listener_id, _}} ->
|
%% TODO
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}};
|
|
||||||
{error, {emqx_conf_schema, _}} ->
|
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}};
|
|
||||||
{error, {eaddrinuse, _}} ->
|
{error, {eaddrinuse, _}} ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}};
|
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||||
Listener ->
|
{ok, Listener} ->
|
||||||
{200, format(Listener)}
|
{200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}}
|
||||||
end;
|
end;
|
||||||
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
|
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
|
||||||
case emqx_mgmt:remove_listener(atom(Node), Id) of
|
case remove_listener(Node, Id) of
|
||||||
ok -> {204};
|
ok -> {204};
|
||||||
{error, Reason} -> {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) ->
|
action_listeners_by_id_on_node(post,
|
||||||
{_, Result} = do_manage_listeners(Node, Id, Oper),
|
#{bindings := #{id := Id, action := Action, node := Node}}) ->
|
||||||
Result;
|
{_, Result} = action_listeners(Node, Id, Action),
|
||||||
|
Result.
|
||||||
|
|
||||||
manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) ->
|
action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
|
||||||
Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()],
|
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
|
||||||
case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of
|
case
|
||||||
|
lists:filter(
|
||||||
|
fun
|
||||||
|
({_, {200}}) -> false;
|
||||||
|
(_) -> true
|
||||||
|
end,
|
||||||
|
Results
|
||||||
|
)
|
||||||
|
of
|
||||||
[] -> {200};
|
[] -> {200};
|
||||||
Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}
|
Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%%==============================================================================================
|
%%%==============================================================================================
|
||||||
%% util functions
|
|
||||||
|
|
||||||
do_manage_listeners(Node, Id, Oper) ->
|
action_listeners(Node, Id, Action) ->
|
||||||
Param = #{node => atom(Node), id => atom(Id)},
|
{Node, do_action_listeners(Action, Node, Id)}.
|
||||||
{Node, do_manage_listeners2(Oper, Param)}.
|
|
||||||
|
|
||||||
do_manage_listeners2(<<"start">>, Param) ->
|
do_action_listeners(start, Node, Id) ->
|
||||||
case emqx_mgmt:manage_listener(start_listener, Param) of
|
case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of
|
||||||
ok -> {200};
|
ok -> {200};
|
||||||
{error, {already_started, _}} -> {200};
|
{error, {already_started, _}} -> {200};
|
||||||
{error, Reason} ->
|
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
|
||||||
end;
|
end;
|
||||||
do_manage_listeners2(<<"stop">>, Param) ->
|
do_action_listeners(stop, Node, Id) ->
|
||||||
case emqx_mgmt:manage_listener(stop_listener, Param) of
|
case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of
|
||||||
ok -> {200};
|
ok -> {200};
|
||||||
{error, not_found} -> {200};
|
{error, not_found} -> {200};
|
||||||
{error, Reason} ->
|
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
|
||||||
end;
|
end;
|
||||||
do_manage_listeners2(<<"restart">>, Param) ->
|
do_action_listeners(restart, Node, Id) ->
|
||||||
case emqx_mgmt:manage_listener(restart_listener, Param) of
|
case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of
|
||||||
ok -> {200};
|
ok -> {200};
|
||||||
{error, not_found} -> do_manage_listeners2(<<"start">>, Param);
|
{error, not_found} -> do_action_listeners(start, Node, Id);
|
||||||
{error, Reason} ->
|
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||||
{500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
manage_listeners_err(Errors) ->
|
action_listeners_err(Errors) ->
|
||||||
list_to_binary(lists:foldl(fun({Node, Err}, Str) ->
|
list_to_binary(
|
||||||
|
lists:foldl(
|
||||||
|
fun({Node, Err}, Str) ->
|
||||||
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
|
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
|
||||||
end, "", Errors)).
|
end,
|
||||||
|
"",
|
||||||
|
Errors
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
format(Listeners) when is_list(Listeners) ->
|
err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom);
|
||||||
[format(Listener) || Listener <- Listeners];
|
err_msg(Reason) -> list_to_binary(err_msg_str(Reason)).
|
||||||
|
|
||||||
format({error, Reason}) ->
|
|
||||||
{error, Reason};
|
|
||||||
|
|
||||||
format(#{node := _Node, id := _Id} = Conf) when is_map(Conf) ->
|
|
||||||
emqx_map_lib:jsonable_map(Conf#{
|
|
||||||
running => trans_running(Conf)
|
|
||||||
}, fun ?MODULE:jsonable_resp/2).
|
|
||||||
|
|
||||||
trans_running(Conf) ->
|
|
||||||
case maps:get(running, Conf) of
|
|
||||||
{error, _} ->
|
|
||||||
false;
|
|
||||||
Running ->
|
|
||||||
Running
|
|
||||||
end.
|
|
||||||
|
|
||||||
filter_errors({error, _}) ->
|
|
||||||
true;
|
|
||||||
filter_errors(_) ->
|
|
||||||
false.
|
|
||||||
|
|
||||||
jsonable_resp(bind, Port) when is_integer(Port) ->
|
|
||||||
{bind, Port};
|
|
||||||
jsonable_resp(bind, {Addr, Port}) when is_tuple(Addr); is_integer(Port)->
|
|
||||||
{bind, inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port)};
|
|
||||||
jsonable_resp(user_lookup_fun, _) ->
|
|
||||||
drop;
|
|
||||||
jsonable_resp(K, V) ->
|
|
||||||
{K, V}.
|
|
||||||
|
|
||||||
atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
|
|
||||||
atom(S) when is_list(S) -> list_to_atom(S);
|
|
||||||
atom(A) when is_atom(A) -> A.
|
|
||||||
|
|
||||||
err_msg(Reason) ->
|
|
||||||
list_to_binary(err_msg_str(Reason)).
|
|
||||||
|
|
||||||
err_msg_str(Reason) ->
|
err_msg_str(Reason) ->
|
||||||
io_lib:format("~p", [Reason]).
|
io_lib:format("~p", [Reason]).
|
||||||
|
|
||||||
|
list_listeners() ->
|
||||||
|
[list_listeners(Node) || Node <- mria_mnesia:running_nodes()].
|
||||||
|
|
||||||
|
list_listeners(Node) ->
|
||||||
|
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
|
||||||
|
|
||||||
|
list_listeners_by_id(Id) ->
|
||||||
|
listener_id_filter(Id, list_listeners()).
|
||||||
|
|
||||||
|
get_listener(Node, Id) ->
|
||||||
|
case listener_id_filter(Id, [list_listeners(Node)]) of
|
||||||
|
[#{<<"listeners">> := []}] -> {error, not_found};
|
||||||
|
[#{<<"listeners">> := [Listener]}] -> Listener
|
||||||
|
end.
|
||||||
|
|
||||||
|
listener_id_filter(Id, Listeners) ->
|
||||||
|
lists:map(
|
||||||
|
fun(Conf = #{<<"listeners">> := Listeners0}) ->
|
||||||
|
Conf#{
|
||||||
|
<<"listeners">> =>
|
||||||
|
[C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
Listeners
|
||||||
|
).
|
||||||
|
|
||||||
|
update_listener(Node, Id, Config) ->
|
||||||
|
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
|
||||||
|
|
||||||
|
remove_listener(Node, Id) ->
|
||||||
|
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
|
||||||
|
|
||||||
|
-spec do_list_listeners() -> map().
|
||||||
|
do_list_listeners() ->
|
||||||
|
Listeners = [
|
||||||
|
Conf#{<<"id">> => Id, <<"type">> => Type}
|
||||||
|
|| {Id, Type, Conf} <- emqx_listeners:list_raw()
|
||||||
|
],
|
||||||
|
#{
|
||||||
|
<<"node">> => node(),
|
||||||
|
<<"listeners">> => Listeners
|
||||||
|
}.
|
||||||
|
|
||||||
|
-spec do_update_listener(string(), emqx_config:update_request()) ->
|
||||||
|
{ok, map()} | {error, _}.
|
||||||
|
do_update_listener(Id, Config) ->
|
||||||
|
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||||
|
case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of
|
||||||
|
{ok, #{raw_config := RawConf}} -> {ok, RawConf};
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec do_remove_listener(string()) -> ok.
|
||||||
|
do_remove_listener(Id) ->
|
||||||
|
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||||
|
case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of
|
||||||
|
{ok, _} -> ok;
|
||||||
|
{error, Reason} -> error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
wrap_rpc({badrpc, Reason}) ->
|
||||||
|
{error, Reason};
|
||||||
|
wrap_rpc(Res) ->
|
||||||
|
Res.
|
||||||
|
|
|
@ -26,36 +26,39 @@
|
||||||
|
|
||||||
-export([load/0]).
|
-export([load/0]).
|
||||||
|
|
||||||
-export([ status/1
|
-export([
|
||||||
, broker/1
|
status/1,
|
||||||
, cluster/1
|
broker/1,
|
||||||
, clients/1
|
cluster/1,
|
||||||
, routes/1
|
clients/1,
|
||||||
, subscriptions/1
|
routes/1,
|
||||||
, plugins/1
|
subscriptions/1,
|
||||||
, listeners/1
|
plugins/1,
|
||||||
, vm/1
|
listeners/1,
|
||||||
, mnesia/1
|
vm/1,
|
||||||
, trace/1
|
mnesia/1,
|
||||||
, traces/1
|
trace/1,
|
||||||
, log/1
|
traces/1,
|
||||||
, authz/1
|
log/1,
|
||||||
, olp/1
|
authz/1,
|
||||||
|
olp/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(PROC_INFOKEYS, [status,
|
-define(PROC_INFOKEYS, [
|
||||||
|
status,
|
||||||
memory,
|
memory,
|
||||||
message_queue_len,
|
message_queue_len,
|
||||||
total_heap_size,
|
total_heap_size,
|
||||||
heap_size,
|
heap_size,
|
||||||
stack_size,
|
stack_size,
|
||||||
reductions]).
|
reductions
|
||||||
|
]).
|
||||||
|
|
||||||
-define(MAX_LIMIT, 10000).
|
-define(MAX_LIMIT, 10000).
|
||||||
|
|
||||||
-define(APP, emqx).
|
-define(APP, emqx).
|
||||||
|
|
||||||
-spec(load() -> ok).
|
-spec load() -> ok.
|
||||||
load() ->
|
load() ->
|
||||||
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
||||||
lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
|
lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
|
||||||
|
@ -79,19 +82,22 @@ broker([]) ->
|
||||||
Funs = [sysdescr, version, datetime],
|
Funs = [sysdescr, version, datetime],
|
||||||
[emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs],
|
[emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs],
|
||||||
emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]);
|
emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]);
|
||||||
|
|
||||||
broker(["stats"]) ->
|
broker(["stats"]) ->
|
||||||
[emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
|
[
|
||||||
|| {Stat, Val} <- lists:sort(emqx_stats:getstats())];
|
emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
|
||||||
|
|| {Stat, Val} <- lists:sort(emqx_stats:getstats())
|
||||||
|
];
|
||||||
broker(["metrics"]) ->
|
broker(["metrics"]) ->
|
||||||
[emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
|
[
|
||||||
|| {Metric, Val} <- lists:sort(emqx_metrics:all())];
|
emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
|
||||||
|
|| {Metric, Val} <- lists:sort(emqx_metrics:all())
|
||||||
|
];
|
||||||
broker(_) ->
|
broker(_) ->
|
||||||
emqx_ctl:usage([{"broker", "Show broker version, uptime and description"},
|
emqx_ctl:usage([
|
||||||
|
{"broker", "Show broker version, uptime and description"},
|
||||||
{"broker stats", "Show broker statistics of clients, topics, subscribers"},
|
{"broker stats", "Show broker statistics of clients, topics, subscribers"},
|
||||||
{"broker metrics", "Show broker metrics"}]).
|
{"broker metrics", "Show broker metrics"}
|
||||||
|
]).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
%% @doc Cluster with other nodes
|
%% @doc Cluster with other nodes
|
||||||
|
@ -106,7 +112,6 @@ cluster(["join", SNode]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
|
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
cluster(["leave"]) ->
|
cluster(["leave"]) ->
|
||||||
case ekka:leave() of
|
case ekka:leave() of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -115,7 +120,6 @@ cluster(["leave"]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
|
emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
cluster(["force-leave", SNode]) ->
|
cluster(["force-leave", SNode]) ->
|
||||||
case ekka:force_leave(ekka_node:parse_name(SNode)) of
|
case ekka:force_leave(ekka_node:parse_name(SNode)) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -126,33 +130,32 @@ cluster(["force-leave", SNode]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
|
emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
cluster(["status"]) ->
|
cluster(["status"]) ->
|
||||||
emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
|
emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
|
||||||
|
|
||||||
cluster(_) ->
|
cluster(_) ->
|
||||||
emqx_ctl:usage([{"cluster join <Node>", "Join the cluster"},
|
emqx_ctl:usage([
|
||||||
|
{"cluster join <Node>", "Join the cluster"},
|
||||||
{"cluster leave", "Leave the cluster"},
|
{"cluster leave", "Leave the cluster"},
|
||||||
{"cluster force-leave <Node>", "Force the node leave from cluster"},
|
{"cluster force-leave <Node>", "Force the node leave from cluster"},
|
||||||
{"cluster status", "Cluster status"}]).
|
{"cluster status", "Cluster status"}
|
||||||
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc Query clients
|
%% @doc Query clients
|
||||||
|
|
||||||
clients(["list"]) ->
|
clients(["list"]) ->
|
||||||
dump(emqx_channel, client);
|
dump(emqx_channel, client);
|
||||||
|
|
||||||
clients(["show", ClientId]) ->
|
clients(["show", ClientId]) ->
|
||||||
if_client(ClientId, fun print/1);
|
if_client(ClientId, fun print/1);
|
||||||
|
|
||||||
clients(["kick", ClientId]) ->
|
clients(["kick", ClientId]) ->
|
||||||
ok = emqx_cm:kick_session(bin(ClientId)),
|
ok = emqx_cm:kick_session(bin(ClientId)),
|
||||||
emqx_ctl:print("ok~n");
|
emqx_ctl:print("ok~n");
|
||||||
|
|
||||||
clients(_) ->
|
clients(_) ->
|
||||||
emqx_ctl:usage([{"clients list", "List all clients"},
|
emqx_ctl:usage([
|
||||||
|
{"clients list", "List all clients"},
|
||||||
{"clients show <ClientId>", "Show a client"},
|
{"clients show <ClientId>", "Show a client"},
|
||||||
{"clients kick <ClientId>", "Kick out a client"}]).
|
{"clients kick <ClientId>", "Kick out a client"}
|
||||||
|
]).
|
||||||
|
|
||||||
if_client(ClientId, Fun) ->
|
if_client(ClientId, Fun) ->
|
||||||
case ets:lookup(emqx_channel, (bin(ClientId))) of
|
case ets:lookup(emqx_channel, (bin(ClientId))) of
|
||||||
|
@ -165,20 +168,22 @@ if_client(ClientId, Fun) ->
|
||||||
|
|
||||||
routes(["list"]) ->
|
routes(["list"]) ->
|
||||||
dump(emqx_route);
|
dump(emqx_route);
|
||||||
|
|
||||||
routes(["show", Topic]) ->
|
routes(["show", Topic]) ->
|
||||||
Routes = ets:lookup(emqx_route, bin(Topic)),
|
Routes = ets:lookup(emqx_route, bin(Topic)),
|
||||||
[print({emqx_route, Route}) || Route <- Routes];
|
[print({emqx_route, Route}) || Route <- Routes];
|
||||||
|
|
||||||
routes(_) ->
|
routes(_) ->
|
||||||
emqx_ctl:usage([{"routes list", "List all routes"},
|
emqx_ctl:usage([
|
||||||
{"routes show <Topic>", "Show a route"}]).
|
{"routes list", "List all routes"},
|
||||||
|
{"routes show <Topic>", "Show a route"}
|
||||||
|
]).
|
||||||
|
|
||||||
subscriptions(["list"]) ->
|
subscriptions(["list"]) ->
|
||||||
lists:foreach(fun(Suboption) ->
|
lists:foreach(
|
||||||
|
fun(Suboption) ->
|
||||||
print({emqx_suboption, Suboption})
|
print({emqx_suboption, Suboption})
|
||||||
end, ets:tab2list(emqx_suboption));
|
end,
|
||||||
|
ets:tab2list(emqx_suboption)
|
||||||
|
);
|
||||||
subscriptions(["show", ClientId]) ->
|
subscriptions(["show", ClientId]) ->
|
||||||
case ets:lookup(emqx_subid, bin(ClientId)) of
|
case ets:lookup(emqx_subid, bin(ClientId)) of
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -186,42 +191,44 @@ subscriptions(["show", ClientId]) ->
|
||||||
[{_, Pid}] ->
|
[{_, Pid}] ->
|
||||||
case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of
|
case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of
|
||||||
[] -> emqx_ctl:print("Not Found.~n");
|
[] -> emqx_ctl:print("Not Found.~n");
|
||||||
Suboption ->
|
Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption]
|
||||||
[print({emqx_suboption, Sub}) || Sub <- Suboption]
|
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
subscriptions(["add", ClientId, Topic, QoS]) ->
|
subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||||
if_valid_qos(QoS, fun(IntQos) ->
|
if_valid_qos(QoS, fun(IntQos) ->
|
||||||
case ets:lookup(emqx_channel, bin(ClientId)) of
|
case ets:lookup(emqx_channel, bin(ClientId)) of
|
||||||
[] -> emqx_ctl:print("Error: Channel not found!");
|
[] ->
|
||||||
|
emqx_ctl:print("Error: Channel not found!");
|
||||||
[{_, Pid}] ->
|
[{_, Pid}] ->
|
||||||
{Topic1, Options} = emqx_topic:parse(bin(Topic)),
|
{Topic1, Options} = emqx_topic:parse(bin(Topic)),
|
||||||
Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]},
|
Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]},
|
||||||
emqx_ctl:print("ok~n")
|
emqx_ctl:print("ok~n")
|
||||||
end
|
end
|
||||||
end);
|
end);
|
||||||
|
|
||||||
subscriptions(["del", ClientId, Topic]) ->
|
subscriptions(["del", ClientId, Topic]) ->
|
||||||
case ets:lookup(emqx_channel, bin(ClientId)) of
|
case ets:lookup(emqx_channel, bin(ClientId)) of
|
||||||
[] -> emqx_ctl:print("Error: Channel not found!");
|
[] ->
|
||||||
|
emqx_ctl:print("Error: Channel not found!");
|
||||||
[{_, Pid}] ->
|
[{_, Pid}] ->
|
||||||
Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]},
|
Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]},
|
||||||
emqx_ctl:print("ok~n")
|
emqx_ctl:print("ok~n")
|
||||||
end;
|
end;
|
||||||
|
|
||||||
subscriptions(_) ->
|
subscriptions(_) ->
|
||||||
emqx_ctl:usage(
|
emqx_ctl:usage(
|
||||||
[{"subscriptions list", "List all subscriptions"},
|
[
|
||||||
|
{"subscriptions list", "List all subscriptions"},
|
||||||
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
||||||
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
||||||
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
|
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
||||||
if_valid_qos(QoS, Fun) ->
|
if_valid_qos(QoS, Fun) ->
|
||||||
try list_to_integer(QoS) of
|
try list_to_integer(QoS) of
|
||||||
Int when ?IS_QOS(Int) -> Fun(Int);
|
Int when ?IS_QOS(Int) -> Fun(Int);
|
||||||
_ -> emqx_ctl:print("QoS should be 0, 1, 2~n")
|
_ -> emqx_ctl:print("QoS should be 0, 1, 2~n")
|
||||||
catch _:_ ->
|
catch
|
||||||
|
_:_ ->
|
||||||
emqx_ctl:print("QoS should be 0, 1, 2~n")
|
emqx_ctl:print("QoS should be 0, 1, 2~n")
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -251,12 +258,15 @@ plugins(["enable", NameVsn, "before", Other]) ->
|
||||||
emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2);
|
emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2);
|
||||||
plugins(_) ->
|
plugins(_) ->
|
||||||
emqx_ctl:usage(
|
emqx_ctl:usage(
|
||||||
[{"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
|
[
|
||||||
|
{"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
|
||||||
{"plugins list", "List all installed plugins"},
|
{"plugins list", "List all installed plugins"},
|
||||||
{"plugins describe Name-Vsn", "Describe an installed plugins"},
|
{"plugins describe Name-Vsn", "Describe an installed plugins"},
|
||||||
{"plugins install Name-Vsn", "Install a plugin package placed\n"
|
{"plugins install Name-Vsn",
|
||||||
|
"Install a plugin package placed\n"
|
||||||
"in plugin'sinstall_dir"},
|
"in plugin'sinstall_dir"},
|
||||||
{"plugins uninstall Name-Vsn", "Uninstall a plugin. NOTE: it deletes\n"
|
{"plugins uninstall Name-Vsn",
|
||||||
|
"Uninstall a plugin. NOTE: it deletes\n"
|
||||||
"all files in install_dir/Name-Vsn"},
|
"all files in install_dir/Name-Vsn"},
|
||||||
{"plugins start Name-Vsn", "Start a plugin"},
|
{"plugins start Name-Vsn", "Start a plugin"},
|
||||||
{"plugins stop Name-Vsn", "Stop a plugin"},
|
{"plugins stop Name-Vsn", "Stop a plugin"},
|
||||||
|
@ -270,50 +280,51 @@ plugins(_) ->
|
||||||
"will stay at is old position; a newly plugin is appended to the rear\n"
|
"will stay at is old position; a newly plugin is appended to the rear\n"
|
||||||
"e.g. plugins disable foo-0.1.0 front\n"
|
"e.g. plugins disable foo-0.1.0 front\n"
|
||||||
" plugins enable bar-0.2.0 before foo-0.1.0"}
|
" plugins enable bar-0.2.0 before foo-0.1.0"}
|
||||||
]).
|
]
|
||||||
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc vm command
|
%% @doc vm command
|
||||||
|
|
||||||
vm([]) ->
|
vm([]) ->
|
||||||
vm(["all"]);
|
vm(["all"]);
|
||||||
|
|
||||||
vm(["all"]) ->
|
vm(["all"]) ->
|
||||||
[vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
|
[vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
|
||||||
|
|
||||||
vm(["load"]) ->
|
vm(["load"]) ->
|
||||||
[emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()];
|
[emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()];
|
||||||
|
|
||||||
vm(["memory"]) ->
|
vm(["memory"]) ->
|
||||||
[emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
|
[emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
|
||||||
|
|
||||||
vm(["process"]) ->
|
vm(["process"]) ->
|
||||||
[emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
|
[
|
||||||
|| {Name, Key} <- [{limit, process_limit}, {count, process_count}]];
|
emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
|
||||||
|
|| {Name, Key} <- [{limit, process_limit}, {count, process_count}]
|
||||||
|
];
|
||||||
vm(["io"]) ->
|
vm(["io"]) ->
|
||||||
IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
|
IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
|
||||||
[emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
|
[
|
||||||
|| Key <- [max_fds, active_fds]];
|
emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
|
||||||
|
|| Key <- [max_fds, active_fds]
|
||||||
|
];
|
||||||
vm(["ports"]) ->
|
vm(["ports"]) ->
|
||||||
[emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
|
[
|
||||||
|| {Name, Key} <- [{count, port_count}, {limit, port_limit}]];
|
emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
|
||||||
|
|| {Name, Key} <- [{count, port_count}, {limit, port_limit}]
|
||||||
|
];
|
||||||
vm(_) ->
|
vm(_) ->
|
||||||
emqx_ctl:usage([{"vm all", "Show info of Erlang VM"},
|
emqx_ctl:usage([
|
||||||
|
{"vm all", "Show info of Erlang VM"},
|
||||||
{"vm load", "Show load of Erlang VM"},
|
{"vm load", "Show load of Erlang VM"},
|
||||||
{"vm memory", "Show memory of Erlang VM"},
|
{"vm memory", "Show memory of Erlang VM"},
|
||||||
{"vm process", "Show process of Erlang VM"},
|
{"vm process", "Show process of Erlang VM"},
|
||||||
{"vm io", "Show IO of Erlang VM"},
|
{"vm io", "Show IO of Erlang VM"},
|
||||||
{"vm ports", "Show Ports of Erlang VM"}]).
|
{"vm ports", "Show Ports of Erlang VM"}
|
||||||
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc mnesia Command
|
%% @doc mnesia Command
|
||||||
|
|
||||||
mnesia([]) ->
|
mnesia([]) ->
|
||||||
mnesia:system_info();
|
mnesia:system_info();
|
||||||
|
|
||||||
mnesia(_) ->
|
mnesia(_) ->
|
||||||
emqx_ctl:usage([{"mnesia", "Mnesia system info"}]).
|
emqx_ctl:usage([{"mnesia", "Mnesia system info"}]).
|
||||||
|
|
||||||
|
@ -325,40 +336,40 @@ log(["set-level", Level]) ->
|
||||||
ok -> emqx_ctl:print("~ts~n", [Level]);
|
ok -> emqx_ctl:print("~ts~n", [Level]);
|
||||||
Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error])
|
Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
log(["primary-level"]) ->
|
log(["primary-level"]) ->
|
||||||
Level = emqx_logger:get_primary_log_level(),
|
Level = emqx_logger:get_primary_log_level(),
|
||||||
emqx_ctl:print("~ts~n", [Level]);
|
emqx_ctl:print("~ts~n", [Level]);
|
||||||
|
|
||||||
log(["primary-level", Level]) ->
|
log(["primary-level", Level]) ->
|
||||||
_ = emqx_logger:set_primary_log_level(list_to_atom(Level)),
|
_ = emqx_logger:set_primary_log_level(list_to_atom(Level)),
|
||||||
emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
|
emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
|
||||||
|
|
||||||
log(["handlers", "list"]) ->
|
log(["handlers", "list"]) ->
|
||||||
_ = [emqx_ctl:print(
|
_ = [
|
||||||
|
emqx_ctl:print(
|
||||||
"LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
|
"LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
|
||||||
[Id, Level, Dst, Status]
|
[Id, Level, Dst, Status]
|
||||||
)
|
)
|
||||||
|| #{id := Id,
|
|| #{
|
||||||
|
id := Id,
|
||||||
level := Level,
|
level := Level,
|
||||||
dst := Dst,
|
dst := Dst,
|
||||||
status := Status} <- emqx_logger:get_log_handlers()],
|
status := Status
|
||||||
|
} <- emqx_logger:get_log_handlers()
|
||||||
|
],
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
log(["handlers", "start", HandlerId]) ->
|
log(["handlers", "start", HandlerId]) ->
|
||||||
case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of
|
case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of
|
||||||
ok -> emqx_ctl:print("log handler ~ts started~n", [HandlerId]);
|
ok ->
|
||||||
|
emqx_ctl:print("log handler ~ts started~n", [HandlerId]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [HandlerId, Reason])
|
emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [HandlerId, Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
log(["handlers", "stop", HandlerId]) ->
|
log(["handlers", "stop", HandlerId]) ->
|
||||||
case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of
|
case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of
|
||||||
ok -> emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]);
|
ok ->
|
||||||
|
emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [HandlerId, Reason])
|
emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [HandlerId, Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
log(["handlers", "set-level", HandlerId, Level]) ->
|
log(["handlers", "set-level", HandlerId, Level]) ->
|
||||||
case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of
|
case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -367,57 +378,60 @@ log(["handlers", "set-level", HandlerId, Level]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("[error] ~p~n", [Error])
|
emqx_ctl:print("[error] ~p~n", [Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
log(_) ->
|
log(_) ->
|
||||||
emqx_ctl:usage(
|
emqx_ctl:usage(
|
||||||
[{"log set-level <Level>", "Set the overall log level"},
|
[
|
||||||
|
{"log set-level <Level>", "Set the overall log level"},
|
||||||
{"log primary-level", "Show the primary log level now"},
|
{"log primary-level", "Show the primary log level now"},
|
||||||
{"log primary-level <Level>", "Set the primary log level"},
|
{"log primary-level <Level>", "Set the primary log level"},
|
||||||
{"log handlers list", "Show log handlers"},
|
{"log handlers list", "Show log handlers"},
|
||||||
{"log handlers start <HandlerId>", "Start a log handler"},
|
{"log handlers start <HandlerId>", "Start a log handler"},
|
||||||
{"log handlers stop <HandlerId>", "Stop a log handler"},
|
{"log handlers stop <HandlerId>", "Stop a log handler"},
|
||||||
{"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}]).
|
{"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc Trace Command
|
%% @doc Trace Command
|
||||||
|
|
||||||
trace(["list"]) ->
|
trace(["list"]) ->
|
||||||
lists:foreach(fun(Trace) ->
|
lists:foreach(
|
||||||
|
fun(Trace) ->
|
||||||
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
||||||
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
|
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
|
||||||
end, emqx_trace_handler:running());
|
end,
|
||||||
|
emqx_trace_handler:running()
|
||||||
|
);
|
||||||
trace(["stop", Operation, Filter0]) ->
|
trace(["stop", Operation, Filter0]) ->
|
||||||
case trace_type(Operation, Filter0) of
|
case trace_type(Operation, Filter0) of
|
||||||
{ok, Type, Filter} -> trace_off(Type, Filter);
|
{ok, Type, Filter} -> trace_off(Type, Filter);
|
||||||
error -> trace([])
|
error -> trace([])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
trace(["start", Operation, ClientId, LogFile]) ->
|
trace(["start", Operation, ClientId, LogFile]) ->
|
||||||
trace(["start", Operation, ClientId, LogFile, "all"]);
|
trace(["start", Operation, ClientId, LogFile, "all"]);
|
||||||
|
|
||||||
trace(["start", Operation, Filter0, LogFile, Level]) ->
|
trace(["start", Operation, Filter0, LogFile, Level]) ->
|
||||||
case trace_type(Operation, Filter0) of
|
case trace_type(Operation, Filter0) of
|
||||||
{ok, Type, Filter} ->
|
{ok, Type, Filter} ->
|
||||||
trace_on(name(Filter0), Type, Filter,
|
trace_on(
|
||||||
list_to_existing_atom(Level), LogFile);
|
name(Filter0),
|
||||||
error -> trace([])
|
Type,
|
||||||
|
Filter,
|
||||||
|
list_to_existing_atom(Level),
|
||||||
|
LogFile
|
||||||
|
);
|
||||||
|
error ->
|
||||||
|
trace([])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
trace(_) ->
|
trace(_) ->
|
||||||
emqx_ctl:usage([{"trace list", "List all traces started on local node"},
|
emqx_ctl:usage([
|
||||||
{"trace start client <ClientId> <File> [<Level>]",
|
{"trace list", "List all traces started on local node"},
|
||||||
"Traces for a client on local node"},
|
{"trace start client <ClientId> <File> [<Level>]", "Traces for a client on local node"},
|
||||||
{"trace stop client <ClientId>",
|
{"trace stop client <ClientId>", "Stop tracing for a client on local node"},
|
||||||
"Stop tracing for a client on local node"},
|
{"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic on local node"},
|
||||||
{"trace start topic <Topic> <File> [<Level>] ",
|
{"trace stop topic <Topic> ", "Stop tracing for a topic on local node"},
|
||||||
"Traces for a topic on local node"},
|
|
||||||
{"trace stop topic <Topic> ",
|
|
||||||
"Stop tracing for a topic on local node"},
|
|
||||||
{"trace start ip_address <IP> <File> [<Level>] ",
|
{"trace start ip_address <IP> <File> [<Level>] ",
|
||||||
"Traces for a client ip on local node"},
|
"Traces for a client ip on local node"},
|
||||||
{"trace stop ip_addresss <IP> ",
|
{"trace stop ip_addresss <IP> ", "Stop tracing for a client ip on local node"}
|
||||||
"Stop tracing for a client ip on local node"}
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
trace_on(Name, Type, Filter, Level, LogFile) ->
|
trace_on(Name, Type, Filter, Level, LogFile) ->
|
||||||
|
@ -447,32 +461,37 @@ traces(["list"]) ->
|
||||||
[] ->
|
[] ->
|
||||||
emqx_ctl:print("Cluster Trace is empty~n", []);
|
emqx_ctl:print("Cluster Trace is empty~n", []);
|
||||||
_ ->
|
_ ->
|
||||||
lists:foreach(fun(Trace) ->
|
lists:foreach(
|
||||||
#{type := Type, name := Name, status := Status,
|
fun(Trace) ->
|
||||||
log_size := LogSize} = Trace,
|
#{
|
||||||
emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n",
|
type := Type,
|
||||||
[Name, Type, maps:get(Type, Trace), Status, LogSize])
|
name := Name,
|
||||||
end, List)
|
status := Status,
|
||||||
|
log_size := LogSize
|
||||||
|
} = Trace,
|
||||||
|
emqx_ctl:print(
|
||||||
|
"Trace(~s: ~s=~s, ~s, LogSize:~p)~n",
|
||||||
|
[Name, Type, maps:get(Type, Trace), Status, LogSize]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
List
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
length(List);
|
length(List);
|
||||||
|
|
||||||
traces(["stop", Name]) ->
|
traces(["stop", Name]) ->
|
||||||
trace_cluster_off(Name);
|
trace_cluster_off(Name);
|
||||||
|
|
||||||
traces(["delete", Name]) ->
|
traces(["delete", Name]) ->
|
||||||
trace_cluster_del(Name);
|
trace_cluster_del(Name);
|
||||||
|
|
||||||
traces(["start", Name, Operation, Filter]) ->
|
traces(["start", Name, Operation, Filter]) ->
|
||||||
traces(["start", Name, Operation, Filter, "900"]);
|
traces(["start", Name, Operation, Filter, "900"]);
|
||||||
|
|
||||||
traces(["start", Name, Operation, Filter0, DurationS]) ->
|
traces(["start", Name, Operation, Filter0, DurationS]) ->
|
||||||
case trace_type(Operation, Filter0) of
|
case trace_type(Operation, Filter0) of
|
||||||
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
|
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
|
||||||
error -> traces([])
|
error -> traces([])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
traces(_) ->
|
traces(_) ->
|
||||||
emqx_ctl:usage([{"traces list", "List all cluster traces started"},
|
emqx_ctl:usage([
|
||||||
|
{"traces list", "List all cluster traces started"},
|
||||||
{"traces start <Name> client <ClientId>", "Traces for a client in cluster"},
|
{"traces start <Name> client <ClientId>", "Traces for a client in cluster"},
|
||||||
{"traces start <Name> topic <Topic>", "Traces for a topic in cluster"},
|
{"traces start <Name> topic <Topic>", "Traces for a topic in cluster"},
|
||||||
{"traces start <Name> ip_address <IPAddr>", "Traces for a IP in cluster"},
|
{"traces start <Name> ip_address <IPAddr>", "Traces for a IP in cluster"},
|
||||||
|
@ -483,18 +502,21 @@ traces(_) ->
|
||||||
trace_cluster_on(Name, Type, Filter, DurationS0) ->
|
trace_cluster_on(Name, Type, Filter, DurationS0) ->
|
||||||
DurationS = list_to_integer(DurationS0),
|
DurationS = list_to_integer(DurationS0),
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Trace = #{ name => list_to_binary(Name)
|
Trace = #{
|
||||||
, type => atom_to_binary(Type)
|
name => list_to_binary(Name),
|
||||||
, Type => list_to_binary(Filter)
|
type => atom_to_binary(Type),
|
||||||
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Now))
|
Type => list_to_binary(Filter),
|
||||||
, end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
|
start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)),
|
||||||
|
end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
|
||||||
},
|
},
|
||||||
case emqx_trace:create(Trace) of
|
case emqx_trace:create(Trace) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
|
emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n",
|
emqx_ctl:print(
|
||||||
[Name, Type, Filter, Error])
|
"[error] cluster_trace ~s ~s=~s ~p~n",
|
||||||
|
[Name, Type, Filter, Error]
|
||||||
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
trace_cluster_del(Name) ->
|
trace_cluster_del(Name) ->
|
||||||
|
@ -518,20 +540,24 @@ trace_type(_, _) -> error.
|
||||||
%% @doc Listeners Command
|
%% @doc Listeners Command
|
||||||
|
|
||||||
listeners([]) ->
|
listeners([]) ->
|
||||||
lists:foreach(fun({ID, Conf}) ->
|
lists:foreach(
|
||||||
|
fun({ID, Conf}) ->
|
||||||
{Host, Port} = maps:get(bind, Conf),
|
{Host, Port} = maps:get(bind, Conf),
|
||||||
Acceptors = maps:get(acceptors, Conf),
|
Acceptors = maps:get(acceptors, Conf),
|
||||||
ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
|
ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
|
||||||
Running = maps:get(running, Conf),
|
Running = maps:get(running, Conf),
|
||||||
CurrentConns = case emqx_listeners:current_conns(ID, {Host, Port}) of
|
CurrentConns =
|
||||||
|
case emqx_listeners:current_conns(ID, {Host, Port}) of
|
||||||
{error, _} -> [];
|
{error, _} -> [];
|
||||||
CC -> [{current_conn, CC}]
|
CC -> [{current_conn, CC}]
|
||||||
end,
|
end,
|
||||||
MaxConn = case emqx_listeners:max_conns(ID, {Host, Port}) of
|
MaxConn =
|
||||||
|
case emqx_listeners:max_conns(ID, {Host, Port}) of
|
||||||
{error, _} -> [];
|
{error, _} -> [];
|
||||||
MC -> [{max_conns, MC}]
|
MC -> [{max_conns, MC}]
|
||||||
end,
|
end,
|
||||||
Info = [
|
Info =
|
||||||
|
[
|
||||||
{listen_on, {string, format_listen_on(Port)}},
|
{listen_on, {string, format_listen_on(Port)}},
|
||||||
{acceptors, Acceptors},
|
{acceptors, Acceptors},
|
||||||
{proxy_protocol, ProxyProtocol},
|
{proxy_protocol, ProxyProtocol},
|
||||||
|
@ -539,8 +565,9 @@ listeners([]) ->
|
||||||
] ++ CurrentConns ++ MaxConn,
|
] ++ CurrentConns ++ MaxConn,
|
||||||
emqx_ctl:print("~ts~n", [ID]),
|
emqx_ctl:print("~ts~n", [ID]),
|
||||||
lists:foreach(fun indent_print/1, Info)
|
lists:foreach(fun indent_print/1, Info)
|
||||||
end, emqx_listeners:list());
|
end,
|
||||||
|
emqx_listeners:list()
|
||||||
|
);
|
||||||
listeners(["stop", ListenerId]) ->
|
listeners(["stop", ListenerId]) ->
|
||||||
case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of
|
case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -548,7 +575,6 @@ listeners(["stop", ListenerId]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error])
|
emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
listeners(["start", ListenerId]) ->
|
listeners(["start", ListenerId]) ->
|
||||||
case emqx_listeners:start_listener(list_to_atom(ListenerId)) of
|
case emqx_listeners:start_listener(list_to_atom(ListenerId)) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -556,7 +582,6 @@ listeners(["start", ListenerId]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error])
|
emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
listeners(["restart", ListenerId]) ->
|
listeners(["restart", ListenerId]) ->
|
||||||
case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of
|
case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -564,9 +589,9 @@ listeners(["restart", ListenerId]) ->
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error])
|
emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
listeners(_) ->
|
listeners(_) ->
|
||||||
emqx_ctl:usage([{"listeners", "List listeners"},
|
emqx_ctl:usage([
|
||||||
|
{"listeners", "List listeners"},
|
||||||
{"listeners stop <Identifier>", "Stop a listener"},
|
{"listeners stop <Identifier>", "Stop a listener"},
|
||||||
{"listeners start <Identifier>", "Start a listener"},
|
{"listeners start <Identifier>", "Start a listener"},
|
||||||
{"listeners restart <Identifier>", "Restart a listener"}
|
{"listeners restart <Identifier>", "Restart a listener"}
|
||||||
|
@ -582,7 +607,6 @@ authz(["cache-clean", "node", Node]) ->
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:print("Authorization drain failed on node ~ts: ~0p.~n", [Node, Reason])
|
emqx_ctl:print("Authorization drain failed on node ~ts: ~0p.~n", [Node, Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
authz(["cache-clean", "all"]) ->
|
authz(["cache-clean", "all"]) ->
|
||||||
case emqx_mgmt:clean_authz_cache_all() of
|
case emqx_mgmt:clean_authz_cache_all() of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -590,22 +614,22 @@ authz(["cache-clean", "all"]) ->
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:print("Authorization cache-clean failed: ~p.~n", [Reason])
|
emqx_ctl:print("Authorization cache-clean failed: ~p.~n", [Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
authz(["cache-clean", ClientId]) ->
|
authz(["cache-clean", ClientId]) ->
|
||||||
emqx_mgmt:clean_authz_cache(ClientId);
|
emqx_mgmt:clean_authz_cache(ClientId);
|
||||||
|
|
||||||
authz(_) ->
|
authz(_) ->
|
||||||
emqx_ctl:usage(
|
emqx_ctl:usage(
|
||||||
[{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
[
|
||||||
|
{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
||||||
{"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
{"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
||||||
{"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
{"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
||||||
]).
|
]
|
||||||
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc OLP (Overload Protection related)
|
%% @doc OLP (Overload Protection related)
|
||||||
olp(["status"]) ->
|
olp(["status"]) ->
|
||||||
S = case emqx_olp:is_overloaded() of
|
S =
|
||||||
|
case emqx_olp:is_overloaded() of
|
||||||
true -> "overloaded";
|
true -> "overloaded";
|
||||||
false -> "not overloaded"
|
false -> "not overloaded"
|
||||||
end,
|
end,
|
||||||
|
@ -617,7 +641,8 @@ olp(["enable"]) ->
|
||||||
Res = emqx_olp:enable(),
|
Res = emqx_olp:enable(),
|
||||||
emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]);
|
emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]);
|
||||||
olp(_) ->
|
olp(_) ->
|
||||||
emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"},
|
emqx_ctl:usage([
|
||||||
|
{"olp status", "Return OLP status if system is overloaded"},
|
||||||
{"olp enable", "Enable overload protection"},
|
{"olp enable", "Enable overload protection"},
|
||||||
{"olp disable", "Disable overload protection"}
|
{"olp disable", "Disable overload protection"}
|
||||||
]).
|
]).
|
||||||
|
@ -634,43 +659,81 @@ dump(Table, Tag) ->
|
||||||
|
|
||||||
dump(_Table, _, '$end_of_table', Result) ->
|
dump(_Table, _, '$end_of_table', Result) ->
|
||||||
lists:reverse(Result);
|
lists:reverse(Result);
|
||||||
|
|
||||||
dump(Table, Tag, Key, Result) ->
|
dump(Table, Tag, Key, Result) ->
|
||||||
PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
|
PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
|
||||||
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
|
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
|
||||||
|
|
||||||
print({_, []}) ->
|
print({_, []}) ->
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
print({client, {ClientId, ChanPid}}) ->
|
print({client, {ClientId, ChanPid}}) ->
|
||||||
Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of
|
Attrs =
|
||||||
|
case emqx_cm:get_chan_info(ClientId, ChanPid) of
|
||||||
undefined -> #{};
|
undefined -> #{};
|
||||||
Attrs0 -> Attrs0
|
Attrs0 -> Attrs0
|
||||||
end,
|
end,
|
||||||
Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of
|
Stats =
|
||||||
|
case emqx_cm:get_chan_stats(ClientId, ChanPid) of
|
||||||
undefined -> #{};
|
undefined -> #{};
|
||||||
Stats0 -> maps:from_list(Stats0)
|
Stats0 -> maps:from_list(Stats0)
|
||||||
end,
|
end,
|
||||||
ClientInfo = maps:get(clientinfo, Attrs, #{}),
|
ClientInfo = maps:get(clientinfo, Attrs, #{}),
|
||||||
ConnInfo = maps:get(conninfo, Attrs, #{}),
|
ConnInfo = maps:get(conninfo, Attrs, #{}),
|
||||||
Session = maps:get(session, Attrs, #{}),
|
Session = maps:get(session, Attrs, #{}),
|
||||||
Connected = case maps:get(conn_state, Attrs) of
|
Connected =
|
||||||
|
case maps:get(conn_state, Attrs) of
|
||||||
connected -> true;
|
connected -> true;
|
||||||
_ -> false
|
_ -> false
|
||||||
end,
|
end,
|
||||||
Info = lists:foldl(fun(Items, Acc) ->
|
Info = lists:foldl(
|
||||||
|
fun(Items, Acc) ->
|
||||||
maps:merge(Items, Acc)
|
maps:merge(Items, Acc)
|
||||||
end, #{connected => Connected},
|
end,
|
||||||
[maps:with([subscriptions_cnt, inflight_cnt, awaiting_rel_cnt,
|
#{connected => Connected},
|
||||||
mqueue_len, mqueue_dropped, send_msg], Stats),
|
[
|
||||||
|
maps:with(
|
||||||
|
[
|
||||||
|
subscriptions_cnt,
|
||||||
|
inflight_cnt,
|
||||||
|
awaiting_rel_cnt,
|
||||||
|
mqueue_len,
|
||||||
|
mqueue_dropped,
|
||||||
|
send_msg
|
||||||
|
],
|
||||||
|
Stats
|
||||||
|
),
|
||||||
maps:with([clientid, username], ClientInfo),
|
maps:with([clientid, username], ClientInfo),
|
||||||
maps:with([peername, clean_start, keepalive, expiry_interval,
|
maps:with(
|
||||||
connected_at, disconnected_at], ConnInfo),
|
[
|
||||||
maps:with([created_at], Session)]),
|
peername,
|
||||||
InfoKeys = [clientid, username, peername, clean_start, keepalive,
|
clean_start,
|
||||||
expiry_interval, subscriptions_cnt, inflight_cnt,
|
keepalive,
|
||||||
awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
|
expiry_interval,
|
||||||
connected, created_at, connected_at] ++
|
connected_at,
|
||||||
|
disconnected_at
|
||||||
|
],
|
||||||
|
ConnInfo
|
||||||
|
),
|
||||||
|
maps:with([created_at], Session)
|
||||||
|
]
|
||||||
|
),
|
||||||
|
InfoKeys =
|
||||||
|
[
|
||||||
|
clientid,
|
||||||
|
username,
|
||||||
|
peername,
|
||||||
|
clean_start,
|
||||||
|
keepalive,
|
||||||
|
expiry_interval,
|
||||||
|
subscriptions_cnt,
|
||||||
|
inflight_cnt,
|
||||||
|
awaiting_rel_cnt,
|
||||||
|
send_msg,
|
||||||
|
mqueue_len,
|
||||||
|
mqueue_dropped,
|
||||||
|
connected,
|
||||||
|
created_at,
|
||||||
|
connected_at
|
||||||
|
] ++
|
||||||
case maps:is_key(disconnected_at, Info) of
|
case maps:is_key(disconnected_at, Info) of
|
||||||
true -> [disconnected_at];
|
true -> [disconnected_at];
|
||||||
false -> []
|
false -> []
|
||||||
|
@ -680,32 +743,30 @@ print({client, {ClientId, ChanPid}}) ->
|
||||||
"Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
|
"Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
|
||||||
"keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
|
"keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
|
||||||
"inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
|
"inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
|
||||||
"dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w"
|
"dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" ++
|
||||||
++ case maps:is_key(disconnected_at, Info1) of
|
case maps:is_key(disconnected_at, Info1) of
|
||||||
true -> ", disconnected_at=~w)~n";
|
true -> ", disconnected_at=~w)~n";
|
||||||
false -> ")~n"
|
false -> ")~n"
|
||||||
end,
|
end,
|
||||||
[format(K, maps:get(K, Info1)) || K <- InfoKeys]);
|
[format(K, maps:get(K, Info1)) || K <- InfoKeys]
|
||||||
|
);
|
||||||
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
|
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
|
||||||
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
|
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
|
||||||
print({emqx_route, #route{topic = Topic, dest = Node}}) ->
|
print({emqx_route, #route{topic = Topic, dest = Node}}) ->
|
||||||
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
|
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
|
||||||
|
|
||||||
print(#plugin{name = Name, descr = Descr, active = Active}) ->
|
print(#plugin{name = Name, descr = Descr, active = Active}) ->
|
||||||
emqx_ctl:print("Plugin(~ts, description=~ts, active=~ts)~n",
|
emqx_ctl:print(
|
||||||
[Name, Descr, Active]);
|
"Plugin(~ts, description=~ts, active=~ts)~n",
|
||||||
|
[Name, Descr, Active]
|
||||||
|
);
|
||||||
print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) ->
|
print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) ->
|
||||||
emqx_ctl:print("~ts -> ~ts~n", [maps:get(subid, Options), Topic]).
|
emqx_ctl:print("~ts -> ~ts~n", [maps:get(subid, Options), Topic]).
|
||||||
|
|
||||||
format(_, undefined) ->
|
format(_, undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
|
|
||||||
format(peername, {IPAddr, Port}) ->
|
format(peername, {IPAddr, Port}) ->
|
||||||
IPStr = emqx_mgmt_util:ntoa(IPAddr),
|
IPStr = emqx_mgmt_util:ntoa(IPAddr),
|
||||||
io_lib:format("~ts:~p", [IPStr, Port]);
|
io_lib:format("~ts:~p", [IPStr, Port]);
|
||||||
|
|
||||||
format(_, Val) ->
|
format(_, Val) ->
|
||||||
Val.
|
Val.
|
||||||
|
|
||||||
|
|
|
@ -18,22 +18,23 @@
|
||||||
|
|
||||||
-behaviour(emqx_bpapi).
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
-export([ introduced_in/0
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
|
||||||
, node_info/1
|
node_info/1,
|
||||||
, broker_info/1
|
broker_info/1,
|
||||||
, list_subscriptions/1
|
list_subscriptions/1,
|
||||||
|
|
||||||
, list_listeners/1
|
list_listeners/1,
|
||||||
, remove_listener/2
|
remove_listener/2,
|
||||||
|
|
||||||
, update_listener/3
|
update_listener/3,
|
||||||
, subscribe/3
|
subscribe/3,
|
||||||
, unsubscribe/3
|
unsubscribe/3,
|
||||||
|
|
||||||
, call_client/3
|
call_client/3,
|
||||||
|
|
||||||
, get_full_config/1
|
get_full_config/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
@ -53,18 +54,18 @@ broker_info(Node) ->
|
||||||
list_subscriptions(Node) ->
|
list_subscriptions(Node) ->
|
||||||
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
||||||
|
|
||||||
-spec list_listeners(node()) -> [map()] | {badrpc, _}.
|
-spec list_listeners(node()) -> map() | {badrpc, _}.
|
||||||
list_listeners(Node) ->
|
list_listeners(Node) ->
|
||||||
rpc:call(Node, emqx_mgmt, do_list_listeners, []).
|
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
|
||||||
|
|
||||||
-spec remove_listener(node(), string()) -> ok | {badrpc, _}.
|
-spec remove_listener(node(), string()) -> ok | {badrpc, _}.
|
||||||
remove_listener(Node, Id) ->
|
remove_listener(Node, Id) ->
|
||||||
rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]).
|
rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]).
|
||||||
|
|
||||||
-spec update_listener(node(), string(), emqx_config:update_request()) ->
|
-spec update_listener(node(), atom(), emqx_config:update_request()) ->
|
||||||
map() | {error, _} | {badrpc, _}.
|
{ok, map()} | {error, _} | {badrpc, _}.
|
||||||
update_listener(Node, Id, Config) ->
|
update_listener(Node, Id, Config) ->
|
||||||
rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]).
|
rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]).
|
||||||
|
|
||||||
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||||
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
||||||
|
|
|
@ -24,90 +24,138 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
emqx_mgmt_api_test_util:end_suite().
|
emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}),
|
||||||
|
emqx_conf:remove([listeners, tcp, new1], #{override_to => local}),
|
||||||
|
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
|
||||||
|
|
||||||
t_list_listeners(_) ->
|
t_list_listeners(_) ->
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
|
Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
|
||||||
get_api(Path).
|
Res = request(get, Path, [], []),
|
||||||
|
Expect = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||||
|
?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_list_node_listeners(_) ->
|
t_crud_listeners_by_id(_) ->
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]),
|
TcpListenerId = <<"tcp:default">>,
|
||||||
get_api(Path).
|
NewListenerId = <<"tcp:new">>,
|
||||||
|
TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]),
|
||||||
|
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
|
||||||
|
[#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []),
|
||||||
|
?assertEqual(atom_to_binary(node()), Node),
|
||||||
|
|
||||||
t_get_listeners(_) ->
|
%% create
|
||||||
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
|
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||||
ID = maps:get(id, LocalListener),
|
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(ID)]),
|
NewConf = TcpListener#{
|
||||||
get_api(Path).
|
<<"id">> => NewListenerId,
|
||||||
|
<<"bind">> => <<"0.0.0.0:2883">>
|
||||||
|
},
|
||||||
|
[#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf),
|
||||||
|
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||||
|
[#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []),
|
||||||
|
?assertMatch(Create, Get1),
|
||||||
|
?assert(is_running(NewListenerId)),
|
||||||
|
|
||||||
t_get_node_listeners(_) ->
|
%% bad create(same port)
|
||||||
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
|
BadId = <<"tcp:bad">>,
|
||||||
ID = maps:get(id, LocalListener),
|
BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]),
|
||||||
Path = emqx_mgmt_api_test_util:api_path(
|
BadConf = TcpListener#{
|
||||||
["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(ID)]),
|
<<"id">> => BadId,
|
||||||
get_api(Path).
|
<<"bind">> => <<"0.0.0.0:2883">>
|
||||||
|
},
|
||||||
|
?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)),
|
||||||
|
|
||||||
t_manage_listener(_) ->
|
%% update
|
||||||
|
#{<<"acceptors">> := Acceptors} = Create,
|
||||||
|
Acceptors1 = Acceptors + 10,
|
||||||
|
[#{<<"listeners">> := [Update]}] =
|
||||||
|
request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
|
||||||
|
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
|
||||||
|
[#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []),
|
||||||
|
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
|
||||||
|
|
||||||
|
%% delete
|
||||||
|
?assertEqual([], delete(NewPath)),
|
||||||
|
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||||
|
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||||
|
?assertEqual([], delete(NewPath)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_list_listeners_on_node(_) ->
|
||||||
|
Node = atom_to_list(node()),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]),
|
||||||
|
Listeners = request(get, Path, [], []),
|
||||||
|
#{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||||
|
?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_crud_listener_by_id_on_node(_) ->
|
||||||
|
TcpListenerId = <<"tcp:default">>,
|
||||||
|
NewListenerId = <<"tcp:new1">>,
|
||||||
|
Node = atom_to_list(node()),
|
||||||
|
TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]),
|
||||||
|
NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]),
|
||||||
|
TcpListener = request(get, TcpPath, [], []),
|
||||||
|
|
||||||
|
%% create
|
||||||
|
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||||
|
?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
|
||||||
|
Create = request(put, NewPath, [], TcpListener#{
|
||||||
|
<<"id">> => NewListenerId,
|
||||||
|
<<"bind">> => <<"0.0.0.0:3883">>
|
||||||
|
}),
|
||||||
|
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||||
|
Get1 = request(get, NewPath, [], []),
|
||||||
|
?assertMatch(Create, Get1),
|
||||||
|
?assert(is_running(NewListenerId)),
|
||||||
|
|
||||||
|
%% update
|
||||||
|
#{<<"acceptors">> := Acceptors} = Create,
|
||||||
|
Acceptors1 = Acceptors + 10,
|
||||||
|
Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
|
||||||
|
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
|
||||||
|
Get2 = request(get, NewPath, [], []),
|
||||||
|
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
|
||||||
|
|
||||||
|
%% delete
|
||||||
|
?assertEqual([], delete(NewPath)),
|
||||||
|
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||||
|
?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
|
||||||
|
?assertEqual([], delete(NewPath)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_action_listeners(_) ->
|
||||||
ID = "tcp:default",
|
ID = "tcp:default",
|
||||||
manage_listener(ID, "stop", false),
|
action_listener(ID, "stop", false),
|
||||||
manage_listener(ID, "start", true),
|
action_listener(ID, "start", true),
|
||||||
manage_listener(ID, "restart", true).
|
action_listener(ID, "restart", true).
|
||||||
|
|
||||||
manage_listener(ID, Operation, Running) ->
|
action_listener(ID, Action, Running) ->
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, "operation", Operation]),
|
Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, Action]),
|
||||||
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path),
|
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]),
|
GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]),
|
||||||
{ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath),
|
[#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []),
|
||||||
Listeners = emqx_json:decode(ListenersResponse, [return_maps]),
|
|
||||||
[listener_stats(Listener, Running) || Listener <- Listeners].
|
[listener_stats(Listener, Running) || Listener <- Listeners].
|
||||||
|
|
||||||
get_api(Path) ->
|
request(Method, Url, QueryParams, Body) ->
|
||||||
{ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path),
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()),
|
case emqx_mgmt_api_test_util:request_api(Method, Url, QueryParams, AuthHeader, Body) of
|
||||||
case emqx_json:decode(ListenersData, [return_maps]) of
|
{ok, Res} -> emqx_json:decode(Res, [return_maps]);
|
||||||
[Listener] ->
|
Error -> Error
|
||||||
ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8),
|
|
||||||
Filter =
|
|
||||||
fun(Local) ->
|
|
||||||
maps:get(id, Local) =:= ID
|
|
||||||
end,
|
|
||||||
LocalListener = hd(lists:filter(Filter, LocalListeners)),
|
|
||||||
comparison_listener(LocalListener, Listener);
|
|
||||||
Listeners when is_list(Listeners) ->
|
|
||||||
?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)),
|
|
||||||
Fun =
|
|
||||||
fun(LocalListener) ->
|
|
||||||
ID = maps:get(id, LocalListener),
|
|
||||||
IDBinary = atom_to_binary(ID, utf8),
|
|
||||||
Filter =
|
|
||||||
fun(Listener) ->
|
|
||||||
maps:get(<<"id">>, Listener) =:= IDBinary
|
|
||||||
end,
|
|
||||||
Listener = hd(lists:filter(Filter, Listeners)),
|
|
||||||
comparison_listener(LocalListener, Listener)
|
|
||||||
end,
|
|
||||||
lists:foreach(Fun, LocalListeners);
|
|
||||||
Listener when is_map(Listener) ->
|
|
||||||
ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8),
|
|
||||||
Filter =
|
|
||||||
fun(Local) ->
|
|
||||||
maps:get(id, Local) =:= ID
|
|
||||||
end,
|
|
||||||
LocalListener = hd(lists:filter(Filter, LocalListeners)),
|
|
||||||
comparison_listener(LocalListener, Listener)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
comparison_listener(Local, Response) ->
|
delete(Url) ->
|
||||||
?assertEqual(maps:get(id, Local), binary_to_atom(maps:get(<<"id">>, Response))),
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))),
|
{ok, Res} = emqx_mgmt_api_test_util:request_api(delete, Url, AuthHeader),
|
||||||
?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)),
|
Res.
|
||||||
?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)).
|
|
||||||
|
|
||||||
|
|
||||||
listener_stats(Listener, ExpectedStats) ->
|
listener_stats(Listener, ExpectedStats) ->
|
||||||
?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)).
|
?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)).
|
||||||
|
|
||||||
|
is_running(Id) ->
|
||||||
|
emqx_listeners:is_running(binary_to_atom(Id)).
|
||||||
|
|
|
@ -92,7 +92,8 @@ do_request_api(Method, Request)->
|
||||||
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
|
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
|
||||||
when Code >= 200 andalso Code =< 299 ->
|
when Code >= 200 andalso Code =< 299 ->
|
||||||
{ok, Return};
|
{ok, Return};
|
||||||
{ok, {Reason, _, _}} ->
|
{ok, {Reason, _, _} = Error} ->
|
||||||
|
ct:pal("error: ~p~n", [Error]),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue