Add http management APIs

This commit is contained in:
turtled 2017-08-09 10:14:29 +08:00
parent 1c63bdd90d
commit 7c1ee6610d
5 changed files with 963 additions and 91 deletions

View File

@ -59,3 +59,17 @@
-define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]).
-define(SUCCESS, 0). %% Success
-define(ERROR1, 101). %% badrpc
-define(ERROR2, 102). %% Unknown error
-define(ERROR3, 103). %% Username or password error
-define(ERROR4, 104). %% Empty username or password
-define(ERROR5, 105). %% User does not exist
-define(ERROR6, 106). %% Admin can not be deleted
-define(ERROR7, 107). %% Missing request parameter
-define(ERROR8, 108). %% Request parameter type error
-define(ERROR9, 109). %% Request parameter is not a json
-define(ERROR10, 110). %% Plugin has been loaded
-define(ERROR11, 111). %% Plugin has been loaded
-define(ERROR12, 112). %% Client not online

View File

@ -184,7 +184,7 @@ start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []});
start_listener({Proto, ListenOn, Opts}) when Proto == api ->
mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}).
mochiweb:start_http('mqtt:api', ListenOn, Opts, emqttd_http:http_handler()).
start_listener(Proto, ListenOn, Opts) ->
Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])),

View File

@ -26,15 +26,41 @@
-import(proplists, [get_value/2, get_value/3]).
-export([handle_request/1]).
-export([http_handler/0, handle_request/2, http_api/0]).
handle_request(Req) ->
handle_request(Req:get(method), Req:get(path), Req).
-include("emqttd_internal.hrl").
handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' ->
-record(state, {dispatch}).
http_handler() ->
APIs = http_api(),
State = #state{dispatch = dispatcher(APIs)},
{?MODULE, handle_request, [State]}.
http_api() ->
Attr = emqttd_rest_api:module_info(attributes),
[{Regexp, Method, Function, Args} || {http_api, [{Regexp, Method, Function, Args}]} <- Attr].
%%--------------------------------------------------------------------
%% Handle HTTP Request
%%--------------------------------------------------------------------
handle_request(Req, State) ->
Path = Req:get(path),
case Path of
"/status" ->
handle_request("/status", Req, Req:get(method));
"/" ->
handle_request("/", Req, Req:get(method));
_ ->
if_authorized(Req, fun() -> handle_request(Path, Req, State) end)
end.
handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) ->
Dispatch(Req, Url);
handle_request("/status", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
AppStatus =
case lists:keysearch(emqttd, 1, application:which_applications()) of
AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of
false -> not_running;
{value, _Val} -> running
end,
@ -42,87 +68,69 @@ handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET'
[node(), InternalStatus, AppStatus]),
Req:ok({"text/plain", iolist_to_binary(Status)});
%%--------------------------------------------------------------------
%% HTTP Publish API
%%--------------------------------------------------------------------
handle_request("/", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' ->
respond(Req, 200, api_list());
handle_request('POST', "/mqtt/publish", Req) ->
Params = parse_params(Req),
case authorized(Req, Params) of
true -> http_publish(Req, Params);
false -> Req:respond({401, [], <<"Unauthorized">>})
end;
handle_request(_, Req, #state{}) ->
respond(Req, 404, []).
%%--------------------------------------------------------------------
%% Get static files
%%--------------------------------------------------------------------
handle_request('GET', "/" ++ File, Req) ->
lager:info("HTTP GET File: ~s", [File]),
mochiweb_request:serve_file(File, docroot(), Req);
handle_request(Method, Path, Req) ->
lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
Req:not_found().
%%--------------------------------------------------------------------
%% HTTP Publish
%%--------------------------------------------------------------------
http_publish(Req, Params) ->
lager:debug("HTTP Publish: ~p", [Params]),
Topics = topics(Params),
ClientId = get_value(<<"client">>, Params, http),
Qos = int(get_value(<<"qos">>, Params, "0")),
Retain = bool(get_value(<<"retain">>, Params, "0")),
Payload = iolist_to_binary(get_value(<<"message">>, Params)),
case {validate(qos, Qos), validate(topics, Topics)} of
{true, true} ->
lists:foreach(fun(Topic) ->
Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
emqttd:publish(Msg#mqtt_message{retain = Retain})
end, Topics),
Req:ok({"text/plain", <<"OK">>});
{false, _} ->
Req:respond({400, [], <<"Bad QoS">>});
{_, false} ->
Req:respond({400, [], <<"Bad Topics">>})
dispatcher(APIs) ->
fun(Req, Url) ->
Method = Req:get(method),
case filter(APIs, Url, Method) of
[{Regexp, _Method, Function, FilterArgs}] ->
case params(Req) of
{error, Error1} ->
respond(Req, 200, Error1);
Params ->
case {check_params(Params, FilterArgs),
check_params_type(Params, FilterArgs)} of
{true, true} ->
{match, [MatchList]} = re:run(Url, Regexp, [global, {capture, all_but_first, list}]),
Args = lists:append([[Method, Params], MatchList]),
lager:debug("Mod:~p, Fun:~p, Args:~p", [emqttd_rest_api, Function, Args]),
case catch apply(emqttd_rest_api, Function, Args) of
{ok, Data} ->
respond(Req, 200, [{code, ?SUCCESS}, {result, Data}]);
{error, Error} ->
respond(Req, 200, Error);
{'EXIT', Reason} ->
lager:error("Execute API '~s' Error: ~p", [Url, Reason]),
respond(Req, 404, [])
end;
{false, _} ->
respond(Req, 200, [{code, ?ERROR7}, {message, <<"params error">>}]);
{_, false} ->
respond(Req, 200, [{code, ?ERROR8}, {message, <<"params type error">>}])
end
end;
_ ->
lager:error("No match Url:~p", [Url]),
respond(Req, 404, [])
end
end.
parse_params(Req) ->
[{iolist_to_binary(K), V} || {K, V} <- mochiweb_request:parse_post(Req)].
% %%--------------------------------------------------------------------
% %% Basic Authorization
% %%--------------------------------------------------------------------
if_authorized(Req, Fun) ->
case authorized(Req) of
true -> Fun();
false -> respond(Req, 401, [])
end.
topics(Params) ->
Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")],
[iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined].
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
validate(topics, [Topic|Left]) ->
case validate(topic, Topic) of
true -> validate(topics, Left);
false -> false
end;
validate(topics, []) ->
true;
validate(topic, Topic) ->
emqttd_topic:validate({name, Topic}).
%%--------------------------------------------------------------------
%% basic authorization
%%--------------------------------------------------------------------
authorized(Req, Params) ->
ClientId = get_value(<<"client">>, Params, http),
authorized(Req) ->
case Req:get_header_value("Authorization") of
undefined ->
false;
"Basic " ++ BasicAuth ->
{Username, Password} = user_passwd(BasicAuth),
{ok, Peer} = Req:get(peername),
case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, username = Username, peername = Peer}, Password) of
Params = params(Req),
ClientId = get_value(<<"client">>, Params, http),
case emqttd_access_control:auth(#mqtt_client{client_id = ClientId,
username = Username,
peername = Peer}, Password) of
ok -> true;
{ok, _IsSuper} ->
true;
@ -133,21 +141,89 @@ authorized(Req, Params) ->
end.
user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
int(I) when is_integer(I)-> I;
int(B) when is_binary(B)-> binary_to_integer(B);
int(S) -> list_to_integer(S).
respond(Req, 401, Data) ->
Req:respond({401, [{"WWW-Authenticate", "Basic Realm=\"emqx control center\""}], Data});
respond(Req, 404, Data) ->
Req:respond({404, [{"Content-Type", "text/plain"}], Data});
respond(Req, 200, Data) ->
Req:respond({200, [{"Content-Type", "application/json"}], to_json(Data)});
respond(Req, Code, Data) ->
Req:respond({Code, [{"Content-Type", "text/plain"}], Data}).
bool(0) -> false;
bool(1) -> true;
bool("0") -> false;
bool("1") -> true;
bool(<<"0">>) -> false;
bool(<<"1">>) -> true.
filter(APIs, Url, Method) ->
lists:filter(fun({Regexp, Method1, _Function, _Args}) ->
case re:run(Url, Regexp, [global, {capture, all_but_first, list}]) of
{match, _} -> Method =:= Method1;
_ -> false
end
end, APIs).
docroot() ->
{file, Here} = code:is_loaded(?MODULE),
Dir = filename:dirname(filename:dirname(Here)),
filename:join([Dir, "priv", "www"]).
params(Req) ->
Method = Req:get(method),
case Method of
'GET' ->
mochiweb_request:parse_qs(Req);
_ ->
case Req:recv_body() of
<<>> -> [];
undefined -> [];
Body ->
case jsx:is_json(Body) of
true -> jsx:decode(Body);
false ->
lager:error("Body:~p", [Body]),
{error, [{code, ?ERROR9}, {message, <<"Body not json">>}]}
end
end
end.
check_params(_Params, Args) when Args =:= [] ->
true;
check_params(Params, Args)->
not lists:any(fun({Item, _Type}) -> undefined =:= proplists:get_value(Item, Params) end, Args).
check_params_type(_Params, Args) when Args =:= [] ->
true;
check_params_type(Params, Args) ->
not lists:any(fun({Item, Type}) ->
Val = proplists:get_value(Item, Params),
case Type of
int -> not is_integer(Val);
binary -> not is_binary(Val);
bool -> not is_boolean(Val)
end
end, Args).
to_json([]) -> <<"[]">>;
to_json(Data) -> iolist_to_binary(mochijson2:encode(Data)).
api_list() ->
[{paths, [<<"api/v2/management/nodes">>,
<<"api/v2/management/nodes/{node_name}">>,
<<"api/v2/monitoring/nodes">>,
<<"api/v2/monitoring/nodes/{node_name}">>,
<<"api/v2/monitoring/listeners">>,
<<"api/v2/monitoring/listeners/{node_name}">>,
<<"api/v2/monitoring/metrics/">>,
<<"api/v2/monitoring/metrics/{node_name}">>,
<<"api/v2/monitoring/stats">>,
<<"api/v2/monitoring/stats/{node_name}">>,
<<"api/v2/nodes/{node_name}/clients">>,
<<"api/v2/nodes/{node_name}/clients/{clientid}">>,
<<"api/v2/clients/{clientid}">>,
<<"api/v2/kick_client/{clientid}">>,
<<"api/v2/nodes/{node_name}/sessions">>,
<<"api/v2/nodes/{node_name}/sessions/{clientid}">>,
<<"api/v2/sessions/{clientid}">>,
<<"api/v2/nodes/{node_name}/subscriptions">>,
<<"api/v2/nodes/{node_name}/subscriptions/{clientid}">>,
<<"api/v2/subscriptions/{clientid}">>,
<<"api/v2/routes">>,
<<"api/v2/routes/{topic}">>,
<<"api/v2/mqtt/publish">>,
<<"api/v2/mqtt/subscribe">>,
<<"api/v2/mqtt/unsubscribe">>,
<<"api/v2/nodes/{node_name}/plugins">>,
<<"api/v2/nodes/{node_name}/plugins/{plugin_name}">>]}].

383
src/emqttd_mgmt.erl Normal file
View File

@ -0,0 +1,383 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_mgmt).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-include("emqttd_protocol.hrl").
-include("emqttd_internal.hrl").
-include_lib("stdlib/include/qlc.hrl").
-import(proplists, [get_value/2]).
-export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0,
plugins/0, plugins/1, listeners/0, listener/1, nodes_info/0, node_info/1]).
-export([plugin_list/1, plugin_unload/2, plugin_load/2]).
-export([client_list/4, session_list/4, route_list/3, subscription_list/4, alarm_list/0]).
-export([client/1, session/1, route/1, subscription/1]).
-export([query_table/4, lookup_table/3]).
-export([publish/1, subscribe/1, unsubscribe/1]).
-export([kick_client/1]).
-define(KB, 1024).
-define(MB, (1024*1024)).
-define(GB, (1024*1024*1024)).
-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))).
brokers() ->
[{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()].
broker(Node) when Node =:= node() ->
emqttd_broker:info();
broker(Node) ->
rpc_call(Node, broker, [Node]).
metrics() ->
[{Node, metrics(Node)} || Node <- ekka_mnesia:running_nodes()].
metrics(Node) when Node =:= node() ->
emqttd_metrics:all();
metrics(Node) ->
rpc_call(Node, metrics, [Node]).
stats() ->
[{Node, stats(Node)} || Node <- ekka_mnesia:running_nodes()].
stats(Node) when Node =:= node() ->
emqttd_stats:getstats();
stats(Node) ->
rpc_call(Node, stats, [Node]).
plugins() ->
[{Node, plugins(Node)} || Node <- ekka_mnesia:running_nodes()].
plugins(Node) when Node =:= node() ->
emqttd_plugins:list(Node);
plugins(Node) ->
rpc_call(Node, plugins, [Node]).
listeners() ->
[{Node, listener(Node)} || Node <- ekka_mnesia:running_nodes()].
listener(Node) when Node =:= node() ->
lists:map(fun({{Protocol, ListenOn}, Pid}) ->
Info = [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)},
{current_clients,esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}],
{Protocol, ListenOn, Info}
end, esockd:listeners());
listener(Node) ->
rpc_call(Node, listener, [Node]).
nodes_info() ->
Running = mnesia:system_info(running_db_nodes),
Stopped = mnesia:system_info(db_nodes) -- Running,
DownNodes = lists:map(fun stop_node/1, Stopped),
[node_info(Node) || Node <- Running] ++ DownNodes.
node_info(Node) when Node =:= node() ->
CpuInfo = [{K, list_to_binary(V)} || {K, V} <- emqttd_vm:loads()],
Memory = emqttd_vm:get_memory(),
OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
[{name, node()},
{otp_release, list_to_binary(OtpRel)},
{memory_total, kmg(get_value(allocated, Memory))},
{memory_used, kmg(get_value(used, Memory))},
{process_available, erlang:system_info(process_limit)},
{process_used, erlang:system_info(process_count)},
{max_fds, get_value(max_fds, erlang:system_info(check_io))},
{clients, ets:info(mqtt_client, size)},
{node_status, 'Running'} | CpuInfo];
node_info(Node) ->
rpc_call(Node, node_info, [Node]).
stop_node(Node) ->
[{name, Node}, {node_status, 'Stopped'}].
%%--------------------------------------------------------
%% plugins
%%--------------------------------------------------------
plugin_list(Node) when Node =:= node() ->
emqttd_plugins:list();
plugin_list(Node) ->
rpc_call(Node, plugin_list, [Node]).
plugin_load(Node, PluginName) when Node =:= node() ->
emqttd_plugins:load(PluginName);
plugin_load(Node, PluginName) ->
rpc_call(Node, plugin_load, [Node, PluginName]).
plugin_unload(Node, PluginName) when Node =:= node() ->
emqttd_plugins:unload(PluginName);
plugin_unload(Node, PluginName) ->
rpc_call(Node, plugin_unload, [Node, PluginName]).
%%--------------------------------------------------------
%% client
%%--------------------------------------------------------
client_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
client_list(Key, PageNo, PageSize);
client_list(Node, Key, PageNo, PageSize) ->
rpc_call(Node, client_list, [Node, Key, PageNo, PageSize]).
client(ClientId) ->
lists:flatten([client_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
%%--------------------------------------------------------
%% session
%%--------------------------------------------------------
session_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
session_list(Key, PageNo, PageSize);
session_list(Node, Key, PageNo, PageSize) ->
rpc_call(Node, session_list, [Node, Key, PageNo, PageSize]).
session(ClientId) ->
lists:flatten([session_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
%%--------------------------------------------------------
%% subscription
%%--------------------------------------------------------
subscription_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
subscription_list(Key, PageNo, PageSize);
subscription_list(Node, Key, PageNo, PageSize) ->
rpc_call(Node, subscription_list, [Node, Key, PageNo, PageSize]).
subscription(Key) ->
lists:flatten([subscription_list(Node, Key, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
%%--------------------------------------------------------
%% Routes
%%--------------------------------------------------------
route(Key) -> route_list(Key, 1, 20).
%%--------------------------------------------------------
%% alarm
%%--------------------------------------------------------
alarm_list() ->
emqttd_alarm:get_alarms().
query_table(Qh, PageNo, PageSize, TotalNum) ->
Cursor = qlc:cursor(Qh),
case PageNo > 1 of
true -> qlc:next_answers(Cursor, (PageNo - 1) * PageSize);
false -> ok
end,
Rows = qlc:next_answers(Cursor, PageSize),
qlc:delete_cursor(Cursor),
[{totalNum, TotalNum},
{totalPage, total_page(TotalNum, PageSize)},
{result, Rows}].
total_page(TotalNum, PageSize) ->
case TotalNum rem PageSize of
0 -> TotalNum div PageSize;
_ -> (TotalNum div PageSize) + 1
end.
%%TODO: refactor later...
lookup_table(LookupFun, _PageNo, _PageSize) ->
Rows = LookupFun(),
Rows.
%%--------------------------------------------------------------------
%% mqtt
%%--------------------------------------------------------------------
publish({ClientId, Topic, Payload, Qos, Retain}) ->
case validate(topic, Topic) of
true ->
Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
emqttd:publish(Msg#mqtt_message{retain = Retain}),
ok;
false ->
{error, format_error(Topic, "validate topic: ${0} fail")}
end.
subscribe({ClientId, Topic, Qos}) ->
case validate(topic, Topic) of
true ->
case emqttd_sm:lookup_session(ClientId) of
undefined ->
{error, format_error(ClientId, "Clientid: ${0} not found")};
#mqtt_session{sess_pid = SessPid} ->
emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
ok
end;
false ->
{error, format_error(Topic, "validate topic: ${0} fail")}
end.
unsubscribe({ClientId, Topic})->
case validate(topic, Topic) of
true ->
case emqttd_sm:lookup_session(ClientId) of
undefined ->
{error, format_error(ClientId, "Clientid: ${0} not found")};
#mqtt_session{sess_pid = SessPid} ->
emqttd_session:unsubscribe(SessPid, [{Topic, []}]),
ok
end;
false ->
{error, format_error(Topic, "validate topic: ${0} fail")}
end.
% publish(Messages) ->
% lists:foldl(
% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) ->
% case validate(topic, Topic) of
% true ->
% Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
% emqttd:publish(Msg#mqtt_message{retain = Retain}),
% {[[{topic, Topic}]| Success], Failed};
% false ->
% {Success, [[{topic, Topic}]| Failed]}
% end
% end, {[], []}, Messages).
% subscribers(Subscribers) ->
% lists:foldl(
% fun({ClientId, Topic, Qos}, {Success, Failed}) ->
% case emqttd_sm:lookup_session(ClientId) of
% undefined ->
% {Success, [[{client_id, ClientId}]|Failed]};
% #mqtt_session{sess_pid = SessPid} ->
% emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
% {[[{client_id, ClientId}]| Success], Failed}
% end
% end,{[], []}, Subscribers).
% unsubscribers(UnSubscribers)->
% lists:foldl(
% fun({ClientId, Topic}, {Success, Failed}) ->
% case emqttd_sm:lookup_session(ClientId) of
% undefined ->
% {Success, [[{client_id, ClientId}]|Failed]};
% #mqtt_session{sess_pid = SessPid} ->
% emqttd_session:unsubscriber(SessPid, [{Topic, []}]),
% {[[{client_id, ClientId}]| Success], Failed}
% end
% end, {[], []}, UnSubscribers).
%%--------------------------------------------------------------------
%% manager API
%%--------------------------------------------------------------------
kick_client(ClientId) ->
Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Result) of
true -> {ok, [{status, success}]};
false -> {ok, [{status, failure}]}
end.
kick_client(Node, ClientId) when Node =:= node() ->
case emqttd_cm:lookup(ClientId) of
undefined -> error;
#mqtt_client{client_pid = Pid}-> emqttd_client:kick(Pid)
end;
kick_client(Node, ClientId) ->
rpc_call(Node, kick_client, [Node, ClientId]).
%%--------------------------------------------------------------------
%% Internel Functions.
%%--------------------------------------------------------------------
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
kmg(Byte) when Byte > ?GB ->
float(Byte / ?GB, "G");
kmg(Byte) when Byte > ?MB ->
float(Byte / ?MB, "M");
kmg(Byte) when Byte > ?KB ->
float(Byte / ?MB, "K");
kmg(Byte) ->
Byte.
float(F, S) ->
iolist_to_binary(io_lib:format("~.2f~s", [F, S])).
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
validate(topic, Topic) ->
emqttd_topic:validate({name, Topic}).
client_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) ->
TotalNum = ets:info(mqtt_client, size),
Qh = qlc:q([R || R <- ets:table(mqtt_client)]),
query_table(Qh, PageNo, PageSize, TotalNum);
client_list(ClientId, PageNo, PageSize) ->
Fun = fun() -> ets:lookup(mqtt_client, ClientId) end,
lookup_table(Fun, PageNo, PageSize).
session_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) ->
TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_session]]),
Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_session]]),
query_table(Qh, PageNo, PageSize, TotalNum);
session_list(ClientId, PageNo, PageSize) ->
MP = {ClientId, '_', '_', '_'},
Fun = fun() -> lists:append([ets:match_object(Tab, MP) || Tab <- [mqtt_local_session]]) end,
lookup_table(Fun, PageNo, PageSize).
subscription_list(Key, PageNo, PageSize) when ?EMPTY_KEY(Key) ->
TotalNum = ets:info(mqtt_subproperty, size),
Qh = qlc:q([E || E <- ets:table(mqtt_subproperty)]),
query_table(Qh, PageNo, PageSize, TotalNum);
subscription_list(Key, PageNo, PageSize) ->
Keys = ets:lookup(mqtt_subscription, Key),
Fun = case length(Keys) == 0 of
true ->
MP = {{Key, '_'}, '_'},
fun() -> ets:match_object(mqtt_subproperty, MP) end;
false ->
fun() ->
lists:map(fun({S, T}) ->[R] = ets:lookup(mqtt_subproperty, {T, S}), R end, Keys)
end
end,
lookup_table(Fun, PageNo, PageSize).
route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) ->
TotalNum = lists:sum([ets:info(Tab, size) || Tab <- tables()]),
Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- tables()]),
query_table(Qh, PageNo, PageSize, TotalNum);
route_list(Topic, PageNo, PageSize) ->
Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- tables()]) end,
lookup_table(Fun, PageNo, PageSize).
tables() ->
[mqtt_route, mqtt_local_route].
format_error(Val, Msg) ->
re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]).

399
src/emqttd_rest_api.erl Normal file
View File

@ -0,0 +1,399 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module (emqttd_rest_api).
-include("emqttd.hrl").
-include("emqttd_internal.hrl").
-http_api({"^nodes/(.+?)/alarms/?$", 'GET', alarm_list, []}).
-http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}).
-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}).
-http_api({"^clients/(.+?)/?$", 'GET', client, []}).
-http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}).
-http_api({"^routes?$", 'GET', route_list, []}).
-http_api({"^routes/(.+?)/?$", 'GET', route, []}).
-http_api({"^nodes/(.+?)/sessions/?$", 'GET', session_list, []}).
-http_api({"^nodes/(.+?)/sessions/(.+?)/?$", 'GET', session_list, []}).
-http_api({"^sessions/(.+?)/?$", 'GET', session, []}).
-http_api({"^nodes/(.+?)/subscriptions/?$", 'GET', subscription_list, []}).
-http_api({"^nodes/(.+?)/subscriptions/(.+?)/?$", 'GET', subscription_list, []}).
-http_api({"^subscriptions/(.+?)/?$", 'GET', subscription, []}).
-http_api({"^mqtt/publish?$", 'POST', publish, [{<<"topic">>, binary}]}).
-http_api({"^mqtt/subscribe?$", 'POST', subscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}).
-http_api({"^mqtt/unsubscribe?$", 'POST', unsubscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}).
-http_api({"^management/nodes/?$", 'GET', brokers, []}).
-http_api({"^management/nodes/(.+?)/?$", 'GET', broker, []}).
-http_api({"^monitoring/nodes/?$", 'GET', nodes, []}).
-http_api({"^monitoring/nodes/(.+?)/?$", 'GET', node, []}).
-http_api({"^monitoring/listeners/?$", 'GET', listeners, []}).
-http_api({"^monitoring/listeners/(.+?)/?$", 'GET', listener, []}).
-http_api({"^monitoring/metrics/?$", 'GET', metrics, []}).
-http_api({"^monitoring/metrics/(.+?)/?$", 'GET', metric, []}).
-http_api({"^monitoring/stats/?$", 'GET', stats, []}).
-http_api({"^monitoring/stats/(.+?)/?$", 'GET', stat, []}).
-http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}).
-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}).
-export([alarm_list/3]).
-export([client/3, client_list/3, client_list/4, kick_client/3]).
-export([route/3, route_list/2]).
-export([session/3, session_list/3, session_list/4]).
-export([subscription/3, subscription_list/3, subscription_list/4]).
-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]).
-export([publish/2, subscribe/2, unsubscribe/2]).
-export([plugin_list/3, enabled/4]).
%%--------------------------------------------------------------------------
%% alarm
%%--------------------------------------------------------------------------
alarm_list('GET', _Req, _Node) ->
Alarms = emqttd_mgmt:alarm_list(),
{ok, lists:map(fun alarm_row/1, Alarms)}.
alarm_row(#mqtt_alarm{id = AlarmId,
severity = Severity,
title = Title,
summary = Summary,
timestamp = Timestamp}) ->
[{id, AlarmId},
{severity, Severity},
{title, l2b(Title)},
{summary, l2b(Summary)},
{occurred_at, l2b(strftime(Timestamp))}].
%%--------------------------------------------------------------------------
%% client
%%--------------------------------------------------------------------------
client('GET', _Params, Key) ->
Data = emqttd_mgmt:client(l2b(Key)),
{ok, [{objects, [client_row(Row) || Row <- Data]}]}.
client_list('GET', Params, Node) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize),
Rows = proplists:get_value(result, Data),
TotalPage = proplists:get_value(totalPage, Data),
TotalNum = proplists:get_value(totalNum, Data),
{ok, [{current_page, PageNo},
{page_size, PageSize},
{total_num, TotalNum},
{total_page, TotalPage},
{objects, [client_row(Row) || Row <- Rows]}]}.
client_list('GET', Params, Node, Key) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize),
{ok, [{objects, [client_row(Row) || Row <- Data]}]}.
kick_client('PUT', _Params, Key) ->
case emqttd_mgmt:kick_client(l2b(Key)) of
ok -> {ok, []};
error -> {error, [{code, ?ERROR12}]}
end.
client_row(#mqtt_client{client_id = ClientId,
peername = {IpAddr, Port},
username = Username,
clean_sess = CleanSess,
proto_ver = ProtoVer,
keepalive = KeepAlvie,
connected_at = ConnectedAt}) ->
[{client_id, ClientId},
{username, Username},
{ipaddress, l2b(ntoa(IpAddr))},
{port, Port},
{clean_sess, CleanSess},
{proto_ver, ProtoVer},
{keepalive, KeepAlvie},
{connected_at, l2b(strftime(ConnectedAt))}].
%%--------------------------------------------------------------------------
%% route
%%--------------------------------------------------------------------------
route('GET', _Params, Key) ->
Data = emqttd_mgmt:route(l2b(Key)),
{ok, [{objects, [route_row(Row) || Row <- Data]}]}.
route_list('GET', Params) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize),
Rows = proplists:get_value(result, Data),
TotalPage = proplists:get_value(totalPage, Data),
TotalNum = proplists:get_value(totalNum, Data),
{ok, [{current_page, PageNo},
{page_size, PageSize},
{total_num, TotalNum},
{total_page, TotalPage},
{objects, [route_row(Row) || Row <- Rows]}]}.
route_row(Route) when is_record(Route, mqtt_route) ->
[{topic, Route#mqtt_route.topic}, {node, Route#mqtt_route.node}];
route_row({Topic, Node}) ->
[{topic, Topic}, {node, Node}].
%%--------------------------------------------------------------------------
%% session
%%--------------------------------------------------------------------------
session('GET', _Params, Key) ->
Data = emqttd_mgmt:session(l2b(Key)),
{ok, [{objects, [session_row(Row) || Row <- Data]}]}.
session_list('GET', Params, Node) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize),
Rows = proplists:get_value(result, Data),
TotalPage = proplists:get_value(totalPage, Data),
TotalNum = proplists:get_value(totalNum, Data),
{ok, [{current_page, PageNo},
{page_size, PageSize},
{total_num, TotalNum},
{total_page, TotalPage},
{objects, [session_row(Row) || Row <- Rows]}]}.
session_list('GET', Params, Node, ClientId) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:session_list(l2a(Node), l2b(ClientId), PageNo, PageSize),
{ok, [{objects, [session_row(Row) || Row <- Data]}]}.
session_row({ClientId, _Pid, _Persistent, Session}) ->
InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue,
message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at],
[{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]].
%%--------------------------------------------------------------------------
%% subscription
%%--------------------------------------------------------------------------
subscription('GET', _Params, Key) ->
Data = emqttd_mgmt:subscription(l2b(Key)),
{ok, [{objects, [subscription_row(Row) || Row <- Data]}]}.
subscription_list('GET', Params, Node) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize),
Rows = proplists:get_value(result, Data),
TotalPage = proplists:get_value(totalPage, Data),
TotalNum = proplists:get_value(totalNum, Data),
{ok, [{current_page, PageNo},
{page_size, PageSize},
{total_num, TotalNum},
{total_page, TotalPage},
{objects, [subscription_row(Row) || Row <- Rows]}]}.
subscription_list('GET', Params, Node, Key) ->
{PageNo, PageSize} = page_params(Params),
Data = emqttd_mgmt:subscription_list(l2a(Node), l2b(Key), PageNo, PageSize),
{ok, [{objects, [subscription_row(Row) || Row <- Data]}]}.
subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) ->
subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option});
subscription_row({{Topic, ClientId}, Option}) ->
Qos = proplists:get_value(qos, Option),
[{client_id, ClientId}, {topic, Topic}, {qos, Qos}].
%%--------------------------------------------------------------------------
%% management/monitoring
%%--------------------------------------------------------------------------
nodes('GET', _Params) ->
Data = emqttd_mgmt:nodes_info(),
{ok, format_broker(Data)}.
node('GET', _Params, Node) ->
Data = emqttd_mgmt:node_info(l2a(Node)),
{ok, format_broker(Data)}.
brokers('GET', _Params) ->
Data = emqttd_mgmt:brokers(),
{ok, [format_broker(Node, Broker) || {Node, Broker} <- Data]}.
broker('GET', _Params, Node) ->
Data = emqttd_mgmt:broker(l2a(Node)),
{ok, format_broker(Data)}.
listeners('GET', _Params) ->
Data = emqttd_mgmt:listeners(),
{ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}.
listener('GET', _Params, Node) ->
Data = emqttd_mgmt:listener(l2a(Node)),
{ok, [format_listener(Listeners) || Listeners <- Data]}.
metrics('GET', _Params) ->
Data = emqttd_mgmt:metrics(),
{ok, Data}.
metric('GET', _Params, Node) ->
Data = emqttd_mgmt:metrics(l2a(Node)),
{ok, Data}.
stats('GET', _Params) ->
Data = emqttd_mgmt:stats(),
{ok, Data}.
stat('GET', _Params, Node) ->
Data = emqttd_mgmt:stats(l2a(Node)),
{ok, Data}.
format_broker(Node, Broker) ->
OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
[{name, Node},
{version, bin(proplists:get_value(version, Broker))},
{sysdescr, bin(proplists:get_value(sysdescr, Broker))},
{uptime, bin(proplists:get_value(uptime, Broker))},
{datetime, bin(proplists:get_value(datetime, Broker))},
{otp_release, l2b(OtpRel)},
{node_status, 'Running'}].
format_broker(Broker) ->
OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
[{version, bin(proplists:get_value(version, Broker))},
{sysdescr, bin(proplists:get_value(sysdescr, Broker))},
{uptime, bin(proplists:get_value(uptime, Broker))},
{datetime, bin(proplists:get_value(datetime, Broker))},
{otp_release, l2b(OtpRel)},
{node_status, 'Running'}].
format_listeners([], Acc) ->
Acc;
format_listeners([{Protocol, ListenOn, Info}| Listeners], Acc) ->
format_listeners(Listeners, [format_listener({Protocol, ListenOn, Info}) | Acc]).
format_listener({Protocol, ListenOn, Info}) ->
Listen = l2b(esockd:to_string(ListenOn)),
lists:append([{protocol, Protocol}, {listen, Listen}], Info).
%%--------------------------------------------------------------------------
%% mqtt
%%--------------------------------------------------------------------------
publish('POST', Params) ->
Topic = proplists:get_value(<<"topic">>, Params),
ClientId = proplists:get_value(<<"client_id">>, Params, http),
Payload = proplists:get_value(<<"payload">>, Params, <<>>),
Qos = proplists:get_value(<<"qos">>, Params, 0),
Retain = proplists:get_value(<<"retain">>, Params, false),
case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of
ok ->
{ok, []};
{error, Error} ->
{error, [{code, ?ERROR2}, {message, Error}]}
end.
subscribe('POST', Params) ->
ClientId = proplists:get_value(<<"client_id">>, Params),
Topic = proplists:get_value(<<"topic">>, Params),
Qos = proplists:get_value(<<"qos">>, Params, 0),
case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of
ok ->
{ok, []};
{error, Error} ->
{error, [{code, ?ERROR2}, {message, Error}]}
end.
unsubscribe('POST', Params) ->
ClientId = proplists:get_value(<<"client_id">>, Params),
Topic = proplists:get_value(<<"topic">>, Params),
case emqttd_mgmt:unsubscribe({ClientId, Topic})of
ok ->
{ok, []};
{error, Error} ->
{error, [{code, ?ERROR2}, {message, Error}]}
end.
%%--------------------------------------------------------------------------
%% plugins
%%--------------------------------------------------------------------------
plugin_list('GET', _Params, Node) ->
Plugins = lists:map(fun plugin/1, emqttd_mgmt:plugin_list(l2a(Node))),
{ok, Plugins}.
enabled('PUT', Params, Node, PluginName) ->
Active = proplists:get_value(<<"active">>, Params),
case Active of
true ->
return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName)));
false ->
return(emqttd_mgmt:plugin_unload(l2a(Node), l2a(PluginName)))
end.
return(Result) ->
case Result of
{ok, _} ->
{ok, []};
{error, already_started} ->
{error, [{code, ?ERROR10}, {message, <<"already_started">>}]};
{error, not_started} ->
{error, [{code, ?ERROR11}, {message, <<"not_started">>}]};
Error ->
lager:error("error:~p", [Error]),
{error, [{code, ?ERROR2}, {message, <<"unknown">>}]}
end.
plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr,
active = Active}) ->
[{name, Name},
{version, iolist_to_binary(Ver)},
{description, iolist_to_binary(Descr)},
{active, Active}].
%%--------------------------------------------------------------------------
%% Inner function
%%--------------------------------------------------------------------------
format(created_at, Val) ->
l2b(strftime(Val));
format(_, Val) ->
Val.
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
ntoa(IP) ->
inet_parse:ntoa(IP).
%%--------------------------------------------------------------------
%% Strftime
%%--------------------------------------------------------------------
strftime({MegaSecs, Secs, _MicroSecs}) ->
strftime(datetime(MegaSecs * 1000000 + Secs));
strftime({{Y,M,D}, {H,MM,S}}) ->
lists:flatten(
io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
datetime(Timestamp) when is_integer(Timestamp) ->
Universal = calendar:gregorian_seconds_to_datetime(Timestamp +
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})),
calendar:universal_time_to_local_time(Universal).
bin(S) when is_list(S) -> l2b(S);
bin(A) when is_atom(A) -> bin(atom_to_list(A));
bin(B) when is_binary(B) -> B;
bin(undefined) -> <<>>.
int(L) -> list_to_integer(L).
l2a(L) -> list_to_atom(L).
l2b(L) -> list_to_binary(L).
page_params(Params) ->
PageNo = int(proplists:get_value("curr_page", Params, "1")),
PageSize = int(proplists:get_value("page_size", Params, "20")),
{PageNo, PageSize}.