fix: mgmt conf & schema; prepare minirest (#5178)
This commit is contained in:
parent
612d25fdb3
commit
dc98cff27b
|
@ -59,9 +59,8 @@ set_special_configs(emqx_authz) ->
|
|||
ok;
|
||||
|
||||
set_special_configs(emqx_management) ->
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}],
|
||||
default_application_id => <<"admin">>,
|
||||
default_application_secret => <<"public">>}),
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
|
||||
applications =>[#{id => "admin", secret => "public"}]}),
|
||||
ok;
|
||||
|
||||
set_special_configs(_App) ->
|
||||
|
|
|
@ -51,9 +51,8 @@ end_per_suite(_Config) ->
|
|||
ekka_mnesia:ensure_stopped().
|
||||
|
||||
set_special_configs(emqx_management) ->
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}],
|
||||
default_application_id => <<"admin">>,
|
||||
default_application_secret => <<"public">>}),
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
|
||||
applications =>[#{id => "admin", secret => "public"}]}),
|
||||
ok;
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
emqx_management:{
|
||||
default_application_id: "admin"
|
||||
default_application_secret: "public"
|
||||
applications: [
|
||||
{
|
||||
id: "admin",
|
||||
secret: "public"
|
||||
}
|
||||
]
|
||||
max_row_limit: 10000
|
||||
listeners: [
|
||||
{
|
||||
num_acceptors: 4
|
||||
max_connections: 512
|
||||
protocol: "http"
|
||||
protocol: http
|
||||
port: 8081
|
||||
backlog: 512
|
||||
send_timeout: 15s
|
||||
|
|
|
@ -25,14 +25,19 @@
|
|||
structs() -> ["emqx_management"].
|
||||
|
||||
fields("emqx_management") ->
|
||||
[ {default_application_id, fun default_application_id/1}
|
||||
, {default_application_secret, fun default_application_secret/1}
|
||||
[ {applications, hoconsc:array(hoconsc:ref(?MODULE, "application"))}
|
||||
, {max_row_limit, fun max_row_limit/1}
|
||||
, {listeners, hoconsc:array(hoconsc:union([hoconsc:ref(?MODULE, "http"), hoconsc:ref(?MODULE, "https")]))}
|
||||
];
|
||||
|
||||
fields("application") ->
|
||||
[ {"id", emqx_schema:t(string(), undefined, "admin")}
|
||||
, {"secret", emqx_schema:t(string(), undefined, "public")}
|
||||
];
|
||||
|
||||
|
||||
fields("http") ->
|
||||
[ {"protocol", emqx_schema:t(string(), undefined, "http")}
|
||||
[ {"protocol", hoconsc:enum([http, https])}
|
||||
, {"port", emqx_schema:t(integer(), undefined, 8081)}
|
||||
, {"num_acceptors", emqx_schema:t(integer(), undefined, 4)}
|
||||
, {"max_connections", emqx_schema:t(integer(), undefined, 512)}
|
||||
|
@ -46,16 +51,6 @@ fields("http") ->
|
|||
fields("https") ->
|
||||
emqx_schema:ssl(undefined, #{enable => true}) ++ fields("http").
|
||||
|
||||
default_application_id(type) -> string();
|
||||
default_application_id(default) -> "admin";
|
||||
default_application_id(nullable) -> true;
|
||||
default_application_id(_) -> undefined.
|
||||
|
||||
default_application_secret(type) -> string();
|
||||
default_application_secret(default) -> "public";
|
||||
default_application_secret(nullable) -> true;
|
||||
default_application_secret(_) -> undefined.
|
||||
|
||||
max_row_limit(type) -> integer();
|
||||
max_row_limit(default) -> 1000;
|
||||
max_row_limit(nullable) -> false;
|
||||
|
|
|
@ -106,10 +106,18 @@
|
|||
, max_row_limit/0
|
||||
]).
|
||||
|
||||
-export([ return/0
|
||||
, return/1]).
|
||||
|
||||
-define(MAX_ROW_LIMIT, 10000).
|
||||
|
||||
-define(APP, emqx_management).
|
||||
|
||||
return() ->
|
||||
minirest:return().
|
||||
return(Response) ->
|
||||
minirest:return(Response).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Node Info
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -36,12 +36,12 @@
|
|||
|
||||
clean_all(_Bindings, _Params) ->
|
||||
case emqx_mgmt:clean_acl_cache_all() of
|
||||
ok -> minirest:return();
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end.
|
||||
|
||||
clean_node(#{node := Node}, _Params) ->
|
||||
case emqx_mgmt:clean_acl_cache_all(Node) of
|
||||
ok -> minirest:return();
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end.
|
||||
|
|
|
@ -125,13 +125,13 @@ get_name(Params) ->
|
|||
binary_to_atom(proplists:get_value(<<"name">>, Params, undefined), utf8).
|
||||
|
||||
do_deactivate(undefined, _) ->
|
||||
minirest:return({error, missing_param});
|
||||
emqx_mgmt:return({error, missing_param});
|
||||
do_deactivate(_, undefined) ->
|
||||
minirest:return({error, missing_param});
|
||||
emqx_mgmt:return({error, missing_param});
|
||||
do_deactivate(Node, Name) ->
|
||||
case emqx_mgmt:deactivate(Node, Name) of
|
||||
ok ->
|
||||
minirest:return();
|
||||
emqx_mgmt:return();
|
||||
{error, Reason} ->
|
||||
minirest:return({error, Reason})
|
||||
emqx_mgmt:return({error, Reason})
|
||||
end.
|
||||
|
|
|
@ -63,30 +63,30 @@ add_app(_Bindings, Params) ->
|
|||
Status = proplists:get_value(<<"status">>, Params),
|
||||
Expired = proplists:get_value(<<"expired">>, Params),
|
||||
case emqx_mgmt_auth:add_app(AppId, Name, Secret, Desc, Status, Expired) of
|
||||
{ok, AppSecret} -> minirest:return({ok, #{secret => AppSecret}});
|
||||
{error, Reason} -> minirest:return({error, Reason})
|
||||
{ok, AppSecret} -> emqx_mgmt:return({ok, #{secret => AppSecret}});
|
||||
{error, Reason} -> emqx_mgmt:return({error, Reason})
|
||||
end.
|
||||
|
||||
del_app(#{appid := AppId}, _Params) ->
|
||||
case emqx_mgmt_auth:del_app(AppId) of
|
||||
ok -> minirest:return();
|
||||
{error, Reason} -> minirest:return({error, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, Reason} -> emqx_mgmt:return({error, Reason})
|
||||
end.
|
||||
|
||||
list_apps(_Bindings, _Params) ->
|
||||
minirest:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}).
|
||||
emqx_mgmt:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}).
|
||||
|
||||
lookup_app(#{appid := AppId}, _Params) ->
|
||||
case emqx_mgmt_auth:lookup_app(AppId) of
|
||||
{AppId, AppSecret, Name, Desc, Status, Expired} ->
|
||||
minirest:return({ok, #{app_id => AppId,
|
||||
emqx_mgmt:return({ok, #{app_id => AppId,
|
||||
secret => AppSecret,
|
||||
name => Name,
|
||||
desc => Desc,
|
||||
status => Status,
|
||||
expired => Expired}});
|
||||
undefined ->
|
||||
minirest:return({ok, #{}})
|
||||
emqx_mgmt:return({ok, #{}})
|
||||
end.
|
||||
|
||||
update_app(#{appid := AppId}, Params) ->
|
||||
|
@ -95,8 +95,8 @@ update_app(#{appid := AppId}, Params) ->
|
|||
Status = proplists:get_value(<<"status">>, Params),
|
||||
Expired = proplists:get_value(<<"expired">>, Params),
|
||||
case emqx_mgmt_auth:update_app(AppId, Name, Desc, Status, Expired) of
|
||||
ok -> minirest:return();
|
||||
{error, Reason} -> minirest:return({error, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, Reason} -> emqx_mgmt:return({error, Reason})
|
||||
end.
|
||||
|
||||
format({AppId, _AppSecret, Name, Desc, Status, Expired}) ->
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
]).
|
||||
|
||||
list(_Bindings, Params) ->
|
||||
minirest:return({ok, emqx_mgmt_api:paginate(emqx_banned, Params, fun format/1)}).
|
||||
emqx_mgmt:return({ok, emqx_mgmt_api:paginate(emqx_banned, Params, fun format/1)}).
|
||||
|
||||
create(_Bindings, Params) ->
|
||||
case pipeline([fun ensure_required/1,
|
||||
|
@ -52,9 +52,9 @@ create(_Bindings, Params) ->
|
|||
{ok, NParams} ->
|
||||
{ok, Banned} = pack_banned(NParams),
|
||||
ok = emqx_mgmt:create_banned(Banned),
|
||||
minirest:return({ok, maps:from_list(Params)});
|
||||
emqx_mgmt:return({ok, maps:from_list(Params)});
|
||||
{error, Code, Message} ->
|
||||
minirest:return({error, Code, Message})
|
||||
emqx_mgmt:return({error, Code, Message})
|
||||
end.
|
||||
|
||||
delete(#{as := As, who := Who}, _) ->
|
||||
|
@ -64,9 +64,9 @@ delete(#{as := As, who := Who}, _) ->
|
|||
fun validate_params/1], Params) of
|
||||
{ok, NParams} ->
|
||||
do_delete(proplists:get_value(<<"as">>, NParams), proplists:get_value(<<"who">>, NParams)),
|
||||
minirest:return();
|
||||
emqx_mgmt:return();
|
||||
{error, Code, Message} ->
|
||||
minirest:return({error, Code, Message})
|
||||
emqx_mgmt:return({error, Code, Message})
|
||||
end.
|
||||
|
||||
pipeline([], Params) ->
|
||||
|
|
|
@ -35,13 +35,13 @@
|
|||
]).
|
||||
|
||||
list(_Bindings, _Params) ->
|
||||
minirest:return({ok, [Info || {_Node, Info} <- emqx_mgmt:list_brokers()]}).
|
||||
emqx_mgmt:return({ok, [Info || {_Node, Info} <- emqx_mgmt:list_brokers()]}).
|
||||
|
||||
get(#{node := Node}, _Params) ->
|
||||
case emqx_mgmt:lookup_broker(Node) of
|
||||
{error, Reason} ->
|
||||
minirest:return({error, ?ERROR2, Reason});
|
||||
emqx_mgmt:return({error, ?ERROR2, Reason});
|
||||
Info ->
|
||||
minirest:return({ok, Info})
|
||||
emqx_mgmt:return({ok, Info})
|
||||
end.
|
||||
|
||||
|
|
|
@ -151,93 +151,93 @@ list(#{node := Node}, Params) when Node =:= node() ->
|
|||
|
||||
list(Bindings = #{node := Node}, Params) ->
|
||||
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
|
||||
{badrpc, Reason} -> minirest:return({error, ?ERROR1, Reason});
|
||||
{badrpc, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason});
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
%% @private
|
||||
fence(Func) ->
|
||||
try
|
||||
minirest:return({ok, Func()})
|
||||
emqx_mgmt:return({ok, Func()})
|
||||
catch
|
||||
throw : {bad_value_type, {_Key, Type, Value}} ->
|
||||
Reason = iolist_to_binary(
|
||||
io_lib:format("Can't convert ~p to ~p type",
|
||||
[Value, Type])
|
||||
),
|
||||
minirest:return({error, ?ERROR8, Reason})
|
||||
emqx_mgmt:return({error, ?ERROR8, Reason})
|
||||
end.
|
||||
|
||||
lookup(#{node := Node, clientid := ClientId}, _Params) ->
|
||||
minirest:return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
|
||||
emqx_mgmt:return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
|
||||
|
||||
lookup(#{clientid := ClientId}, _Params) ->
|
||||
minirest:return({ok, emqx_mgmt:lookup_client({clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
|
||||
emqx_mgmt:return({ok, emqx_mgmt:lookup_client({clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
|
||||
|
||||
lookup(#{node := Node, username := Username}, _Params) ->
|
||||
minirest:return({ok, emqx_mgmt:lookup_client(Node, {username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)});
|
||||
emqx_mgmt:return({ok, emqx_mgmt:lookup_client(Node, {username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)});
|
||||
|
||||
lookup(#{username := Username}, _Params) ->
|
||||
minirest:return({ok, emqx_mgmt:lookup_client({username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)}).
|
||||
emqx_mgmt:return({ok, emqx_mgmt:lookup_client({username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)}).
|
||||
|
||||
kickout(#{clientid := ClientId}, _Params) ->
|
||||
case emqx_mgmt:kickout_client(emqx_mgmt_util:urldecode(ClientId)) of
|
||||
ok -> minirest:return();
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end.
|
||||
|
||||
clean_acl_cache(#{clientid := ClientId}, _Params) ->
|
||||
case emqx_mgmt:clean_acl_cache(emqx_mgmt_util:urldecode(ClientId)) of
|
||||
ok -> minirest:return();
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end.
|
||||
|
||||
list_acl_cache(#{clientid := ClientId}, _Params) ->
|
||||
case emqx_mgmt:list_acl_cache(emqx_mgmt_util:urldecode(ClientId)) of
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason});
|
||||
Caches -> minirest:return({ok, [format_acl_cache(Cache) || Cache <- Caches]})
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason});
|
||||
Caches -> emqx_mgmt:return({ok, [format_acl_cache(Cache) || Cache <- Caches]})
|
||||
end.
|
||||
|
||||
set_ratelimit_policy(#{clientid := ClientId}, Params) ->
|
||||
P = [{conn_bytes_in, proplists:get_value(<<"conn_bytes_in">>, Params)},
|
||||
{conn_messages_in, proplists:get_value(<<"conn_messages_in">>, Params)}],
|
||||
case [{K, parse_ratelimit_str(V)} || {K, V} <- P, V =/= undefined] of
|
||||
[] -> minirest:return();
|
||||
[] -> emqx_mgmt:return();
|
||||
Policy ->
|
||||
case emqx_mgmt:set_ratelimit_policy(emqx_mgmt_util:urldecode(ClientId), Policy) of
|
||||
ok -> minirest:return();
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end
|
||||
end.
|
||||
|
||||
clean_ratelimit(#{clientid := ClientId}, _Params) ->
|
||||
case emqx_mgmt:set_ratelimit_policy(emqx_mgmt_util:urldecode(ClientId), []) of
|
||||
ok -> minirest:return();
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end.
|
||||
|
||||
set_quota_policy(#{clientid := ClientId}, Params) ->
|
||||
P = [{conn_messages_routing, proplists:get_value(<<"conn_messages_routing">>, Params)}],
|
||||
case [{K, parse_ratelimit_str(V)} || {K, V} <- P, V =/= undefined] of
|
||||
[] -> minirest:return();
|
||||
[] -> emqx_mgmt:return();
|
||||
Policy ->
|
||||
case emqx_mgmt:set_quota_policy(emqx_mgmt_util:urldecode(ClientId), Policy) of
|
||||
ok -> minirest:return();
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end
|
||||
end.
|
||||
|
||||
clean_quota(#{clientid := ClientId}, _Params) ->
|
||||
case emqx_mgmt:set_quota_policy(emqx_mgmt_util:urldecode(ClientId), []) of
|
||||
ok -> minirest:return();
|
||||
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
|
||||
ok -> emqx_mgmt:return();
|
||||
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
|
||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||
end.
|
||||
|
||||
%% @private
|
||||
|
|
|
@ -44,18 +44,18 @@
|
|||
|
||||
%% List listeners on a node.
|
||||
list(#{node := Node}, _Params) ->
|
||||
minirest:return({ok, format(emqx_mgmt:list_listeners(Node))});
|
||||
emqx_mgmt:return({ok, format(emqx_mgmt:list_listeners(Node))});
|
||||
|
||||
%% List listeners in the cluster.
|
||||
list(_Binding, _Params) ->
|
||||
minirest:return({ok, [#{node => Node, listeners => format(Listeners)}
|
||||
emqx_mgmt:return({ok, [#{node => Node, listeners => format(Listeners)}
|
||||
|| {Node, Listeners} <- emqx_mgmt:list_listeners()]}).
|
||||
|
||||
%% Restart listeners on a node.
|
||||
restart(#{node := Node, identifier := Identifier}, _Params) ->
|
||||
case emqx_mgmt:restart_listener(Node, Identifier) of
|
||||
ok -> minirest:return({ok, "Listener restarted."});
|
||||
{error, Error} -> minirest:return({error, Error})
|
||||
ok -> emqx_mgmt:return({ok, "Listener restarted."});
|
||||
{error, Error} -> emqx_mgmt:return({error, Error})
|
||||
end;
|
||||
|
||||
%% Restart listeners in the cluster.
|
||||
|
@ -64,8 +64,8 @@ restart(#{identifier := <<"http", _/binary>>}, _Params) ->
|
|||
restart(#{identifier := Identifier}, _Params) ->
|
||||
Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
|
||||
case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of
|
||||
[] -> minirest:return(ok);
|
||||
Errors -> minirest:return({error, {restart, Errors}})
|
||||
[] -> emqx_mgmt:return(ok);
|
||||
Errors -> emqx_mgmt:return({error, {restart, Errors}})
|
||||
end.
|
||||
|
||||
format(Listeners) when is_list(Listeners) ->
|
||||
|
|
|
@ -31,12 +31,12 @@
|
|||
-export([list/2]).
|
||||
|
||||
list(Bindings, _Params) when map_size(Bindings) == 0 ->
|
||||
minirest:return({ok, [#{node => Node, metrics => maps:from_list(Metrics)}
|
||||
emqx_mgmt:return({ok, [#{node => Node, metrics => maps:from_list(Metrics)}
|
||||
|| {Node, Metrics} <- emqx_mgmt:get_metrics()]});
|
||||
|
||||
list(#{node := Node}, _Params) ->
|
||||
case emqx_mgmt:get_metrics(Node) of
|
||||
{error, Reason} -> minirest:return({error, Reason});
|
||||
Metrics -> minirest:return({ok, maps:from_list(Metrics)})
|
||||
{error, Reason} -> emqx_mgmt:return({error, Reason});
|
||||
Metrics -> emqx_mgmt:return({ok, maps:from_list(Metrics)})
|
||||
end.
|
||||
|
||||
|
|
|
@ -33,10 +33,10 @@
|
|||
]).
|
||||
|
||||
list(_Bindings, _Params) ->
|
||||
minirest:return({ok, [format(Node, Info) || {Node, Info} <- emqx_mgmt:list_nodes()]}).
|
||||
emqx_mgmt:return({ok, [format(Node, Info) || {Node, Info} <- emqx_mgmt:list_nodes()]}).
|
||||
|
||||
get(#{node := Node}, _Params) ->
|
||||
minirest:return({ok, emqx_mgmt:lookup_node(Node)}).
|
||||
emqx_mgmt:return({ok, emqx_mgmt:lookup_node(Node)}).
|
||||
|
||||
format(Node, {error, Reason}) -> #{node => Node, error => Reason};
|
||||
|
||||
|
|
|
@ -69,36 +69,36 @@
|
|||
]).
|
||||
|
||||
list(#{node := Node}, _Params) ->
|
||||
minirest:return({ok, [format(Plugin) || Plugin <- emqx_mgmt:list_plugins(Node)]});
|
||||
emqx_mgmt:return({ok, [format(Plugin) || Plugin <- emqx_mgmt:list_plugins(Node)]});
|
||||
|
||||
list(_Bindings, _Params) ->
|
||||
minirest:return({ok, [format({Node, Plugins}) || {Node, Plugins} <- emqx_mgmt:list_plugins()]}).
|
||||
emqx_mgmt:return({ok, [format({Node, Plugins}) || {Node, Plugins} <- emqx_mgmt:list_plugins()]}).
|
||||
|
||||
load(#{node := Node, plugin := Plugin}, _Params) ->
|
||||
minirest:return(emqx_mgmt:load_plugin(Node, Plugin)).
|
||||
emqx_mgmt:return(emqx_mgmt:load_plugin(Node, Plugin)).
|
||||
|
||||
unload(#{node := Node, plugin := Plugin}, _Params) ->
|
||||
minirest:return(emqx_mgmt:unload_plugin(Node, Plugin));
|
||||
emqx_mgmt:return(emqx_mgmt:unload_plugin(Node, Plugin));
|
||||
|
||||
unload(#{plugin := Plugin}, _Params) ->
|
||||
Results = [emqx_mgmt:unload_plugin(Node, Plugin) || {Node, _Info} <- emqx_mgmt:list_nodes()],
|
||||
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
|
||||
[] ->
|
||||
minirest:return(ok);
|
||||
emqx_mgmt:return(ok);
|
||||
Errors ->
|
||||
minirest:return(lists:last(Errors))
|
||||
emqx_mgmt:return(lists:last(Errors))
|
||||
end.
|
||||
|
||||
reload(#{node := Node, plugin := Plugin}, _Params) ->
|
||||
minirest:return(emqx_mgmt:reload_plugin(Node, Plugin));
|
||||
emqx_mgmt:return(emqx_mgmt:reload_plugin(Node, Plugin));
|
||||
|
||||
reload(#{plugin := Plugin}, _Params) ->
|
||||
Results = [emqx_mgmt:reload_plugin(Node, Plugin) || {Node, _Info} <- emqx_mgmt:list_nodes()],
|
||||
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
|
||||
[] ->
|
||||
minirest:return(ok);
|
||||
emqx_mgmt:return(ok);
|
||||
Errors ->
|
||||
minirest:return(lists:last(Errors))
|
||||
emqx_mgmt:return(lists:last(Errors))
|
||||
end.
|
||||
|
||||
format({Node, Plugins}) ->
|
||||
|
|
|
@ -67,7 +67,7 @@
|
|||
subscribe(_Bindings, Params) ->
|
||||
logger:debug("API subscribe Params:~p", [Params]),
|
||||
{ClientId, Topic, QoS} = parse_subscribe_params(Params),
|
||||
minirest:return(do_subscribe(ClientId, Topic, QoS)).
|
||||
emqx_mgmt:return(do_subscribe(ClientId, Topic, QoS)).
|
||||
|
||||
publish(_Bindings, Params) ->
|
||||
logger:debug("API publish Params:~p", [Params]),
|
||||
|
@ -75,33 +75,33 @@ publish(_Bindings, Params) ->
|
|||
case do_publish(ClientId, Topic, Qos, Retain, Payload) of
|
||||
{ok, MsgIds} ->
|
||||
case proplists:get_value(<<"return">>, Params, undefined) of
|
||||
undefined -> minirest:return(ok);
|
||||
undefined -> emqx_mgmt:return(ok);
|
||||
_Val ->
|
||||
case proplists:get_value(<<"topics">>, Params, undefined) of
|
||||
undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}});
|
||||
_ -> minirest:return({ok, #{msgids => MsgIds}})
|
||||
undefined -> emqx_mgmt:return({ok, #{msgid => lists:last(MsgIds)}});
|
||||
_ -> emqx_mgmt:return({ok, #{msgids => MsgIds}})
|
||||
end
|
||||
end;
|
||||
Result ->
|
||||
minirest:return(Result)
|
||||
emqx_mgmt:return(Result)
|
||||
end.
|
||||
|
||||
unsubscribe(_Bindings, Params) ->
|
||||
logger:debug("API unsubscribe Params:~p", [Params]),
|
||||
{ClientId, Topic} = parse_unsubscribe_params(Params),
|
||||
minirest:return(do_unsubscribe(ClientId, Topic)).
|
||||
emqx_mgmt:return(do_unsubscribe(ClientId, Topic)).
|
||||
|
||||
subscribe_batch(_Bindings, Params) ->
|
||||
logger:debug("API subscribe batch Params:~p", [Params]),
|
||||
minirest:return({ok, loop_subscribe(Params)}).
|
||||
emqx_mgmt:return({ok, loop_subscribe(Params)}).
|
||||
|
||||
publish_batch(_Bindings, Params) ->
|
||||
logger:debug("API publish batch Params:~p", [Params]),
|
||||
minirest:return({ok, loop_publish(Params)}).
|
||||
emqx_mgmt:return({ok, loop_publish(Params)}).
|
||||
|
||||
unsubscribe_batch(_Bindings, Params) ->
|
||||
logger:debug("API unsubscribe batch Params:~p", [Params]),
|
||||
minirest:return({ok, loop_unsubscribe(Params)}).
|
||||
emqx_mgmt:return({ok, loop_unsubscribe(Params)}).
|
||||
|
||||
loop_subscribe(Params) ->
|
||||
loop_subscribe(Params, []).
|
||||
|
|
|
@ -35,11 +35,11 @@
|
|||
]).
|
||||
|
||||
list(Bindings, Params) when map_size(Bindings) == 0 ->
|
||||
minirest:return({ok, emqx_mgmt_api:paginate(emqx_route, Params, fun format/1)}).
|
||||
emqx_mgmt:return({ok, emqx_mgmt_api:paginate(emqx_route, Params, fun format/1)}).
|
||||
|
||||
lookup(#{topic := Topic}, _Params) ->
|
||||
Topic1 = emqx_mgmt_util:urldecode(Topic),
|
||||
minirest:return({ok, [format(R) || R <- emqx_mgmt:lookup_routes(Topic1)]}).
|
||||
emqx_mgmt:return({ok, [format(R) || R <- emqx_mgmt:lookup_routes(Topic1)]}).
|
||||
format(#route{topic = Topic, dest = {_, Node}}) ->
|
||||
#{topic => Topic, node => Node};
|
||||
format(#route{topic = Topic, dest = Node}) ->
|
||||
|
|
|
@ -34,12 +34,12 @@
|
|||
|
||||
%% List stats of all nodes
|
||||
list(Bindings, _Params) when map_size(Bindings) == 0 ->
|
||||
minirest:return({ok, [#{node => Node, stats => maps:from_list(Stats)}
|
||||
emqx_mgmt:return({ok, [#{node => Node, stats => maps:from_list(Stats)}
|
||||
|| {Node, Stats} <- emqx_mgmt:get_stats()]}).
|
||||
|
||||
%% List stats of a node
|
||||
lookup(#{node := Node}, _Params) ->
|
||||
case emqx_mgmt:get_stats(Node) of
|
||||
{error, Reason} -> minirest:return({error, Reason});
|
||||
Stats -> minirest:return({ok, maps:from_list(Stats)})
|
||||
{error, Reason} -> emqx_mgmt:return({error, Reason});
|
||||
Stats -> emqx_mgmt:return({ok, maps:from_list(Stats)})
|
||||
end.
|
||||
|
|
|
@ -63,9 +63,9 @@
|
|||
list(Bindings, Params) when map_size(Bindings) == 0 ->
|
||||
case proplists:get_value(<<"topic">>, Params) of
|
||||
undefined ->
|
||||
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
||||
emqx_mgmt:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
||||
Topic ->
|
||||
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
||||
emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
||||
end;
|
||||
|
||||
list(#{node := Node} = Bindings, Params) ->
|
||||
|
@ -73,22 +73,22 @@ list(#{node := Node} = Bindings, Params) ->
|
|||
undefined ->
|
||||
case Node =:= node() of
|
||||
true ->
|
||||
minirest:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
||||
emqx_mgmt:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
||||
false ->
|
||||
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
|
||||
{badrpc, Reason} -> minirest:return({error, Reason});
|
||||
{badrpc, Reason} -> emqx_mgmt:return({error, Reason});
|
||||
Res -> Res
|
||||
end
|
||||
end;
|
||||
Topic ->
|
||||
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
||||
emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
||||
end.
|
||||
|
||||
lookup(#{node := Node, clientid := ClientId}, _Params) ->
|
||||
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
|
||||
emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
|
||||
|
||||
lookup(#{clientid := ClientId}, _Params) ->
|
||||
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
|
||||
emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
|
||||
|
||||
format(Items) when is_list(Items) ->
|
||||
[format(Item) || Item <- Items];
|
||||
|
|
|
@ -66,10 +66,10 @@ mnesia(copy) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Manage Apps
|
||||
%%--------------------------------------------------------------------
|
||||
-spec(add_default_app() -> ok | {ok, appsecret()} | {error, term()}).
|
||||
-spec(add_default_app() -> list()).
|
||||
add_default_app() ->
|
||||
AppId = emqx_config:get([?APP, default_application_id], undefined),
|
||||
AppSecret = emqx_config:get([?APP, default_application_secret], undefined),
|
||||
Apps = emqx_config:get([?APP, applications], []),
|
||||
[ begin
|
||||
case {AppId, AppSecret} of
|
||||
{undefined, _} -> ok;
|
||||
{_, undefined} -> ok;
|
||||
|
@ -77,7 +77,9 @@ add_default_app() ->
|
|||
AppId1 = to_binary(AppId),
|
||||
AppSecret1 = to_binary(AppSecret),
|
||||
add_app(AppId1, <<"Default">>, AppSecret1, <<"Application user">>, true, undefined)
|
||||
end.
|
||||
end
|
||||
end
|
||||
|| #{id := AppId, secret := AppSecret} <- Apps].
|
||||
|
||||
-spec(add_app(appid(), binary()) -> {ok, appsecret()} | {error, term()}).
|
||||
add_app(AppId, Name) when is_binary(AppId) ->
|
||||
|
|
|
@ -78,7 +78,7 @@ stop_listener({Proto, Port, _}) ->
|
|||
minirest:stop_http(listener_name(Proto)).
|
||||
|
||||
listeners() ->
|
||||
[{list_to_atom(Protocol), Port, maps:to_list(maps:without([protocol, port], Map))}
|
||||
[{Protocol, Port, maps:to_list(maps:without([protocol, port], Map))}
|
||||
|| Map = #{protocol := Protocol,port := Port}
|
||||
<- emqx_config:get([emqx_management, listeners], [])].
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ end_per_suite(_Config) ->
|
|||
emqx_ct_helpers:stop_apps([emqx_management, emqx_retainer]).
|
||||
|
||||
set_special_configs(emqx_management) ->
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}]}),
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}]}),
|
||||
ok;
|
||||
set_special_configs(_App) ->
|
||||
ok.
|
||||
|
|
|
@ -51,9 +51,8 @@ end_per_testcase(_, Config) ->
|
|||
Config.
|
||||
|
||||
set_special_configs(emqx_management) ->
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}],
|
||||
default_application_id => <<"admin">>,
|
||||
default_application_secret => <<"public">>}),
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
|
||||
applications =>[#{id => "admin", secret => "public"}]}),
|
||||
ok;
|
||||
set_special_configs(_App) ->
|
||||
ok.
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
emqx_management:{
|
||||
default_application_id: "admin"
|
||||
default_application_secret: "public"
|
||||
applications: [
|
||||
{
|
||||
id: "admin",
|
||||
secret: "public"
|
||||
}
|
||||
]
|
||||
max_row_limit: 10000
|
||||
listeners: [
|
||||
{
|
||||
num_acceptors: 4
|
||||
max_connections: 512
|
||||
protocol: "http"
|
||||
protocol: http
|
||||
port: 8080
|
||||
backlog: 512
|
||||
send_timeout: 15s
|
||||
|
|
|
@ -37,9 +37,8 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
set_special_configs(emqx_management) ->
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}],
|
||||
default_application_id => <<"admin">>,
|
||||
default_application_secret => <<"public">>}),
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
|
||||
applications =>[#{id => "admin", secret => "public"}]}),
|
||||
ok;
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -56,9 +56,8 @@ init_per_testcase(_, Config) ->
|
|||
set_special_configs(emqx_retainer) ->
|
||||
init_emqx_retainer_conf(0);
|
||||
set_special_configs(emqx_management) ->
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => "http", port => 8081}],
|
||||
default_application_id => <<"admin">>,
|
||||
default_application_secret => <<"public">>}),
|
||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
|
||||
applications =>[#{id => "admin", secret => "public"}]}),
|
||||
ok;
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue