style(api): minirest imports removed

This commit is contained in:
Karol Kaczmarek 2021-03-18 21:03:03 +01:00 committed by Rory Z
parent 20086bcff1
commit ea384ec6b5
13 changed files with 101 additions and 135 deletions

View File

@ -20,10 +20,6 @@
-import(proplists, [get_value/2]).
-import(minirest, [ return/0
, return/1
]).
-rest_api(#{name => add_app,
method => 'POST',
path => "/apps/",
@ -69,30 +65,30 @@ add_app(_Bindings, Params) ->
Status = get_value(<<"status">>, Params),
Expired = get_value(<<"expired">>, Params),
case emqx_mgmt_auth:add_app(AppId, Name, Secret, Desc, Status, Expired) of
{ok, AppSecret} -> return({ok, #{secret => AppSecret}});
{error, Reason} -> return({error, Reason})
{ok, AppSecret} -> minirest:return({ok, #{secret => AppSecret}});
{error, Reason} -> minirest:return({error, Reason})
end.
del_app(#{appid := AppId}, _Params) ->
case emqx_mgmt_auth:del_app(AppId) of
ok -> return();
{error, Reason} -> return({error, Reason})
ok -> minirest:return();
{error, Reason} -> minirest:return({error, Reason})
end.
list_apps(_Bindings, _Params) ->
return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}).
minirest:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}).
lookup_app(#{appid := AppId}, _Params) ->
case emqx_mgmt_auth:lookup_app(AppId) of
{AppId, AppSecret, Name, Desc, Status, Expired} ->
return({ok, #{app_id => AppId,
minirest:return({ok, #{app_id => AppId,
secret => AppSecret,
name => Name,
desc => Desc,
status => Status,
expired => Expired}});
undefined ->
return({ok, #{}})
minirest:return({ok, #{}})
end.
update_app(#{appid := AppId}, Params) ->
@ -101,8 +97,8 @@ update_app(#{appid := AppId}, Params) ->
Status = get_value(<<"status">>, Params),
Expired = get_value(<<"expired">>, Params),
case emqx_mgmt_auth:update_app(AppId, Name, Desc, Status, Expired) of
ok -> return();
{error, Reason} -> return({error, Reason})
ok -> minirest:return();
{error, Reason} -> minirest:return({error, Reason})
end.
format({AppId, _AppSecret, Name, Desc, Status, Expired}) ->

View File

@ -22,10 +22,6 @@
-import(proplists, [get_value/2]).
-import(minirest, [ return/0
, return/1
]).
-rest_api(#{name => list_banned,
method => 'GET',
path => "/banned/",
@ -50,7 +46,7 @@
]).
list(_Bindings, Params) ->
return({ok, emqx_mgmt_api:paginate(emqx_banned, Params, fun format/1)}).
minirest:return({ok, emqx_mgmt_api:paginate(emqx_banned, Params, fun format/1)}).
create(_Bindings, Params) ->
case pipeline([fun ensure_required/1,
@ -58,9 +54,9 @@ create(_Bindings, Params) ->
{ok, NParams} ->
{ok, Banned} = pack_banned(NParams),
ok = emqx_mgmt:create_banned(Banned),
return({ok, maps:from_list(Params)});
minirest:return({ok, maps:from_list(Params)});
{error, Code, Message} ->
return({error, Code, Message})
minirest:return({error, Code, Message})
end.
delete(#{as := As, who := Who}, _) ->
@ -70,9 +66,9 @@ delete(#{as := As, who := Who}, _) ->
fun validate_params/1], Params) of
{ok, NParams} ->
do_delete(get_value(<<"as">>, NParams), get_value(<<"who">>, NParams)),
return();
minirest:return();
{error, Code, Message} ->
return({error, Code, Message})
minirest:return({error, Code, Message})
end.
pipeline([], Params) ->

View File

@ -18,8 +18,6 @@
-include("emqx_mgmt.hrl").
-import(minirest, [return/1]).
-rest_api(#{name => list_brokers,
method => 'GET',
path => "/brokers/",
@ -37,13 +35,13 @@
]).
list(_Bindings, _Params) ->
return({ok, [Info || {_Node, Info} <- emqx_mgmt:list_brokers()]}).
minirest:return({ok, [Info || {_Node, Info} <- emqx_mgmt:list_brokers()]}).
get(#{node := Node}, _Params) ->
case emqx_mgmt:lookup_broker(Node) of
{error, Reason} ->
return({error, ?ERROR2, Reason});
minirest:return({error, ?ERROR2, Reason});
Info ->
return({ok, Info})
minirest:return({ok, Info})
end.

View File

@ -21,10 +21,6 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-import(minirest, [ return/0
, return/1
]).
-import(proplists, [get_value/2]).
-define(CLIENT_QS_SCHEMA, {emqx_channel_info,
@ -146,87 +142,87 @@
-define(format_fun, {?MODULE, format_channel_info}).
list(Bindings, Params) when map_size(Bindings) == 0 ->
return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
list(#{node := Node}, Params) when Node =:= node() ->
return({ok, emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
minirest:return({ok, emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
list(Bindings = #{node := Node}, Params) ->
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
{badrpc, Reason} -> return({error, ?ERROR1, Reason});
{badrpc, Reason} -> minirest:return({error, ?ERROR1, Reason});
Res -> Res
end.
lookup(#{node := Node, clientid := ClientId}, _Params) ->
return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
minirest:return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
lookup(#{clientid := ClientId}, _Params) ->
return({ok, emqx_mgmt:lookup_client({clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
minirest:return({ok, emqx_mgmt:lookup_client({clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
lookup(#{node := Node, username := Username}, _Params) ->
return({ok, emqx_mgmt:lookup_client(Node, {username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)});
minirest:return({ok, emqx_mgmt:lookup_client(Node, {username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)});
lookup(#{username := Username}, _Params) ->
return({ok, emqx_mgmt:lookup_client({username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)}).
minirest:return({ok, emqx_mgmt:lookup_client({username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)}).
kickout(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:kickout_client(emqx_mgmt_util:urldecode(ClientId)) of
ok -> return();
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason})
ok -> minirest:return();
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
end.
clean_acl_cache(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:clean_acl_cache(emqx_mgmt_util:urldecode(ClientId)) of
ok -> return();
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason})
ok -> minirest:return();
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
end.
list_acl_cache(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:list_acl_cache(emqx_mgmt_util:urldecode(ClientId)) of
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason});
Caches -> return({ok, [format_acl_cache(Cache) || Cache <- Caches]})
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason});
Caches -> minirest:return({ok, [format_acl_cache(Cache) || Cache <- Caches]})
end.
set_ratelimit_policy(#{clientid := ClientId}, Params) ->
P = [{conn_bytes_in, get_value(<<"conn_bytes_in">>, Params)},
{conn_messages_in, get_value(<<"conn_messages_in">>, Params)}],
case [{K, parse_ratelimit_str(V)} || {K, V} <- P, V =/= undefined] of
[] -> return();
[] -> minirest:return();
Policy ->
case emqx_mgmt:set_ratelimit_policy(emqx_mgmt_util:urldecode(ClientId), Policy) of
ok -> return();
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason})
ok -> minirest:return();
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
end
end.
clean_ratelimit(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:set_ratelimit_policy(emqx_mgmt_util:urldecode(ClientId), []) of
ok -> return();
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason})
ok -> minirest:return();
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
end.
set_quota_policy(#{clientid := ClientId}, Params) ->
P = [{conn_messages_routing, get_value(<<"conn_messages_routing">>, Params)}],
case [{K, parse_ratelimit_str(V)} || {K, V} <- P, V =/= undefined] of
[] -> return();
[] -> minirest:return();
Policy ->
case emqx_mgmt:set_quota_policy(emqx_mgmt_util:urldecode(ClientId), Policy) of
ok -> return();
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason})
ok -> minirest:return();
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
end
end.
clean_quota(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:set_quota_policy(emqx_mgmt_util:urldecode(ClientId), []) of
ok -> return();
{error, not_found} -> return({error, ?ERROR12, not_found});
{error, Reason} -> return({error, ?ERROR1, Reason})
ok -> minirest:return();
{error, not_found} -> minirest:return({error, ?ERROR12, not_found});
{error, Reason} -> minirest:return({error, ?ERROR1, Reason})
end.
%% @private

View File

@ -22,10 +22,6 @@
-include("emqx_mgmt.hrl").
-import(minirest, [ return/0
, return/1
]).
-rest_api(#{name => export,
method => 'POST',
path => "/data/export",
@ -77,14 +73,14 @@
export(_Bindings, _Params) ->
case emqx_mgmt_data_backup:export() of
{ok, File = #{filename := Filename}} ->
return({ok, File#{filename => filename:basename(Filename)}});
Return -> return(Return)
minirest:return({ok, File#{filename => filename:basename(Filename)}});
Return -> minirest:return(Return)
end.
list_exported(_Bindings, _Params) ->
List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ],
NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))),
return({ok, NList}).
minirest:return({ok, NList}).
get_list_exported() ->
Dir = emqx:get_env(data_dir),
@ -114,7 +110,7 @@ get_list_exported() ->
import(_Bindings, Params) ->
case proplists:get_value(<<"filename">>, Params) of
undefined ->
return({error, missing_required_params});
minirest:return({error, missing_required_params});
Filename ->
Result = case proplists:get_value(<<"node">>, Params) of
undefined -> do_import(Filename);
@ -122,11 +118,11 @@ import(_Bindings, Params) ->
case lists:member(Node,
[ erlang:atom_to_binary(N, utf8) || N <- ekka_mnesia:running_nodes() ]
) of
true -> return(rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]));
false -> return({error, no_existent_node})
true -> minirest:return(rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]));
false -> minirest:return({error, no_existent_node})
end
end,
return(Result)
minirest:return(Result)
end.
do_import(Filename) ->
@ -140,7 +136,7 @@ download(#{filename := Filename}, _Params) ->
{ok, #{filename => list_to_binary(Filename),
file => Bin}};
{error, Reason} ->
return({error, Reason})
minirest:return({error, Reason})
end.
upload(Bindings, Params) ->
@ -151,9 +147,9 @@ do_upload(_Bindings, #{<<"filename">> := Filename,
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:write_file(FullFilename, Bin) of
ok ->
return({ok, [{node, node()}]});
minirest:return({ok, [{node, node()}]});
{error, Reason} ->
return({error, Reason})
minirest:return({error, Reason})
end;
do_upload(Bindings, Params = #{<<"file">> := _}) ->
Seconds = erlang:system_time(second),
@ -161,13 +157,13 @@ do_upload(Bindings, Params = #{<<"file">> := _}) ->
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
do_upload(Bindings, Params#{<<"filename">> => Filename});
do_upload(_Bindings, _Params) ->
return({error, missing_required_params}).
minirest:return({error, missing_required_params}).
delete(#{filename := Filename}, _Params) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:delete(FullFilename) of
ok ->
return();
minirest:return();
{error, Reason} ->
return({error, Reason})
minirest:return({error, Reason})
end.

View File

@ -16,8 +16,6 @@
-module(emqx_mgmt_api_listeners).
-import(minirest, [return/1]).
-rest_api(#{name => list_listeners,
method => 'GET',
path => "/listeners/",
@ -46,18 +44,18 @@
%% List listeners on a node.
list(#{node := Node}, _Params) ->
return({ok, format(emqx_mgmt:list_listeners(Node))});
minirest:return({ok, format(emqx_mgmt:list_listeners(Node))});
%% List listeners in the cluster.
list(_Binding, _Params) ->
return({ok, [#{node => Node, listeners => format(Listeners)}
minirest:return({ok, [#{node => Node, listeners => format(Listeners)}
|| {Node, Listeners} <- emqx_mgmt:list_listeners()]}).
%% Restart listeners on a node.
restart(#{node := Node, identifier := Identifier}, _Params) ->
case emqx_mgmt:restart_listener(Node, Identifier) of
ok -> return({ok, "Listener restarted."});
{error, Error} -> return({error, Error})
ok -> minirest:return({ok, "Listener restarted."});
{error, Error} -> minirest:return({error, Error})
end;
%% Restart listeners in the cluster.
@ -66,8 +64,8 @@ restart(#{identifier := <<"http", _/binary>>}, _Params) ->
restart(#{identifier := Identifier}, _Params) ->
Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of
[] -> return(ok);
Errors -> return({error, {restart, Errors}})
[] -> minirest:return(ok);
Errors -> minirest:return({error, {restart, Errors}})
end.
format(Listeners) when is_list(Listeners) ->

View File

@ -16,8 +16,6 @@
-module(emqx_mgmt_api_metrics).
-import(minirest, [return/1]).
-rest_api(#{name => list_all_metrics,
method => 'GET',
path => "/metrics",
@ -33,12 +31,12 @@
-export([list/2]).
list(Bindings, _Params) when map_size(Bindings) == 0 ->
return({ok, [#{node => Node, metrics => maps:from_list(Metrics)}
minirest:return({ok, [#{node => Node, metrics => maps:from_list(Metrics)}
|| {Node, Metrics} <- emqx_mgmt:get_metrics()]});
list(#{node := Node}, _Params) ->
case emqx_mgmt:get_metrics(Node) of
{error, Reason} -> return({error, Reason});
Metrics -> return({ok, maps:from_list(Metrics)})
{error, Reason} -> minirest:return({error, Reason});
Metrics -> minirest:return({ok, maps:from_list(Metrics)})
end.

View File

@ -16,8 +16,6 @@
-module(emqx_mgmt_api_nodes).
-import(minirest, [return/1]).
-rest_api(#{name => list_nodes,
method => 'GET',
path => "/nodes/",
@ -35,10 +33,10 @@
]).
list(_Bindings, _Params) ->
return({ok, [format(Node, Info) || {Node, Info} <- emqx_mgmt:list_nodes()]}).
minirest:return({ok, [format(Node, Info) || {Node, Info} <- emqx_mgmt:list_nodes()]}).
get(#{node := Node}, _Params) ->
return({ok, emqx_mgmt:lookup_node(Node)}).
minirest:return({ok, emqx_mgmt:lookup_node(Node)}).
format(Node, {error, Reason}) -> #{node => Node, error => Reason};

View File

@ -20,8 +20,6 @@
-include_lib("emqx/include/emqx.hrl").
-import(minirest, [return/1]).
-rest_api(#{name => list_all_plugins,
method => 'GET',
path => "/plugins/",
@ -71,36 +69,36 @@
]).
list(#{node := Node}, _Params) ->
return({ok, [format(Plugin) || Plugin <- emqx_mgmt:list_plugins(Node)]});
minirest:return({ok, [format(Plugin) || Plugin <- emqx_mgmt:list_plugins(Node)]});
list(_Bindings, _Params) ->
return({ok, [format({Node, Plugins}) || {Node, Plugins} <- emqx_mgmt:list_plugins()]}).
minirest:return({ok, [format({Node, Plugins}) || {Node, Plugins} <- emqx_mgmt:list_plugins()]}).
load(#{node := Node, plugin := Plugin}, _Params) ->
return(emqx_mgmt:load_plugin(Node, Plugin)).
minirest:return(emqx_mgmt:load_plugin(Node, Plugin)).
unload(#{node := Node, plugin := Plugin}, _Params) ->
return(emqx_mgmt:unload_plugin(Node, Plugin));
minirest:return(emqx_mgmt:unload_plugin(Node, Plugin));
unload(#{plugin := Plugin}, _Params) ->
Results = [emqx_mgmt:unload_plugin(Node, Plugin) || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
minirest:return(ok);
Errors ->
return(lists:last(Errors))
minirest:return(lists:last(Errors))
end.
reload(#{node := Node, plugin := Plugin}, _Params) ->
return(emqx_mgmt:reload_plugin(Node, Plugin));
minirest:return(emqx_mgmt:reload_plugin(Node, Plugin));
reload(#{plugin := Plugin}, _Params) ->
Results = [emqx_mgmt:reload_plugin(Node, Plugin) || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
minirest:return(ok);
Errors ->
return(lists:last(Errors))
minirest:return(lists:last(Errors))
end.
format({Node, Plugins}) ->

View File

@ -24,8 +24,6 @@
, get_value/3
]).
-import(minirest, [return/1]).
-rest_api(#{name => mqtt_subscribe,
method => 'POST',
path => "/mqtt/subscribe",
@ -73,7 +71,7 @@
subscribe(_Bindings, Params) ->
logger:debug("API subscribe Params:~p", [Params]),
{ClientId, Topic, QoS} = parse_subscribe_params(Params),
return(do_subscribe(ClientId, Topic, QoS)).
minirest:return(do_subscribe(ClientId, Topic, QoS)).
publish(_Bindings, Params) ->
logger:debug("API publish Params:~p", [Params]),
@ -81,33 +79,33 @@ publish(_Bindings, Params) ->
case do_publish(ClientId, Topic, Qos, Retain, Payload) of
{ok, MsgIds} ->
case get_value(<<"return">>, Params, undefined) of
undefined -> return(ok);
undefined -> minirest:return(ok);
_Val ->
case get_value(<<"topics">>, Params, undefined) of
undefined -> return({ok, #{msgid => lists:last(MsgIds)}});
_ -> return({ok, #{msgids => MsgIds}})
undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}});
_ -> minirest:return({ok, #{msgids => MsgIds}})
end
end;
Result ->
return(Result)
minirest:return(Result)
end.
unsubscribe(_Bindings, Params) ->
logger:debug("API unsubscribe Params:~p", [Params]),
{ClientId, Topic} = parse_unsubscribe_params(Params),
return(do_unsubscribe(ClientId, Topic)).
minirest:return(do_unsubscribe(ClientId, Topic)).
subscribe_batch(_Bindings, Params) ->
logger:debug("API subscribe batch Params:~p", [Params]),
return({ok, loop_subscribe(Params)}).
minirest:return({ok, loop_subscribe(Params)}).
publish_batch(_Bindings, Params) ->
logger:debug("API publish batch Params:~p", [Params]),
return({ok, loop_publish(Params)}).
minirest:return({ok, loop_publish(Params)}).
unsubscribe_batch(_Bindings, Params) ->
logger:debug("API unsubscribe batch Params:~p", [Params]),
return({ok, loop_unsubscribe(Params)}).
minirest:return({ok, loop_unsubscribe(Params)}).
loop_subscribe(Params) ->
loop_subscribe(Params, []).

View File

@ -18,8 +18,6 @@
-include_lib("emqx/include/emqx.hrl").
-import(minirest, [return/1]).
-rest_api(#{name => list_routes,
method => 'GET',
path => "/routes/",
@ -37,11 +35,11 @@
]).
list(Bindings, Params) when map_size(Bindings) == 0 ->
return({ok, emqx_mgmt_api:paginate(emqx_route, Params, fun format/1)}).
minirest:return({ok, emqx_mgmt_api:paginate(emqx_route, Params, fun format/1)}).
lookup(#{topic := Topic}, _Params) ->
Topic1 = emqx_mgmt_util:urldecode(Topic),
return({ok, [format(R) || R <- emqx_mgmt:lookup_routes(Topic1)]}).
minirest:return({ok, [format(R) || R <- emqx_mgmt:lookup_routes(Topic1)]}).
format(#route{topic = Topic, dest = {_, Node}}) ->
#{topic => Topic, node => Node};
format(#route{topic = Topic, dest = Node}) ->

View File

@ -16,8 +16,6 @@
-module(emqx_mgmt_api_stats).
-import(minirest, [return/1]).
-rest_api(#{name => list_stats,
method => 'GET',
path => "/stats/",
@ -36,12 +34,12 @@
%% List stats of all nodes
list(Bindings, _Params) when map_size(Bindings) == 0 ->
return({ok, [#{node => Node, stats => maps:from_list(Stats)}
minirest:return({ok, [#{node => Node, stats => maps:from_list(Stats)}
|| {Node, Stats} <- emqx_mgmt:get_stats()]}).
%% List stats of a node
lookup(#{node := Node}, _Params) ->
case emqx_mgmt:get_stats(Node) of
{error, Reason} -> return({error, Reason});
Stats -> return({ok, maps:from_list(Stats)})
{error, Reason} -> minirest:return({error, Reason});
Stats -> minirest:return({ok, maps:from_list(Stats)})
end.

View File

@ -18,8 +18,6 @@
-include_lib("emqx/include/emqx.hrl").
-import(minirest, [return/1]).
-define(SUBS_QS_SCHEMA, {emqx_suboption,
[{<<"clientid">>, binary},
{<<"topic">>, binary},
@ -65,9 +63,9 @@
list(Bindings, Params) when map_size(Bindings) == 0 ->
case proplists:get_value(<<"topic">>, Params) of
undefined ->
return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
Topic ->
return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
end;
list(#{node := Node} = Bindings, Params) ->
@ -75,22 +73,22 @@ list(#{node := Node} = Bindings, Params) ->
undefined ->
case Node =:= node() of
true ->
return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)});
minirest:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)});
false ->
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
{badrpc, Reason} -> return({error, Reason});
{badrpc, Reason} -> minirest:return({error, Reason});
Res -> Res
end
end;
Topic ->
return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
end.
lookup(#{node := Node, clientid := ClientId}, _Params) ->
return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
lookup(#{clientid := ClientId}, _Params) ->
return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
format(Items) when is_list(Items) ->
[format(Item) || Item <- Items];