From 8325056061fa4dae7134ab91e7ee8606acc8d008 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 26 Jul 2017 13:16:46 +0800 Subject: [PATCH 01/14] Authorize HTTP Publish API with clientId --- src/emqttd_http.erl | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 36323406a..0ae07a6f7 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -47,8 +47,9 @@ handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' %%-------------------------------------------------------------------- handle_request('POST', "/mqtt/publish", Req) -> - case authorized(Req) of - true -> http_publish(Req); + Params = parse_params(Req), + case authorized(Req, Params) of + true -> http_publish(Req, Params); false -> Req:respond({401, [], <<"Unauthorized">>}) end; @@ -68,8 +69,7 @@ handle_request(Method, Path, Req) -> %% HTTP Publish %%-------------------------------------------------------------------- -http_publish(Req) -> - Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)], +http_publish(Req, Params) -> lager:debug("HTTP Publish: ~p", [Params]), Topics = topics(Params), ClientId = get_value(<<"client">>, Params, http), @@ -89,6 +89,9 @@ http_publish(Req) -> Req:respond({400, [], <<"Bad Topics">>}) end. +parse_params(Req) -> + [{iolist_to_binary(K), V} || {K, V} <- mochiweb_request:parse_post(Req)]. + topics(Params) -> Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")], [iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined]. @@ -111,24 +114,22 @@ validate(topic, Topic) -> %% basic authorization %%-------------------------------------------------------------------- -authorized(Req) -> - Params = mochiweb_request:parse_post(Req), - ClientId = get_value("client", Params, http), +authorized(Req, Params) -> + ClientId = get_value(<<"client">>, Params, http), case Req:get_header_value("Authorization") of - undefined -> - false; - "Basic " ++ BasicAuth -> - {Username, Password} = user_passwd(BasicAuth), - {ok, Peer} = Req:get(peername), - case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, username = Username, peername = Peer}, Password) of - ok -> - true; - {ok, _IsSuper} -> - true; - {error, Reason} -> - lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]), - false - end + undefined -> + false; + "Basic " ++ BasicAuth -> + {Username, Password} = user_passwd(BasicAuth), + {ok, Peer} = Req:get(peername), + case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, username = Username, peername = Peer}, Password) of + ok -> true; + {ok, _IsSuper} -> + true; + {error, Reason} -> + lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]), + false + end end. user_passwd(BasicAuth) -> From ffebce50908190e06648711662b4a586495a59c3 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 26 Jul 2017 13:58:44 +0800 Subject: [PATCH 02/14] Depend on develop branch of ekka --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e035a3d02..9270a562e 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd master -dep_ekka = git https://github.com/emqtt/ekka emq24 +dep_ekka = git https://github.com/emqtt/ekka develop dep_mochiweb = git https://github.com/emqtt/mochiweb master dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog From 925e35dcbd7705661239e0d6b89b57700bb84046 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 7 Aug 2017 12:15:52 +0800 Subject: [PATCH 03/14] Remove the fullsweep_after option --- src/emqttd_client.erl | 2 +- src/emqttd_session.erl | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index dedfcf1df..c7167c8a7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -66,7 +66,7 @@ [esockd_net:format(State#client_state.peername) | Args])). start_link(Conn, Env) -> - {ok, proc_lib:spawn_opt(?MODULE, init, [[Conn, Env]], [link | ?FULLSWEEP_OPTS])}. + {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. info(CPid) -> gen_server2:call(CPid, info). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 31934759f..e8e694530 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -174,8 +174,7 @@ %% @doc Start a Session -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}). start_link(CleanSess, {ClientId, Username}, ClientPid) -> - gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], - [{spawn_opt, ?FULLSWEEP_OPTS}]). %% Tune GC. + gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []). %%-------------------------------------------------------------------- %% PubSub API @@ -183,7 +182,7 @@ start_link(CleanSess, {ClientId, Username}, ClientPid) -> %% @doc Subscribe topics -spec(subscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok). -subscribe(Session, TopicTable) ->%%TODO: the ack function??... +subscribe(Session, TopicTable) -> %%TODO: the ack function??... gen_server2:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}). -spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqttd_topic:option()]}]) -> ok). From 88f84a4a0c25a2c69c7e58a21c7113f58b379159 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 7 Aug 2017 18:27:16 +0800 Subject: [PATCH 04/14] Support to configure keepalive backoff --- etc/emq.conf | 3 +++ priv/emq.schema | 9 ++++++++- src/emqttd_protocol.erl | 15 +++++++++------ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 82209dde7..1c0859921 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -177,6 +177,9 @@ mqtt.max_packet_size = 64KB ## Check Websocket Protocol Header. Enum: on, off mqtt.websocket_protocol_header = on +## The Keepalive timeout: Keepalive * backoff * 2 +mqtt.keepalive_backoff = 1.25 + ##-------------------------------------------------------------------- ## MQTT Connection ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index f4c369b9c..bd533d05e 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -474,9 +474,16 @@ end}. {datatype, bytesize} ]}. +%% @doc Keepalive backoff +{mapping, "mqtt.keepalive_backoff", "emqttd.protocol", [ + {default, 1.25}, + {datatype, float} +]}. + {translation, "emqttd.protocol", fun(Conf) -> [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, - {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] + {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, + {keepalive_backoff, cuttlefish:conf_get("mqtt.keepalive_backoff", Conf)}] end}. {mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b5be7c066..0129faedd 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -42,8 +42,9 @@ %% ws_initial_headers: Headers from first HTTP request for WebSocket Client. -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, proto_ver, proto_name, username, is_superuser, - will_msg, keepalive, max_clientid_len, session, stats_data, - mountpoint, ws_initial_headers, connected_at}). + will_msg, keepalive, keepalive_backoff, max_clientid_len, + session, stats_data, mountpoint, ws_initial_headers, + connected_at}). -type(proto_state() :: #proto_state{}). @@ -58,6 +59,7 @@ %% @doc Init protocol init(Peername, SendFun, Opts) -> + Backoff = get_value(keepalive_backoff, Opts, 1.25), EnableStats = get_value(client_enable_stats, Opts, false), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = get_value(ws_initial_headers, Opts), @@ -67,6 +69,7 @@ init(Peername, SendFun, Opts) -> is_superuser = false, client_pid = self(), ws_initial_headers = WsInitialHeaders, + keepalive_backoff = Backoff, stats_data = #proto_stats{enable_stats = EnableStats}}. init(Conn, Peername, SendFun, Opts) -> @@ -202,7 +205,7 @@ process(?CONNECT_PACKET(Var), State0) -> %% Register the client emqttd_cm:reg(client(State2)), %% Start keepalive - start_keepalive(KeepAlive), + start_keepalive(KeepAlive, State2), %% Emit Stats self() ! emit_stats, %% ACCEPT @@ -411,10 +414,10 @@ send_willmsg(_Client, undefined) -> send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). -start_keepalive(0) -> ignore; +start_keepalive(0, _State) -> ignore; -start_keepalive(Sec) when Sec > 0 -> - self() ! {keepalive, start, round(Sec * 1.25)}. +start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 -> + self() ! {keepalive, start, round(Sec * Backoff)}. %%-------------------------------------------------------------------- %% Validate Packets From 1c63bdd90d8a14df147e629f5bde1b317eed8c2a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 7 Aug 2017 18:28:32 +0800 Subject: [PATCH 05/14] APIs for hot reload of configuration --- src/emqttd_config.erl | 59 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 src/emqttd_config.erl diff --git a/src/emqttd_config.erl b/src/emqttd_config.erl new file mode 100644 index 000000000..8ae8ab72d --- /dev/null +++ b/src/emqttd_config.erl @@ -0,0 +1,59 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Hot Configuration + +-module(emqttd_config). + +-export([read/1, dump/2, reload/1, get/2, get/3, set/3]). + +-type(env() :: {atom(), term()}). + +%% @doc Read the configuration of an application. +-spec(read(atom()) -> {ok, list(env())} | {error, term()}). +read(_App) -> + %% TODO + %% 1. Read the app.conf from etc folder + %% 2. Cuttlefish to read the conf + %% 3. Return the terms and schema + {error, unsupported}. + +%% @doc Reload configuration of an application. +-spec(reload(atom()) -> ok | {error, term()}). +reload(_App) -> + %% TODO + %% 1. Read the app.conf from etc folder + %% 2. Cuttlefish to generate config terms. + %% 3. set/3 to apply the config + ok. + +-spec(dump(atom(), list(env())) -> ok | {error, term()}). +dump(_App, _Terms) -> + %% TODO + ok. + +-spec(set(atom(), atom(), term()) -> ok). +set(App, Par, Val) -> + application:set_env(App, Par, Val). + +-spec(get(atom(), atom()) -> undefined | {ok, term()}). +get(App, Par) -> + application:get_env(App, Par). + +-spec(get(atom(), atom(), atom()) -> term()). +get(App, Par, Def) -> + application:get_env(App, Par, Def). + From 7c1ee6610d1a27a9666e0f23ff24df99b2e4db26 Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 9 Aug 2017 10:14:29 +0800 Subject: [PATCH 06/14] Add http management APIs --- include/emqttd_internal.hrl | 14 ++ src/emqttd_app.erl | 2 +- src/emqttd_http.erl | 256 +++++++++++++++-------- src/emqttd_mgmt.erl | 383 ++++++++++++++++++++++++++++++++++ src/emqttd_rest_api.erl | 399 ++++++++++++++++++++++++++++++++++++ 5 files changed, 963 insertions(+), 91 deletions(-) create mode 100644 src/emqttd_mgmt.erl create mode 100644 src/emqttd_rest_api.erl diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index ede858916..68e2b6ba6 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -59,3 +59,17 @@ -define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]). +-define(SUCCESS, 0). %% Success +-define(ERROR1, 101). %% badrpc +-define(ERROR2, 102). %% Unknown error +-define(ERROR3, 103). %% Username or password error +-define(ERROR4, 104). %% Empty username or password +-define(ERROR5, 105). %% User does not exist +-define(ERROR6, 106). %% Admin can not be deleted +-define(ERROR7, 107). %% Missing request parameter +-define(ERROR8, 108). %% Request parameter type error +-define(ERROR9, 109). %% Request parameter is not a json +-define(ERROR10, 110). %% Plugin has been loaded +-define(ERROR11, 111). %% Plugin has been loaded +-define(ERROR12, 112). %% Client not online + diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 0aa87458c..1e99cb951 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -184,7 +184,7 @@ start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []}); start_listener({Proto, ListenOn, Opts}) when Proto == api -> - mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}). + mochiweb:start_http('mqtt:api', ListenOn, Opts, emqttd_http:http_handler()). start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])), diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 0ae07a6f7..a8b78900e 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -26,15 +26,41 @@ -import(proplists, [get_value/2, get_value/3]). --export([handle_request/1]). +-export([http_handler/0, handle_request/2, http_api/0]). -handle_request(Req) -> - handle_request(Req:get(method), Req:get(path), Req). +-include("emqttd_internal.hrl"). -handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' -> +-record(state, {dispatch}). + +http_handler() -> + APIs = http_api(), + State = #state{dispatch = dispatcher(APIs)}, + {?MODULE, handle_request, [State]}. + +http_api() -> + Attr = emqttd_rest_api:module_info(attributes), + [{Regexp, Method, Function, Args} || {http_api, [{Regexp, Method, Function, Args}]} <- Attr]. + +%%-------------------------------------------------------------------- +%% Handle HTTP Request +%%-------------------------------------------------------------------- +handle_request(Req, State) -> + Path = Req:get(path), + case Path of + "/status" -> + handle_request("/status", Req, Req:get(method)); + "/" -> + handle_request("/", Req, Req:get(method)); + _ -> + if_authorized(Req, fun() -> handle_request(Path, Req, State) end) + end. + +handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) -> + Dispatch(Req, Url); + +handle_request("/status", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' -> {InternalStatus, _ProvidedStatus} = init:get_status(), - AppStatus = - case lists:keysearch(emqttd, 1, application:which_applications()) of + AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of false -> not_running; {value, _Val} -> running end, @@ -42,87 +68,69 @@ handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' [node(), InternalStatus, AppStatus]), Req:ok({"text/plain", iolist_to_binary(Status)}); -%%-------------------------------------------------------------------- -%% HTTP Publish API -%%-------------------------------------------------------------------- +handle_request("/", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' -> + respond(Req, 200, api_list()); -handle_request('POST', "/mqtt/publish", Req) -> - Params = parse_params(Req), - case authorized(Req, Params) of - true -> http_publish(Req, Params); - false -> Req:respond({401, [], <<"Unauthorized">>}) - end; +handle_request(_, Req, #state{}) -> + respond(Req, 404, []). -%%-------------------------------------------------------------------- -%% Get static files -%%-------------------------------------------------------------------- - -handle_request('GET', "/" ++ File, Req) -> - lager:info("HTTP GET File: ~s", [File]), - mochiweb_request:serve_file(File, docroot(), Req); - -handle_request(Method, Path, Req) -> - lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), - Req:not_found(). - -%%-------------------------------------------------------------------- -%% HTTP Publish -%%-------------------------------------------------------------------- - -http_publish(Req, Params) -> - lager:debug("HTTP Publish: ~p", [Params]), - Topics = topics(Params), - ClientId = get_value(<<"client">>, Params, http), - Qos = int(get_value(<<"qos">>, Params, "0")), - Retain = bool(get_value(<<"retain">>, Params, "0")), - Payload = iolist_to_binary(get_value(<<"message">>, Params)), - case {validate(qos, Qos), validate(topics, Topics)} of - {true, true} -> - lists:foreach(fun(Topic) -> - Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), - emqttd:publish(Msg#mqtt_message{retain = Retain}) - end, Topics), - Req:ok({"text/plain", <<"OK">>}); - {false, _} -> - Req:respond({400, [], <<"Bad QoS">>}); - {_, false} -> - Req:respond({400, [], <<"Bad Topics">>}) +dispatcher(APIs) -> + fun(Req, Url) -> + Method = Req:get(method), + case filter(APIs, Url, Method) of + [{Regexp, _Method, Function, FilterArgs}] -> + case params(Req) of + {error, Error1} -> + respond(Req, 200, Error1); + Params -> + case {check_params(Params, FilterArgs), + check_params_type(Params, FilterArgs)} of + {true, true} -> + {match, [MatchList]} = re:run(Url, Regexp, [global, {capture, all_but_first, list}]), + Args = lists:append([[Method, Params], MatchList]), + lager:debug("Mod:~p, Fun:~p, Args:~p", [emqttd_rest_api, Function, Args]), + case catch apply(emqttd_rest_api, Function, Args) of + {ok, Data} -> + respond(Req, 200, [{code, ?SUCCESS}, {result, Data}]); + {error, Error} -> + respond(Req, 200, Error); + {'EXIT', Reason} -> + lager:error("Execute API '~s' Error: ~p", [Url, Reason]), + respond(Req, 404, []) + end; + {false, _} -> + respond(Req, 200, [{code, ?ERROR7}, {message, <<"params error">>}]); + {_, false} -> + respond(Req, 200, [{code, ?ERROR8}, {message, <<"params type error">>}]) + end + end; + _ -> + lager:error("No match Url:~p", [Url]), + respond(Req, 404, []) + end end. -parse_params(Req) -> - [{iolist_to_binary(K), V} || {K, V} <- mochiweb_request:parse_post(Req)]. +% %%-------------------------------------------------------------------- +% %% Basic Authorization +% %%-------------------------------------------------------------------- +if_authorized(Req, Fun) -> + case authorized(Req) of + true -> Fun(); + false -> respond(Req, 401, []) + end. -topics(Params) -> - Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")], - [iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined]. - -validate(qos, Qos) -> - (Qos >= ?QOS_0) and (Qos =< ?QOS_2); - -validate(topics, [Topic|Left]) -> - case validate(topic, Topic) of - true -> validate(topics, Left); - false -> false - end; -validate(topics, []) -> - true; - -validate(topic, Topic) -> - emqttd_topic:validate({name, Topic}). - -%%-------------------------------------------------------------------- -%% basic authorization -%%-------------------------------------------------------------------- - -authorized(Req, Params) -> - ClientId = get_value(<<"client">>, Params, http), +authorized(Req) -> case Req:get_header_value("Authorization") of undefined -> false; "Basic " ++ BasicAuth -> {Username, Password} = user_passwd(BasicAuth), {ok, Peer} = Req:get(peername), - case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, username = Username, peername = Peer}, Password) of + Params = params(Req), + ClientId = get_value(<<"client">>, Params, http), + case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, + username = Username, + peername = Peer}, Password) of ok -> true; {ok, _IsSuper} -> true; @@ -133,21 +141,89 @@ authorized(Req, Params) -> end. user_passwd(BasicAuth) -> - list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). + list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). -int(I) when is_integer(I)-> I; -int(B) when is_binary(B)-> binary_to_integer(B); -int(S) -> list_to_integer(S). +respond(Req, 401, Data) -> + Req:respond({401, [{"WWW-Authenticate", "Basic Realm=\"emqx control center\""}], Data}); +respond(Req, 404, Data) -> + Req:respond({404, [{"Content-Type", "text/plain"}], Data}); +respond(Req, 200, Data) -> + Req:respond({200, [{"Content-Type", "application/json"}], to_json(Data)}); +respond(Req, Code, Data) -> + Req:respond({Code, [{"Content-Type", "text/plain"}], Data}). -bool(0) -> false; -bool(1) -> true; -bool("0") -> false; -bool("1") -> true; -bool(<<"0">>) -> false; -bool(<<"1">>) -> true. +filter(APIs, Url, Method) -> + lists:filter(fun({Regexp, Method1, _Function, _Args}) -> + case re:run(Url, Regexp, [global, {capture, all_but_first, list}]) of + {match, _} -> Method =:= Method1; + _ -> false + end + end, APIs). -docroot() -> - {file, Here} = code:is_loaded(?MODULE), - Dir = filename:dirname(filename:dirname(Here)), - filename:join([Dir, "priv", "www"]). +params(Req) -> + Method = Req:get(method), + case Method of + 'GET' -> + mochiweb_request:parse_qs(Req); + _ -> + case Req:recv_body() of + <<>> -> []; + undefined -> []; + Body -> + case jsx:is_json(Body) of + true -> jsx:decode(Body); + false -> + lager:error("Body:~p", [Body]), + {error, [{code, ?ERROR9}, {message, <<"Body not json">>}]} + end + end + end. +check_params(_Params, Args) when Args =:= [] -> + true; +check_params(Params, Args)-> + not lists:any(fun({Item, _Type}) -> undefined =:= proplists:get_value(Item, Params) end, Args). + +check_params_type(_Params, Args) when Args =:= [] -> + true; +check_params_type(Params, Args) -> + not lists:any(fun({Item, Type}) -> + Val = proplists:get_value(Item, Params), + case Type of + int -> not is_integer(Val); + binary -> not is_binary(Val); + bool -> not is_boolean(Val) + end + end, Args). + +to_json([]) -> <<"[]">>; +to_json(Data) -> iolist_to_binary(mochijson2:encode(Data)). + +api_list() -> + [{paths, [<<"api/v2/management/nodes">>, + <<"api/v2/management/nodes/{node_name}">>, + <<"api/v2/monitoring/nodes">>, + <<"api/v2/monitoring/nodes/{node_name}">>, + <<"api/v2/monitoring/listeners">>, + <<"api/v2/monitoring/listeners/{node_name}">>, + <<"api/v2/monitoring/metrics/">>, + <<"api/v2/monitoring/metrics/{node_name}">>, + <<"api/v2/monitoring/stats">>, + <<"api/v2/monitoring/stats/{node_name}">>, + <<"api/v2/nodes/{node_name}/clients">>, + <<"api/v2/nodes/{node_name}/clients/{clientid}">>, + <<"api/v2/clients/{clientid}">>, + <<"api/v2/kick_client/{clientid}">>, + <<"api/v2/nodes/{node_name}/sessions">>, + <<"api/v2/nodes/{node_name}/sessions/{clientid}">>, + <<"api/v2/sessions/{clientid}">>, + <<"api/v2/nodes/{node_name}/subscriptions">>, + <<"api/v2/nodes/{node_name}/subscriptions/{clientid}">>, + <<"api/v2/subscriptions/{clientid}">>, + <<"api/v2/routes">>, + <<"api/v2/routes/{topic}">>, + <<"api/v2/mqtt/publish">>, + <<"api/v2/mqtt/subscribe">>, + <<"api/v2/mqtt/unsubscribe">>, + <<"api/v2/nodes/{node_name}/plugins">>, + <<"api/v2/nodes/{node_name}/plugins/{plugin_name}">>]}]. diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl new file mode 100644 index 000000000..a46f70d09 --- /dev/null +++ b/src/emqttd_mgmt.erl @@ -0,0 +1,383 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_mgmt). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-include("emqttd_protocol.hrl"). + +-include("emqttd_internal.hrl"). + +-include_lib("stdlib/include/qlc.hrl"). + +-import(proplists, [get_value/2]). + +-export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0, + plugins/0, plugins/1, listeners/0, listener/1, nodes_info/0, node_info/1]). + +-export([plugin_list/1, plugin_unload/2, plugin_load/2]). + +-export([client_list/4, session_list/4, route_list/3, subscription_list/4, alarm_list/0]). + +-export([client/1, session/1, route/1, subscription/1]). + +-export([query_table/4, lookup_table/3]). + +-export([publish/1, subscribe/1, unsubscribe/1]). + +-export([kick_client/1]). + +-define(KB, 1024). +-define(MB, (1024*1024)). +-define(GB, (1024*1024*1024)). + +-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). + +brokers() -> + [{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()]. + +broker(Node) when Node =:= node() -> + emqttd_broker:info(); +broker(Node) -> + rpc_call(Node, broker, [Node]). + +metrics() -> + [{Node, metrics(Node)} || Node <- ekka_mnesia:running_nodes()]. + +metrics(Node) when Node =:= node() -> + emqttd_metrics:all(); +metrics(Node) -> + rpc_call(Node, metrics, [Node]). + +stats() -> + [{Node, stats(Node)} || Node <- ekka_mnesia:running_nodes()]. + +stats(Node) when Node =:= node() -> + emqttd_stats:getstats(); +stats(Node) -> + rpc_call(Node, stats, [Node]). + +plugins() -> + [{Node, plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. + +plugins(Node) when Node =:= node() -> + emqttd_plugins:list(Node); +plugins(Node) -> + rpc_call(Node, plugins, [Node]). + +listeners() -> + [{Node, listener(Node)} || Node <- ekka_mnesia:running_nodes()]. + +listener(Node) when Node =:= node() -> + lists:map(fun({{Protocol, ListenOn}, Pid}) -> + Info = [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients,esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}], + {Protocol, ListenOn, Info} + end, esockd:listeners()); + +listener(Node) -> + rpc_call(Node, listener, [Node]). + +nodes_info() -> + Running = mnesia:system_info(running_db_nodes), + Stopped = mnesia:system_info(db_nodes) -- Running, + DownNodes = lists:map(fun stop_node/1, Stopped), + [node_info(Node) || Node <- Running] ++ DownNodes. + +node_info(Node) when Node =:= node() -> + CpuInfo = [{K, list_to_binary(V)} || {K, V} <- emqttd_vm:loads()], + Memory = emqttd_vm:get_memory(), + OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), + [{name, node()}, + {otp_release, list_to_binary(OtpRel)}, + {memory_total, kmg(get_value(allocated, Memory))}, + {memory_used, kmg(get_value(used, Memory))}, + {process_available, erlang:system_info(process_limit)}, + {process_used, erlang:system_info(process_count)}, + {max_fds, get_value(max_fds, erlang:system_info(check_io))}, + {clients, ets:info(mqtt_client, size)}, + {node_status, 'Running'} | CpuInfo]; + +node_info(Node) -> + rpc_call(Node, node_info, [Node]). + +stop_node(Node) -> + [{name, Node}, {node_status, 'Stopped'}]. +%%-------------------------------------------------------- +%% plugins +%%-------------------------------------------------------- +plugin_list(Node) when Node =:= node() -> + emqttd_plugins:list(); +plugin_list(Node) -> + rpc_call(Node, plugin_list, [Node]). + +plugin_load(Node, PluginName) when Node =:= node() -> + emqttd_plugins:load(PluginName); +plugin_load(Node, PluginName) -> + rpc_call(Node, plugin_load, [Node, PluginName]). + +plugin_unload(Node, PluginName) when Node =:= node() -> + emqttd_plugins:unload(PluginName); +plugin_unload(Node, PluginName) -> + rpc_call(Node, plugin_unload, [Node, PluginName]). + +%%-------------------------------------------------------- +%% client +%%-------------------------------------------------------- +client_list(Node, Key, PageNo, PageSize) when Node =:= node() -> + client_list(Key, PageNo, PageSize); +client_list(Node, Key, PageNo, PageSize) -> + rpc_call(Node, client_list, [Node, Key, PageNo, PageSize]). + +client(ClientId) -> + lists:flatten([client_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]). + +%%-------------------------------------------------------- +%% session +%%-------------------------------------------------------- +session_list(Node, Key, PageNo, PageSize) when Node =:= node() -> + session_list(Key, PageNo, PageSize); +session_list(Node, Key, PageNo, PageSize) -> + rpc_call(Node, session_list, [Node, Key, PageNo, PageSize]). + +session(ClientId) -> + lists:flatten([session_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]). + +%%-------------------------------------------------------- +%% subscription +%%-------------------------------------------------------- +subscription_list(Node, Key, PageNo, PageSize) when Node =:= node() -> + subscription_list(Key, PageNo, PageSize); +subscription_list(Node, Key, PageNo, PageSize) -> + rpc_call(Node, subscription_list, [Node, Key, PageNo, PageSize]). + +subscription(Key) -> + lists:flatten([subscription_list(Node, Key, 1, 20) || Node <- ekka_mnesia:running_nodes()]). + +%%-------------------------------------------------------- +%% Routes +%%-------------------------------------------------------- +route(Key) -> route_list(Key, 1, 20). + +%%-------------------------------------------------------- +%% alarm +%%-------------------------------------------------------- +alarm_list() -> + emqttd_alarm:get_alarms(). + +query_table(Qh, PageNo, PageSize, TotalNum) -> + Cursor = qlc:cursor(Qh), + case PageNo > 1 of + true -> qlc:next_answers(Cursor, (PageNo - 1) * PageSize); + false -> ok + end, + Rows = qlc:next_answers(Cursor, PageSize), + qlc:delete_cursor(Cursor), + [{totalNum, TotalNum}, + {totalPage, total_page(TotalNum, PageSize)}, + {result, Rows}]. + +total_page(TotalNum, PageSize) -> + case TotalNum rem PageSize of + 0 -> TotalNum div PageSize; + _ -> (TotalNum div PageSize) + 1 + end. + +%%TODO: refactor later... +lookup_table(LookupFun, _PageNo, _PageSize) -> + Rows = LookupFun(), + Rows. + +%%-------------------------------------------------------------------- +%% mqtt +%%-------------------------------------------------------------------- +publish({ClientId, Topic, Payload, Qos, Retain}) -> + case validate(topic, Topic) of + true -> + Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), + emqttd:publish(Msg#mqtt_message{retain = Retain}), + ok; + false -> + {error, format_error(Topic, "validate topic: ${0} fail")} + end. + +subscribe({ClientId, Topic, Qos}) -> + case validate(topic, Topic) of + true -> + case emqttd_sm:lookup_session(ClientId) of + undefined -> + {error, format_error(ClientId, "Clientid: ${0} not found")}; + #mqtt_session{sess_pid = SessPid} -> + emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), + ok + end; + false -> + {error, format_error(Topic, "validate topic: ${0} fail")} + end. + +unsubscribe({ClientId, Topic})-> + case validate(topic, Topic) of + true -> + case emqttd_sm:lookup_session(ClientId) of + undefined -> + {error, format_error(ClientId, "Clientid: ${0} not found")}; + #mqtt_session{sess_pid = SessPid} -> + emqttd_session:unsubscribe(SessPid, [{Topic, []}]), + ok + end; + false -> + {error, format_error(Topic, "validate topic: ${0} fail")} + end. + +% publish(Messages) -> +% lists:foldl( +% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) -> +% case validate(topic, Topic) of +% true -> +% Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), +% emqttd:publish(Msg#mqtt_message{retain = Retain}), +% {[[{topic, Topic}]| Success], Failed}; +% false -> +% {Success, [[{topic, Topic}]| Failed]} +% end +% end, {[], []}, Messages). + +% subscribers(Subscribers) -> +% lists:foldl( +% fun({ClientId, Topic, Qos}, {Success, Failed}) -> +% case emqttd_sm:lookup_session(ClientId) of +% undefined -> +% {Success, [[{client_id, ClientId}]|Failed]}; +% #mqtt_session{sess_pid = SessPid} -> +% emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), +% {[[{client_id, ClientId}]| Success], Failed} +% end +% end,{[], []}, Subscribers). + +% unsubscribers(UnSubscribers)-> +% lists:foldl( +% fun({ClientId, Topic}, {Success, Failed}) -> +% case emqttd_sm:lookup_session(ClientId) of +% undefined -> +% {Success, [[{client_id, ClientId}]|Failed]}; +% #mqtt_session{sess_pid = SessPid} -> +% emqttd_session:unsubscriber(SessPid, [{Topic, []}]), +% {[[{client_id, ClientId}]| Success], Failed} +% end +% end, {[], []}, UnSubscribers). + +%%-------------------------------------------------------------------- +%% manager API +%%-------------------------------------------------------------------- +kick_client(ClientId) -> + Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + case lists:any(fun(Item) -> Item =:= ok end, Result) of + true -> {ok, [{status, success}]}; + false -> {ok, [{status, failure}]} + end. + +kick_client(Node, ClientId) when Node =:= node() -> + case emqttd_cm:lookup(ClientId) of + undefined -> error; + #mqtt_client{client_pid = Pid}-> emqttd_client:kick(Pid) + end; +kick_client(Node, ClientId) -> + rpc_call(Node, kick_client, [Node, ClientId]). + +%%-------------------------------------------------------------------- +%% Internel Functions. +%%-------------------------------------------------------------------- + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. + +kmg(Byte) when Byte > ?GB -> + float(Byte / ?GB, "G"); +kmg(Byte) when Byte > ?MB -> + float(Byte / ?MB, "M"); +kmg(Byte) when Byte > ?KB -> + float(Byte / ?MB, "K"); +kmg(Byte) -> + Byte. +float(F, S) -> + iolist_to_binary(io_lib:format("~.2f~s", [F, S])). + +validate(qos, Qos) -> + (Qos >= ?QOS_0) and (Qos =< ?QOS_2); + +validate(topic, Topic) -> + emqttd_topic:validate({name, Topic}). + +client_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) -> + TotalNum = ets:info(mqtt_client, size), + Qh = qlc:q([R || R <- ets:table(mqtt_client)]), + query_table(Qh, PageNo, PageSize, TotalNum); + +client_list(ClientId, PageNo, PageSize) -> + Fun = fun() -> ets:lookup(mqtt_client, ClientId) end, + lookup_table(Fun, PageNo, PageSize). + +session_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) -> + TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_session]]), + Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_session]]), + query_table(Qh, PageNo, PageSize, TotalNum); + +session_list(ClientId, PageNo, PageSize) -> + MP = {ClientId, '_', '_', '_'}, + Fun = fun() -> lists:append([ets:match_object(Tab, MP) || Tab <- [mqtt_local_session]]) end, + lookup_table(Fun, PageNo, PageSize). + +subscription_list(Key, PageNo, PageSize) when ?EMPTY_KEY(Key) -> + TotalNum = ets:info(mqtt_subproperty, size), + Qh = qlc:q([E || E <- ets:table(mqtt_subproperty)]), + query_table(Qh, PageNo, PageSize, TotalNum); + +subscription_list(Key, PageNo, PageSize) -> + Keys = ets:lookup(mqtt_subscription, Key), + Fun = case length(Keys) == 0 of + true -> + MP = {{Key, '_'}, '_'}, + fun() -> ets:match_object(mqtt_subproperty, MP) end; + false -> + fun() -> + lists:map(fun({S, T}) ->[R] = ets:lookup(mqtt_subproperty, {T, S}), R end, Keys) + end + end, + lookup_table(Fun, PageNo, PageSize). + +route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) -> + TotalNum = lists:sum([ets:info(Tab, size) || Tab <- tables()]), + Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- tables()]), + query_table(Qh, PageNo, PageSize, TotalNum); + +route_list(Topic, PageNo, PageSize) -> + Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- tables()]) end, + lookup_table(Fun, PageNo, PageSize). + +tables() -> + [mqtt_route, mqtt_local_route]. + +format_error(Val, Msg) -> + re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]). + diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl new file mode 100644 index 000000000..e776e2210 --- /dev/null +++ b/src/emqttd_rest_api.erl @@ -0,0 +1,399 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module (emqttd_rest_api). + +-include("emqttd.hrl"). + +-include("emqttd_internal.hrl"). + +-http_api({"^nodes/(.+?)/alarms/?$", 'GET', alarm_list, []}). + +-http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}). +-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}). +-http_api({"^clients/(.+?)/?$", 'GET', client, []}). +-http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}). + +-http_api({"^routes?$", 'GET', route_list, []}). +-http_api({"^routes/(.+?)/?$", 'GET', route, []}). + +-http_api({"^nodes/(.+?)/sessions/?$", 'GET', session_list, []}). +-http_api({"^nodes/(.+?)/sessions/(.+?)/?$", 'GET', session_list, []}). +-http_api({"^sessions/(.+?)/?$", 'GET', session, []}). + +-http_api({"^nodes/(.+?)/subscriptions/?$", 'GET', subscription_list, []}). +-http_api({"^nodes/(.+?)/subscriptions/(.+?)/?$", 'GET', subscription_list, []}). +-http_api({"^subscriptions/(.+?)/?$", 'GET', subscription, []}). + +-http_api({"^mqtt/publish?$", 'POST', publish, [{<<"topic">>, binary}]}). +-http_api({"^mqtt/subscribe?$", 'POST', subscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}). +-http_api({"^mqtt/unsubscribe?$", 'POST', unsubscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}). + +-http_api({"^management/nodes/?$", 'GET', brokers, []}). +-http_api({"^management/nodes/(.+?)/?$", 'GET', broker, []}). +-http_api({"^monitoring/nodes/?$", 'GET', nodes, []}). +-http_api({"^monitoring/nodes/(.+?)/?$", 'GET', node, []}). +-http_api({"^monitoring/listeners/?$", 'GET', listeners, []}). +-http_api({"^monitoring/listeners/(.+?)/?$", 'GET', listener, []}). +-http_api({"^monitoring/metrics/?$", 'GET', metrics, []}). +-http_api({"^monitoring/metrics/(.+?)/?$", 'GET', metric, []}). +-http_api({"^monitoring/stats/?$", 'GET', stats, []}). +-http_api({"^monitoring/stats/(.+?)/?$", 'GET', stat, []}). + +-http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}). +-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}). + +-export([alarm_list/3]). +-export([client/3, client_list/3, client_list/4, kick_client/3]). +-export([route/3, route_list/2]). +-export([session/3, session_list/3, session_list/4]). +-export([subscription/3, subscription_list/3, subscription_list/4]). +-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]). +-export([publish/2, subscribe/2, unsubscribe/2]). +-export([plugin_list/3, enabled/4]). + +%%-------------------------------------------------------------------------- +%% alarm +%%-------------------------------------------------------------------------- +alarm_list('GET', _Req, _Node) -> + Alarms = emqttd_mgmt:alarm_list(), + {ok, lists:map(fun alarm_row/1, Alarms)}. + +alarm_row(#mqtt_alarm{id = AlarmId, + severity = Severity, + title = Title, + summary = Summary, + timestamp = Timestamp}) -> + [{id, AlarmId}, + {severity, Severity}, + {title, l2b(Title)}, + {summary, l2b(Summary)}, + {occurred_at, l2b(strftime(Timestamp))}]. + +%%-------------------------------------------------------------------------- +%% client +%%-------------------------------------------------------------------------- +client('GET', _Params, Key) -> + Data = emqttd_mgmt:client(l2b(Key)), + {ok, [{objects, [client_row(Row) || Row <- Data]}]}. + +client_list('GET', Params, Node) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [client_row(Row) || Row <- Rows]}]}. + +client_list('GET', Params, Node, Key) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize), + {ok, [{objects, [client_row(Row) || Row <- Data]}]}. + +kick_client('PUT', _Params, Key) -> + case emqttd_mgmt:kick_client(l2b(Key)) of + ok -> {ok, []}; + error -> {error, [{code, ?ERROR12}]} + end. + +client_row(#mqtt_client{client_id = ClientId, + peername = {IpAddr, Port}, + username = Username, + clean_sess = CleanSess, + proto_ver = ProtoVer, + keepalive = KeepAlvie, + connected_at = ConnectedAt}) -> + [{client_id, ClientId}, + {username, Username}, + {ipaddress, l2b(ntoa(IpAddr))}, + {port, Port}, + {clean_sess, CleanSess}, + {proto_ver, ProtoVer}, + {keepalive, KeepAlvie}, + {connected_at, l2b(strftime(ConnectedAt))}]. + +%%-------------------------------------------------------------------------- +%% route +%%-------------------------------------------------------------------------- +route('GET', _Params, Key) -> + Data = emqttd_mgmt:route(l2b(Key)), + {ok, [{objects, [route_row(Row) || Row <- Data]}]}. + +route_list('GET', Params) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [route_row(Row) || Row <- Rows]}]}. + +route_row(Route) when is_record(Route, mqtt_route) -> + [{topic, Route#mqtt_route.topic}, {node, Route#mqtt_route.node}]; + +route_row({Topic, Node}) -> + [{topic, Topic}, {node, Node}]. + +%%-------------------------------------------------------------------------- +%% session +%%-------------------------------------------------------------------------- +session('GET', _Params, Key) -> + Data = emqttd_mgmt:session(l2b(Key)), + {ok, [{objects, [session_row(Row) || Row <- Data]}]}. + +session_list('GET', Params, Node) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [session_row(Row) || Row <- Rows]}]}. + +session_list('GET', Params, Node, ClientId) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:session_list(l2a(Node), l2b(ClientId), PageNo, PageSize), + {ok, [{objects, [session_row(Row) || Row <- Data]}]}. + +session_row({ClientId, _Pid, _Persistent, Session}) -> + InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue, + message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at], + [{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]]. + +%%-------------------------------------------------------------------------- +%% subscription +%%-------------------------------------------------------------------------- +subscription('GET', _Params, Key) -> + Data = emqttd_mgmt:subscription(l2b(Key)), + {ok, [{objects, [subscription_row(Row) || Row <- Data]}]}. + +subscription_list('GET', Params, Node) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize), + Rows = proplists:get_value(result, Data), + TotalPage = proplists:get_value(totalPage, Data), + TotalNum = proplists:get_value(totalNum, Data), + {ok, [{current_page, PageNo}, + {page_size, PageSize}, + {total_num, TotalNum}, + {total_page, TotalPage}, + {objects, [subscription_row(Row) || Row <- Rows]}]}. + +subscription_list('GET', Params, Node, Key) -> + {PageNo, PageSize} = page_params(Params), + Data = emqttd_mgmt:subscription_list(l2a(Node), l2b(Key), PageNo, PageSize), + {ok, [{objects, [subscription_row(Row) || Row <- Data]}]}. + +subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) -> + subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option}); +subscription_row({{Topic, ClientId}, Option}) -> + Qos = proplists:get_value(qos, Option), + [{client_id, ClientId}, {topic, Topic}, {qos, Qos}]. + +%%-------------------------------------------------------------------------- +%% management/monitoring +%%-------------------------------------------------------------------------- +nodes('GET', _Params) -> + Data = emqttd_mgmt:nodes_info(), + {ok, format_broker(Data)}. + +node('GET', _Params, Node) -> + Data = emqttd_mgmt:node_info(l2a(Node)), + {ok, format_broker(Data)}. + +brokers('GET', _Params) -> + Data = emqttd_mgmt:brokers(), + {ok, [format_broker(Node, Broker) || {Node, Broker} <- Data]}. + +broker('GET', _Params, Node) -> + Data = emqttd_mgmt:broker(l2a(Node)), + {ok, format_broker(Data)}. + +listeners('GET', _Params) -> + Data = emqttd_mgmt:listeners(), + {ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}. + +listener('GET', _Params, Node) -> + Data = emqttd_mgmt:listener(l2a(Node)), + {ok, [format_listener(Listeners) || Listeners <- Data]}. + +metrics('GET', _Params) -> + Data = emqttd_mgmt:metrics(), + {ok, Data}. + +metric('GET', _Params, Node) -> + Data = emqttd_mgmt:metrics(l2a(Node)), + {ok, Data}. + +stats('GET', _Params) -> + Data = emqttd_mgmt:stats(), + {ok, Data}. + +stat('GET', _Params, Node) -> + Data = emqttd_mgmt:stats(l2a(Node)), + {ok, Data}. + +format_broker(Node, Broker) -> + OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), + [{name, Node}, + {version, bin(proplists:get_value(version, Broker))}, + {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, + {uptime, bin(proplists:get_value(uptime, Broker))}, + {datetime, bin(proplists:get_value(datetime, Broker))}, + {otp_release, l2b(OtpRel)}, + {node_status, 'Running'}]. + +format_broker(Broker) -> + OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), + [{version, bin(proplists:get_value(version, Broker))}, + {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, + {uptime, bin(proplists:get_value(uptime, Broker))}, + {datetime, bin(proplists:get_value(datetime, Broker))}, + {otp_release, l2b(OtpRel)}, + {node_status, 'Running'}]. + +format_listeners([], Acc) -> + Acc; +format_listeners([{Protocol, ListenOn, Info}| Listeners], Acc) -> + format_listeners(Listeners, [format_listener({Protocol, ListenOn, Info}) | Acc]). + +format_listener({Protocol, ListenOn, Info}) -> + Listen = l2b(esockd:to_string(ListenOn)), + lists:append([{protocol, Protocol}, {listen, Listen}], Info). + +%%-------------------------------------------------------------------------- +%% mqtt +%%-------------------------------------------------------------------------- +publish('POST', Params) -> + Topic = proplists:get_value(<<"topic">>, Params), + ClientId = proplists:get_value(<<"client_id">>, Params, http), + Payload = proplists:get_value(<<"payload">>, Params, <<>>), + Qos = proplists:get_value(<<"qos">>, Params, 0), + Retain = proplists:get_value(<<"retain">>, Params, false), + case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of + ok -> + {ok, []}; + {error, Error} -> + {error, [{code, ?ERROR2}, {message, Error}]} + end. + +subscribe('POST', Params) -> + ClientId = proplists:get_value(<<"client_id">>, Params), + Topic = proplists:get_value(<<"topic">>, Params), + Qos = proplists:get_value(<<"qos">>, Params, 0), + case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of + ok -> + {ok, []}; + {error, Error} -> + {error, [{code, ?ERROR2}, {message, Error}]} + end. + +unsubscribe('POST', Params) -> + ClientId = proplists:get_value(<<"client_id">>, Params), + Topic = proplists:get_value(<<"topic">>, Params), + case emqttd_mgmt:unsubscribe({ClientId, Topic})of + ok -> + {ok, []}; + {error, Error} -> + {error, [{code, ?ERROR2}, {message, Error}]} + end. + +%%-------------------------------------------------------------------------- +%% plugins +%%-------------------------------------------------------------------------- +plugin_list('GET', _Params, Node) -> + Plugins = lists:map(fun plugin/1, emqttd_mgmt:plugin_list(l2a(Node))), + {ok, Plugins}. + +enabled('PUT', Params, Node, PluginName) -> + Active = proplists:get_value(<<"active">>, Params), + case Active of + true -> + return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName))); + false -> + return(emqttd_mgmt:plugin_unload(l2a(Node), l2a(PluginName))) + end. + +return(Result) -> + case Result of + {ok, _} -> + {ok, []}; + {error, already_started} -> + {error, [{code, ?ERROR10}, {message, <<"already_started">>}]}; + {error, not_started} -> + {error, [{code, ?ERROR11}, {message, <<"not_started">>}]}; + Error -> + lager:error("error:~p", [Error]), + {error, [{code, ?ERROR2}, {message, <<"unknown">>}]} + end. +plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr, + active = Active}) -> + [{name, Name}, + {version, iolist_to_binary(Ver)}, + {description, iolist_to_binary(Descr)}, + {active, Active}]. + +%%-------------------------------------------------------------------------- +%% Inner function +%%-------------------------------------------------------------------------- +format(created_at, Val) -> + l2b(strftime(Val)); +format(_, Val) -> + Val. + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); +ntoa(IP) -> + inet_parse:ntoa(IP). + +%%-------------------------------------------------------------------- +%% Strftime +%%-------------------------------------------------------------------- +strftime({MegaSecs, Secs, _MicroSecs}) -> + strftime(datetime(MegaSecs * 1000000 + Secs)); + +strftime({{Y,M,D}, {H,MM,S}}) -> + lists:flatten( + io_lib:format( + "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). + +datetime(Timestamp) when is_integer(Timestamp) -> + Universal = calendar:gregorian_seconds_to_datetime(Timestamp + + calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})), + calendar:universal_time_to_local_time(Universal). + +bin(S) when is_list(S) -> l2b(S); +bin(A) when is_atom(A) -> bin(atom_to_list(A)); +bin(B) when is_binary(B) -> B; +bin(undefined) -> <<>>. +int(L) -> list_to_integer(L). +l2a(L) -> list_to_atom(L). +l2b(L) -> list_to_binary(L). + + +page_params(Params) -> + PageNo = int(proplists:get_value("curr_page", Params, "1")), + PageSize = int(proplists:get_value("page_size", Params, "20")), + {PageNo, PageSize}. \ No newline at end of file From 701ee3e8dcbc1af06d8bfd7329ee9f3733d492ed Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 10 Aug 2017 18:19:20 +0800 Subject: [PATCH 07/14] Add CLI set/get env --- Makefile | 4 +- src/emqttd_cli.erl | 3 +- src/emqttd_cli_config.erl | 235 ++++++++++++++++++++++++++++++++++++++ src/emqttd_ctl.erl | 6 + 4 files changed, 246 insertions(+), 2 deletions(-) create mode 100644 src/emqttd_cli_config.erl diff --git a/Makefile b/Makefile index 9270a562e..9944cb5ae 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.3 -DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt +DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc @@ -14,6 +14,8 @@ dep_mochiweb = git https://github.com/emqtt/mochiweb master dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master +dep_clique = git https://github.com/turtleDeng/clique +dep_jsx = git https://github.com/talentdeficit/jsx ERLC_OPTS += +debug_info ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index fa3f7687d..a4029d46f 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -48,7 +48,8 @@ load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], - [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds]. + [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds], + emqttd_cli_config:register_config(). is_cmd(Fun) -> not lists:member(Fun, [init, load, module_info]). diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl new file mode 100644 index 000000000..bd353ff5f --- /dev/null +++ b/src/emqttd_cli_config.erl @@ -0,0 +1,235 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module (emqttd_cli_config). + +-export ([register_config_cli/0, register_config/0, run/1]). + +-define(APP, emqttd). + +register_config() -> + F = fun() -> ekka_mnesia:running_nodes() end, + clique:register_node_finder(F), + emqttd_cli_config:register_config_cli(). + +run(Cmd) -> + clique:run(Cmd). + +register_config_cli() -> + ok = clique_config:load_schema([code:priv_dir(?APP)], ?APP), + register_protocol_formatter(), + register_client_formatter(), + register_session_formatter(), + register_queue_formatter(), + register_lager_formatter(), + + register_auth_config(), + register_protocol_config(), + register_connection_config(), + register_client_config(), + register_session_config(), + register_queue_config(), + register_broker_config(), + register_lager_config(). + +%%-------------------------------------------------------------------- +%% Auth/Acl +%%-------------------------------------------------------------------- +register_auth_config() -> + ConfigKeys = ["mqtt.allow_anonymous", + "mqtt.acl_nomatch", + "mqtt.acl_file", + "mqtt.cache_acl"], + [clique:register_config(Key , fun auth_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +auth_config_callback([_, KeyStr], Value) -> + application:set_env(?APP, l2a(KeyStr), Value), + " successfully\n". + +%%-------------------------------------------------------------------- +%% MQTT Protocol +%%-------------------------------------------------------------------- +register_protocol_formatter() -> + ConfigKeys = ["max_clientid_len", + "max_packet_size", + "websocket_protocol_header", + "keepalive_backoff"], + [clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys]. + +protocol_formatter_callback([_, Key], Params) -> + proplists:get_value(l2a(Key), Params). + +register_protocol_config() -> + ConfigKeys = ["mqtt.max_clientid_len", + "mqtt.max_packet_size", + "mqtt.websocket_protocol_header", + "mqtt.keepalive_backoff"], + [clique:register_config(Key , fun protocol_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +protocol_config_callback([_AppStr, KeyStr], Value) -> + protocol_config_callback(protocol, l2a(KeyStr), Value). +protocol_config_callback(App, Key, Value) -> + {ok, Env} = emqttd:env(App), + application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), + " successfully\n". + +%%-------------------------------------------------------------------- +%% MQTT Connection +%%-------------------------------------------------------------------- +register_connection_config() -> + ConfigKeys = ["mqtt.conn.force_gc_count"], + [clique:register_config(Key , fun connection_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +connection_config_callback([_, KeyStr0, KeyStr1], Value) -> + KeyStr = lists:concat([KeyStr0, "_", KeyStr1]), + application:set_env(?APP, l2a(KeyStr), Value), + " successfully\n". + +%%-------------------------------------------------------------------- +%% MQTT Client +%%-------------------------------------------------------------------- +register_client_formatter() -> + ConfigKeys = ["max_publish_rate", + "idle_timeout", + "enable_stats"], + [clique:register_formatter(["mqtt", "client", Key], fun client_formatter_callback/2) || Key <- ConfigKeys]. + +client_formatter_callback([_, _, Key], Params) -> + proplists:get_value(list_to_atom(Key), Params). + +register_client_config() -> + ConfigKeys = ["mqtt.client.max_publish_rate", + "mqtt.client.idle_timeout", + "mqtt.client.enable_stats"], + [clique:register_config(Key , fun client_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +client_config_callback([_, AppStr, KeyStr], Value) -> + client_config_callback(l2a(AppStr), l2a(KeyStr), Value). +client_config_callback(App, Key, Value) -> + {ok, Env} = emqttd:env(App), + application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), + " successfully\n". + +%%-------------------------------------------------------------------- +%% session +%%-------------------------------------------------------------------- +register_session_formatter() -> + ConfigKeys = ["max_subscriptions", + "upgrade_qos", + "max_inflight", + "retry_interval", + "max_awaiting_rel", + "await_rel_timeout", + "enable_stats", + "expiry_interval", + "ignore_loop_deliver"], + [clique:register_formatter(["mqtt", "session", Key], fun session_formatter_callback/2) || Key <- ConfigKeys]. + +session_formatter_callback([_, _, Key], Params) -> + proplists:get_value(list_to_atom(Key), Params). + +register_session_config() -> + ConfigKeys = ["mqtt.session.max_subscriptions", + "mqtt.session.upgrade_qos", + "mqtt.session.max_inflight", + "mqtt.session.retry_interval", + "mqtt.session.max_awaiting_rel", + "mqtt.session.await_rel_timeout", + "mqtt.session.enable_stats", + "mqtt.session.expiry_interval", + "mqtt.session.ignore_loop_deliver"], + [clique:register_config(Key , fun session_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +session_config_callback([_, AppStr, KeyStr], Value) -> + session_config_callback(l2a(AppStr), l2a(KeyStr), Value). +session_config_callback(App, Key, Value) -> + {ok, Env} = emqttd:env(App), + application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), + " successfully\n". + +l2a(List) -> list_to_atom(List). + +%%-------------------------------------------------------------------- +%% MQTT MQueue +%%-------------------------------------------------------------------- +register_queue_formatter() -> + ConfigKeys = ["type", + "priority", + "max_length", + "low_watermark", + "high_watermark", + "store_qos0"], + [clique:register_formatter(["mqtt", "mqueue", Key], fun queue_formatter_callback/2) || Key <- ConfigKeys]. + +queue_formatter_callback([_, _, Key], Params) -> + proplists:get_value(list_to_atom(Key), Params). + +register_queue_config() -> + ConfigKeys = ["mqtt.mqueue.type", + "mqtt.mqueue.priority", + "mqtt.mqueue.max_length", + "mqtt.mqueue.low_watermark", + "mqtt.mqueue.high_watermark", + "mqtt.mqueue.store_qos0"], + [clique:register_config(Key , fun queue_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +queue_config_callback([_, AppStr, KeyStr], Value) -> + queue_config_callback(l2a(AppStr), l2a(KeyStr), Value). +queue_config_callback(App, Key, Value) -> + {ok, Env} = emqttd:env(App), + application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), + " successfully\n". + +%%-------------------------------------------------------------------- +%% MQTT Broker +%%-------------------------------------------------------------------- +register_broker_config() -> + ConfigKeys = ["mqtt.broker.sys_interval"], + [clique:register_config(Key , fun broker_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +broker_config_callback([_, KeyStr0, KeyStr1], Value) -> + KeyStr = lists:concat([KeyStr0, "_", KeyStr1]), + application:set_env(?APP, l2a(KeyStr), Value), + " successfully\n". + +%%-------------------------------------------------------------------- +%% MQTT Lager +%%-------------------------------------------------------------------- +register_lager_formatter() -> + ConfigKeys = ["level"], + [clique:register_formatter(["log", "console", Key], fun lager_formatter_callback/2) || Key <- ConfigKeys]. + +lager_formatter_callback(_, Params) -> + proplists:get_value(lager_console_backend, Params). + +register_lager_config() -> + ConfigKeys = ["log.console.level"], + [clique:register_config(Key , fun lager_config_callback/2) || Key <- ConfigKeys], + ok = register_config_whitelist(ConfigKeys). + +lager_config_callback(_, Value) -> + lager:set_loglevel(lager_console_backend, Value), + " successfully\n". + +register_config_whitelist(ConfigKeys) -> + clique:register_config_whitelist(ConfigKeys, ?APP). \ No newline at end of file diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 195d3dea0..0cec222fd 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -68,6 +68,12 @@ run([]) -> usage(), ok; run(["help"]) -> usage(), ok; +run(["set" | _] = CmdS) -> + emqttd_cli_config:run(["config" | CmdS]), ok; + +run(["show" | _] = CmdS) -> + emqttd_cli_config:run(["config" | CmdS]), ok; + run([CmdS|Args]) -> case lookup(list_to_atom(CmdS)) of [{Mod, Fun}] -> From 51c7c9b6888abaa36314107387c15aa0b9140fb4 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Thu, 10 Aug 2017 18:30:07 +0800 Subject: [PATCH 08/14] Add http management APIs tests cases --- test/emqttd_SUITE.erl | 60 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 160e023f2..f89f25c91 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -36,6 +36,24 @@ {cacertfile, "certs/cacert.pem"}, {certfile, "certs/client-cert.pem"}]). +-define(URL, "http://localhost:8080/api/v2/"). + +-define(APPL_JSON, "application/json"). + +-define(PRINT(PATH), lists:flatten(io_lib:format(PATH, [atom_to_list(node())]))). + +-define(GET_API, ["management/nodes", + ?PRINT("management/nodes/~s"), + "monitoring/nodes", + ?PRINT("monitoring/nodes/~s"), + "monitoring/listeners", + ?PRINT("monitoring/listeners/~s"), + "monitoring/metrics", + ?PRINT("monitoring/metrics/~s"), + "monitoring/stats", + ?PRINT("monitoring/stats/~s"), + ?PRINT("nodes/~s/clients"), + "routes"]). all() -> [{group, protocol}, @@ -49,6 +67,7 @@ all() -> {group, http}, {group, alarms}, {group, cli}, + {group, get_api}, {group, cleanSession}]. groups() -> @@ -103,13 +122,14 @@ groups() -> ]}, cli_vm]}, {cleanSession, [sequence], - [cleanSession_validate - ]}]. + [cleanSession_validate]}, + {get_api, [sequence], [get_api_lists]}]. init_per_suite(Config) -> NewConfig = generate_config(), lists:foreach(fun set_app_env/1, NewConfig), application:ensure_all_started(?APP), + timer:sleep(6000), Config. end_per_suite(_Config) -> @@ -137,6 +157,7 @@ mqtt_ssl_oneway(_) -> emqttd:stop(), change_opts(ssl_oneway), emqttd:start(), + timer:sleep(6000), {ok, SslOneWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssloneway">>}, ssl]), @@ -158,6 +179,7 @@ mqtt_ssl_twoway(_) -> emqttd:stop(), change_opts(ssl_twoway), emqttd:start(), + timer:sleep(6000), ClientSSl = [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT], {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"}, @@ -593,6 +615,9 @@ cleanSession_validate(_) -> emqttc:disconnect(Pub), emqttc:disconnect(C11). +get_api_lists(_Config) -> + lists:foreach(fun request/1, ?GET_API). + change_opts(SslType) -> {ok, Listeners} = application:get_env(?APP, listeners), NewListeners = @@ -646,3 +671,34 @@ set_app_env({App, Lists}) -> lists:foreach(fun({Par, Var}) -> application:set_env(App, Par, Var) end, Lists). + +request(Path) -> + http_get(get, Path). + +http_get(Method, Path) -> + req(Method, Path, []). + +http_put(Method, Path, Params) -> + req(Method, Path, format_for_upload(Params)). + +http_post(Method, Path, Params) -> + req(Method, Path, format_for_upload(Params)). + +req(Method, Path, Body) -> + Url = ?URL ++ Path, + Headers = auth_header_("", ""), + case httpc:request(Method, {Url, [Headers]}, [], []) of + {error, socket_closed_remotely} -> + false; + {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> + true; + {ok, {{"HTTP/1.1", 400, _}, _, []}} -> + false; + {ok, {{"HTTP/1.1", 404, _}, _, []}} -> + false + end. + +format_for_upload(none) -> + <<"">>; +format_for_upload(List) -> + iolist_to_binary(mochijson2:encode(List)). From f4381155f8a13bbb43f822563269f60cddfab623 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 11 Aug 2017 09:53:06 +0800 Subject: [PATCH 09/14] Format code --- src/emqttd_cli_config.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl index bd353ff5f..b23d3e766 100644 --- a/src/emqttd_cli_config.erl +++ b/src/emqttd_cli_config.erl @@ -21,9 +21,10 @@ -define(APP, emqttd). register_config() -> - F = fun() -> ekka_mnesia:running_nodes() end, - clique:register_node_finder(F), - emqttd_cli_config:register_config_cli(). + application:start(clique), + F = fun() -> ekka_mnesia:running_nodes() end, + clique:register_node_finder(F), + register_config_cli(). run(Cmd) -> clique:run(Cmd). From f11967288e67f3de4bc11390fd6f3ac3a3eff5c3 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 11 Aug 2017 14:56:56 +0800 Subject: [PATCH 10/14] Add emqttd_broker info API --- src/emqttd_broker.erl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index c35f44ae3..9f939a45e 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -31,7 +31,7 @@ -export([subscribe/1, notify/2]). %% Broker API --export([version/0, uptime/0, datetime/0, sysdescr/0]). +-export([version/0, uptime/0, datetime/0, sysdescr/0, info/0]). %% Tick API -export([start_tick/1, stop_tick/1]). @@ -75,6 +75,14 @@ subscribe(EventType) -> notify(EventType, Event) -> gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}). +%% @doc Get broker info +-spec(info() -> list(tuple())). +info() -> + [{version, version()}, + {sysdescr, sysdescr()}, + {uptime, uptime()}, + {datetime, datetime()}]. + %% @doc Get broker version -spec(version() -> string()). version() -> From 9b61fea1d19803aa346aeec31482ec7766a8fbd3 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Fri, 11 Aug 2017 15:40:33 +0800 Subject: [PATCH 11/14] Update auth spec --- src/emqttd_access_control.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 283d42a78..6cfcc03cf 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -48,7 +48,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Authenticate MQTT Client. --spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {error, any()}). +-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, any()}). auth(Client, Password) when is_record(Client, mqtt_client) -> auth(Client, Password, lookup_mods(auth)). auth(_Client, _Password, []) -> From 0c3c1c3788446366439e975a890918b848536c17 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Fri, 11 Aug 2017 15:42:55 +0800 Subject: [PATCH 12/14] Add restApi related test case --- .gitignore | 1 + test/emqttd_SUITE.erl | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index b78d533e5..aa8378d0d 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ data/ _build .rebar3 rebar3.crashdump +.DS_Store diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index f89f25c91..ed8896f0d 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -26,7 +26,7 @@ -define(APP, emqttd). --define(CONTENT_TYPE, "application/x-www-form-urlencoded"). +-define(CONTENT_TYPE, "application/json"). -define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, {verify, verify_peer}, @@ -449,13 +449,21 @@ request_status(_) -> ?assertEqual(binary_to_list(Status), Return). request_publish(_) -> + emqttc:start_link([{host, "localhost"}, + {port, 1883}, + {client_id, <<"random">>}, + {clean_sess, false}]), + SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", + ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("", ""))), ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), - Params = "qos=1&retain=0&topic=a/b/c&message=hello", - ?assert(connect_emqttd_publish_(post, "mqtt/publish", Params, auth_header_("", ""))), + Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}", + ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("", ""))), ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), - emqttd:unsubscribe(<<"a/b/c">>). -connect_emqttd_publish_(Method, Api, Params, Auth) -> + UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", + ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("", ""))). + +connect_emqttd_pubsub_(Method, Api, Params, Auth) -> Url = "http://127.0.0.1:8080/" ++ Api, case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of {error, socket_closed_remotely} -> From 8cb07b62990e05621c8cf67827f4bc23072a63f3 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Fri, 11 Aug 2017 17:15:53 +0800 Subject: [PATCH 13/14] Update Depends of clique --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 9944cb5ae..d313936ac 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ dep_mochiweb = git https://github.com/emqtt/mochiweb master dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master -dep_clique = git https://github.com/turtleDeng/clique +dep_clique = git https://github.com/emqtt/clique dep_jsx = git https://github.com/talentdeficit/jsx ERLC_OPTS += +debug_info From f28db8b4c7b733be45bbc2337156185dd3f3d024 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 12 Aug 2017 12:51:02 +0800 Subject: [PATCH 14/14] Format code --- src/emqttd_cli_config.erl | 50 +++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl index b23d3e766..0e0e29c4b 100644 --- a/src/emqttd_cli_config.erl +++ b/src/emqttd_cli_config.erl @@ -49,6 +49,7 @@ register_config_cli() -> %%-------------------------------------------------------------------- %% Auth/Acl %%-------------------------------------------------------------------- + register_auth_config() -> ConfigKeys = ["mqtt.allow_anonymous", "mqtt.acl_nomatch", @@ -58,17 +59,17 @@ register_auth_config() -> ok = register_config_whitelist(ConfigKeys). auth_config_callback([_, KeyStr], Value) -> - application:set_env(?APP, l2a(KeyStr), Value), - " successfully\n". + application:set_env(?APP, l2a(KeyStr), Value), " successfully\n". %%-------------------------------------------------------------------- %% MQTT Protocol %%-------------------------------------------------------------------- + register_protocol_formatter() -> ConfigKeys = ["max_clientid_len", - "max_packet_size", - "websocket_protocol_header", - "keepalive_backoff"], + "max_packet_size", + "websocket_protocol_header", + "keepalive_backoff"], [clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys]. protocol_formatter_callback([_, Key], Params) -> @@ -92,6 +93,7 @@ protocol_config_callback(App, Key, Value) -> %%-------------------------------------------------------------------- %% MQTT Connection %%-------------------------------------------------------------------- + register_connection_config() -> ConfigKeys = ["mqtt.conn.force_gc_count"], [clique:register_config(Key , fun connection_config_callback/2) || Key <- ConfigKeys], @@ -105,10 +107,11 @@ connection_config_callback([_, KeyStr0, KeyStr1], Value) -> %%-------------------------------------------------------------------- %% MQTT Client %%-------------------------------------------------------------------- + register_client_formatter() -> ConfigKeys = ["max_publish_rate", - "idle_timeout", - "enable_stats"], + "idle_timeout", + "enable_stats"], [clique:register_formatter(["mqtt", "client", Key], fun client_formatter_callback/2) || Key <- ConfigKeys]. client_formatter_callback([_, _, Key], Params) -> @@ -131,16 +134,17 @@ client_config_callback(App, Key, Value) -> %%-------------------------------------------------------------------- %% session %%-------------------------------------------------------------------- + register_session_formatter() -> ConfigKeys = ["max_subscriptions", - "upgrade_qos", - "max_inflight", - "retry_interval", - "max_awaiting_rel", - "await_rel_timeout", - "enable_stats", - "expiry_interval", - "ignore_loop_deliver"], + "upgrade_qos", + "max_inflight", + "retry_interval", + "max_awaiting_rel", + "await_rel_timeout", + "enable_stats", + "expiry_interval", + "ignore_loop_deliver"], [clique:register_formatter(["mqtt", "session", Key], fun session_formatter_callback/2) || Key <- ConfigKeys]. session_formatter_callback([_, _, Key], Params) -> @@ -171,13 +175,14 @@ l2a(List) -> list_to_atom(List). %%-------------------------------------------------------------------- %% MQTT MQueue %%-------------------------------------------------------------------- + register_queue_formatter() -> ConfigKeys = ["type", - "priority", - "max_length", - "low_watermark", - "high_watermark", - "store_qos0"], + "priority", + "max_length", + "low_watermark", + "high_watermark", + "store_qos0"], [clique:register_formatter(["mqtt", "mqueue", Key], fun queue_formatter_callback/2) || Key <- ConfigKeys]. queue_formatter_callback([_, _, Key], Params) -> @@ -203,6 +208,7 @@ queue_config_callback(App, Key, Value) -> %%-------------------------------------------------------------------- %% MQTT Broker %%-------------------------------------------------------------------- + register_broker_config() -> ConfigKeys = ["mqtt.broker.sys_interval"], [clique:register_config(Key , fun broker_config_callback/2) || Key <- ConfigKeys], @@ -216,6 +222,7 @@ broker_config_callback([_, KeyStr0, KeyStr1], Value) -> %%-------------------------------------------------------------------- %% MQTT Lager %%-------------------------------------------------------------------- + register_lager_formatter() -> ConfigKeys = ["level"], [clique:register_formatter(["log", "console", Key], fun lager_formatter_callback/2) || Key <- ConfigKeys]. @@ -233,4 +240,5 @@ lager_config_callback(_, Value) -> " successfully\n". register_config_whitelist(ConfigKeys) -> - clique:register_config_whitelist(ConfigKeys, ?APP). \ No newline at end of file + clique:register_config_whitelist(ConfigKeys, ?APP). +