From 7c1ee6610d1a27a9666e0f23ff24df99b2e4db26 Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 9 Aug 2017 10:14:29 +0800 Subject: [PATCH] Add http management APIs --- include/emqttd_internal.hrl | 14 ++ src/emqttd_app.erl | 2 +- src/emqttd_http.erl | 256 +++++++++++++++-------- src/emqttd_mgmt.erl | 383 ++++++++++++++++++++++++++++++++++ src/emqttd_rest_api.erl | 399 ++++++++++++++++++++++++++++++++++++ 5 files changed, 963 insertions(+), 91 deletions(-) create mode 100644 src/emqttd_mgmt.erl create mode 100644 src/emqttd_rest_api.erl diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index ede858916..68e2b6ba6 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -59,3 +59,17 @@ -define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]). +-define(SUCCESS, 0). %% Success +-define(ERROR1, 101). %% badrpc +-define(ERROR2, 102). %% Unknown error +-define(ERROR3, 103). %% Username or password error +-define(ERROR4, 104). %% Empty username or password +-define(ERROR5, 105). %% User does not exist +-define(ERROR6, 106). %% Admin can not be deleted +-define(ERROR7, 107). %% Missing request parameter +-define(ERROR8, 108). %% Request parameter type error +-define(ERROR9, 109). %% Request parameter is not a json +-define(ERROR10, 110). %% Plugin has been loaded +-define(ERROR11, 111). %% Plugin has been loaded +-define(ERROR12, 112). %% Client not online + diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 0aa87458c..1e99cb951 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -184,7 +184,7 @@ start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []}); start_listener({Proto, ListenOn, Opts}) when Proto == api -> - mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}). + mochiweb:start_http('mqtt:api', ListenOn, Opts, emqttd_http:http_handler()). start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])), diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 0ae07a6f7..a8b78900e 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -26,15 +26,41 @@ -import(proplists, [get_value/2, get_value/3]). --export([handle_request/1]). +-export([http_handler/0, handle_request/2, http_api/0]). -handle_request(Req) -> - handle_request(Req:get(method), Req:get(path), Req). +-include("emqttd_internal.hrl"). -handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' -> +-record(state, {dispatch}). + +http_handler() -> + APIs = http_api(), + State = #state{dispatch = dispatcher(APIs)}, + {?MODULE, handle_request, [State]}. + +http_api() -> + Attr = emqttd_rest_api:module_info(attributes), + [{Regexp, Method, Function, Args} || {http_api, [{Regexp, Method, Function, Args}]} <- Attr]. + +%%-------------------------------------------------------------------- +%% Handle HTTP Request +%%-------------------------------------------------------------------- +handle_request(Req, State) -> + Path = Req:get(path), + case Path of + "/status" -> + handle_request("/status", Req, Req:get(method)); + "/" -> + handle_request("/", Req, Req:get(method)); + _ -> + if_authorized(Req, fun() -> handle_request(Path, Req, State) end) + end. + +handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) -> + Dispatch(Req, Url); + +handle_request("/status", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' -> {InternalStatus, _ProvidedStatus} = init:get_status(), - AppStatus = - case lists:keysearch(emqttd, 1, application:which_applications()) of + AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of false -> not_running; {value, _Val} -> running end, @@ -42,87 +68,69 @@ handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' [node(), InternalStatus, AppStatus]), Req:ok({"text/plain", iolist_to_binary(Status)}); -%%-------------------------------------------------------------------- -%% HTTP Publish API -%%-------------------------------------------------------------------- +handle_request("/", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' -> + respond(Req, 200, api_list()); -handle_request('POST', "/mqtt/publish", Req) -> - Params = parse_params(Req), - case authorized(Req, Params) of - true -> http_publish(Req, Params); - false -> Req:respond({401, [], <<"Unauthorized">>}) - end; +handle_request(_, Req, #state{}) -> + respond(Req, 404, []). -%%-------------------------------------------------------------------- -%% Get static files -%%-------------------------------------------------------------------- - -handle_request('GET', "/" ++ File, Req) -> - lager:info("HTTP GET File: ~s", [File]), - mochiweb_request:serve_file(File, docroot(), Req); - -handle_request(Method, Path, Req) -> - lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), - Req:not_found(). - -%%-------------------------------------------------------------------- -%% HTTP Publish -%%-------------------------------------------------------------------- - -http_publish(Req, Params) -> - lager:debug("HTTP Publish: ~p", [Params]), - Topics = topics(Params), - ClientId = get_value(<<"client">>, Params, http), - Qos = int(get_value(<<"qos">>, Params, "0")), - Retain = bool(get_value(<<"retain">>, Params, "0")), - Payload = iolist_to_binary(get_value(<<"message">>, Params)), - case {validate(qos, Qos), validate(topics, Topics)} of - {true, true} -> - lists:foreach(fun(Topic) -> - Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), - emqttd:publish(Msg#mqtt_message{retain = Retain}) - end, Topics), - Req:ok({"text/plain", <<"OK">>}); - {false, _} -> - Req:respond({400, [], <<"Bad QoS">>}); - {_, false} -> - Req:respond({400, [], <<"Bad Topics">>}) +dispatcher(APIs) -> + fun(Req, Url) -> + Method = Req:get(method), + case filter(APIs, Url, Method) of + [{Regexp, _Method, Function, FilterArgs}] -> + case params(Req) of + {error, Error1} -> + respond(Req, 200, Error1); + Params -> + case {check_params(Params, FilterArgs), + check_params_type(Params, FilterArgs)} of + {true, true} -> + {match, [MatchList]} = re:run(Url, Regexp, [global, {capture, all_but_first, list}]), + Args = lists:append([[Method, Params], MatchList]), + lager:debug("Mod:~p, Fun:~p, Args:~p", [emqttd_rest_api, Function, Args]), + case catch apply(emqttd_rest_api, Function, Args) of + {ok, Data} -> + respond(Req, 200, [{code, ?SUCCESS}, {result, Data}]); + {error, Error} -> + respond(Req, 200, Error); + {'EXIT', Reason} -> + lager:error("Execute API '~s' Error: ~p", [Url, Reason]), + respond(Req, 404, []) + end; + {false, _} -> + respond(Req, 200, [{code, ?ERROR7}, {message, <<"params error">>}]); + {_, false} -> + respond(Req, 200, [{code, ?ERROR8}, {message, <<"params type error">>}]) + end + end; + _ -> + lager:error("No match Url:~p", [Url]), + respond(Req, 404, []) + end end. -parse_params(Req) -> - [{iolist_to_binary(K), V} || {K, V} <- mochiweb_request:parse_post(Req)]. +% %%-------------------------------------------------------------------- +% %% Basic Authorization +% %%-------------------------------------------------------------------- +if_authorized(Req, Fun) -> + case authorized(Req) of + true -> Fun(); + false -> respond(Req, 401, []) + end. -topics(Params) -> - Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")], - [iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined]. - -validate(qos, Qos) -> - (Qos >= ?QOS_0) and (Qos =< ?QOS_2); - -validate(topics, [Topic|Left]) -> - case validate(topic, Topic) of - true -> validate(topics, Left); - false -> false - end; -validate(topics, []) -> - true; - -validate(topic, Topic) -> - emqttd_topic:validate({name, Topic}). - -%%-------------------------------------------------------------------- -%% basic authorization -%%-------------------------------------------------------------------- - -authorized(Req, Params) -> - ClientId = get_value(<<"client">>, Params, http), +authorized(Req) -> case Req:get_header_value("Authorization") of undefined -> false; "Basic " ++ BasicAuth -> {Username, Password} = user_passwd(BasicAuth), {ok, Peer} = Req:get(peername), - case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, username = Username, peername = Peer}, Password) of + Params = params(Req), + ClientId = get_value(<<"client">>, Params, http), + case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, + username = Username, + peername = Peer}, Password) of ok -> true; {ok, _IsSuper} -> true; @@ -133,21 +141,89 @@ authorized(Req, Params) -> end. user_passwd(BasicAuth) -> - list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). + list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). -int(I) when is_integer(I)-> I; -int(B) when is_binary(B)-> binary_to_integer(B); -int(S) -> list_to_integer(S). +respond(Req, 401, Data) -> + Req:respond({401, [{"WWW-Authenticate", "Basic Realm=\"emqx control center\""}], Data}); +respond(Req, 404, Data) -> + Req:respond({404, [{"Content-Type", "text/plain"}], Data}); +respond(Req, 200, Data) -> + Req:respond({200, [{"Content-Type", "application/json"}], to_json(Data)}); +respond(Req, Code, Data) -> + Req:respond({Code, [{"Content-Type", "text/plain"}], Data}). -bool(0) -> false; -bool(1) -> true; -bool("0") -> false; -bool("1") -> true; -bool(<<"0">>) -> false; -bool(<<"1">>) -> true. +filter(APIs, Url, Method) -> + lists:filter(fun({Regexp, Method1, _Function, _Args}) -> + case re:run(Url, Regexp, [global, {capture, all_but_first, list}]) of + {match, _} -> Method =:= Method1; + _ -> false + end + end, APIs). -docroot() -> - {file, Here} = code:is_loaded(?MODULE), - Dir = filename:dirname(filename:dirname(Here)), - filename:join([Dir, "priv", "www"]). +params(Req) -> + Method = Req:get(method), + case Method of + 'GET' -> + mochiweb_request:parse_qs(Req); + _ -> + case Req:recv_body() of + <<>> -> []; + undefined -> []; + Body -> + case jsx:is_json(Body) of + true -> jsx:decode(Body); + false -> + lager:error("Body:~p", [Body]), + {error, [{code, ?ERROR9}, {message, <<"Body not json">>}]} + end + end + end. +check_params(_Params, Args) when Args =:= [] -> + true; +check_params(Params, Args)-> + not lists:any(fun({Item, _Type}) -> undefined =:= proplists:get_value(Item, Params) end, Args). + +check_params_type(_Params, Args) when Args =:= [] -> + true; +check_params_type(Params, Args) -> + not lists:any(fun({Item, Type}) -> + Val = proplists:get_value(Item, Params), + case Type of + int -> not is_integer(Val); + binary -> not is_binary(Val); + bool -> not is_boolean(Val) + end + end, Args). + +to_json([]) -> <<"[]">>; +to_json(Data) -> iolist_to_binary(mochijson2:encode(Data)). + +api_list() -> + [{paths, [<<"api/v2/management/nodes">>, + <<"api/v2/management/nodes/{node_name}">>, + <<"api/v2/monitoring/nodes">>, + <<"api/v2/monitoring/nodes/{node_name}">>, + <<"api/v2/monitoring/listeners">>, + <<"api/v2/monitoring/listeners/{node_name}">>, + <<"api/v2/monitoring/metrics/">>, + <<"api/v2/monitoring/metrics/{node_name}">>, + <<"api/v2/monitoring/stats">>, + <<"api/v2/monitoring/stats/{node_name}">>, + <<"api/v2/nodes/{node_name}/clients">>, + <<"api/v2/nodes/{node_name}/clients/{clientid}">>, + <<"api/v2/clients/{clientid}">>, + <<"api/v2/kick_client/{clientid}">>, + <<"api/v2/nodes/{node_name}/sessions">>, + <<"api/v2/nodes/{node_name}/sessions/{clientid}">>, + <<"api/v2/sessions/{clientid}">>, + <<"api/v2/nodes/{node_name}/subscriptions">>, + <<"api/v2/nodes/{node_name}/subscriptions/{clientid}">>, + <<"api/v2/subscriptions/{clientid}">>, + <<"api/v2/routes">>, + <<"api/v2/routes/{topic}">>, + <<"api/v2/mqtt/publish">>, + <<"api/v2/mqtt/subscribe">>, + <<"api/v2/mqtt/unsubscribe">>, + <<"api/v2/nodes/{node_name}/plugins">>, + <<"api/v2/nodes/{node_name}/plugins/{plugin_name}">>]}]. diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl new file mode 100644 index 000000000..a46f70d09 --- /dev/null +++ b/src/emqttd_mgmt.erl @@ -0,0 +1,383 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_mgmt). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-include("emqttd_protocol.hrl"). + +-include("emqttd_internal.hrl"). + +-include_lib("stdlib/include/qlc.hrl"). + +-import(proplists, [get_value/2]). + +-export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0, + plugins/0, plugins/1, listeners/0, listener/1, nodes_info/0, node_info/1]). + +-export([plugin_list/1, plugin_unload/2, plugin_load/2]). + +-export([client_list/4, session_list/4, route_list/3, subscription_list/4, alarm_list/0]). + +-export([client/1, session/1, route/1, subscription/1]). + +-export([query_table/4, lookup_table/3]). + +-export([publish/1, subscribe/1, unsubscribe/1]). + +-export([kick_client/1]). + +-define(KB, 1024). +-define(MB, (1024*1024)). +-define(GB, (1024*1024*1024)). + +-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). + +brokers() -> + [{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()]. + +broker(Node) when Node =:= node() -> + emqttd_broker:info(); +broker(Node) -> + rpc_call(Node, broker, [Node]). + +metrics() -> + [{Node, metrics(Node)} || Node <- ekka_mnesia:running_nodes()]. + +metrics(Node) when Node =:= node() -> + emqttd_metrics:all(); +metrics(Node) -> + rpc_call(Node, metrics, [Node]). + +stats() -> + [{Node, stats(Node)} || Node <- ekka_mnesia:running_nodes()]. + +stats(Node) when Node =:= node() -> + emqttd_stats:getstats(); +stats(Node) -> + rpc_call(Node, stats, [Node]). + +plugins() -> + [{Node, plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. + +plugins(Node) when Node =:= node() -> + emqttd_plugins:list(Node); +plugins(Node) -> + rpc_call(Node, plugins, [Node]). + +listeners() -> + [{Node, listener(Node)} || Node <- ekka_mnesia:running_nodes()]. + +listener(Node) when Node =:= node() -> + lists:map(fun({{Protocol, ListenOn}, Pid}) -> + Info = [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients,esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}], + {Protocol, ListenOn, Info} + end, esockd:listeners()); + +listener(Node) -> + rpc_call(Node, listener, [Node]). + +nodes_info() -> + Running = mnesia:system_info(running_db_nodes), + Stopped = mnesia:system_info(db_nodes) -- Running, + DownNodes = lists:map(fun stop_node/1, Stopped), + [node_info(Node) || Node <- Running] ++ DownNodes. + +node_info(Node) when Node =:= node() -> + CpuInfo = [{K, list_to_binary(V)} || {K, V} <- emqttd_vm:loads()], + Memory = emqttd_vm:get_memory(), + OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), + [{name, node()}, + {otp_release, list_to_binary(OtpRel)}, + {memory_total, kmg(get_value(allocated, Memory))}, + {memory_used, kmg(get_value(used, Memory))}, + {process_available, erlang:system_info(process_limit)}, + {process_used, erlang:system_info(process_count)}, + {max_fds, get_value(max_fds, erlang:system_info(check_io))}, + {clients, ets:info(mqtt_client, size)}, + {node_status, 'Running'} | CpuInfo]; + +node_info(Node) -> + rpc_call(Node, node_info, [Node]). + +stop_node(Node) -> + [{name, Node}, {node_status, 'Stopped'}]. +%%-------------------------------------------------------- +%% plugins +%%-------------------------------------------------------- +plugin_list(Node) when Node =:= node() -> + emqttd_plugins:list(); +plugin_list(Node) -> + rpc_call(Node, plugin_list, [Node]). + +plugin_load(Node, PluginName) when Node =:= node() -> + emqttd_plugins:load(PluginName); +plugin_load(Node, PluginName) -> + rpc_call(Node, plugin_load, [Node, PluginName]). + +plugin_unload(Node, PluginName) when Node =:= node() -> + emqttd_plugins:unload(PluginName); +plugin_unload(Node, PluginName) -> + rpc_call(Node, plugin_unload, [Node, PluginName]). + +%%-------------------------------------------------------- +%% client +%%-------------------------------------------------------- +client_list(Node, Key, PageNo, PageSize) when Node =:= node() -> + client_list(Key, PageNo, PageSize); +client_list(Node, Key, PageNo, PageSize) -> + rpc_call(Node, client_list, [Node, Key, PageNo, PageSize]). + +client(ClientId) -> + lists:flatten([client_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]). + +%%-------------------------------------------------------- +%% session +%%-------------------------------------------------------- +session_list(Node, Key, PageNo, PageSize) when Node =:= node() -> + session_list(Key, PageNo, PageSize); +session_list(Node, Key, PageNo, PageSize) -> + rpc_call(Node, session_list, [Node, Key, PageNo, PageSize]). + +session(ClientId) -> + lists:flatten([session_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]). + +%%-------------------------------------------------------- +%% subscription +%%-------------------------------------------------------- +subscription_list(Node, Key, PageNo, PageSize) when Node =:= node() -> + subscription_list(Key, PageNo, PageSize); +subscription_list(Node, Key, PageNo, PageSize) -> + rpc_call(Node, subscription_list, [Node, Key, PageNo, PageSize]). + +subscription(Key) -> + lists:flatten([subscription_list(Node, Key, 1, 20) || Node <- ekka_mnesia:running_nodes()]). + +%%-------------------------------------------------------- +%% Routes +%%-------------------------------------------------------- +route(Key) -> route_list(Key, 1, 20). + +%%-------------------------------------------------------- +%% alarm +%%-------------------------------------------------------- +alarm_list() -> + emqttd_alarm:get_alarms(). + +query_table(Qh, PageNo, PageSize, TotalNum) -> + Cursor = qlc:cursor(Qh), + case PageNo > 1 of + true -> qlc:next_answers(Cursor, (PageNo - 1) * PageSize); + false -> ok + end, + Rows = qlc:next_answers(Cursor, PageSize), + qlc:delete_cursor(Cursor), + [{totalNum, TotalNum}, + {totalPage, total_page(TotalNum, PageSize)}, + {result, Rows}]. + +total_page(TotalNum, PageSize) -> + case TotalNum rem PageSize of + 0 -> TotalNum div PageSize; + _ -> (TotalNum div PageSize) + 1 + end. + +%%TODO: refactor later... +lookup_table(LookupFun, _PageNo, _PageSize) -> + Rows = LookupFun(), + Rows. + +%%-------------------------------------------------------------------- +%% mqtt +%%-------------------------------------------------------------------- +publish({ClientId, Topic, Payload, Qos, Retain}) -> + case validate(topic, Topic) of + true -> + Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), + emqttd:publish(Msg#mqtt_message{retain = Retain}), + ok; + false -> + {error, format_error(Topic, "validate topic: ${0} fail")} + end. + +subscribe({ClientId, Topic, Qos}) -> + case validate(topic, Topic) of + true -> + case emqttd_sm:lookup_session(ClientId) of + undefined -> + {error, format_error(ClientId, "Clientid: ${0} not found")}; + #mqtt_session{sess_pid = SessPid} -> + emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), + ok + end; + false -> + {error, format_error(Topic, "validate topic: ${0} fail")} + end. + +unsubscribe({ClientId, Topic})-> + case validate(topic, Topic) of + true -> + case emqttd_sm:lookup_session(ClientId) of + undefined -> + {error, format_error(ClientId, "Clientid: ${0} not found")}; + #mqtt_session{sess_pid = SessPid} -> + emqttd_session:unsubscribe(SessPid, [{Topic, []}]), + ok + end; + false -> + {error, format_error(Topic, "validate topic: ${0} fail")} + end. + +% publish(Messages) -> +% lists:foldl( +% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) -> +% case validate(topic, Topic) of +% true -> +% Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), +% emqttd:publish(Msg#mqtt_message{retain = Retain}), +% {[[{topic, Topic}]| Success], Failed}; +% false -> +% {Success, [[{topic, Topic}]| Failed]} +% end +% end, {[], []}, Messages). + +% subscribers(Subscribers) -> +% lists:foldl( +% fun({ClientId, Topic, Qos}, {Success, Failed}) -> +% case emqttd_sm:lookup_session(ClientId) of +% undefined -> +% {Success, [[{client_id, ClientId}]|Failed]}; +% #mqtt_session{sess_pid = SessPid} -> +% emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), +% {[[{client_id, ClientId}]| Success], Failed} +% end +% end,{[], []}, Subscribers). + +% unsubscribers(UnSubscribers)-> +% lists:foldl( +% fun({ClientId, Topic}, {Success, Failed}) -> +% case emqttd_sm:lookup_session(ClientId) of +% undefined -> +% {Success, [[{client_id, ClientId}]|Failed]}; +% #mqtt_session{sess_pid = SessPid} -> +% emqttd_session:unsubscriber(SessPid, [{Topic, []}]), +% {[[{client_id, ClientId}]| Success], Failed} +% end +% end, {[], []}, UnSubscribers). + +%%-------------------------------------------------------------------- +%% manager API +%%-------------------------------------------------------------------- +kick_client(ClientId) -> + Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + case lists:any(fun(Item) -> Item =:= ok end, Result) of + true -> {ok, [{status, success}]}; + false -> {ok, [{status, failure}]} + end. + +kick_client(Node, ClientId) when Node =:= node() -> + case emqttd_cm:lookup(ClientId) of + undefined -> error; + #mqtt_client{client_pid = Pid}-> emqttd_client:kick(Pid) + end; +kick_client(Node, ClientId) -> + rpc_call(Node, kick_client, [Node, ClientId]). + +%%-------------------------------------------------------------------- +%% Internel Functions. +%%-------------------------------------------------------------------- + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. + +kmg(Byte) when Byte > ?GB -> + float(Byte / ?GB, "G"); +kmg(Byte) when Byte > ?MB -> + float(Byte / ?MB, "M"); +kmg(Byte) when Byte > ?KB -> + float(Byte / ?MB, "K"); +kmg(Byte) -> + Byte. +float(F, S) -> + iolist_to_binary(io_lib:format("~.2f~s", [F, S])). + +validate(qos, Qos) -> + (Qos >= ?QOS_0) and (Qos =< ?QOS_2); + +validate(topic, Topic) -> + emqttd_topic:validate({name, Topic}). + +client_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) -> + TotalNum = ets:info(mqtt_client, size), + Qh = qlc:q([R || R <- ets:table(mqtt_client)]), + query_table(Qh, PageNo, PageSize, TotalNum); + +client_list(ClientId, PageNo, PageSize) -> + Fun = fun() -> ets:lookup(mqtt_client, ClientId) end, + lookup_table(Fun, PageNo, PageSize). + +session_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) -> + TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_session]]), + Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_session]]), + query_table(Qh, PageNo, PageSize, TotalNum); + +session_list(ClientId, PageNo, PageSize) -> + MP = {ClientId, '_', '_', '_'}, + Fun = fun() -> lists:append([ets:match_object(Tab, MP) || Tab <- [mqtt_local_session]]) end, + lookup_table(Fun, PageNo, PageSize). + +subscription_list(Key, PageNo, PageSize) when ?EMPTY_KEY(Key) -> + TotalNum = ets:info(mqtt_subproperty, size), + Qh = qlc:q([E || E <- ets:table(mqtt_subproperty)]), + query_table(Qh, PageNo, PageSize, TotalNum); + +subscription_list(Key, PageNo, PageSize) -> + Keys = ets:lookup(mqtt_subscription, Key), + Fun = case length(Keys) == 0 of + true -> + MP = {{Key, '_'}, '_'}, + fun() -> ets:match_object(mqtt_subproperty, MP) end; + false -> + fun() -> + lists:map(fun({S, T}) ->[R] = ets:lookup(mqtt_subproperty, {T, S}), R end, Keys) + end + end, + lookup_table(Fun, PageNo, PageSize). + +route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) -> + TotalNum = lists:sum([ets:info(Tab, size) || Tab <- tables()]), + Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- tables()]), + query_table(Qh, PageNo, PageSize, TotalNum); + +route_list(Topic, PageNo, PageSize) -> + Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- tables()]) end, + lookup_table(Fun, PageNo, PageSize). + +tables() -> + [mqtt_route, mqtt_local_route]. + +format_error(Val, Msg) -> + re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]). + diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl new file mode 100644 index 000000000..e776e2210 --- /dev/null +++ b/src/emqttd_rest_api.erl @@ -0,0 +1,399 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module (emqttd_rest_api). + +-include("emqttd.hrl"). + +-include("emqttd_internal.hrl"). + +-http_api({"^nodes/(.+?)/alarms/?$", 'GET', alarm_list, []}). + +-http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}). +-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}). +-http_api({"^clients/(.+?)/?$", 'GET', client, []}). +-http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}). + +-http_api({"^routes?$", 'GET', route_list, []}). +-http_api({"^routes/(.+?)/?$", 'GET', route, []}). + +-http_api({"^nodes/(.+?)/sessions/?$", 'GET', session_list, []}). +-http_api({"^nodes/(.+?)/sessions/(.+?)/?$", 'GET', session_list, []}). +-http_api({"^sessions/(.+?)/?$", 'GET', session, []}). + +-http_api({"^nodes/(.+?)/subscriptions/?$", 'GET', subscription_list, []}). +-http_api({"^nodes/(.+?)/subscriptions/(.+?)/?$", 'GET', subscription_list, []}). +-http_api({"^subscriptions/(.+?)/?$", 'GET', subscription, []}). + +-http_api({"^mqtt/publish?$", 'POST', publish, [{<<"topic">>, binary}]}). +-http_api({"^mqtt/subscribe?$", 'POST', subscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}). +-http_api({"^mqtt/unsubscribe?$", 'POST', unsubscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}). + +-http_api({"^management/nodes/?$", 'GET', brokers, []}). +-http_api({"^management/nodes/(.+?)/?$", 'GET', broker, []}). +-http_api({"^monitoring/nodes/?$", 'GET', nodes, []}). +-http_api({"^monitoring/nodes/(.+?)/?$", 'GET', node, []}). +-http_api({"^monitoring/listeners/?$", 'GET', listeners, []}). +-http_api({"^monitoring/listeners/(.+?)/?$", 'GET', listener, []}). +-http_api({"^monitoring/metrics/?$", 'GET', metrics, []}). +-http_api({"^monitoring/metrics/(.+?)/?$", 'GET', metric, []}). +-http_api({"^monitoring/stats/?$", 'GET', stats, []}). +-http_api({"^monitoring/stats/(.+?)/?$", 'GET', stat, []}). + +-http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}). +-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}). + +-export([alarm_list/3]). +-export([client/3, client_list/3, client_list/4, kick_client/3]). +-export([route/3, route_list/2]). +-export([session/3, session_list/3, session_list/4]). +-export([subscription/3, subscription_list/3, subscription_list/4]). +-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]). +-export([publish/2, subscribe/2, unsubscribe/2]). +-export([plugin_list/3, enabled/4]). + +%%-------------------------------------------------------------------------- +%% alarm +%%-------------------------------------------------------------------------- +alarm_list('GET', _Req, _Node) -> + Alarms = emqttd_mgmt:alarm_list(), + {ok, lists:map(fun alarm_row/1, Alarms)}. + +alarm_row(#mqtt_alarm{id = AlarmId, + severity = Severity, + title = Title, + summary = Summary, + timestamp = Timestamp}) -> + [{id, AlarmId}, + {severity, Severity}, + {title, l2b(Title)}, + {summary, l2b(Summary)}, + {occurred_at, l2b(strftime(Timestamp))}]. + +%%-------------------------------------------------------------------------- +%% client +%%-------------------------------------------------------------------------- +client('GET', _Params, Key) -> + Data = emqttd_mgmt:client(l2b(Key)), + {ok, [{objects, [client_row(Row) || Row <- Data]}]}. + +client_list('GET', Params, Node) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [client_row(Row) || Row <- Rows]}]}. + +client_list('GET', Params, Node, Key) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize), + {ok, [{objects, [client_row(Row) || Row <- Data]}]}. + +kick_client('PUT', _Params, Key) -> + case emqttd_mgmt:kick_client(l2b(Key)) of + ok -> {ok, []}; + error -> {error, [{code, ?ERROR12}]} + end. + +client_row(#mqtt_client{client_id = ClientId, + peername = {IpAddr, Port}, + username = Username, + clean_sess = CleanSess, + proto_ver = ProtoVer, + keepalive = KeepAlvie, + connected_at = ConnectedAt}) -> + [{client_id, ClientId}, + {username, Username}, + {ipaddress, l2b(ntoa(IpAddr))}, + {port, Port}, + {clean_sess, CleanSess}, + {proto_ver, ProtoVer}, + {keepalive, KeepAlvie}, + {connected_at, l2b(strftime(ConnectedAt))}]. + +%%-------------------------------------------------------------------------- +%% route +%%-------------------------------------------------------------------------- +route('GET', _Params, Key) -> + Data = emqttd_mgmt:route(l2b(Key)), + {ok, [{objects, [route_row(Row) || Row <- Data]}]}. + +route_list('GET', Params) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [route_row(Row) || Row <- Rows]}]}. + +route_row(Route) when is_record(Route, mqtt_route) -> + [{topic, Route#mqtt_route.topic}, {node, Route#mqtt_route.node}]; + +route_row({Topic, Node}) -> + [{topic, Topic}, {node, Node}]. + +%%-------------------------------------------------------------------------- +%% session +%%-------------------------------------------------------------------------- +session('GET', _Params, Key) -> + Data = emqttd_mgmt:session(l2b(Key)), + {ok, [{objects, [session_row(Row) || Row <- Data]}]}. + +session_list('GET', Params, Node) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [session_row(Row) || Row <- Rows]}]}. + +session_list('GET', Params, Node, ClientId) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:session_list(l2a(Node), l2b(ClientId), PageNo, PageSize), + {ok, [{objects, [session_row(Row) || Row <- Data]}]}. + +session_row({ClientId, _Pid, _Persistent, Session}) -> + InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue, + message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at], + [{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]]. + +%%-------------------------------------------------------------------------- +%% subscription +%%-------------------------------------------------------------------------- +subscription('GET', _Params, Key) -> + Data = emqttd_mgmt:subscription(l2b(Key)), + {ok, [{objects, [subscription_row(Row) || Row <- Data]}]}. + +subscription_list('GET', Params, Node) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [subscription_row(Row) || Row <- Rows]}]}. + +subscription_list('GET', Params, Node, Key) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:subscription_list(l2a(Node), l2b(Key), PageNo, PageSize), + {ok, [{objects, [subscription_row(Row) || Row <- Data]}]}. + +subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) -> + subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option}); +subscription_row({{Topic, ClientId}, Option}) -> + Qos = proplists:get_value(qos, Option), + [{client_id, ClientId}, {topic, Topic}, {qos, Qos}]. + +%%-------------------------------------------------------------------------- +%% management/monitoring +%%-------------------------------------------------------------------------- +nodes('GET', _Params) -> + Data = emqttd_mgmt:nodes_info(), + {ok, format_broker(Data)}. + +node('GET', _Params, Node) -> + Data = emqttd_mgmt:node_info(l2a(Node)), + {ok, format_broker(Data)}. + +brokers('GET', _Params) -> + Data = emqttd_mgmt:brokers(), + {ok, [format_broker(Node, Broker) || {Node, Broker} <- Data]}. + +broker('GET', _Params, Node) -> + Data = emqttd_mgmt:broker(l2a(Node)), + {ok, format_broker(Data)}. + +listeners('GET', _Params) -> + Data = emqttd_mgmt:listeners(), + {ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}. + +listener('GET', _Params, Node) -> + Data = emqttd_mgmt:listener(l2a(Node)), + {ok, [format_listener(Listeners) || Listeners <- Data]}. + +metrics('GET', _Params) -> + Data = emqttd_mgmt:metrics(), + {ok, Data}. + +metric('GET', _Params, Node) -> + Data = emqttd_mgmt:metrics(l2a(Node)), + {ok, Data}. + +stats('GET', _Params) -> + Data = emqttd_mgmt:stats(), + {ok, Data}. + +stat('GET', _Params, Node) -> + Data = emqttd_mgmt:stats(l2a(Node)), + {ok, Data}. + +format_broker(Node, Broker) -> + OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), + [{name, Node}, + {version, bin(proplists:get_value(version, Broker))}, + {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, + {uptime, bin(proplists:get_value(uptime, Broker))}, + {datetime, bin(proplists:get_value(datetime, Broker))}, + {otp_release, l2b(OtpRel)}, + {node_status, 'Running'}]. + +format_broker(Broker) -> + OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), + [{version, bin(proplists:get_value(version, Broker))}, + {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, + {uptime, bin(proplists:get_value(uptime, Broker))}, + {datetime, bin(proplists:get_value(datetime, Broker))}, + {otp_release, l2b(OtpRel)}, + {node_status, 'Running'}]. + +format_listeners([], Acc) -> + Acc; +format_listeners([{Protocol, ListenOn, Info}| Listeners], Acc) -> + format_listeners(Listeners, [format_listener({Protocol, ListenOn, Info}) | Acc]). + +format_listener({Protocol, ListenOn, Info}) -> + Listen = l2b(esockd:to_string(ListenOn)), + lists:append([{protocol, Protocol}, {listen, Listen}], Info). + +%%-------------------------------------------------------------------------- +%% mqtt +%%-------------------------------------------------------------------------- +publish('POST', Params) -> + Topic = proplists:get_value(<<"topic">>, Params), + ClientId = proplists:get_value(<<"client_id">>, Params, http), + Payload = proplists:get_value(<<"payload">>, Params, <<>>), + Qos = proplists:get_value(<<"qos">>, Params, 0), + Retain = proplists:get_value(<<"retain">>, Params, false), + case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of + ok -> + {ok, []}; + {error, Error} -> + {error, [{code, ?ERROR2}, {message, Error}]} + end. + +subscribe('POST', Params) -> + ClientId = proplists:get_value(<<"client_id">>, Params), + Topic = proplists:get_value(<<"topic">>, Params), + Qos = proplists:get_value(<<"qos">>, Params, 0), + case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of + ok -> + {ok, []}; + {error, Error} -> + {error, [{code, ?ERROR2}, {message, Error}]} + end. + +unsubscribe('POST', Params) -> + ClientId = proplists:get_value(<<"client_id">>, Params), + Topic = proplists:get_value(<<"topic">>, Params), + case emqttd_mgmt:unsubscribe({ClientId, Topic})of + ok -> + {ok, []}; + {error, Error} -> + {error, [{code, ?ERROR2}, {message, Error}]} + end. + +%%-------------------------------------------------------------------------- +%% plugins +%%-------------------------------------------------------------------------- +plugin_list('GET', _Params, Node) -> + Plugins = lists:map(fun plugin/1, emqttd_mgmt:plugin_list(l2a(Node))), + {ok, Plugins}. + +enabled('PUT', Params, Node, PluginName) -> + Active = proplists:get_value(<<"active">>, Params), + case Active of + true -> + return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName))); + false -> + return(emqttd_mgmt:plugin_unload(l2a(Node), l2a(PluginName))) + end. + +return(Result) -> + case Result of + {ok, _} -> + {ok, []}; + {error, already_started} -> + {error, [{code, ?ERROR10}, {message, <<"already_started">>}]}; + {error, not_started} -> + {error, [{code, ?ERROR11}, {message, <<"not_started">>}]}; + Error -> + lager:error("error:~p", [Error]), + {error, [{code, ?ERROR2}, {message, <<"unknown">>}]} + end. +plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr, + active = Active}) -> + [{name, Name}, + {version, iolist_to_binary(Ver)}, + {description, iolist_to_binary(Descr)}, + {active, Active}]. + +%%-------------------------------------------------------------------------- +%% Inner function +%%-------------------------------------------------------------------------- +format(created_at, Val) -> + l2b(strftime(Val)); +format(_, Val) -> + Val. + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); +ntoa(IP) -> + inet_parse:ntoa(IP). + +%%-------------------------------------------------------------------- +%% Strftime +%%-------------------------------------------------------------------- +strftime({MegaSecs, Secs, _MicroSecs}) -> + strftime(datetime(MegaSecs * 1000000 + Secs)); + +strftime({{Y,M,D}, {H,MM,S}}) -> + lists:flatten( + io_lib:format( + "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). + +datetime(Timestamp) when is_integer(Timestamp) -> + Universal = calendar:gregorian_seconds_to_datetime(Timestamp + + calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})), + calendar:universal_time_to_local_time(Universal). + +bin(S) when is_list(S) -> l2b(S); +bin(A) when is_atom(A) -> bin(atom_to_list(A)); +bin(B) when is_binary(B) -> B; +bin(undefined) -> <<>>. +int(L) -> list_to_integer(L). +l2a(L) -> list_to_atom(L). +l2b(L) -> list_to_binary(L). + + +page_params(Params) -> + PageNo = int(proplists:get_value("curr_page", Params, "1")), + PageSize = int(proplists:get_value("page_size", Params, "20")), + {PageNo, PageSize}. \ No newline at end of file